Source code for cloudos_cli.jobs.job

"""
This is the main class to create jobs.
"""

from dataclasses import dataclass
from typing import Union
import json
from cloudos_cli.clos import Cloudos
from cloudos_cli.utils.errors import BadRequestException
from cloudos_cli.utils.requests import retry_requests_post, retry_requests_get
from pathlib import Path
import base64
from cloudos_cli.utils.array_job import classify_pattern, get_file_or_folder_id, extract_project
import os
import click


[docs] @dataclass class Job(Cloudos): """Class to store and operate jobs. Parameters ---------- cloudos_url : string The CloudOS service url. apikey : string Your CloudOS API key. cromwell_token : string Cromwell server token. workspace_id : string The specific Cloudos workspace id. project_name : string The name of a CloudOS project. workflow_name : string The name of a CloudOS workflow or pipeline. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. mainfile : string The name of the mainFile used by the workflow. Required for WDL pipelines as different mainFiles could be loaded for a single pipeline. importsfile : string The name of the importsFile used by the workflow. Optional and only used for WDL pipelines as different importsFiles could be loaded for a single pipeline. repository_platform : string The name of the repository platform of the workflow. project_id : string The CloudOS project id for a given project name. workflow_id : string The CloudOS workflow id for a given workflow_name. """ workspace_id: str project_name: str workflow_name: str last: bool = False verify: Union[bool, str] = True mainfile: str = None importsfile: str = None repository_platform: str = 'github' project_id: str = None workflow_id: str = None @property def project_id(self) -> str: return self._project_id @project_id.setter def project_id(self, v) -> None: if isinstance(v, property): # Fetch the value as not defined by user. self._project_id = self.fetch_cloudos_id( self.apikey, self.cloudos_url, 'projects', self.workspace_id, self.project_name, verify=self.verify) else: # Let the user define the value. self._project_id = v @property def workflow_id(self) -> str: return self._workflow_id @workflow_id.setter def workflow_id(self, v) -> None: if isinstance(v, property): # Fetch the value as not defined by user. self._workflow_id = self.fetch_cloudos_id( self.apikey, self.cloudos_url, 'workflows', self.workspace_id, self.workflow_name, self.mainfile, self.importsfile, self.repository_platform, self.verify) else: # Let the user define the value. self._workflow_id = v
[docs] def fetch_cloudos_id(self, apikey, cloudos_url, resource, workspace_id, name, mainfile=None, importsfile=None, repository_platform='github', verify=True): """Fetch the cloudos id for a given name. Parameters ---------- apikey : string Your CloudOS API key cloudos_url : string The CloudOS service url. resource : string The resource you want to fetch from. E.g.: projects. workspace_id : string The specific Cloudos workspace id. name : string The name of a CloudOS resource element. mainfile : string The name of the mainFile used by the workflow. Only used when resource == 'workflows'. Required for WDL pipelines as different mainFiles could be loaded for a single pipeline. importsfile : string The name of the importsFile used by the workflow. Optional and only used for WDL pipelines as different importsFiles could be loaded for a single pipeline. repository_platform : string The name of the repository platform of the workflow resides. verify: [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- project_id : string The CloudOS project id for a given project name. """ allowed_resources = ['projects', 'workflows'] if resource not in allowed_resources: raise ValueError('Your specified resource is not supported. ' + f'Use one of the following: {allowed_resources}') if resource == 'workflows': content = self.get_workflow_content(workspace_id, name, verify=verify, last=self.last) for element in content["workflows"]: if (element["name"] == name and element["workflowType"] == "docker" and not element["archived"]["status"]): return element["_id"] # no mainfile or importsfile if (element["name"] == name and element["repository"]["platform"] == repository_platform and not element["archived"]["status"]): if mainfile is None: return element["_id"] elif element["mainFile"] == mainfile: if importsfile is None and "importsFile" not in element.keys(): return element["_id"] elif "importsFile" in element.keys() and element["importsFile"] == importsfile: return element["_id"] elif resource == 'projects': return self.get_project_id_from_name(workspace_id, self.project_name, verify=verify) if mainfile is not None: raise ValueError(f'A workflow named \'{name}\' with a mainFile \'{mainfile}\'' + f' and an importsFile \'{importsfile}\' was not found') else: raise ValueError(f'No {name} element in {resource} was found')
[docs] def convert_nextflow_to_json(self, job_config, parameter, array_parameter, array_file_header, is_module, example_parameters, git_commit, git_tag, git_branch, project_id, workflow_id, job_name, resumable, save_logs, batch, job_queue_id, nextflow_profile, nextflow_version, instance_type, instance_disk, storage_mode, lustre_size, execution_platform, hpc_id, workflow_type, cromwell_id, azure_worker_instance_type, azure_worker_instance_disk, azure_worker_instance_spot, cost_limit, use_mountpoints, accelerate_saving_results, docker_login, command, cpus, memory): """Converts a nextflow.config file into a json formatted dict. Parameters ---------- job_config : string Path to a nextflow.config file with parameters scope. parameter : tuple Tuple of strings indicating the parameters to pass to the pipeline call. They are in the following form: ('param1=param1val', 'param2=param2val', ...) array_parameter : tuple Tuple of strings indicating the parameters to pass to the pipeline call for array jobs. They are in the following form: ('param1=param1val', 'param2=param2val', ...) array_file_header : string The header of the file containing the array parameters. It is used to add the necessary column index for array file columns. is_module : bool Whether the job is a module or not. If True, the job will be submitted as a module. example_parameters : list A list of dicts, with the parameters required for the API request in JSON format. It is typically used to run curated pipelines using the already available example parameters. git_commit : string The git commit hash of the pipeline to use. Equivalent to -r option in Nextflow. If not specified, the last commit of the default branch will be used. git_tag : string The tag of the pipeline to use. If not specified, the last commit of the default branch will be used. git_branch : string The branch of the pipeline to use. If not specified, the last commit of the default branch will be used. project_id : string The CloudOS project id for a given project name. workflow_id : string The CloudOS workflow id for a given workflow_name. job_name : string. The name to assign to the job. resumable: bool Whether to create a resumable job or not. save_logs : bool Whether to save job logs or not. batch: bool Whether to create an AWS batch job or not. job_queue_id : string Job queue Id to use in the batch job. nextflow_profile: string A comma separated string with the profiles to be used. nextflow_version: string Nextflow version to use when executing the workflow in CloudOS. instance_type : string Name of the instance type to be used for the job master node, for example for AWS EC2 c5.xlarge instance_disk : int The disk space of the master node instance, in GB. storage_mode : string Either 'lustre' or 'regular'. Indicates if the user wants to select regular or lustre storage. lustre_size : int The lustre storage to be used when --storage-mode=lustre, in GB. It should be 1200 or a multiple of it. execution_platform : string ['aws'|'azure'|'hpc'] The execution platform implemented in your CloudOS. hpc_id : string The ID of your HPC in CloudOS. workflow_type : str The type of workflow to run. It could be 'nextflow', 'wdl' or 'docker'. cromwell_id : str Cromwell server ID. azure_worker_instance_type: str The worker node instance type to be used in azure. azure_worker_instance_disk: int The disk size in GB for the worker node to be used in azure. azure_worker_instance_spot: bool Whether the azure worker nodes have to be spot instances or not. cost_limit : float Job cost limit. -1 means no cost limit. use_mountpoints : bool Whether to use or not AWS S3 mountpoint for quicker file staging. accelerate_saving_results : bool Whether to save results directly to cloud storage bypassing the master node. docker_login : bool Whether to use private docker images, provided the users have linked their docker.io accounts. command : string The command to run in bash jobs. cpus : int The number of CPUs to use for the bash jobs task's master node. memory : int The amount of memory, in GB, to use for the bash job task's master node. Returns ------- params : dict A JSON formatted dict. """ workflow_params = [] if workflow_type == 'wdl': # This is required as non-resumable jobs fails always using WDL workflows. resumable = True if ( nextflow_profile is None and job_config is None and len(parameter) == 0 and len(example_parameters) == 0 and workflow_type != 'docker' ): raise ValueError('No --job-config, --nextflow_profile, --parameter or ' + '--example_parameters were specified,' + ' please use at least one of these options.') if workflow_type == 'wdl' and job_config is None and len(parameter) == 0: raise ValueError('No --job-config or --parameter were provided. At least one of ' + 'these are required for WDL workflows.') if job_config is not None: with open(job_config, 'r') as p: reading = False for p_l in p: if 'params' in p_l.lower(): reading = True else: if reading: if workflow_type == 'wdl': p_l_strip = p_l.strip().replace( ' ', '') else: p_l_strip = p_l.strip().replace( ' ', '').replace('\"', '').replace('\'', '') if len(p_l_strip) == 0: continue elif p_l_strip[0] == '/' or p_l_strip[0] == '#': continue elif p_l_strip == '}': reading = False else: p_list = p_l_strip.split('=') p_name = p_list[0] p_value = '='.join(p_list[1:]) if len(p_list) < 2: raise ValueError('Please, specify your ' + 'parameters in ' + f'{job_config} using ' + 'the \'=\' as spacer. ' + 'E.g: name = my_name') elif workflow_type == 'wdl': param = {"prefix": "", "name": p_name, "parameterKind": "textValue", "textValue": p_value} workflow_params.append(param) else: param = {"prefix": "--", "name": p_name, "parameterKind": "textValue", "textValue": p_value} workflow_params.append(param) if len(workflow_params) == 0: raise ValueError(f'The {job_config} file did not contain any ' + 'valid parameter') # array file specific parameters (from --array-parameter) if array_parameter is not None and len(array_parameter) > 0: ap_param = Job.split_array_file_params(array_parameter, workflow_type, array_file_header) workflow_params.append(ap_param) elif array_file_header is not None and (array_parameter is None or len(array_parameter) == 0): raise ValueError('At least one array file column must be added to the parameters') # general parameters (from --parameter) if len(parameter) > 0: for p in parameter: p_split = p.split('=') if len(p_split) < 2: raise ValueError('Please, specify -p / --parameter using a single \'=\' ' + 'as spacer. E.g: input=value') p_name = p_split[0] p_value = '='.join(p_split[1:]) if workflow_type == 'docker': # will differentiate between text, data items and glob patterns workflow_params.append(self.docker_workflow_param_processing(p, self.project_name)) elif workflow_type == 'wdl': param = {"prefix": "", "name": p_name, "parameterKind": "textValue", "textValue": p_value} workflow_params.append(param) else: param = {"prefix": "--", "name": p_name, "parameterKind": "textValue", "textValue": p_value} workflow_params.append(param) if len(workflow_params) == 0: raise ValueError(f'The provided parameters are not valid: {parameter}') if len(example_parameters) > 0: for example_param in example_parameters: workflow_params.append(example_param) if storage_mode == "lustre": click.secho('\nLustre storage has been selected. Please, be sure that this kind of ' + 'storage is available in your CloudOS workspace.\n', fg='yellow', bold=True) if lustre_size % 1200: raise ValueError('Please, specify a lustre storage size of 1200 or a multiple of it. ' + f'{lustre_size} is not a valid number.') if storage_mode not in ['lustre', 'regular']: raise ValueError('Please, use either \'lustre\' or \'regular\' for --storage-mode ' + f'{storage_mode} is not allowed') params = { "parameters": workflow_params, "project": project_id, "workflow": workflow_id, "name": job_name, "resumable": resumable, "saveProcessLogs": save_logs, "executionPlatform": execution_platform, "hpc": hpc_id, "storageSizeInGb": instance_disk, "execution": { "computeCostLimit": cost_limit, "optim": "test" }, "lusterFsxStorageSizeInGb": lustre_size, "storageMode": storage_mode, "instanceType": instance_type, "usesFusionFileSystem": use_mountpoints, "accelerateSavingResults": accelerate_saving_results } if workflow_type != 'docker': params["nextflowVersion"] = nextflow_version if execution_platform != 'hpc': params['masterInstance'] = { "requestedInstance": { "type": instance_type } } params['batch'] = { "enabled": batch } if job_queue_id is not None: params['batch'] = { "dockerLogin": docker_login, "enabled": batch, "jobQueue": job_queue_id } if execution_platform == 'azure' and not is_module: params['azureBatch'] = { "vmType": azure_worker_instance_type, "spot": azure_worker_instance_spot, "diskSizeInGb": azure_worker_instance_disk } if workflow_type == 'docker': params = params | command # add command to params as dict (python 3.9+) params["resourceRequirements"] = { "cpu": cpus, "ram": memory } if workflow_type == 'wdl': params['cromwellCloudResources'] = cromwell_id git_flag = [x is not None for x in [git_tag, git_commit, git_branch]] if sum(git_flag) > 1: raise ValueError('Please, specify none or only one of --git-tag, ' + '--git-branch or --git-commit options.') elif sum(git_flag) == 1: revision_type = 'tag' if git_tag is not None else 'commit' if git_commit is not None else 'branch' params['revision'] = { "revisionType": revision_type, "tag": git_tag, "commit": git_commit, "branch": git_branch } if nextflow_profile is not None: params['profile'] = nextflow_profile return params
[docs] def send_job(self, job_config=None, parameter=(), array_parameter=(), array_file_header=None, is_module=False, example_parameters=[], git_commit=None, git_tag=None, git_branch=None, job_name='new_job', resumable=False, save_logs=True, batch=True, job_queue_id=None, nextflow_profile=None, nextflow_version='22.10.8', instance_type='c5.xlarge', instance_disk=500, storage_mode='regular', lustre_size=1200, execution_platform='aws', hpc_id=None, workflow_type='nextflow', cromwell_id=None, azure_worker_instance_type='Standard_D4as_v4', azure_worker_instance_disk=100, azure_worker_instance_spot=False, cost_limit=30.0, use_mountpoints=False, accelerate_saving_results=False, docker_login=False, verify=True, command=None, cpus=1, memory=4): """Send a job to CloudOS. Parameters ---------- job_config : string Path to a nextflow.config file with parameters scope. parameter : tuple Tuple of strings indicating the parameters to pass to the pipeline call. They are in the following form: ('param1=param1val', 'param2=param2val', ...) array_parameter : tuple Tuple of strings indicating the parameters to pass to the pipeline call for array jobs. They are in the following form: ('param1=param1val', 'param2=param2val', ...) array_file_header : string The header of the file containing the array parameters. It is used to add the necessary column index for array file columns. example_parameters : list A list of dicts, with the parameters required for the API request in JSON format. It is typically used to run curated pipelines using the already available example parameters. git_commit : string The git commit hash of the pipeline to use. Equivalent to -r option in Nextflow. If not specified, the last commit of the default branch will be used. git_tag : string The tag of the pipeline to use. If not specified, the last commit of the default branch will be used. git_branch : string The branch of the pipeline to use. If not specified, the last commit of the default branch will be used. job_name : string The name to assign to the job. resumable : bool Whether to create a resumable job or not. save_logs : bool Whether to save job logs or not. batch: bool Whether to create an AWS batch job or not. job_queue_id : string Job queue Id to use in the batch job. nextflow_profile: string A comma separated string with the profiles to be used. nextflow_version: string Nextflow version to use when executing the workflow in CloudOS. instance_type : string Name of the instance type to be used for the job master node, for example for AWS EC2 c5.xlarge instance_disk : int The disk space of the master node instance, in GB. storage_mode : string Either 'lustre' or 'regular'. Indicates if the user wants to select regular or lustre storage. lustre_size : int The lustre storage to be used when --storage-mode=lustre, in GB. It should be 1200 or a multiple of it. execution_platform : string ['aws'|'azure'|'hpc'] The execution platform implemented in your CloudOS. hpc_id : string The ID of your HPC in CloudOS. workflow_type : str The type of workflow to run. It could be 'nextflow', 'wdl' or 'docker'. cromwell_id : str Cromwell server ID. azure_worker_instance_type: str The worker node instance type to be used in azure. azure_worker_instance_disk: int The disk size in GB for the worker node to be used in azure. azure_worker_instance_spot: bool Whether the azure worker nodes have to be spot instances or not. cost_limit : float Job cost limit. -1 means no cost limit. use_mountpoints : bool Whether to use or not AWS S3 mountpoint for quicker file staging. accelerate_saving_results : bool Whether to save results directly to cloud storage bypassing the master node. docker_login : bool Whether to use private docker images, provided the users have linked their docker.io accounts. verify : [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. command : string The command to run in bash jobs. cpus : int The number of CPUs to use for the bash jobs task's master node. memory : int The amount of memory, in GB, to use for the bash job task's master node. Returns ------- j_id : string The CloudOS job id of the job just launched. """ apikey = self.apikey cloudos_url = self.cloudos_url workspace_id = self.workspace_id workflow_id = self.workflow_id project_id = self.project_id # Prepare api request for CloudOS to run a job headers = { "Content-type": "application/json", "apikey": apikey } params = self.convert_nextflow_to_json(job_config, parameter, array_parameter, array_file_header, is_module, example_parameters, git_commit, git_tag, git_branch, project_id, workflow_id, job_name, resumable, save_logs, batch, job_queue_id, nextflow_profile, nextflow_version, instance_type, instance_disk, storage_mode, lustre_size, execution_platform, hpc_id, workflow_type, cromwell_id, azure_worker_instance_type, azure_worker_instance_disk, azure_worker_instance_spot, cost_limit, use_mountpoints, accelerate_saving_results, docker_login, command=command, cpus=cpus, memory=memory) r = retry_requests_post("{}/api/v2/jobs?teamId={}".format(cloudos_url, workspace_id), data=json.dumps(params), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) j_id = json.loads(r.content)["jobId"] print('\tJob successfully launched to CloudOS, please check the ' + f'following link: {cloudos_url}/app/advanced-analytics/analyses/{j_id}') return j_id
[docs] def retrieve_cols_from_array_file(self, array_file, ds, separator, verify_ssl): """ Retrieve metadata for columns from an array file stored in a directory. This method fetches the metadata of an array file by interacting with a directory service and making an API call to retrieve the file's metadata. Parameters ---------- array_file : str The path to the array file whose metadata is to be retrieved. ds : object The directory service object used to list folder content. separator : str The separator used in the array file. verify_ssl : bool Whether to verify SSL certificates during the API request. Raises ------ ValueError If the specified file is not found in the directory. BadRequestException If the API request to retrieve metadata fails with a status code >= 400. Returns ------- Response The HTTP response object containing the metadata of the array file. """ # Split the array_file path to get the directory and file name p = Path(array_file) directory = str(p.parent) file_name = p.name # fetch the content of the directory result = ds.list_folder_content(directory) # retrieve the S3 bucket name and object key for the specified file for file in result['files']: if file.get("name") == file_name: self.array_file_id = file.get("_id") s3_bucket_name = file.get("s3BucketName") s3_object_key = file.get("s3ObjectKey") s3_object_key_b64 = base64.b64encode(s3_object_key.encode()).decode() break else: raise ValueError(f'File "{file_name}" not found in the "{directory}" folder of the project "{self.project_name}".') # retrieve the metadata of the array file headers = { "Content-type": "application/json", "apikey": self.apikey } url = ( f"{self.cloudos_url}/api/v1/jobs/array-file/metadata" f"?separator={separator}" f"&s3BucketName={s3_bucket_name}" f"&s3ObjectKey={s3_object_key_b64}" f"&teamId={self.workspace_id}" ) r = retry_requests_get(url, headers=headers, verify=verify_ssl) if r.status_code >= 400: raise BadRequestException(r) return r
[docs] def setup_params_array_file(self, custom_script_path, ds_custom, command, separator): """ Sets up a dictionary representing command parameters, including support for custom scripts and array files, to be used in job execution. Parameters ---------- custom_script_path : str Path to the custom script file. If None, the command is treated as text. ds_custom : object An object providing access to folder content listing functionality. command : str The command to be executed, either as text or the name of a custom script. separator : str The separator to be used for the array file. Returns ------- dict A dictionary containing the command parameters, including: - "command": The command name or text. - "customScriptFile" (optional): Details of the custom script file if provided. - "arrayFile": Details of the array file and its separator. """ if custom_script_path is not None: command_path = Path(custom_script_path) command_dir = str(command_path.parent) command_name = command_path.name result_script = ds_custom.list_folder_content(command_dir) for file in result_script['files']: if file.get("name") == command_name: custom_script_item = file.get("_id") break # use this in case the command is in a custom script cmd = { "command": f"{command_name}", "customScriptFile": { "dataItem": { "kind": "File", "item": f"{custom_script_item}" } } } else: # use this for text commands cmd = {"command": command} # add array-file cmd = cmd | { "arrayFile": { "dataItem": {"kind": "File", "item": f"{self.array_file_id}"}, "separator": f"{separator}" } } return cmd
[docs] @staticmethod def split_array_file_params(array_parameter, workflow_type, array_file_header): """ Splits and processes array parameters for a given workflow type and array file header. Parameters ---------- array_parameter : list A list of strings representing array parameters in the format "key=value". workflow_type : str The type of workflow, e.g., 'docker'. array_file_header : list A list of dictionaries representing the header of the array file. Each dictionary should contain "name" and "index" keys. Returns ------- dict A dictionary containing processed parameter details, including: - prefix (str): The prefix for the parameter (e.g., "--" or "-"). - name (str): The name of the parameter with leading dashes stripped. - parameterKind (str): The kind of parameter, set to "arrayFileColumn". - columnName (str): The name of the column derived from the parameter value. - columnIndex (int): The index of the column in the array file header. Raises ------ ValueError If an array parameter does not contain a '=' character or is improperly formatted. """ ap_param = dict() for ap in array_parameter: ap_split = ap.split('=') if len(ap_split) < 2: raise ValueError('Please, specify -a / --array-parameter using a single \'=\' ' + 'as spacer. E.g: input=value') ap_name = ap_split[0] ap_value = '='.join(ap_split[1:]) if workflow_type == 'docker': ap_prefix = "--" if ap_name.startswith('--') else ("-" if ap_name.startswith('-') else '') ap_param = { "prefix": ap_prefix, "name": ap_name.lstrip('-'), "parameterKind": "arrayFileColumn", "columnName": ap_value, "columnIndex": next((item["index"] for item in array_file_header if item["name"] == "id"), 0) } return ap_param
[docs] def docker_workflow_param_processing(self, param, project_name): """ Processes a Docker workflow parameter and determines its type and associated metadata. Parameters ---------- param : str The parameter string in the format '--param_name=value'. It can represent a file path, a glob pattern, or a simple text value. project_name : str The name of the current project to use if no specific project is extracted from the parameter. Returns: dict: A dictionary containing the processed parameter details. The structure of the dictionary depends on the type of the parameter: - For glob patterns: { "name": str, # Parameter name without leading dashes. "prefix": str, # Prefix ('--' or '-') based on the parameter format. "globPattern": str, # The glob pattern extracted from the parameter. "parameterKind": str, # Always "globPattern". "folder": str # Folder ID associated with the glob pattern. - For file paths: { "name": str, # Parameter name without leading dashes. "prefix": str, # Prefix ('--' or '-') based on the parameter format. "parameterKind": str, # Always "dataItem". "dataItem": { "kind": str, # Always "File". "item": str # File ID associated with the file path. - For text values: { "name": str, # Parameter name without leading dashes. "prefix": str, # Prefix ('--' or '-') based on the parameter format. "parameterKind": str, # Always "textValue". "textValue": str # The text value extracted from the parameter. Notes ----- - The function uses helper methods `extract_project`, `classify_pattern`, and `get_file_or_folder_id` to process the parameter. - If the parameter represents a file path or glob pattern, the function retrieves the corresponding file or folder ID from the cloud workspace. - If the parameter does not match any specific pattern or file extension, it is treated as a simple text value. """ # split '--param_name=example_test' # name -> '--param_name' # rest -> 'example_test' name, rest = param.split('=', 1) # e.g. "/Project/Subproject/file.csv", project is "Project" # e.g "Data/input.csv", project is '', leaving the global project name # e.g "-p --test=value", project is '' project, file_path = extract_project(rest) current_project = project if project != '' else project_name # e.g. "/Project/Subproject/file.csv" command_path = Path(file_path) command_dir = str(command_path.parent) command_name = command_path.name _, ext = os.path.splitext(command_name) prefix = "--" if name.startswith('--') else ("-" if name.startswith('-') else "") if classify_pattern(rest) in ["regex", "glob"]: if not (file_path.startswith('/Data') or file_path.startswith('Data')): raise ValueError("The file path inside the project must start with '/Data' or 'Data'. ") folder = get_file_or_folder_id(self.cloudos_url, self.apikey, self.workspace_id, current_project, self.verify, command_dir, command_name, is_file=False) return { "name": f"{name.lstrip('-')}", "prefix": f"{prefix}", 'globPattern': command_name, "parameterKind": "globPattern", "folder": f"{folder}" } elif ext: if not (file_path.startswith('/Data') or file_path.startswith('Data')): raise ValueError("The file path inside the project must start with '/Data' or 'Data'. ") file = get_file_or_folder_id(self.cloudos_url, self.apikey, self.workspace_id, current_project, self.verify, command_dir, command_name, is_file=True) return { "name": f"{name.lstrip('-')}", "prefix": f"{prefix}", "parameterKind": "dataItem", "dataItem": { "kind": "File", "item": f"{file}" } } else: return { "name": f"{name.lstrip('-')}", "prefix": f"{prefix}", "parameterKind": "textValue", "textValue": f"{rest}" }
[docs] def get_job_request_payload(self, job_id, verify=True): """Get the original request payload for a job. Parameters ---------- job_id : str The CloudOS job ID to get the payload for. verify : [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- dict The original job request payload. """ headers = { "Content-type": "application/json", "apikey": self.apikey } url = f"{self.cloudos_url}/api/v1/jobs/{job_id}/request-payload?teamId={self.workspace_id}" r = retry_requests_get(url, headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) return json.loads(r.content)
[docs] def update_parameter_value(self, parameters, param_name, new_value): """Update a parameter value in the parameters list. Parameters ---------- parameters : list List of parameter dictionaries. param_name : str Name of the parameter to update. new_value : str New value for the parameter. Returns ------- bool True if parameter was found and updated, False otherwise. """ for param in parameters: if param.get('name') == param_name: # Handle different parameter kinds if param.get('parameterKind') == 'textValue': param['textValue'] = new_value elif param.get('parameterKind') == 'dataItem': # For data items, we need to process the value to get file/folder ID # This is a simplified version - in practice you'd need more logic if new_value.startswith('s3://') or new_value.startswith('az://'): param['textValue'] = new_value param['parameterKind'] = 'textValue' else: # Try to process as file/data item processed_param = self.docker_workflow_param_processing(f"--{param_name}={new_value}", self.project_name) param.update(processed_param) return True return False
[docs] def get_field_from_jobs_endpoint(self, job_id, field=None, verify=True): """Get the resume work directory id for a job. Parameters ---------- job_id : str The CloudOS job ID to get the resume work directory for. verify : [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. Returns ------- str The resume work directory id. """ headers = { "Content-type": "application/json", "apikey": self.apikey } url = f"{self.cloudos_url}/api/v1/jobs/{job_id}?teamId={self.workspace_id}" r = retry_requests_get(url, headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) if field in json.loads(r.content).keys(): return json.loads(r.content)[field] else: raise ValueError(f"Field '{field}' not found in endpoint 'jobs'.")
[docs] def clone_or_resume_job(self, source_job_id, queue_name=None, cost_limit=None, master_instance=None, job_name=None, nextflow_version=None, branch=None, profile=None, do_not_save_logs=None, use_fusion=None, accelerate_saving_results=None, resumable=None, project_name=None, parameters=None, verify=True, mode=None): """Clone or resume an existing job with optional parameter overrides. Parameters ---------- source_job_id : str The CloudOS job ID to clone/resume from. queue_name : str, optional Name of the job queue to use. cost_limit : float, optional Job cost limit override. master_instance : str, optional Master instance type override. job_name : str, optional New job name. nextflow_version : str, optional Nextflow version override. branch : str, optional Git branch override. profile : str, optional Nextflow profile override. do_not_save_logs : bool, optional Whether to save logs override. use_fusion : bool, optional Whether to use fusion filesystem override. accelerate_saving_results : bool, optional Whether to accelerate saving results override. resumable : bool, optional Whether to make the job resumable or not. project_name : str, optional Project name override (will look up new project ID). parameters : list, optional List of parameter overrides in format ['param1=value1', 'param2=value2']. verify : [bool|string] Whether to use SSL verification or not. Alternatively, if a string is passed, it will be interpreted as the path to the SSL certificate file. mode : str, optional The mode to use for the job (e.g. "clone", "resume"). Returns ------- str The CloudOS job ID of the cloned/resumed job. """ # Get the original job payload original_payload = self.get_job_request_payload(source_job_id, verify=verify) # Create a copy of the payload for modification cloned_payload = json.loads(json.dumps(original_payload)) # remove unwanted fields del cloned_payload['_id'] del cloned_payload['resourceId'] if mode == "resume": try: cloned_payload["resumeWorkDir"] = self.get_field_from_jobs_endpoint( source_job_id, field="resumeWorkDir", verify=verify ) except Exception: raise ValueError("The job was not set as resumable when originally run") try: status = self.get_field_from_jobs_endpoint( source_job_id, field="status", verify=verify ) except Exception as e: raise ValueError(f"The job status cannot be retrieved: {e}") allowed_statuses = {"completed", "aborted", "failed"} if status not in allowed_statuses: raise ValueError( f"Only jobs with status {', '.join(allowed_statuses)} can be resumed. " f"Current job status is '{status}'" ) # Override job name if provided if job_name: cloned_payload['name'] = job_name # Override cost limit if provided if cost_limit is not None: if 'execution' not in cloned_payload: cloned_payload['execution'] = {} cloned_payload['execution']['computeCostLimit'] = cost_limit # Override master instance if provided if master_instance: if 'masterInstance' not in cloned_payload: cloned_payload['masterInstance'] = {'requestedInstance': {}} cloned_payload['masterInstance']['requestedInstance']['type'] = master_instance # Override nextflow version if provided (only for non-docker workflows) if nextflow_version and 'nextflowVersion' in cloned_payload: cloned_payload['nextflowVersion'] = nextflow_version elif nextflow_version and cloned_payload['executionPlatform'] == 'azure' and\ nextflow_version not in ['22.11.1-edge', 'latest']: print("Azure workspace only uses Nextflow version 22.11.1-edge, option '--nextflow-version' is ignored.\n") # Override branch if provided # sometimes revision is missing from the 'request-payload' API, make sure is present if 'revision' not in cloned_payload or not cloned_payload.get('revision'): cloned_payload['revision'] = self.get_field_from_jobs_endpoint(source_job_id, field="revision", verify=verify) cloned_payload['revision']['revisionType'] = 'digest' if branch: cloned_payload['revision']['revisionType'] = 'branch' cloned_payload['revision']['branch'] = branch # Clear other revision types cloned_payload['revision'].pop('commit', None) cloned_payload['revision'].pop('tag', None) # Override profile if provided if profile: cloned_payload['profile'] = profile # Override save logs if provided if do_not_save_logs: cloned_payload['saveProcessLogs'] = do_not_save_logs # Override use fusion if provided if use_fusion and cloned_payload['executionPlatform'] != 'azure': cloned_payload['usesFusionFileSystem'] = use_fusion elif use_fusion and cloned_payload['executionPlatform'] == 'azure': print("Azure workspace does not use fusion filesystem, option '--accelerate-file-staging' is ignored.\n") # Override accelerate saving results if provided if accelerate_saving_results: cloned_payload['accelerateSavingResults'] = accelerate_saving_results # Override resumable if provided if resumable and mode == "clone": cloned_payload['resumable'] = resumable elif resumable and mode == "resume": print("'resumable' option is only applicable when resuming a job, ignoring '--resumable' flag.\n") if 'command' in cloned_payload: cloned_payload['batch'] = {"enabled": False} if resumable: print("'resumable' option is not applicable when resuming a bash job, ignoring '--resumable' flag.\n") # Handle job queue override if queue_name: if cloned_payload['executionPlatform'] != 'azure': try: from cloudos_cli.queue.queue import Queue queue_api = Queue(self.cloudos_url, self.apikey, self.cromwell_token, self.workspace_id, verify) queues = queue_api.get_job_queues() queue_id = None for queue in queues: if queue.get("label") == queue_name or queue.get("name") == queue_name: queue_id = queue.get("id") or queue.get("_id") break cloned_payload['batch']['jobQueue'] = queue_id if not queue_id: raise ValueError(f"Queue with name '{queue_name}' not found in workspace '{self.workspace_id}'") except Exception as e: raise ValueError(f"Error filtering by queue '{queue_name}': {str(e)}") else: print("Azure workspace does not use job queues, option '--job-queue' is ignored.\n") # Handle parameter overrides # The columnIndex is retrieved from request-payload as string, but needs to be int for param in cloned_payload.get('parameters', []): if param.get('parameterKind') == 'arrayFileColumn' and isinstance(param.get('columnIndex'), str): try: param['columnIndex'] = int(param['columnIndex']) except ValueError: raise ValueError(f"Invalid columnIndex value '{param['columnIndex']}' for parameter '{param.get('name')}'") if parameters: cloned_parameters = cloned_payload.get('parameters', []) for param_override in parameters: param_name, param_value = param_override.split('=', 1) param_name = param_name.lstrip('-') # Remove leading dashes if not self.update_parameter_value(cloned_parameters, param_name, param_value): # Parameter not found, add as new parameter # Determine workflow type to set proper prefix and format prefix = "--" if param_override.startswith('--') else ("-" if param_override.startswith('-') else "") new_param = { "prefix": prefix, "name": param_name, "parameterKind": "textValue", "textValue": param_value } cloned_parameters.append(new_param) cloned_payload['parameters'] = cloned_parameters # setup project name if project_name: # get project ID project_id = self.get_project_id_from_name(self.workspace_id, project_name, verify=verify) cloned_payload['project'] = project_id # Send the cloned job headers = { "Content-type": "application/json", "apikey": self.apikey } r = retry_requests_post(f"{self.cloudos_url}/api/v2/jobs?teamId={self.workspace_id}", data=json.dumps(cloned_payload), headers=headers, verify=verify) if r.status_code >= 400: raise BadRequestException(r) j_id = json.loads(r.content)["jobId"] print(f'\tJob successfully {mode}d and launched to CloudOS, please check the ' + f"following link: {self.cloudos_url}/app/advanced-analytics/analyses/{j_id}\n") return j_id