Source code for cloudos_cli.datasets.datasets

"""
This is the main class for file explorer (datasets).
"""

from dataclasses import dataclass
from typing import Union
from cloudos_cli.clos import Cloudos
from cloudos_cli.utils.errors import BadRequestException
from cloudos_cli.utils.requests import retry_requests_get, retry_requests_put, retry_requests_post, retry_requests_delete
import json


[docs] @dataclass class Datasets(Cloudos): """Class for file explorer. Parameters ---------- cloudos_url : string The CloudOS service url. apikey : string Your CloudOS API key. workspace_id : string The specific Cloudos workspace id. project_name : string The name of a CloudOS project. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. project_id : string The CloudOS project id for a given project name. """ workspace_id: str project_name: str verify: Union[bool, str] = True project_id: str = None @property def project_id(self) -> str: return self._project_id @project_id.setter def project_id(self, v) -> None: if isinstance(v, property): # Fetch the value as not defined by user. self._project_id = self.fetch_project_id( self.workspace_id, self.project_name, verify=self.verify) else: # Let the user define the value. self._project_id = v
[docs] def fetch_project_id(self, workspace_id, project_name, verify=True): """Fetch the project id for a given name. Parameters ---------- workspace_id : string The specific Cloudos workspace id. project_name : string The name of a CloudOS project element. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- project_id : string The CloudOS project id for a given project name. """ return self.get_project_id_from_name(workspace_id, project_name, verify=verify)
[docs] def list_project_content(self): """ Fetch the information of the directories present in the projects. Uses ---------- apikey : string Your CloudOS API key cloudos_url : string The CloudOS service url. workspace_id : string The specific Cloudos workspace id. project_id The specific project id """ headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_get("{}/api/v2/datasets?projectId={}&teamId={}".format(self.cloudos_url, self.project_id, self.workspace_id), headers=headers, verify=self.verify) if r.status_code >= 400: raise BadRequestException(r) raw = r.json() datasets = raw.get("datasets", []) # Normalize response for item in datasets: item["folderType"] = True response = { "folders": datasets, "files": [] } return response
[docs] def list_datasets_content(self, folder_name): """Uses ---------- apikey : string Your CloudOS API key cloudos_url : string The CloudOS service url. workspace_id : string The specific Cloudos workspace id. project_id : string The specific project id folder_name : string The requested folder name """ # Prepare api request for CloudOS to fetch dataset info headers = { "Content-type": "application/json", "apikey": self.apikey } pro_fol = self.list_project_content() folder_id = None if folder_name == 'AnalysesResults': folder_name = 'Analyses Results' for folder in pro_fol.get("folders", []): if folder['name'] == folder_name: folder_id = folder['_id'] if not folder_id: raise ValueError(f"Folder '{folder_name}' not found in project '{self.project_name}'.") r = retry_requests_get("{}/api/v1/datasets/{}/items?teamId={}".format(self.cloudos_url, folder_id, self.workspace_id), headers=headers, verify=self.verify) if r.status_code >= 400: raise BadRequestException(r) return r.json()
[docs] def list_s3_folder_content(self, s3_bucket_name, s3_relative_path): """Uses ---------- apikey : string Your CloudOS API key cloudos_url : string The CloudOS service url. workspace_id : string The specific Cloudos workspace id. project_id : string The specific project id s3_bucket_name : string The s3 bucket name s3_relative_path: string The relative path in the s3 bucket """ # Prepare api request for CloudOS to fetch dataset info headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_get("{}/api/v1/data-access/s3/bucket-contents?bucket={}&path={}&teamId={}".format(self.cloudos_url, s3_bucket_name, s3_relative_path, self.workspace_id), headers=headers, verify=self.verify) if r.status_code >= 400: raise BadRequestException(r) raw = r.json() # Normalize response normalized = {"folders": [], "files": []} for item in raw.get("contents", []): if item.get("isDir"): item["folderType"] = "S3Folder" # 👈 inject folderType item["s3BucketName"] = s3_bucket_name item["s3Prefix"] = item['path'] normalized["folders"].append(item) else: item["s3Prefix"] = item['path'] item["s3BucketName"] = s3_bucket_name item["fileType"] = "S3File" normalized["files"].append(item) return normalized
[docs] def list_virtual_folder_content(self, folder_id): """Uses ---------- apikey : string Your CloudOS API key cloudos_url : string The CloudOS service url. workspace_id : string The specific Cloudos workspace id. project_id : string The specific project id folder_id : string The folder id of the folder whose content are to be listed """ headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_get("{}/api/v1/folders/virtual/{}/items?teamId={}".format(self.cloudos_url, folder_id, self.workspace_id), headers=headers, verify=self.verify) if r.status_code >= 400: raise BadRequestException(r) return r.json()
[docs] def list_azure_container_content(self, container_name: str, storage_account_name: str, path: str): """ List contents of an Azure Blob container path. """ headers = { "Content-type": "application/json", "apikey": self.apikey } url = f"{self.cloudos_url}/api/v1/data-access/azure/container-contents" url += f"?containerName={container_name}&storageAccountName={storage_account_name}" url += f"&path={path}&teamId={self.workspace_id}" r = retry_requests_get(url, headers=headers, verify=self.verify) if r.status_code >= 400: raise BadRequestException(r) raw = r.json() # Normalize response to match existing expectations normalized = {"folders": [], "files": []} for item in raw.get("contents", []): is_dir = item.get("isDir", False) # Set a name field based on the last part of the blob path path_str = item.get("path", "") name = item.get("name") or path_str.rstrip("/").split("/")[-1] # inject expected structure if is_dir: normalized["folders"].append({ "_id": item.get("_id"), "name": name, "folderType": "AzureBlobFolder", "blobPrefix": path_str, "blobContainerName": container_name, "blobStorageAccountName": storage_account_name, "kind": "Folder" }) else: normalized["files"].append({ "_id": item.get("_id"), "name": name, "fileType": "AzureBlobFile", "blobName": path_str, "blobContainerName": container_name, "blobStorageAccountName": storage_account_name, "sizeInBytes": item.get("size", 0), "updatedAt": item.get("lastModified"), "kind": "File" }) return normalized
[docs] def list_folder_content(self, path=None): """ Wrapper to list contents of a CloudOS folder. Parameters ---------- path : str, optional A path like 'TopFolder', 'TopFolder/Subfolder', or deeper. If None, lists all top-level datasets in the project. Returns ------- dict JSON response from the appropriate CloudOS endpoint. """ if not path: return self.list_project_content() parts = path.strip('/').split('/') if len(parts) == 1: return self.list_datasets_content(parts[0]) dataset_name = parts[0] folder_content = self.list_datasets_content(dataset_name) path_depth = 1 while path_depth < len(parts): job_name = parts[path_depth] found = False for job_folder in folder_content.get("folders", []): if job_folder["name"] == job_name: found = True folder_type = job_folder.get("folderType") if folder_type == "S3Folder": s3_bucket_name = job_folder['s3BucketName'] s3_relative_path = job_folder['s3Prefix'] if path_depth == len(parts) - 1: return self.list_s3_folder_content(s3_bucket_name, s3_relative_path) else: sub_path = '/'.join(parts[0:path_depth+1]) folder_content = self.list_folder_content(sub_path) path_depth += 1 break elif folder_type == "VirtualFolder": folder_id = job_folder['_id'] if path_depth == len(parts) - 1: return self.list_virtual_folder_content(folder_id) else: sub_path = '/'.join(parts[0:path_depth+1]) folder_content = self.list_folder_content(sub_path) path_depth += 1 break elif folder_type == "AzureBlobFolder": container_name = job_folder['blobContainerName'] storage_account_name = job_folder['blobStorageAccountName'] blob_prefix = job_folder['blobPrefix'] # trailing slash is mandatory for azure, otherwise it will not list the content of thefolde, just the folder if not blob_prefix.endswith('/'): blob_prefix += '/' if path_depth == len(parts) - 1: return self.list_azure_container_content(container_name, storage_account_name, blob_prefix) else: sub_path = '/'.join(parts[0:path_depth+1]) folder_content = self.list_folder_content(sub_path) path_depth += 1 break else: raise ValueError(f"Unsupported folder type '{folder_type}' for path '{path}'") if not found: raise ValueError(f"Folder '{job_name}' not found under dataset '{dataset_name}'") return folder_content
[docs] def move_files_and_folders(self, source_id: str, source_kind: str, target_id: str, target_kind: str): """ Move a file to another dataset in CloudOS. Parameters ---------- file_id : str The ID of the file to move. target_dataset_id : str The ID of the target dataset to move the file into. Returns ------- response : requests.Response The response object from the CloudOS API. """ url = f"{self.cloudos_url}/api/v1/dataItems/move?teamId={self.workspace_id}" headers = { "accept": "application/json", "content-type": "application/json", "ApiKey": self.apikey } payload = { "dataItemToMove": { "kind": source_kind, "item": source_id }, "toDataItemParent": { "kind": target_kind, "item": target_id } } response = retry_requests_put(url, headers=headers, data=json.dumps(payload), verify=self.verify) if response.status_code >= 400: raise BadRequestException(response) return response
[docs] def rename_item(self, item_id: str, new_name: str, kind: str): """ Rename a file or folder in CloudOS. Parameters ---------- item_id : str The ID of the file or folder to rename. new_name : str The new name to assign to the item. kind : str Either "File" or "Folder" Returns ------- response : requests.Response The response object from the CloudOS API. """ if kind not in ("File", "Folder"): raise ValueError("Invalid kind provided. Must be 'File' or 'Folder'.") endpoint = "files" if kind == "File" else "folders" url = f"{self.cloudos_url}/api/v1/{endpoint}/{item_id}?teamId={self.workspace_id}" headers = { "accept": "application/json", "content-type": "application/json", "ApiKey": self.apikey } payload = { "name": new_name } response = retry_requests_put(url, headers=headers, data=json.dumps(payload), verify=self.verify) if response.status_code >= 400: raise BadRequestException(response) return response
[docs] def copy_item(self, item, destination_id, destination_kind): """Copy a file or folder (S3, Azure or Virtual) to a destination in CloudOS.""" headers = { "accept": "application/json", "content-type": "application/json", "ApiKey": self.apikey } parent = {"kind": destination_kind, "id": destination_id} # Virtual folder if item.get("folderType") == "VirtualFolder": payload = { "copyContentsFrom": item["_id"], "name": item["name"], "parent": parent } url = f"{self.cloudos_url}/api/v1/folders/virtual?teamId={self.workspace_id}" # S3 folder elif item.get("folderType") == "S3Folder": payload = { "s3BucketName": item["s3BucketName"], "s3Prefix": item.get("s3Prefix"), "name": item["name"], "parent": parent, "isManagedByLifebit": item.get("isManagedByLifebit", False) } url = f"{self.cloudos_url}/api/v1/folders/s3?teamId={self.workspace_id}" # S3 file elif item.get("fileType") == "S3File": payload = { "s3BucketName": item["s3BucketName"], "s3ObjectKey": item.get("s3ObjectKey") or item.get("s3Prefix"), "name": item["name"], "parent": parent, "isManagedByLifebit": item.get("isManagedByLifebit", False), "sizeInBytes": item.get("sizeInBytes", 0) } url = f"{self.cloudos_url}/api/v1/files/s3?teamId={self.workspace_id}" # Azure folder elif item.get("folderType") == "AzureBlobFolder": payload = { "blobContainerName": item["blobContainerName"], "blobPrefix": item["blobPrefix"], "blobStorageAccountName": item["blobStorageAccountName"], "name": item["name"], "parent": parent } url = f"{self.cloudos_url}/api/v1/folders/azure-blob?teamId={self.workspace_id}" # Azure file elif item.get("fileType") == "AzureBlobFile": payload = { "blobContainerName": item["blobContainerName"], "blobName": item["blobName"], "blobStorageAccountName": item["blobStorageAccountName"], "name": item["name"], "parent": parent, "isManagedByLifebit": item.get("isManagedByLifebit", False), "sizeInBytes": item.get("sizeInBytes", 0) } url = f"{self.cloudos_url}/api/v1/files/azure-blob?teamId={self.workspace_id}" else: raise ValueError(f"Unknown item type for copy: {item.get('name')}") response = retry_requests_post(url, headers=headers, json=payload) if response.status_code >= 400: raise BadRequestException(response) return response
[docs] def create_virtual_folder(self, name: str, parent_id: str, parent_kind: str): """ Create a new virtual folder in CloudOS under a given parent. Parameters ---------- name : str The name of the new folder. parent_id : str The ID of the parent (can be a Dataset or a Folder). parent_kind : str The type of the parent: either "Dataset" or "Folder". Returns ------- response : requests.Response The response object from the CloudOS API. """ if parent_kind not in ("Dataset", "Folder"): raise ValueError("Invalid parent_kind. Must be 'Dataset' or 'Folder'.") url = f"{self.cloudos_url}/api/v1/folders/virtual?teamId={self.workspace_id}" headers = { "accept": "application/json", "content-type": "application/json", "ApiKey": self.apikey } payload = { "name": name, "parent": { "kind": parent_kind, "id": parent_id } } response = retry_requests_post(url, headers=headers, json=payload, verify=self.verify) if response.status_code >= 400: raise BadRequestException(response) return response
[docs] def delete_item(self, item_id: str, kind: str): """ Delete a file or folder in CloudOS. Parameters ---------- item_id : str The ID of the file or folder to delete. kind : str Must be either "File" or "Folder". Returns ------- response : requests.Response The response object from the CloudOS API. """ if kind not in ("File", "Folder"): raise ValueError("Invalid kind provided. Must be 'File' or 'Folder'.") endpoint = "files" if kind == "File" else "folders" url = f"{self.cloudos_url}/api/v1/{endpoint}/{item_id}?teamId={self.workspace_id}" headers = { "accept": "application/json", "ApiKey": self.apikey } response = retry_requests_delete(url, headers=headers, verify=self.verify) if response.status_code >= 400: raise BadRequestException(response) return response