from datetime import datetime
from rich.console import Console
from rich.table import Table
import json
import csv
[docs]
def get_path(param, param_kind_map, execution_platform, storage_provider, mode="parameters"):
"""
Constructs a storage path based on the parameter kind and execution platform.
Parameters
----------
param : dict
A dictionary containing parameter details. Expected keys include:
- 'parameterKind': Specifies the kind of parameter (e.g., 'dataItem', 'globPattern').
- For 'dataItem': Contains nested keys such as 'item', which includes:
- 's3BucketName', 's3ObjectKey', 's3Prefix' (for AWS Batch).
- 'blobStorageAccountName', 'blobContainerName', 'blobName' (for other platforms).
- For 'globPattern': Contains nested keys such as 'folder', which includes:
- 's3BucketName', 's3Prefix' (for AWS Batch).
- 'blobStorageAccountName', 'blobContainerName', 'blobPrefix' (for other platforms).
param_kind_map : dict
A mapping of parameter kinds to their corresponding keys in the `param` dictionary.
execution_platform : str
The platform on which the execution is taking place.
Expected values include "Batch AWS" or other non-AWS platforms.
storage_provider : str
Either s3:// or az://
mode : str
For "parameters" is creating the '*.config' file and it adds the complete path, for "asis"
leaves the constructed path as generated from the API
Returns
-------
str: A constructed storage path based on the parameter kind and execution platform.
- For 'dataItem' on AWS Batch: "s3BucketName/s3ObjectKey" or "s3BucketName/s3Prefix".
- For 'dataItem' on other platforms: "blobStorageAccountName/blobContainerName/blobName".
- For 'globPattern' on AWS Batch: "s3BucketName/s3Prefix/globPattern".
- For 'globPattern' on other platforms: "blobStorageAccountName/blobContainerName/blobPrefix/globPattern".
"""
value = param[param_kind_map[param['parameterKind']]]
if param['parameterKind'] == 'dataItem':
if execution_platform == "Batch AWS":
s3_object_key = value['item'].get('s3ObjectKey', None) if value['item'].get('s3Prefix', None) is None else value['item'].get('s3Prefix', None)
if mode == "parameters":
value = storage_provider + value['item']['s3BucketName'] + '/' + s3_object_key
else:
value = value['item']['s3BucketName'] + '/' + s3_object_key
else:
account_name = value['item']['blobStorageAccountName'] + ".blob.core.windows.net"
container_name = value['item']['blobContainerName']
blob_name = value['item']['blobName']
if mode == "parameters":
value = storage_provider + account_name + '/' + container_name + '/' + blob_name
else:
value = value['item']['blobStorageAccountName'] + '/' + container_name + '/' + blob_name
elif param['parameterKind'] == 'globPattern':
if execution_platform == "Batch AWS":
if mode == "parameters":
value = storage_provider + param['folder']['s3BucketName'] + '/' + param['folder']['s3Prefix'] + '/' + param['globPattern']
else:
value = param['folder']['s3BucketName'] + '/' + param['folder']['s3Prefix'] + '/' + param['globPattern']
else:
account_name = param['folder']['blobStorageAccountName'] + ".blob.core.windows.net"
container_name = param['folder']['blobContainerName']
blob_name = param['folder']['blobPrefix']
if mode == "parameters":
value = storage_provider + account_name + '/' + container_name + '/' + blob_name + '/' + param['globPattern']
else:
value = param['folder']['blobStorageAccountName'] + '/' + container_name + '/' + blob_name + '/' + param['globPattern']
return value
[docs]
def create_job_details(j_details_h, job_id, output_format, output_basename, parameters):
"""
Creates formatted job details output from job data in multiple formats.
This function processes job details from the CloudOS API response and outputs
the information in a user-specified format (stdout table, JSON, or CSV).
It also optionally creates configuration files with job parameters.
Parameters
----------
j_details_h : dict
A dictionary containing job details from the CloudOS API. Expected keys include:
- 'jobType': The type of job executor (e.g., 'nextflowAWS', 'dockerAWS').
- 'parameters': List of parameter dictionaries for the job.
- 'status': Current status of the job.
- 'name': Name of the job.
- 'project': Dictionary containing project information with 'name' key.
- 'user': Dictionary containing user information with 'name' and 'surname' keys.
- 'workflow': Dictionary containing workflow information.
- 'startTime': ISO format timestamp of job start.
- 'endTime': ISO format timestamp of job completion.
- 'computeCostSpent': Cost in cents (optional).
- 'masterInstance': Dictionary containing instance information.
- 'storageSizeInGb': Storage size allocated to the job.
- 'resourceRequirements': Dictionary with 'cpu' and 'ram' specifications.
- Additional platform-specific keys based on jobType.
job_id : str
Unique identifier for the job.
output_format : str
Format for output display. Expected values:
- 'stdout': Display as a formatted table in the console.
- 'json': Save as a JSON file.
- 'csv': Save as a CSV file.
output_basename : str
Base name for output files (without extension). Used when output_format
is 'json' or 'csv'.
parameters : bool
Whether to create a separate configuration file containing job parameters.
If True and parameters exist, creates a '.config' file with Nextflow-style
parameter formatting.
Returns
-------
None
This function has side effects only:
- Prints formatted output to console (for 'stdout' format).
- Creates output files (for 'json' and 'csv' formats).
- Optionally creates parameter configuration files.
- Prints status messages about file creation.
Notes
-----
The function handles different job types and execution platforms:
- AWS Batch (nextflowAWS, dockerAWS, cromwellAWS)
- Azure Batch (nextflowAzure)
- Google Cloud Platform (nextflowGcp)
- HPC clusters (nextflowHpc)
- Kubernetes (nextflowKubernetes)
Parameter processing depends on the parameter kind:
- 'textValue': Simple text parameters
- 'arrayFileColumn': Column-based array parameters
- 'globPattern': File pattern matching parameters
- 'lustreFileSystem': Lustre filesystem parameters
- 'dataItem': Data file/object parameters
Time calculations assume UTC timezone and convert ISO format timestamps
to human-readable duration strings.
"""
# Determine the execution platform based on jobType
executors = {
'nextflowAWS': 'Batch AWS',
'nextflowAzure': 'Batch Azure',
'nextflowGcp': 'GCP',
'nextflowHpc': 'HPC',
'nextflowKubernetes': 'Kubernetes',
'dockerAWS': 'Batch AWS',
'cromwellAWS': 'Batch AWS'
}
execution_platform = executors.get(j_details_h["jobType"], "None")
storage_provider = "s3://" if execution_platform == "Batch AWS" else "az://"
# Check if the job details contain parameters
if j_details_h["parameters"] != []:
param_kind_map = {
'textValue': 'textValue',
'arrayFileColumn': 'columnName',
'globPattern': 'globPattern',
'lustreFileSystem': 'fileSystem',
'dataItem': 'dataItem'
}
# there are different types of parameters, arrayFileColumn, globPattern, lustreFileSystem
# get first the type of parameter, then the value based on the parameter kind
concats = []
for param in j_details_h["parameters"]:
concats.append(f"{param['prefix']}{param['name']}={get_path(param, param_kind_map, execution_platform, storage_provider, 'asis')}")
concat_string = '\n'.join(concats)
# If the user requested to save the parameters in a config file
if parameters:
# Create a config file with the parameters
config_filename = f"{output_basename}.config"
with open(config_filename, 'w') as config_file:
config_file.write("params {\n")
for param in j_details_h["parameters"]:
config_file.write(f"\t{param['name']} = {get_path(param, param_kind_map, execution_platform, storage_provider)}\n")
config_file.write("}\n")
print(f"\tJob parameters have been saved to '{config_filename}'")
else:
concat_string = 'No parameters provided'
if parameters:
print("\tNo parameters found in the job details, no config file will be created.")
# revision
if j_details_h["jobType"] == "dockerAWS":
revision = j_details_h["revision"]["digest"]
else:
revision = j_details_h["revision"]["commit"]
# Output the job details
status = str(j_details_h.get("status", "None"))
name = str(j_details_h.get("name", "None"))
project = str(j_details_h.get("project", {}).get("name", "None"))
owner = str(j_details_h.get("user", {}).get("name", "None") + " " + j_details_h.get("user", {}).get("surname", "None"))
pipeline = str(j_details_h.get("workflow", {}).get("name", "None"))
# calculate the run time
start_dt = datetime.fromisoformat(str(j_details_h["startTime"]).replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(str(j_details_h["endTime"]).replace('Z', '+00:00'))
duration = end_dt - start_dt
# Format duration as hours:minutes:seconds
total_seconds = int(duration.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if hours > 0:
run_time = f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
run_time = f"{minutes}m {seconds}s"
else:
run_time = f"{seconds}s"
# determine cost
cost = j_details_h.get("computeCostSpent", None)
if cost is not None:
cost_display = "$" + str(round(float(cost) / 100, 4))
else:
cost_display = "None"
# when the job is just running this value might not be present
master_instance = j_details_h.get("masterInstance", {})
used_instance = master_instance.get("usedInstance", {})
instance_type = used_instance.get("type", "N/A")
storage = str(j_details_h.get("storageSizeInGb", 0)) + " GB"
pipeline_url = str(j_details_h.get("workflow", {}).get("repository", {}).get("url", "Not Specified"))
accelerated_file_staging = str(j_details_h.get("usesFusionFileSystem", "None"))
nextflow_version = str(j_details_h.get("nextflowVersion", "None"))
profile = str(j_details_h.get("profile", "None"))
# Create a JSON object with the key-value pairs
# make these separation to preserve order
job_details_json = {
"Status": status,
"Name": name,
"Project": project,
"Owner": owner,
"Pipeline": pipeline,
"ID": str(job_id),
"Submit time": str(start_dt.strftime('%Y-%m-%d %H:%M:%S')),
"End time": str(end_dt.strftime('%Y-%m-%d %H:%M:%S')),
"Run time": str(run_time),
"Commit": str(revision),
"Cost": cost_display,
"Master Instance": str(instance_type),
}
if j_details_h["jobType"] == "nextflowAzure":
try:
job_details_json["Worker Node"] = str(j_details_h["azureBatch"]["vmType"])
except KeyError:
job_details_json["Worker Node"] = "Not Specified"
job_details_json["Storage"] = storage
# Conditionally add the "Job Queue" key if the jobType is not "nextflowAzure"
if j_details_h["jobType"] != "nextflowAzure":
try:
job_details_json["Job Queue ID"] = str(j_details_h["batch"]["jobQueue"]["name"])
job_details_json["Job Queue Name"] = str(j_details_h["batch"]["jobQueue"]["label"])
except KeyError:
job_details_json["Job Queue"] = "Master Node"
job_details_json["Task Resources"] = f"{str(j_details_h['resourceRequirements']['cpu'])} CPUs, " + \
f"{str(j_details_h['resourceRequirements']['ram'])} GB RAM"
job_details_json["Pipeline url"] = pipeline_url
job_details_json["Nextflow Version"] = nextflow_version
job_details_json["Execution Platform"] = execution_platform
job_details_json["Accelerated File Staging"] = accelerated_file_staging
job_details_json["Parameters"] = ';'.join(concat_string.split("\n"))
# Conditionally add the "Command" key if the jobType is "dockerAWS"
if j_details_h["jobType"] == "dockerAWS":
job_details_json["Command"] = str(j_details_h["command"])
job_details_json["Profile"] = profile
if output_format == 'stdout':
# Generate a table for stdout output
console = Console()
table = Table(title="Job Details")
table.add_column("Field", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta", overflow="fold")
for key, value in job_details_json.items():
if key == "Parameters":
table.add_row(key, "\n".join(value.split(";")))
else:
table.add_row(key, str(value))
console.print(table)
elif output_format == 'json':
# Write the JSON object to a file
with open(f"{output_basename}.json", "w") as json_file:
json.dump(job_details_json, json_file, indent=4, ensure_ascii=False)
print(f"\tJob details have been saved to '{output_basename}.json'")
else:
# Write the same details to a CSV file
with open(f"{output_basename}.csv", "w", newline='') as csv_file:
writer = csv.writer(csv_file)
# Write headers (fields) in the first row
writer.writerow(job_details_json.keys())
# Write values in the second row
writer.writerow(job_details_json.values())
print(f"\tJob details have been saved to '{output_basename}.csv'")