Source code for cloudos_cli.import_wf.import_wf

from abc import ABC, abstractmethod
from urllib.parse import urlsplit
from cloudos_cli.utils.errors import BadRequestException, AccountNotLinkedException
from cloudos_cli.utils.requests import retry_requests_post, retry_requests_get
import json
from requests.exceptions import RetryError
import sys


[docs] class WFImport(ABC): def __init__(self, cloudos_url, cloudos_apikey, workspace_id, platform, workflow_name, workflow_url, workflow_docs_link="", workflow_description="", cost_limit=30, main_file=None, verify=True): self.cloudos_url = cloudos_url self.workflow_url = workflow_url.rstrip('.git') self.workspace_id = workspace_id self.platform = platform self.parsed_url = urlsplit(self.workflow_url) self.main_file = main_file self.repo_name = "" self.repo_owner = "" self.repo_host = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}" self.headers = { "Content-Type": "application/json", "apikey": cloudos_apikey } self.payload = { "workflowType": "nextflow", "repository": { "platform": platform, "repositoryId": None, "name": None, "owner": { "login": None, "id": None }, "isPrivate": True, "url": self.workflow_url, "commit": "", "branch": "" }, "name": workflow_name, "description": workflow_description, "isPublic": False, "mainFile": "main.nf", "defaultContainer": None, "processes": [], "docsLink": workflow_docs_link, "team": workspace_id, "executionConfiguration": { "costLimitsInUsd": {"value": cost_limit, "editable": True} } } self.get_repo_url = "" self.get_repo_params = dict() self.get_repo_main_file_url = "" self.get_repo_main_file_params = "" self.post_request_url = f"{cloudos_url}/api/v2/workflows?teamId={workspace_id}" self.verify = verify
[docs] def get_repo_main_file(self): repo_owner_urlencode = self.repo_owner.replace("/", "%2F") get_repo_main_file_url = f"{self.cloudos_url}/api/v1/git/{self.platform}/getWorkflowConfig/{self.repo_name}/{repo_owner_urlencode}" get_repo_main_file_params = dict(host=self.repo_host, teamId=self.workspace_id) r = retry_requests_get(get_repo_main_file_url, params=get_repo_main_file_params, headers=self.headers) if r.status_code >= 400: raise BadRequestException(r) r_data = r.json() return r_data["mainFile"]
[docs] @abstractmethod def get_repo(self, *args, **kwargs): pass
[docs] def check_payload(self): for required_key in ["repositoryId", "name", ("owner", "login"), ("owner", "id")]: if isinstance(required_key, tuple): key1, key2 = required_key value = self.payload["repository"][key1][key2] str_value = f"self.payload['repository']['{key1}']['{key2}']" else: value = self.payload["repository"][required_key] str_value = f"self.payload['repository']['{required_key}']" if value is None: raise ValueError("The payload dictionary does not have the required data. " + f"Check that {str_value} is present and the method " f"self.get_repo() has been executed")
[docs] def import_workflow(self): self.get_repo() self.check_payload() r = retry_requests_post(self.post_request_url, json=self.payload, headers=self.headers, verify=self.verify) if r.status_code == 401: raise ValueError('It seems your API key is not authorised. Please check if ' + 'your workspace has support for importing workflows using cloudos-cli') elif r.status_code >= 400: raise BadRequestException(r) content = json.loads(r.content) return content["_id"]
[docs] class ImportWorflow(WFImport):
[docs] def get_repo(self): get_repo_url = f"{self.cloudos_url}/api/v1/git/{self.platform}/getPublicRepo" if self.platform == "bitbucketServer": # platform allows to add paths like /browse, so we need to check if the path ends with it if self.parsed_url.path.endswith("browse"): self.repo_name = self.parsed_url.path.split("/")[-2] else: self.repo_name = self.parsed_url.path.split("/")[-1] self.repo_owner = self.parsed_url.path.split("/")[2] else: self.repo_name = self.parsed_url.path.split("/")[-1] self.repo_owner = "/".join(self.parsed_url.path.split("/")[1:-1]) self.repo_host = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}" get_repo_params = dict(repoName=self.repo_name, repoOwner=self.repo_owner, host=self.repo_host, teamId=self.workspace_id) try: r = retry_requests_get(get_repo_url, params=get_repo_params, headers=self.headers) except RetryError as e: # RetryError getting from missing BitBucket Server credentials raise AccountNotLinkedException(self.workflow_url) # for Github and Gitlab the API gives very general errors on missing credentials # therefore we only have these at the moment if r.status_code == 404: raise AccountNotLinkedException(self.workflow_url) elif r.status_code >= 400: raise BadRequestException(r) r_data = r.json() if self.platform == "bitbucketServer": self.payload["repository"]["repositoryId"] = r_data["name"] else: self.payload["repository"]["repositoryId"] = r_data["id"] self.payload["repository"]["name"] = r_data["name"] owner_data = { "bitbucketServer": ("project", "id", "key"), "gitlab": ("namespace", "id", "full_path"), "github": ("owner", "id", "login") } key, id_field, login_field = owner_data[self.platform] self.payload["repository"]["owner"]["id"] = r_data[key][id_field] self.payload["repository"]["owner"]["login"] = r_data[key][login_field] self.payload["mainFile"] = self.main_file or self.get_repo_main_file()