Source code for cloudos_cli.clos

"""
This is the main class of the package.
"""

import requests
import time
import json
from dataclasses import dataclass
from cloudos_cli.utils.cloud import find_cloud
from cloudos_cli.utils.errors import BadRequestException, JoBNotCompletedException, NotAuthorisedException, JobAccessDeniedException
from cloudos_cli.utils.requests import retry_requests_get, retry_requests_post, retry_requests_put
import pandas as pd
from cloudos_cli.utils.last_wf import youngest_workflow_id_by_name
from datetime import datetime


# GLOBAL VARS
JOB_COMPLETED = 'completed'
JOB_FAILED = 'failed'
JOB_ABORTED = 'aborted'


[docs] @dataclass class Cloudos: """A simple class to contain the required connection information. Parameters ---------- cloudos_url : string The CloudOS service url. apikey : string Your CloudOS API key. cromwell_token : string Cromwell server token. If None, apikey will be used instead. """ cloudos_url: str apikey: str cromwell_token: str
[docs] def get_job_status(self, j_id, verify=True): """Get job status from CloudOS. Parameters ---------- j_id : string The CloudOS job id of the job just launched. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- r : requests.models.Response The server response """ cloudos_url = self.cloudos_url apikey = self.apikey headers = { "Content-type": "application/json", "apikey": apikey } r = retry_requests_get("{}/api/v1/jobs/{}".format(cloudos_url, j_id), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return r
[docs] def wait_job_completion(self, job_id, wait_time=3600, request_interval=30, verbose=False, verify=True): """Checks job status from CloudOS and wait for its complation. Parameters ---------- j_id : string The CloudOS job id of the job just launched. wait_time : int Max time to wait (in seconds) to job completion. request_interval : int Time interval (in seconds) to request job status. verbose : bool Whether to output status on every request or not. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- : dict A dict with three elements collected from the job status: 'name', 'id', 'status'. """ j_url = f'{self.cloudos_url}/app/advanced-analytics/analyses/{job_id}' elapsed = 0 j_status_h_old = '' # make sure user doesn't surpass the wait time if request_interval > wait_time: request_interval = wait_time while elapsed < wait_time: j_status = self.get_job_status(job_id, verify) j_status_content = json.loads(j_status.content) j_status_h = j_status_content["status"] j_name = j_status_content["name"] if j_status_h == JOB_COMPLETED: if verbose: print(f'\tYour job "{j_name}" (ID: {job_id}) took {elapsed} seconds to complete ' + 'successfully.') return {'name': j_name, 'id': job_id, 'status': j_status_h} elif j_status_h == JOB_FAILED: if verbose: print(f'\tYour job "{j_name}" (ID: {job_id}) took {elapsed} seconds to fail.') return {'name': j_name, 'id': job_id, 'status': j_status_h} elif j_status_h == JOB_ABORTED: if verbose: print(f'\tYour job "{j_name}" (ID: {job_id}) took {elapsed} seconds to abort.') return {'name': j_name, 'id': job_id, 'status': j_status_h} else: elapsed += request_interval if j_status_h != j_status_h_old: if verbose: print(f'\tYour current job "{j_name}" (ID: {job_id}) status is: {j_status_h}.') j_status_h_old = j_status_h time.sleep(request_interval) j_status = self.get_job_status(job_id, verify) j_status_content = json.loads(j_status.content) j_status_h = j_status_content["status"] j_name = j_status_content["name"] if j_status_h != JOB_COMPLETED and verbose: print(f'\tYour current job "{j_name}" (ID: {job_id}) status is: {j_status_h}. The ' + f'selected wait-time of {wait_time} was exceeded. Please, ' + 'consider to set a longer wait-time.') print('\tTo further check your job status you can either go to ' + f'{j_url} or use the following command:\n' + '\tcloudos job status \\\n' + '\t\t--apikey $MY_API_KEY \\\n' + f'\t\t--cloudos-url {self.cloudos_url} \\\n' + f'\t\t--job-id {job_id}\n') return {'name': j_name, 'id': job_id, 'status': j_status_h}
[docs] def get_storage_contents(self, cloud_name, cloud_meta, container, path, workspace_id, verify): """ Retrieves the contents of a storage container from the specified cloud service. This method fetches the contents of a specified path within a storage container on a cloud service (e.g., AWS S3 or Azure Blob). The request is authenticated using an API key and requires valid parameters such as the workspace ID and path. Parameters: cloud_name (str): The name of the cloud service (e.g., 'aws' or 'azure'). container (str): The name of the storage container or bucket. path (str): The file path or directory within the storage container. workspace_id (str): The identifier of the workspace or team. verify (bool): Whether to verify SSL certificates for the request. Returns: list: A list of contents retrieved from the specified cloud storage. Raises: BadRequestException: If the request to retrieve the contents fails with a status code indicating an error. """ headers = { "Content-type": "application/json", "apikey": self.apikey } cloud_data = { "aws": { "url": f"{self.cloudos_url}/api/v1/data-access/s3/bucket-contents", "params": { "bucket": container, "path": path, "teamId": workspace_id } }, "azure": { "url": f"{self.cloudos_url}/api/v1/data-access/azure/container-contents", "container": "containerName", "params": { "containerName": container, "path": path + "/", "storageAccountName": "", "teamId": workspace_id } } } if cloud_name == "azure": cloud_data[cloud_name]["params"]["storageAccountName"] = cloud_meta["storage"]["storageAccount"] params = cloud_data[cloud_name]["params"] contents_req = retry_requests_get(cloud_data[cloud_name]["url"], params=params, headers=headers, verify=verify) if contents_req.status_code >= 400: raise BadRequestException(contents_req) return contents_req.json()["contents"]
[docs] def get_job_workdir(self, j_id, workspace_id, verify=True): """ Get the working directory for the specified job """ cloudos_url = self.cloudos_url apikey = self.apikey headers = { "Content-type": "application/json", "apikey": apikey } r = retry_requests_get(f"{cloudos_url}/api/v1/jobs/{j_id}", headers=headers, verify=verify) if r.status_code == 401: raise NotAuthorisedException elif r.status_code == 403: # Handle 403 with more informative error message self._handle_job_access_denied(j_id, workspace_id, verify) elif r.status_code >= 400: raise BadRequestException(r) r_json = r.json() job_workspace = r_json["team"] if job_workspace != workspace_id: raise ValueError("Workspace provided or configured is different from workspace where the job was executed") if "resumeWorkDir" not in r_json: print("Working directories are not available. This may be because the analysis was run without resumable mode enabled, or because intermediate results have since been removed.") # Check if logs field exists, if not fall back to original folder-based approach elif "logs" in r_json: # Get workdir information from logs object using the same pattern as get_job_logs logs_obj = r_json["logs"] cloud_name, cloud_meta, cloud_storage = find_cloud(self.cloudos_url, self.apikey, workspace_id, logs_obj) container_name = cloud_storage["container"] prefix_name = cloud_storage["prefix"] logs_bucket = logs_obj[container_name] logs_path = logs_obj[prefix_name] # Construct workdir path by replacing '/logs' with '/work' in the logs path workdir_path_suffix = logs_path.replace('/logs', '/work') if cloud_name == "aws": workdir_path = f"s3://{logs_bucket}/{workdir_path_suffix}" elif cloud_name == "azure": storage_account_prefix = '' cloude_scheme = cloud_storage["scheme"] if cloude_scheme == 'az': storage_account_prefix = f"az://{cloud_meta['storage']['storageAccount']}.blob.core.windows.net" workdir_path = f"{storage_account_prefix}/{logs_bucket}/{workdir_path_suffix}" else: raise ValueError("Unsupported cloud provider") return workdir_path else: # Fallback to original folder-based approach for backward compatibility workdir_id = r_json["resumeWorkDir"] # This will fail, as the API endpoint is not open. This works when adding # the authorisation bearer token manually to the headers workdir_bucket_r = retry_requests_get(f"{cloudos_url}/api/v1/folders", params=dict(id=workdir_id, teamId=workspace_id), headers=headers, verify=verify) if workdir_bucket_r.status_code == 401: raise NotAuthorisedException elif workdir_bucket_r.status_code >= 400: raise BadRequestException(workdir_bucket_r) workdir_bucket_o = workdir_bucket_r.json() if len(workdir_bucket_o) > 1: raise ValueError(f"Request returned more than one result for folder id {workdir_id}") workdir_bucket_info = workdir_bucket_o[0] if workdir_bucket_info["folderType"] == "S3Folder": cloud_name = "aws" elif workdir_bucket_info["folderType"] == "AzureBlobFolder": cloud_name = "azure" else: raise ValueError("Unsupported cloud provider") if cloud_name == "aws": bucket_name = workdir_bucket_info["s3BucketName"] bucket_path = workdir_bucket_info["s3Prefix"] workdir_path = f"s3://{bucket_name}/{bucket_path}" elif cloud_name == "azure": storage_account = f"az://{workspace_id}.blob.core.windows.net" container_name = workdir_bucket_info["blobContainerName"] blob_prefix = workdir_bucket_info["blobPrefix"] workdir_path = f"{storage_account}/{container_name}/{blob_prefix}" else: raise ValueError("Unsupported cloud provider") return workdir_path
def _handle_job_access_denied(self, job_id, workspace_id, verify=True): """ Handle 403 errors with more informative messages by checking job ownership """ try: # Try to get current user info current_user = self.get_user_info(verify) current_user_name = f"{current_user.get('name', '')} {current_user.get('surname', '')}".strip() if not current_user_name: current_user_name = current_user.get('email', 'Unknown') except Exception: current_user_name = None try: # Try to get job info from job list to see the owner jobs = self.get_job_list(workspace_id, last_n_jobs='all', verify=verify) job_owner_name = None for job in jobs: if job.get('_id') == job_id: user_info = job.get('user', {}) job_owner_name = f"{user_info.get('name', '')} {user_info.get('surname', '')}".strip() if not job_owner_name: job_owner_name = user_info.get('email', 'Unknown') break raise JobAccessDeniedException(job_id, job_owner_name, current_user_name) except JobAccessDeniedException: # Re-raise the specific exception raise except Exception: # If we can't get detailed info, fall back to generic message raise JobAccessDeniedException(job_id)
[docs] def get_job_logs(self, j_id, workspace_id, verify=True): """ Get the location of the logs for the specified job """ cloudos_url = self.cloudos_url apikey = self.apikey headers = { "Content-type": "application/json", "apikey": apikey } r = retry_requests_get(f"{cloudos_url}/api/v1/jobs/{j_id}", headers=headers, verify=verify) if r.status_code == 401: raise NotAuthorisedException elif r.status_code >= 400: raise BadRequestException(r) r_json = r.json() logs_obj = r_json["logs"] job_workspace = r_json["team"] if job_workspace != workspace_id: raise ValueError("Workspace provided or configured is different from workspace where the job was executed") cloud_name, cloud_meta, cloud_storage = find_cloud(self.cloudos_url, self.apikey, workspace_id, logs_obj) container_name = cloud_storage["container"] prefix_name = cloud_storage["prefix"] logs_bucket = logs_obj[container_name] logs_path = logs_obj[prefix_name] contents_obj = self.get_storage_contents(cloud_name, cloud_meta, logs_bucket, logs_path, workspace_id, verify) logs = {} cloude_scheme = cloud_storage["scheme"] storage_account_prefix = '' if cloude_scheme == 'az': storage_account_prefix = f'{workspace_id}.blob.core.windows.net/' for item in contents_obj: if not item["isDir"]: filename = item["name"] if filename == "stdout.txt": filename = "Nextflow standard output" if filename == ".nextflow.log": filename = "Nextflow log" if filename == "trace.txt": filename = "Trace file" logs[filename] = f"{cloude_scheme}://{storage_account_prefix}{logs_bucket}/{item['path']}" return logs
[docs] def get_job_results(self, j_id, workspace_id, verify=True): """ Get the location of the results for the specified job """ cloudos_url = self.cloudos_url apikey = self.apikey headers = { "Content-type": "application/json", "apikey": apikey } status = self.get_job_status(j_id, verify).json()["status"] if status != JOB_COMPLETED: raise JoBNotCompletedException(j_id, status) r = retry_requests_get(f"{cloudos_url}/api/v1/jobs/{j_id}", headers=headers, verify=verify) if r.status_code == 401: raise NotAuthorisedException if r.status_code >= 400: raise BadRequestException(r) req_obj = r.json() job_workspace = req_obj["team"] if job_workspace != workspace_id: raise ValueError("Workspace provided or configured is different from workspace where the job was executed") cloud_name, meta, cloud_storage = find_cloud(self.cloudos_url, self.apikey, workspace_id, req_obj["logs"]) # cont_name results_obj = req_obj["results"] results_container = results_obj[cloud_storage["container"]] results_path = results_obj[cloud_storage["prefix"]] scheme = cloud_storage["scheme"] contents_obj = self.get_storage_contents(cloud_name, meta, results_container, results_path, workspace_id, verify) storage_account_prefix = '' if scheme == 'az': storage_account_prefix = f'{workspace_id}.blob.core.windows.net/' results = dict() for item in contents_obj: if item["isDir"]: filename = item["name"] results[filename] = f"{scheme}://{storage_account_prefix}{results_container}/{item['path']}" return results
def _create_cromwell_header(self): """Generates cromwell header. This methods is responsible for using personal API key instead of specific Cromwell API when the later is not provided. Returns ------- headers : dict The correct headers based on using cromwell specific token or personal API key. """ if self.cromwell_token is None: headers = { "Accept": "application/json", "apikey": self.apikey } else: headers = { "Accept": "application/json", "Authorization": f'Bearer {self.cromwell_token}' } return headers
[docs] def resolve_user_id(self, filter_owner, workspace_id, verify=True): """Resolve a username or display name to a user ID. Parameters ---------- filter_owner : str The username or display name to search for. workspace_id : str The CloudOS workspace ID. verify : [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- str The user ID corresponding to the filter_owner. Raises ------ ValueError If the user cannot be found or if there's an error during the search. """ try: search_headers = { "Content-type": "application/json", "apikey": self.apikey } search_params = { "q": filter_owner, "teamId": workspace_id } # Note: this endpoint may not be open in all CloudOS instances user_search_r = retry_requests_get(f"{self.cloudos_url}/api/v1/users/search-assist", params=search_params, headers=search_headers, verify=verify) if user_search_r.status_code >= 400: raise ValueError(f"Error searching for user '{filter_owner}'") user_search_content = user_search_r.json() user_items = user_search_content.get('items', []) if user_items and len(user_items) > 0: user_match = None for user in user_items: if user.get("username") == filter_owner or user.get("name") == filter_owner: user_match = user break if user_match: return user_match.get("id") else: raise ValueError(f"User '{filter_owner}' not found.") else: raise ValueError(f"User '{filter_owner}' not found.") except Exception as e: raise ValueError(f"Error resolving user '{filter_owner}': {str(e)}")
[docs] def get_cromwell_status(self, workspace_id, verify=True): """Get Cromwell server status from CloudOS. Parameters ---------- workspace_id : string The CloudOS workspace id from to check the Cromwell status. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- r : requests.models.Response The server response """ cloudos_url = self.cloudos_url headers = self._create_cromwell_header() r = retry_requests_get("{}/api/v1/cromwell?teamId={}".format(cloudos_url, workspace_id), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return r
[docs] def cromwell_switch(self, workspace_id, action, verify=True): """Restart Cromwell server. Parameters ---------- workspace_id : string The CloudOS workspace id in which restart/stop Cromwell status. action : string [restart|stop] The action to perform. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- r : requests.models.Response The server response """ cloudos_url = self.cloudos_url headers = self._create_cromwell_header() r = requests.put("{}/api/v1/cromwell/{}?teamId={}".format(cloudos_url, action, workspace_id), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return r
[docs] def get_job_list(self, workspace_id, last_n_jobs=None, page=None, page_size=None, archived=False, verify=True, filter_status=None, filter_job_name=None, filter_project=None, filter_workflow=None, filter_job_id=None, filter_only_mine=False, filter_owner=None, filter_queue=None, last=False): """Get jobs from a CloudOS workspace with optional filtering. Fetches jobs page by page, applies all filters after fetching. Stops when enough jobs are collected or no more jobs are available. Parameters ---------- workspace_id : string The CloudOS workspace id from to collect the jobs. last_n_jobs : [int | 'all'], default=None How many of the last jobs from the user to retrieve. You can specify a very large int or 'all' to get all user's jobs. When specified, page and page_size parameters are ignored. page : int, default=None Response page to get when not using last_n_jobs. page_size : int, default=None Number of jobs to retrieve per page when not using last_n_jobs. Maximum allowed value is 100. archived : bool, default=False When True, only the archived jobs are retrieved. verify: [bool|string], default=True Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. filter_status : string, optional Filter jobs by status (e.g., 'completed', 'running', 'failed'). filter_job_name : string, optional Filter jobs by name. filter_project : string, optional Filter jobs by project name (will be resolved to project ID). filter_workflow : string, optional Filter jobs by workflow name (will be resolved to workflow ID). filter_job_id : string, optional Filter jobs by specific job ID. filter_only_mine : bool, optional Filter to show only jobs belonging to the current user. filter_owner : string, optional Filter jobs by owner username (will be resolved to user ID). filter_queue : string, optional Filter jobs by queue name (will be resolved to queue ID). Only applies to jobs running in batch environment. Non-batch jobs are preserved in results as they don't use queues. last : bool, optional When workflows are duplicated, use the latest imported workflow (by date). Returns ------- r : list A list of dicts, each corresponding to a jobs from the user and the workspace. """ # Validate workspace_id if not workspace_id or not isinstance(workspace_id, str): raise ValueError("Invalid workspace_id: must be a non-empty string") # Validate last_n_jobs if last_n_jobs is not None: if isinstance(last_n_jobs, str): if last_n_jobs != 'all': try: last_n_jobs = int(last_n_jobs) except ValueError: raise ValueError("last_n_jobs must be a positive integer or 'all'") # Validate that integer last_n_jobs is positive if isinstance(last_n_jobs, int) and last_n_jobs <= 0: raise ValueError("last_n_jobs must be a positive integer or 'all'") # Validate page and page_size if page is not None and (page <= 0 or not isinstance(page, int)): raise ValueError('Please, use a positive integer (>= 1) for the --page parameter') if page_size is not None and (page_size <= 0 or not isinstance(page_size, int)): raise ValueError('Please, use a positive integer (>= 1) for the --page-size parameter') # Handle parameter interaction and set defaults # If last_n_jobs is provided, use pagination mode with last_n_jobs # If page/page_size are provided without last_n_jobs, use direct pagination mode if last_n_jobs is not None: # When last_n_jobs is specified, warn if page/page_size are also specified print('[Warning] When using --last-n-jobs option, --page and --page-size are ignored. ' + 'To use --page and --page-size, please remove --last-n-jobs option.\n') # Use pagination to fetch last_n_jobs, starting from page 1 use_pagination_mode = True target_job_count = last_n_jobs current_page = 1 current_page_size = min(100, int(last_n_jobs)) if last_n_jobs != 'all' else 100 else: # Direct pagination mode - use page and page_size as specified use_pagination_mode = False target_job_count = page_size # Only get jobs for this page current_page = page if page is not None else 1 current_page_size = page_size if page_size is not None else 10 # Validate page_size limit for direct pagination if current_page_size > 100: raise ValueError('Please, use a page_size value <= 100') # Validate filter_status values if filter_status: valid_statuses = ['completed', 'running', 'failed', 'aborted', 'queued', 'pending', 'initializing'] if filter_status.lower() not in valid_statuses: raise ValueError(f"Invalid filter_status '{filter_status}'. Valid values: {', '.join(valid_statuses)}") headers = { "Content-type": "application/json", "apikey": self.apikey } # Build query parameters for server-side filtering params = { "teamId": workspace_id, "archived.status": str(archived).lower(), "page": current_page, "limit": current_page_size } # --- Resolve IDs once (before pagination loop) --- # Add simple server-side filters if filter_status: params["status"] = filter_status.lower() if filter_job_name: params["name"] = filter_job_name if filter_job_id: params["id"] = filter_job_id # Resolve project name to ID if filter_project: try: project_id = self.get_project_id_from_name(workspace_id, filter_project, verify=verify) if project_id: params["project.id"] = project_id else: raise ValueError(f"Project '{filter_project}' not found.") except Exception as e: raise ValueError(f"Error resolving project '{filter_project}': {str(e)}") # Resolve workflow name to ID if filter_workflow: try: workflow_content = self.get_workflow_content(workspace_id, filter_workflow, verify=verify, last=last) if workflow_content and workflow_content.get("workflows"): # Extract the first (and should be only) workflow from the list workflow = workflow_content["workflows"][0] workflow_id = workflow.get("_id") if workflow_id: params["workflow.id"] = workflow_id else: raise ValueError(f"Workflow '{filter_workflow}' not found.") else: raise ValueError(f"Workflow '{filter_workflow}' not found.") except Exception as e: raise ValueError(f"Error resolving workflow '{filter_workflow}': {str(e)}") # Get current user ID for filter_only_mine if filter_only_mine: try: user_info = self.get_user_info(verify=verify) user_id = user_info.get("id") or user_info.get("_id") if user_id: params["user.id"] = user_id else: raise ValueError("Could not retrieve current user information.") except Exception as e: raise ValueError(f"Error getting current user info: {str(e)}") # Resolve owner username to user ID if filter_owner: user_id = self.resolve_user_id(filter_owner, workspace_id, verify) params["user.id"] = user_id # --- Fetch jobs page by page --- all_jobs = [] params["limit"] = current_page_size while True: params["page"] = current_page r = retry_requests_get(f"{self.cloudos_url}/api/v2/jobs", params=params, headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) content = r.json() page_jobs = content.get('jobs', []) # No jobs returned, we've reached the end if not page_jobs: break all_jobs.extend(page_jobs) # Check stopping conditions based on mode if use_pagination_mode: # In pagination mode (last_n_jobs), continue until we have enough jobs if target_job_count != 'all' and len(all_jobs) >= target_job_count: break else: # In direct mode (page/page_size), only get one page break # Check if we reached the last page (fewer jobs than requested page size) if len(page_jobs) < params["limit"]: break # Last page current_page += 1 # --- Local queue filtering (not supported by API) --- if filter_queue: try: batch_jobs=[job for job in all_jobs if job.get("batch", {})] if batch_jobs: from cloudos_cli.queue.queue import Queue queue_api = Queue(self.cloudos_url, self.apikey, self.cromwell_token, workspace_id, verify) queues = queue_api.get_job_queues() queue_id = None for queue in queues: if queue.get("label") == filter_queue or queue.get("name") == filter_queue: queue_id = queue.get("id") or queue.get("_id") break if not queue_id: raise ValueError(f"Queue with name '{filter_queue}' not found in workspace '{workspace_id}'") all_jobs = [job for job in all_jobs if job.get("batch", {}).get("jobQueue", {}).get("id") == queue_id] else: raise ValueError(f"The environment is not a batch environment so queues do not exist. Please remove the --filter-queue option.") except Exception as e: raise ValueError(f"Error filtering by queue '{filter_queue}': {str(e)}") # --- Apply limit after all filtering --- if use_pagination_mode and target_job_count != 'all' and isinstance(target_job_count, int) and target_job_count > 0: all_jobs = all_jobs[:target_job_count] return all_jobs
[docs] @staticmethod def process_job_list(r, all_fields=False): """Process a job list from a self.get_job_list call. Parameters ---------- r : list A list of dicts, each corresponding to a job from the user and the workspace. all_fields : bool. Default=False Whether to return a reduced version of the DataFrame containing only the selected columns or the full DataFrame. Returns ------- df : pandas.DataFrame A DataFrame with the requested columns from the jobs. """ COLUMNS = ['status', 'name', 'project.name', 'user.name', 'user.surname', 'workflow.name', '_id', 'startTime', 'endTime', 'createdAt', 'updatedAt', 'revision.commit', 'realInstancesExecutionCost', 'masterInstance.usedInstance.type', 'storageMode', 'workflow.repository.url', 'nextflowVersion', 'batch.enabled', 'storageSizeInGb', 'batch.jobQueue.id', 'usesFusionFileSystem' ] df_full = pd.json_normalize(r) if df_full.empty: return df_full if all_fields: df = df_full else: # Only select columns that actually exist in the DataFrame existing_columns = [col for col in COLUMNS if col in df_full.columns] if existing_columns: df = df_full.loc[:, existing_columns] else: # If none of the predefined columns exist, raise missing error raise ValueError(f"None of the predefined columns {COLUMNS} exist in retrieved columns:{list(df_full.columns)}") return df
[docs] def reorder_job_list(self, my_jobs_df, filename='my_jobs.csv'): """Save a job list DataFrame to a CSV file with renamed and ordered columns. Parameters ---------- my_jobs_df : pandas.DataFrame A DataFrame containing job information from process_job_list. filename : str The name of the file to save the DataFrame to. Default is 'my_jobs.csv'. Returns ------- None Saves the DataFrame to a CSV file with renamed and ordered columns. """ # Handle empty DataFrame if my_jobs_df.empty: print("Warning: DataFrame is empty. Creating empty CSV file.") empty_df = pd.DataFrame() empty_df.to_csv(filename, index=False) return # Create a copy to avoid modifying the original DataFrame jobs_df = my_jobs_df.copy() # 1. Fusion user.name and user.surname into user if 'user.name' in jobs_df.columns and 'user.surname' in jobs_df.columns: jobs_df['user'] = jobs_df.apply( lambda row: f"{row.get('user.name', '')} {row.get('user.surname', '')}".strip() if pd.notna(row.get('user.name')) or pd.notna(row.get('user.surname')) else None, axis=1 ) # Remove original columns jobs_df = jobs_df.drop(columns=['user.name', 'user.surname'], errors='ignore') # 2. Convert time fields to human-readable format time_columns = ['startTime', 'endTime', 'createdAt', 'updatedAt'] for col in time_columns: if col in jobs_df.columns: def format_time(x): if pd.notna(x) and isinstance(x, str) and x: try: return datetime.fromisoformat(x.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S UTC') except (ValueError, TypeError): return x # Return original value if parsing fails return None jobs_df[col] = jobs_df[col].apply(format_time) # 3. Format realInstancesExecutionCost (divide by 100, show 4 decimals) if 'realInstancesExecutionCost' in jobs_df.columns: def format_cost(x): if pd.notna(x) and x != '' and x is not None: try: return f"{float(x) / 100:.4f}" except (ValueError, TypeError): return x # Return original value if conversion fails return None jobs_df['realInstancesExecutionCost'] = jobs_df['realInstancesExecutionCost'].apply(format_cost) # 4. Calculate Run time (endTime - startTime) if 'startTime' in jobs_df.columns and 'endTime' in jobs_df.columns: def calculate_runtime(row): start_time = row.get('startTime') end_time = row.get('endTime') if pd.notna(start_time) and pd.notna(end_time) and start_time and end_time: # Use original times from the original DataFrame for calculation original_start = my_jobs_df.iloc[row.name].get('startTime') if row.name < len(my_jobs_df) else start_time original_end = my_jobs_df.iloc[row.name].get('endTime') if row.name < len(my_jobs_df) else end_time if pd.notna(original_start) and pd.notna(original_end) and original_start and original_end: try: start_dt = datetime.fromisoformat(str(original_start).replace('Z', '+00:00')) end_dt = datetime.fromisoformat(str(original_end).replace('Z', '+00:00')) duration = end_dt - start_dt # Format duration as hours:minutes:seconds total_seconds = int(duration.total_seconds()) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 if hours > 0: return f"{hours}h {minutes}m {seconds}s" elif minutes > 0: return f"{minutes}m {seconds}s" else: return f"{seconds}s" except (ValueError, TypeError): return None return None jobs_df['Run time'] = jobs_df.apply(calculate_runtime, axis=1) # 5. Format batch.enabled (True -> "Batch", else "N/A") if 'batch.enabled' in jobs_df.columns: jobs_df['batch.enabled'] = jobs_df['batch.enabled'].apply( lambda x: "Batch" if x is True else "N/A" ) # 6. Rename columns using the provided dictionary column_name_mapping = { "status": "Status", "name": "Name", "project.name": "Project", "user": "Owner", "workflow.name": "Pipeline", "_id": "ID", "createdAt": "Submit time", "updatedAt": "End time", "revision.commit": "Commit", "realInstancesExecutionCost": "Cost", "masterInstance.usedInstance.type": "Resources", "storageMode": "Storage type", "workflow.repository.url": "Pipeline url", "nextflowVersion": "Nextflow version", "batch.enabled": "Executor", "storageSizeInGb": "Storage size", "batch.jobQueue.id": "Job queue ID", "usesFusionFileSystem": "Accelerated file staging" } # Rename columns that exist in the DataFrame jobs_df = jobs_df.rename(columns=column_name_mapping) # Remove the original startTime and endTime columns since we now have Submit time, End time, and Run time jobs_df = jobs_df.drop(columns=['startTime', 'endTime'], errors='ignore') # 7. Define the desired order of columns desired_order = [ "Status", "Name", "Project", "Owner", "Pipeline", "ID", "Submit time", "End time", "Run time", "Commit", "Cost", "Resources", "Storage type", "Pipeline url", "Nextflow version", "Executor", "Storage size", "Job queue ID", "Accelerated file staging" ] # Reorder columns - only include columns that exist in the DataFrame available_columns = [col for col in desired_order if col in jobs_df.columns] # Add any remaining columns that aren't in the desired order remaining_columns = [col for col in jobs_df.columns if col not in desired_order] final_column_order = available_columns + remaining_columns # Reorder the DataFrame jobs_df = jobs_df[final_column_order] return jobs_df
[docs] def save_job_list_to_csv(self, my_jobs_df, filename='my_jobs.csv'): # Save to CSV jobs_df = self.reorder_job_list(my_jobs_df, filename) jobs_df.to_csv(filename, index=False) print(f'\tJob list collected with a total of {len(jobs_df)} jobs.') print(f'\tJob list saved to {filename}')
[docs] def get_workflow_list(self, workspace_id, verify=True, get_all=True, page=1, page_size=10, max_page_size=100, archived_status=False): """Get all the workflows from a CloudOS workspace. Parameters ---------- workspace_id : string The CloudOS workspace id from to collect the workflows. verify : [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. get_all : bool Whether to get all available curated workflows or just the indicated page. page : int The page number to retrieve, from the paginated response. page_size : int The number of workflows by page. From 1 to 1000. max_page_size : int Max page size defined by the API server. It is currently 1000. archived_status : bool Whether to retrieve archived workflows or not. Returns ------- r : list A list of dicts, each corresponding to a workflow. """ headers = { "Content-type": "application/json", "apikey": self.apikey } archived_status = str(archived_status).lower() r = retry_requests_get( "{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format( self.cloudos_url, workspace_id, page_size, page, archived_status), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) content = json.loads(r.content) if get_all: total_workflows = content['paginationMetadata']['Pagination-Count'] if total_workflows <= max_page_size: r = retry_requests_get( "{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format( self.cloudos_url, workspace_id, total_workflows, 1, archived_status), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return json.loads(r.content)['workflows'] else: n_pages = (total_workflows // max_page_size) + int((total_workflows % max_page_size) > 0) for p in range(n_pages): p += 1 r = retry_requests_get( "{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format( self.cloudos_url, workspace_id, max_page_size, p, archived_status), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) if p == 1: all_content = json.loads(r.content)['workflows'] else: all_content += json.loads(r.content)['workflows'] return all_content else: return content['workflows']
[docs] @staticmethod def process_workflow_list(r, all_fields=False): """Process a server response from a self.get_workflow_list call. Parameters ---------- r : list A list of dicts, each corresponding to a workflow. all_fields : bool. Default=False Whether to return a reduced version of the DataFrame containing only the selected columns or the full DataFrame. Returns ------- df : pandas.DataFrame A DataFrame with the requested columns from the workflows. """ COLUMNS = ['_id', 'name', 'archived.status', 'mainFile', 'workflowType', 'group', 'repository.name', 'repository.platform', 'repository.url', 'repository.isPrivate' ] df_full = pd.json_normalize(r) if all_fields: df = df_full else: present_columns = [] for column in COLUMNS: if column in df_full.columns: present_columns.append(column) df = df_full.loc[:, present_columns] return df
[docs] def detect_workflow(self, workflow_name, workspace_id, verify=True, last=False): """Detects workflow type: nextflow or wdl. Parameters ---------- workflow_name : string Name of the workflow. workspace_id : string The CloudOS workspace id from to collect the workflows. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- wt : string ['nextflow'|'wdl'] The workflow type detected """ # get list with workflow types wt_all = self.workflow_content_query(workspace_id, workflow_name, verify=verify, query="workflowType", last=last) # make unique wt = list(dict.fromkeys(wt_all)) if len(wt) > 1: raise ValueError(f'More than one workflow type detected for {workflow_name}: {wt}') return str(wt[0])
[docs] def is_module(self, workflow_name, workspace_id, verify=True, last=False): """Detects whether the workflow is a system module or not. System modules use fixed queues, so this check is important to properly manage queue selection. Parameters ---------- workflow_name : string Name of the workflow. workspace_id : string The CloudOS workspace id from to collect the workflows. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- bool True, if the workflow is a system module, false otherwise. """ # get a list of all groups group = self.workflow_content_query(workspace_id, workflow_name, verify=verify, query="group", last=last) module_groups = ['system-tools', 'data-factory-data-connection-etl', 'data-factory', 'data-factory-omics-etl', 'drug-discovery', 'data-factory-omics-insights', 'intermediate' ] if group[0] in module_groups: return True else: return False
[docs] def get_project_list(self, workspace_id, verify=True, get_all=True, page=1, page_size=10, max_page_size=100): """Get all the project from a CloudOS workspace. Parameters ---------- workspace_id : string The CloudOS workspace id from to collect the projects. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. get_all : bool Whether to get all available curated workflows or just the indicated page. page : int The page number to retrieve, from the paginated response. page_size : int The number of workflows by page. From 1 to 1000. max_page_size : int Max page size defined by the API server. It is currently 1000. Returns ------- r : requests.models.Response The server response """ headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_get("{}/api/v2/projects?teamId={}&pageSize={}&page={}".format( self.cloudos_url, workspace_id, page_size, page), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) content = json.loads(r.content) if get_all: total_projects = content['total'] if total_projects <= max_page_size: r = retry_requests_get("{}/api/v2/projects?teamId={}&pageSize={}&page={}".format( self.cloudos_url, workspace_id, total_projects, 1), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return json.loads(r.content)['projects'] else: n_pages = (total_projects // max_page_size) + int((total_projects % max_page_size) > 0) for p in range(n_pages): p += 1 r = retry_requests_get( "{}/api/v2/projects?teamId={}&pageSize={}&page={}".format( self.cloudos_url, workspace_id, max_page_size, p), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) if p == 1: all_content_p = json.loads(r.content)['projects'] else: all_content_p += json.loads(r.content)['projects'] return all_content_p else: return content['projects']
[docs] @staticmethod def process_project_list(r, all_fields=False): """Process a server response from a self.get_project_list call. Parameters ---------- r : requests.models.Response A list of dicts, each corresponding to a project. all_fields : bool. Default=False Whether to return a reduced version of the DataFrame containing only the selected columns or the full DataFrame. Returns ------- df : pandas.DataFrame A DataFrame with the requested columns from the projects. """ COLUMNS = ['_id', 'name', 'user.id', 'user.name', 'user.surname', 'user.email', 'createdAt', 'updatedAt', 'workflowCount', 'jobCount', 'notebookSessionCount' ] df_full = pd.json_normalize(r) if df_full.empty: return df_full if all_fields: df = df_full else: df = df_full.loc[:, COLUMNS] return df
[docs] def workflow_import(self, workspace_id, workflow_url, workflow_name, repository_project_id, workflow_docs_link='', repository_id=None, verify=True): """Imports workflows to CloudOS. Parameters ---------- workspace_id : string The CloudOS workspace id from to collect the projects. workflow_url : string The URL of the workflow. Only Github or Bitbucket are allowed. workflow_name : string A name for the imported pipeline in CloudOS. repository_project_id : int The repository project ID. workflow_docs_link : string Link to the documentation URL. repository_id : int The repository ID. Only required for GitHub repositories. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. returns ------- workflow_id : string The newly imported worflow ID. """ platform_url = workflow_url.split('/')[2].split('.')[0] repository_name = workflow_url.split('/')[-1] if platform_url == 'github': platform = 'github' repository_project = workflow_url.split('/')[3] if repository_id is None: raise ValueError('Please, specify --repository-id when importing a GitHub repository') elif platform_url == 'bitbucket': platform = 'bitbucketServer' repository_project = workflow_url.split('/')[4] repository_id = repository_name else: raise ValueError(f'Your repository platform is not supported: {platform_url}. ' + 'Please use either GitHub or BitbucketServer.') repository_name = workflow_url.split('/')[-1] data = { "workflowType": "nextflow", "repository": { "platform": platform, "repositoryId": repository_id, "name": repository_name, "owner": { "login": repository_project, "id": repository_project_id}, "isPrivate": True, "url": workflow_url, "commit": "", "branch": "" }, "name": workflow_name, "description": "", "isPublic": False, "mainFile": "main.nf", "defaultContainer": None, "processes": [], "docsLink": workflow_docs_link, "team": workspace_id } headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_post("{}/api/v1/workflows?teamId={}".format(self.cloudos_url, workspace_id), json=data, headers=headers, verify=verify) if r.status_code == 401: raise ValueError('It seems your API key is not authorised. Please check if ' + 'your workspace has support for importing workflows using cloudos-cli') elif r.status_code >= 400: raise BadRequestException(r) content = json.loads(r.content) return content['_id']
[docs] def get_user_info(self, verify=True): """Gets user information from users/me endpoint Parameters ---------- verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- r : requests.models.Response.content The server response content """ headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_get("{}/api/v1/users/me".format(self.cloudos_url), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return json.loads(r.content)
[docs] def abort_job(self, job, workspace_id, verify=True): """Abort a job. Parameters ---------- job : string The CloudOS job id of the job to abort. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- r : requests.models.Response The server response """ cloudos_url = self.cloudos_url apikey = self.apikey headers = { "Content-type": "application/json", "apikey": apikey } r = retry_requests_put("{}/api/v1/jobs/{}/abort?teamId={}".format(cloudos_url, job, workspace_id), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return r
[docs] def get_project_id_from_name(self, workspace_id, project_name, verify=True): """Retrieve the project ID from its name. Parameters ---------- workspace_id : str The CloudOS workspace ID to search for the project. project_name : str The name of the project to search for. verify : [bool | str], optional Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Default is True. Returns ------- dict The server response containing project details. Raises ------ BadRequestException If the request to retrieve the project fails with a status code indicating an error. """ headers = { "Content-type": "application/json", "apikey": self.apikey } url = f"{self.cloudos_url}/api/v2/projects?teamId={workspace_id}&search={project_name}" response = retry_requests_get(url, headers=headers, verify=verify) if response.status_code >= 400: raise BadRequestException(response) content = json.loads(response.content) project_id = next((p.get("_id") for p in content.get("projects", []) if p.get("name") == project_name), None) if project_id is None: raise ValueError(f"Project '{project_name}' was not found in workspace '{workspace_id}'") return project_id
[docs] def create_project(self, workspace_id, project_name, verify=True): """Create a new project in CloudOS. Parameters ---------- workspace_id : str The CloudOS workspace ID where the project will be created. project_name : str The name for the new project. verify : [bool | str], optional Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Default is True. Returns ------- str The ID of the newly created project. Raises ------ BadRequestException If the request to create the project fails with a status code indicating an error. """ data = { "name": project_name } headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_post("{}/api/v1/projects?teamId={}".format(self.cloudos_url, workspace_id), json=data, headers=headers, verify=verify) if r.status_code == 401: raise ValueError('It seems your API key is not authorised. Please check if ' + 'you have used the correct API key for the selected workspace') elif r.status_code == 409: raise ValueError(f'It seems that there is another project named "{project_name}" ' + 'in your workspace, please use another name for the new project') elif r.status_code >= 400: raise BadRequestException(r) content = json.loads(r.content) return content['_id']
[docs] def get_workflow_max_pagination(self, workspace_id, workflow_name, verify=True): """Retrieve the workflows max pages from API. Parameters ---------- workspace_id : str The CloudOS workspace ID to search for the workflow. workflow_name : str The name of the workflow to search for. verify : [bool | str], optional Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Default is True. Returns ------- int The server response with max pagination for workflows. Raises ------ BadRequestException If the request to retrieve the project fails with a status code indicating an error. """ headers = { "Content-type": "application/json", "apikey": self.apikey } # determine pagination, there might be a lot with the same name url = f"{self.cloudos_url}/api/v3/workflows?teamId={workspace_id}&search={workflow_name}" response = retry_requests_get(url, headers=headers, verify=verify) if response.status_code >= 400: raise BadRequestException(response) pag_content = json.loads(response.content) max_pagination = pag_content["paginationMetadata"]["Pagination-Count"] if max_pagination == 0: raise ValueError(f'No workflow found with name: {workflow_name} in workspace: {workspace_id}') return max_pagination
[docs] def get_workflow_content(self, workspace_id, workflow_name, verify=True, last=False, max_page_size=100): """Retrieve the workflow content from API. Parameters ---------- workspace_id : str The CloudOS workspace ID to search for the workflow. workflow_name : str The name of the workflow to search for. verify : [bool | str], optional Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Default is True. Returns ------- dict The server response containing workflow details. Raises ------ BadRequestException If the request to retrieve the project fails with a status code indicating an error. """ headers = { "Content-type": "application/json", "apikey": self.apikey } max_pagination = self.get_workflow_max_pagination(workspace_id, workflow_name, verify=verify) # get all the matching content if max_pagination > max_page_size: content = {"workflows": []} for page_start in range(0, max_pagination, max_page_size): page_size = min(max_page_size, max_pagination - page_start) url = f"{self.cloudos_url}/api/v3/workflows?teamId={workspace_id}&search={workflow_name}&pageSize={page_size}&page={page_start // max_page_size + 1}" response = retry_requests_get(url, headers=headers, verify=verify) # keep structure as a dict content["workflows"].extend(json.loads(response.content).get("workflows", [])) else: url = f"{self.cloudos_url}/api/v3/workflows?teamId={workspace_id}&search={workflow_name}&pageSize={max_pagination}" response = retry_requests_get(url, headers=headers, verify=verify) # return all content content = json.loads(response.content) if response.status_code >= 400: raise BadRequestException(response) # check for duplicates wf = [wf.get("name") for wf in content.get("workflows", []) if wf.get("name") == workflow_name] if len(wf) == 0 or len(content["workflows"]) == 0: raise ValueError(f'No workflow found with name: {workflow_name} in workspace: {workspace_id}') if len(wf) > 1 and not last: raise ValueError(f'More than one workflow found with name: {workflow_name}. ' + \ "To run the last imported workflow use '--last' flag.") else: content = youngest_workflow_id_by_name(content, workflow_name) return content
[docs] def workflow_content_query(self, workspace_id, workflow_name, verify=True, query="workflowType", last=False): content = self.get_workflow_content(workspace_id, workflow_name, verify=verify, last=last) # use 'query' to look in the content return [wf.get(query) for wf in content.get("workflows", []) if wf.get("name") == workflow_name]