Source code for cloudos_cli.queue.queue
"""
This is the main class to create job queues.
"""
import requests
import json
import pandas as pd
from dataclasses import dataclass
from typing import Union
from cloudos_cli.clos import Cloudos
from cloudos_cli.utils.errors import BadRequestException
[docs]
@dataclass
class Queue(Cloudos):
"""Class to store and operate job queues.
Parameters
----------
cloudos_url : string
The CloudOS service url.
apikey : string
Your CloudOS API key.
cromwell_token : string
Cromwell server token.
workspace_id : string
The specific Cloudos workspace id.
verify: [bool|string]
Whether to use SSL verification or not. Alternatively, if
a string is passed, it will be interpreted as the path to
the SSL certificate file.
"""
workspace_id: str
verify: Union[bool, str] = True
[docs]
def get_job_queues(self):
"""Get all the job queues from a CloudOS workspace.
Returns
-------
r : list
A list of dicts, each corresponding to a job queue.
"""
headers = {"apikey": self.apikey}
r = requests.get("{}/api/v1/teams/aws/v2/job-queues?teamId={}".format(self.cloudos_url,
self.workspace_id),
headers=headers, verify=self.verify)
if r.status_code >= 400:
raise BadRequestException(r)
return json.loads(r.content)
[docs]
@staticmethod
def process_queue_list(r, all_fields=False):
"""Process a queue list from a self.get_job_queues call.
Parameters
----------
r : list
A list of dicts, each corresponding to a job queue.
all_fields : bool. Default=False
Whether to return a reduced version of the DataFrame containing
only the selected columns or the full DataFrame.
Returns
-------
df : pandas.DataFrame
A DataFrame with the requested columns from the job queues.
"""
COLUMNS = ['id',
'name',
'label',
'description',
'isDefault',
'resourceType',
'executor',
'status'
]
df_full = pd.json_normalize(r)
if all_fields:
df = df_full
else:
df = df_full.loc[:, COLUMNS]
return df
[docs]
def fetch_job_queue_id(self, workflow_type, batch=True, job_queue=None):
"""Fetches CloudOS ID for a given job queue.
This method will try to find the
corresponding CloudOS ID for the job_queue in a given workspace. If
job_queue=None, this method will select the available default queue in
the workspace, or the newest "ready" job queue if no default queues are
available.
Parameters
----------
workflow_type : str ['wdl'|'cromwell'|'nextflow']
The type of workflow to run.
batch: bool
Whether to create a batch job or an ignite one.
job_queue : str or None
The name of the job queue to search. If None, a default one will be selected.
Returns
-------
job_queue_id : str or None
The CloudOS ID for the selected job queue, or None if batch=False.
"""
if not batch:
return None
if workflow_type == 'wdl':
workflow_type = 'cromwell'
if workflow_type not in ['cromwell', 'nextflow']:
raise ValueError('Only nextflow or cromwell workflows are allowed when ' +
'running using AWS batch.')
job_queues = self.get_job_queues()
available_queues = [q for q in job_queues if q['status'] == 'Ready' and
q['executor'] == workflow_type]
if len(available_queues) == 0:
raise Exception(f'There are no available job queues for {workflow_type} ' +
'workflows. Consider creating one using CloudOS UI.')
default_queue = [q for q in available_queues if q['isDefault']]
if len(default_queue) > 0:
default_queue_id = default_queue[0]['id']
default_queue_name = default_queue[0]['label']
queue_as_default = 'CloudOS default'
else:
default_queue_id = available_queues[-1]['id']
default_queue_name = available_queues[-1]['label']
queue_as_default = 'most recent suitable'
if job_queue is None:
print(f'No job queue was specified, using the {queue_as_default} queue: ' +
f'{default_queue_name}.')
return default_queue_id
selected_queue = [q for q in available_queues if q['label'] == job_queue]
if len(selected_queue) == 0:
print(f'Queue \'{job_queue}\' you specified was not found, using the {queue_as_default} ' +
f'queue instead: {default_queue_name}.')
return default_queue_id
return selected_queue[0]['id']