Source code for cloudos_cli.utils.details

from datetime import datetime
from rich.console import Console
from rich.table import Table
import json
import csv
import os
import sys

from cloudos_cli.constants import (
    JOB_STATUS_SYMBOLS,
    COLUMN_PRIORITY_GROUPS,
    ESSENTIAL_COLUMN_PRIORITY,
    ADDITIONAL_COLUMN_PRIORITY,
    COLUMN_CONFIGS
)


[docs] def get_path(param, param_kind_map, execution_platform, storage_provider, mode="parameters"): """ Constructs a storage path based on the parameter kind and execution platform. Parameters ---------- param : dict A dictionary containing parameter details. Expected keys include: - 'parameterKind': Specifies the kind of parameter (e.g., 'dataItem', 'globPattern'). - For 'dataItem': Contains nested keys such as 'item', which includes: - 's3BucketName', 's3ObjectKey', 's3Prefix' (for AWS Batch). - 'blobStorageAccountName', 'blobContainerName', 'blobName' (for other platforms). - For 'globPattern': Contains nested keys such as 'folder', which includes: - 's3BucketName', 's3Prefix' (for AWS Batch). - 'blobStorageAccountName', 'blobContainerName', 'blobPrefix' (for other platforms). param_kind_map : dict A mapping of parameter kinds to their corresponding keys in the `param` dictionary. execution_platform : str The platform on which the execution is taking place. Expected values include "Batch AWS" or other non-AWS platforms. storage_provider : str Either s3:// or az:// mode : str For "parameters" is creating the '*.config' file and it adds the complete path, for "asis" leaves the constructed path as generated from the API Returns ------- str: A constructed storage path based on the parameter kind and execution platform. - For 'dataItem' on AWS Batch: "s3BucketName/s3ObjectKey" or "s3BucketName/s3Prefix". - For 'dataItem' on other platforms: "blobStorageAccountName/blobContainerName/blobName". - For 'globPattern' on AWS Batch: "s3BucketName/s3Prefix/globPattern". - For 'globPattern' on other platforms: "blobStorageAccountName/blobContainerName/blobPrefix/globPattern". """ # Handle unsupported parameter kinds (e.g., legacy 'lustreFileSystem' from historical jobs) parameter_kind = param['parameterKind'] if parameter_kind not in param_kind_map: # Return the parameterKind as-is to indicate an unsupported/legacy parameter type return f"<unsupported parameter type: {parameter_kind}>" value = param[param_kind_map[parameter_kind]] if param['parameterKind'] == 'dataItem': if execution_platform == "Batch AWS": s3_object_key = value['item'].get('s3ObjectKey', None) if value['item'].get('s3Prefix', None) is None else value['item'].get('s3Prefix', None) if mode == "parameters": value = storage_provider + value['item']['s3BucketName'] + '/' + s3_object_key else: value = value['item']['s3BucketName'] + '/' + s3_object_key else: account_name = value['item']['blobStorageAccountName'] + ".blob.core.windows.net" container_name = value['item']['blobContainerName'] blob_name = value['item']['blobName'] if mode == "parameters": value = storage_provider + account_name + '/' + container_name + '/' + blob_name else: value = value['item']['blobStorageAccountName'] + '/' + container_name + '/' + blob_name elif param['parameterKind'] == 'globPattern': if execution_platform == "Batch AWS": if mode == "parameters": value = storage_provider + param['folder']['s3BucketName'] + '/' + param['folder']['s3Prefix'] + '/' + param['globPattern'] else: value = param['folder']['s3BucketName'] + '/' + param['folder']['s3Prefix'] + '/' + param['globPattern'] else: account_name = param['folder']['blobStorageAccountName'] + ".blob.core.windows.net" container_name = param['folder']['blobContainerName'] blob_name = param['folder']['blobPrefix'] if mode == "parameters": value = storage_provider + account_name + '/' + container_name + '/' + blob_name + '/' + param['globPattern'] else: value = param['folder']['blobStorageAccountName'] + '/' + container_name + '/' + blob_name + '/' + param['globPattern'] return value
[docs] def create_job_details(j_details_h, job_id, output_format, output_basename, parameters, cloudos_url="https://cloudos.lifebit.ai"): """ Creates formatted job details output from job data in multiple formats. This function processes job details from the Lifebit Platform API response and outputs the information in a user-specified format (stdout table, JSON, or CSV). It also optionally creates configuration files with job parameters. Parameters ---------- j_details_h : dict A dictionary containing job details from the Lifebit Platform API. Expected keys include: - 'jobType': The type of job executor (e.g., 'nextflowAWS', 'dockerAWS'). - 'parameters': List of parameter dictionaries for the job. - 'status': Current status of the job. - 'name': Name of the job. - 'project': Dictionary containing project information with 'name' key. - 'user': Dictionary containing user information with 'name' and 'surname' keys. - 'workflow': Dictionary containing workflow information. - 'startTime': ISO format timestamp of job start. - 'endTime': ISO format timestamp of job completion. - 'computeCostSpent': Cost in cents (optional). - 'masterInstance': Dictionary containing instance information. - 'storageSizeInGb': Storage size allocated to the job. - 'resourceRequirements': Dictionary with 'cpu' and 'ram' specifications. - Additional platform-specific keys based on jobType. job_id : str Unique identifier for the job. output_format : str Format for output display. Expected values: - 'stdout': Display as a formatted table in the console. - 'json': Save as a JSON file. - 'csv': Save as a CSV file. output_basename : str Base name for output files (without extension). Used when output_format is 'json' or 'csv'. parameters : bool Whether to create a separate configuration file containing job parameters. If True and parameters exist, creates a '.config' file with Nextflow-style parameter formatting. cloudos_url : str, optional The base URL of the Lifebit Platform instance. Defaults to "https://cloudos.lifebit.ai". Returns ------- None This function has side effects only: - Prints formatted output to console (for 'stdout' format). - Creates output files (for 'json' and 'csv' formats). - Optionally creates parameter configuration files. - Prints status messages about file creation. Notes ----- The function handles different job types and execution platforms: - AWS Batch (nextflowAWS, dockerAWS, cromwellAWS) - Azure Batch (nextflowAzure) - Google Cloud Platform (nextflowGcp) - HPC clusters (nextflowHpc) - Kubernetes (nextflowKubernetes) Parameter processing depends on the parameter kind: - 'textValue': Simple text parameters - 'arrayFileColumn': Column-based array parameters - 'globPattern': File pattern matching parameters - 'dataItem': Data file/object parameters Time calculations assume UTC timezone and convert ISO format timestamps to human-readable duration strings. """ # Determine the execution platform based on jobType executors = { 'nextflowAWS': 'Batch AWS', 'nextflowAzure': 'Batch Azure', 'nextflowGcp': 'GCP', 'nextflowHpc': 'HPC', 'nextflowKubernetes': 'Kubernetes', 'dockerAWS': 'Batch AWS', 'cromwellAWS': 'Batch AWS' } execution_platform = executors.get(j_details_h["jobType"], "None") storage_provider = "s3://" if execution_platform == "Batch AWS" else "az://" # Check if the job details contain parameters if j_details_h["parameters"] != []: param_kind_map = { 'textValue': 'textValue', 'arrayFileColumn': 'columnName', 'globPattern': 'globPattern', 'dataItem': 'dataItem' } # there are different types of parameters, arrayFileColumn, globPattern # get first the type of parameter, then the value based on the parameter kind concats = [] for param in j_details_h["parameters"]: concats.append(f"{param['prefix']}{param['name']}={get_path(param, param_kind_map, execution_platform, storage_provider, 'asis')}") concat_string = '\n'.join(concats) # If the user requested to save the parameters in a config file if parameters: # Create a config file with the parameters config_filename = f"{output_basename}.config" with open(config_filename, 'w') as config_file: config_file.write("params {\n") for param in j_details_h["parameters"]: config_file.write(f"\t{param['name']} = {get_path(param, param_kind_map, execution_platform, storage_provider)}\n") config_file.write("}\n") print(f"\tJob parameters have been saved to '{config_filename}'") else: concat_string = 'No parameters provided' if parameters: print("\tNo parameters found in the job details, no config file will be created.") # revision if j_details_h["jobType"] == "dockerAWS": revision = j_details_h["revision"]["digest"] else: revision = j_details_h["revision"]["commit"] # Output the job details status = str(j_details_h.get("status", "None")) name = str(j_details_h.get("name", "None")) project = str(j_details_h.get("project", {}).get("name", "None")) owner = str(j_details_h.get("user", {}).get("name", "None") + " " + j_details_h.get("user", {}).get("surname", "None")) pipeline = str(j_details_h.get("workflow", {}).get("name", "None")) # calculate the run time start_time_raw = j_details_h.get("startTime") end_time_raw = j_details_h.get("endTime") if start_time_raw and end_time_raw: try: start_dt = datetime.fromisoformat(str(start_time_raw).replace('Z', '+00:00')) end_dt = datetime.fromisoformat(str(end_time_raw).replace('Z', '+00:00')) duration = end_dt - start_dt # Format duration as hours:minutes:seconds total_seconds = int(duration.total_seconds()) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 if hours > 0: run_time = f"{hours}h {minutes}m {seconds}s" elif minutes > 0: run_time = f"{minutes}m {seconds}s" else: run_time = f"{seconds}s" submit_time = str(start_dt.strftime('%Y-%m-%d %H:%M:%S')) end_time = str(end_dt.strftime('%Y-%m-%d %H:%M:%S')) except (ValueError, TypeError): run_time = "N/A" submit_time = "N/A" end_time = "N/A" else: run_time = "N/A" submit_time = "N/A" if not start_time_raw else str(datetime.fromisoformat(str(start_time_raw).replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')) end_time = "N/A" if not end_time_raw else str(datetime.fromisoformat(str(end_time_raw).replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')) # determine cost cost = j_details_h.get("computeCostSpent", None) if cost is not None: cost_display = "$" + str(round(float(cost) / 100, 4)) else: cost_display = "None" # when the job is just running this value might not be present master_instance = j_details_h.get("masterInstance", {}) used_instance = master_instance.get("usedInstance", {}) instance_type = used_instance.get("type", "N/A") storage = str(j_details_h.get("storageSizeInGb", 0)) + " GB" pipeline_url = str(j_details_h.get("workflow", {}).get("repository", {}).get("url", "Not Specified")) accelerated_file_staging = str(j_details_h.get("usesFusionFileSystem", "None")) nextflow_version = str(j_details_h.get("nextflowVersion", "None")) profile = str(j_details_h.get("profile", "None")) # Create a JSON object with the key-value pairs # make these separation to preserve order job_details_json = { "Status": status, "Name": name, "Project": project, "Owner": owner, "Pipeline": pipeline, "ID": str(job_id), "Submit time": submit_time, "End time": end_time, "Run time": str(run_time), "Commit": str(revision), "Cost": cost_display, "Master Instance": str(instance_type), } if j_details_h["jobType"] == "nextflowAzure": try: job_details_json["Worker Node"] = str(j_details_h["azureBatch"]["vmType"]) except KeyError: job_details_json["Worker Node"] = "Not Specified" job_details_json["Storage"] = storage # Conditionally add the "Job Queue" key if the jobType is not "nextflowAzure" if j_details_h["jobType"] != "nextflowAzure": try: batch = j_details_h.get("batch", {}) job_queue = batch.get("jobQueue", {}) if batch is not None else {} if job_queue is not None: job_details_json["Job Queue ID"] = str(job_queue.get("name", "Not Specified")) job_details_json["Job Queue Name"] = str(job_queue.get("label", "Not Specified")) else: job_details_json["Job Queue ID"] = "Not Specified" job_details_json["Job Queue Name"] = "Not Specified" except KeyError: job_details_json["Job Queue"] = "Master Node" job_details_json["Task Resources"] = f"{str(j_details_h['resourceRequirements']['cpu'])} CPUs, " + \ f"{str(j_details_h['resourceRequirements']['ram'])} GB RAM" job_details_json["Pipeline url"] = pipeline_url job_details_json["Nextflow Version"] = nextflow_version job_details_json["Execution Platform"] = execution_platform job_details_json["Accelerated File Staging"] = accelerated_file_staging job_details_json["Parameters"] = ';'.join(concat_string.split("\n")) # Conditionally add the "Command" key if the jobType is "dockerAWS" if j_details_h["jobType"] == "dockerAWS": job_details_json["Command"] = str(j_details_h["command"]) job_details_json["Profile"] = profile if output_format == 'stdout': # Generate a table for stdout output console = Console() table = Table(title="Job Details") table.add_column("Field", style="cyan", no_wrap=True) table.add_column("Value", style="magenta", overflow="fold") for key, value in job_details_json.items(): if key == "Parameters": table.add_row(key, "\n".join(value.split(";"))) elif key == "ID": # Add hyperlink to job ID job_url = f"{cloudos_url}/app/advanced-analytics/analyses/{value}" job_id_with_link = f"[link={job_url}]{value}[/link]" table.add_row(key, job_id_with_link) else: table.add_row(key, str(value)) console.print(table) elif output_format == 'json': # Write the JSON object to a file with open(f"{output_basename}.json", "w") as json_file: json.dump(job_details_json, json_file, indent=4, ensure_ascii=False) print(f"\tJob details have been saved to '{output_basename}.json'") else: # Write the same details to a CSV file with open(f"{output_basename}.csv", "w", newline='') as csv_file: writer = csv.writer(csv_file) # Write headers (fields) in the first row writer.writerow(job_details_json.keys()) # Write values in the second row writer.writerow(job_details_json.values()) print(f"\tJob details have been saved to '{output_basename}.csv'")
def _build_job_row_values(job, cloudos_url, terminal_width, columns_to_show): """Helper function to build row values for a single job. Parameters ---------- job : dict Job dictionary from Lifebit Platform API cloudos_url : str Lifebit Platform service URL for generating job links terminal_width : int Current terminal width for responsive formatting columns_to_show : list List of column keys to include Returns ------- list Row values in the order of columns_to_show """ # Status with colored and bold ANSI symbols status_raw = str(job.get("status", "N/A")) status = JOB_STATUS_SYMBOLS.get(status_raw.lower(), status_raw) # Name name = str(job.get("name", "N/A")) # Project project = str(job.get("project", {}).get("name", "N/A")) # Owner (single-line format, no wrapping) user_info = job.get("user", {}) name_part = user_info.get('name', '') surname_part = user_info.get('surname', '') if name_part and surname_part: owner = f"{name_part} {surname_part}" elif name_part or surname_part: owner = name_part or surname_part else: owner = "N/A" # Pipeline pipeline = str(job.get("workflow", {}).get("name", "N/A")).split('\n')[0].strip() if len(pipeline) > 25: pipeline = pipeline[:22] + "..." # ID with hyperlink job_id = str(job.get("_id", "N/A")) job_url = f"{cloudos_url}/app/advanced-analytics/analyses/{job_id}" job_id_with_link = f"[link={job_url}]{job_id}[/link]" # Submit time (single-line format) created_at = job.get("createdAt") if created_at: try: dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) submit_time = dt.strftime('%m-%d %H:%M') if terminal_width < 90 else dt.strftime('%Y-%m-%d %H:%M') except (ValueError, TypeError): submit_time = "N/A" else: submit_time = "N/A" # End time (single-line format) end_time_raw = job.get("endTime") if end_time_raw: try: dt = datetime.fromisoformat(end_time_raw.replace('Z', '+00:00')) end_time = dt.strftime('%m-%d %H:%M') if terminal_width < 90 else dt.strftime('%Y-%m-%d %H:%M') except (ValueError, TypeError): end_time = "N/A" else: end_time = "N/A" # Run time (calculate from startTime and endTime) start_time_raw = job.get("startTime") if start_time_raw and end_time_raw: try: start_dt = datetime.fromisoformat(start_time_raw.replace('Z', '+00:00')) end_dt = datetime.fromisoformat(end_time_raw.replace('Z', '+00:00')) duration = end_dt - start_dt total_seconds = int(duration.total_seconds()) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 if hours > 0: run_time = f"{hours}h {minutes}m {seconds}s" elif minutes > 0: run_time = f"{minutes}m {seconds}s" else: run_time = f"{seconds}s" except (ValueError, TypeError): run_time = "N/A" else: run_time = "N/A" # Commit revision = job.get("revision", {}) if job.get("jobType") == "dockerAWS": commit = str(revision.get("digest", "N/A")) else: commit = str(revision.get("commit", "N/A")) if commit != "N/A" and len(commit) > 7: commit = commit[:7] # Cost cost_raw = job.get("computeCostSpent") or job.get("realInstancesExecutionCost") if cost_raw is not None: try: cost = f"${float(cost_raw) / 100:.4f}" except (ValueError, TypeError): cost = "N/A" else: cost = "N/A" # Resources (instance type only) master_instance = job.get("masterInstance", {}) used_instance = master_instance.get("usedInstance", {}) instance_type = used_instance.get("type", "N/A") resources = instance_type if instance_type else "N/A" # Storage type storage_mode = job.get("storageMode", "N/A") if storage_mode == "regular": storage_type = "Regular" else: storage_type = str(storage_mode).capitalize() if storage_mode != "N/A" else "N/A" # Map column keys to their values column_values = { 'status': status, 'name': name, 'project': project, 'owner': owner, 'pipeline': pipeline, 'id': job_id_with_link, 'submit_time': submit_time, 'end_time': end_time, 'run_time': run_time, 'commit': commit, 'cost': cost, 'resources': resources, 'storage_type': storage_type } # Return row values in the order of columns_to_show return [column_values[col] for col in columns_to_show] def _create_status_legend(): """Create a formatted legend for job status symbols. Returns ------- str Formatted legend string with status symbols and their meanings. """ legend_items = [ "[bold cyan]◷[/bold cyan] = Scheduled", "[bold bright_black]○[/bold bright_black] = Initialising", "[bold bright_black]◐[/bold bright_black] = Running", "[bold green]✓[/bold green] = Completed", "[bold red]✗[/bold red] = Failed", "[bold orange3]⊡[/bold orange3] = Aborting", "[bold orange3]■[/bold orange3] = Aborted", "[bold bright_black]?[/bold bright_black] = Unknown" ] return "[cyan]Legend:[/cyan] " + " | ".join(legend_items) def _build_job_table(jobs, cloudos_url, terminal_width, columns_to_show, column_configs): """Helper function to build a complete job table. Parameters ---------- jobs : list List of job dictionaries from Lifebit Platform API cloudos_url : str Lifebit Platform service URL for generating job links terminal_width : int Current terminal width for responsive formatting columns_to_show : list List of column keys to include column_configs : dict Dictionary of all column configurations Returns ------- Table Rich Table object populated with job rows """ table = Table() # Add columns to table for col_key in columns_to_show: col_config = column_configs[col_key] table.add_column( col_config["header"], style=col_config.get("style"), no_wrap=col_config.get("no_wrap", False), overflow=col_config.get("overflow"), min_width=col_config.get("min_width"), max_width=col_config.get("max_width") ) # Add rows for each job for job in jobs: row_values = _build_job_row_values(job, cloudos_url, terminal_width, columns_to_show) table.add_row(*row_values) return table def _calculate_table_width(column_list, col_configs): """Calculate total table width including all overhead.""" borders_and_separators = 2 + (len(column_list) - 1) column_widths = sum( col_configs[col].get('max_width', col_configs[col].get('min_width', 10)) + 2 for col in column_list ) buffer = 2 return borders_and_separators + column_widths + buffer def _fit_columns_to_terminal(cols, terminal_w, col_configs, preserve_order=False): """Build column list progressively, only adding columns that fit completely. Parameters ---------- cols : list List of column keys to fit terminal_w : int Terminal width to fit columns into col_configs : dict Column configuration dictionary preserve_order : bool If True, preserve the order of cols. If False, reorder by priority. Returns ------- list Columns that fit in the terminal, in appropriate order """ if len(cols) == 0: return cols if preserve_order: # User explicitly specified column order - preserve it result = [] for col in cols: test_list = result + [col] width = _calculate_table_width(test_list, col_configs) if width <= terminal_w: result.append(col) # Continue evaluating remaining columns even if this one doesn't fit # Ensure at least one column is shown, even if terminal is very narrow if len(result) == 0 and len(cols) > 0: # Find the narrowest column that was requested narrowest_col = min(cols, key=lambda c: col_configs[c].get('max_width', col_configs[c].get('min_width', 10))) result.append(narrowest_col) return result # Auto-selection mode: reorder by priority for better UX essential_requested = [col for col in ESSENTIAL_COLUMN_PRIORITY if col in cols] additional_requested = [col for col in cols if col not in ESSENTIAL_COLUMN_PRIORITY] additional_ordered = [col for col in ADDITIONAL_COLUMN_PRIORITY if col in additional_requested] additional_ordered.extend([col for col in additional_requested if col not in ADDITIONAL_COLUMN_PRIORITY]) result = [] for col in essential_requested: test_list = result + [col] width = _calculate_table_width(test_list, col_configs) if width <= terminal_w: result.append(col) else: # Column doesn't fit - continue trying remaining columns # Special case: always show at least status on very narrow terminals if len(result) == 0 and col == 'status': result.append(col) # Try to add additional columns one by one for col in additional_ordered: test_list = result + [col] width = _calculate_table_width(test_list, col_configs) if width <= terminal_w: result.append(col) # Continue trying remaining columns even if this one doesn't fit return result
[docs] def create_job_list_table(jobs, cloudos_url, pagination_metadata=None, selected_columns=None, fetch_page_callback=None): """Creates a formatted job list table with responsive design and pagination.""" # Get terminal width for responsive design try: terminal_width = os.get_terminal_size().columns except OSError: terminal_width = 80 # Default fallback if selected_columns is None: if terminal_width < 80: columns_to_show = COLUMN_PRIORITY_GROUPS['minimal'] elif terminal_width <= 100: columns_to_show = COLUMN_PRIORITY_GROUPS['essential'] elif terminal_width < 150: columns_to_show = COLUMN_PRIORITY_GROUPS['essential'] + COLUMN_PRIORITY_GROUPS['important'] elif terminal_width < 180: columns_to_show = (COLUMN_PRIORITY_GROUPS['essential'] + COLUMN_PRIORITY_GROUPS['important'] + COLUMN_PRIORITY_GROUPS['useful']) else: columns_to_show = (COLUMN_PRIORITY_GROUPS['essential'] + COLUMN_PRIORITY_GROUPS['important'] + COLUMN_PRIORITY_GROUPS['useful'] + COLUMN_PRIORITY_GROUPS['extended']) else: if isinstance(selected_columns, str): selected_columns = [col.strip().lower() for col in selected_columns.split(',')] # Store original count before deduplication (for accurate warning message) original_column_count = len(selected_columns) # Check for duplicates duplicates = [col for col in selected_columns if selected_columns.count(col) > 1] if duplicates: # Deduplicate while preserving order seen = set() deduplicated = [] for col in selected_columns: if col not in seen: seen.add(col) deduplicated.append(col) selected_columns = deduplicated # Warn user about deduplication unique_duplicates = list(dict.fromkeys(duplicates)) # Remove duplicates from duplicates list console = Console() console.print(f"[yellow]Warning: Duplicate columns removed: {', '.join(unique_duplicates)}[/yellow]") valid_columns = list(COLUMN_CONFIGS.keys()) invalid_cols = [col for col in selected_columns if col not in valid_columns] if invalid_cols: raise ValueError(f"Invalid column names: {', '.join(invalid_cols)}. " f"Valid columns are: {', '.join(valid_columns)}") columns_to_show = selected_columns effective_width = terminal_width - 5 console = Console(width=terminal_width) # Preserve user-specified column order; auto-selected columns are reordered by priority preserve_order = selected_columns is not None columns_to_show = _fit_columns_to_terminal(columns_to_show, effective_width, COLUMN_CONFIGS, preserve_order) # Warn if user-requested columns were truncated due to narrow terminal if preserve_order and selected_columns: # Use original count before deduplication for accurate message original_count = original_column_count if len(columns_to_show) < original_count: console.print(f"[yellow]Warning: Terminal too narrow. Showing {len(columns_to_show)} of {original_count} requested columns.[/yellow]") console.print(f"[yellow]Increase terminal width to see all columns.[/yellow]\n") if not jobs: console.print("\n[yellow]No jobs found matching the criteria.[/yellow]") return # Use actual terminal_width (not effective_width) for date formatting logic table = _build_job_table(jobs, cloudos_url, terminal_width, columns_to_show, COLUMN_CONFIGS) if not fetch_page_callback or not pagination_metadata: console.print(table) legend = _create_status_legend() console.print(f"\n{legend}\n") if pagination_metadata: total_jobs = pagination_metadata.get('Pagination-Count', 0) current_page = pagination_metadata.get('Pagination-Page', 1) page_size = pagination_metadata.get('Pagination-Limit', 10) total_pages = (total_jobs + page_size - 1) // page_size if total_jobs > 0 else 1 console.print(f"\n[cyan]Showing {len(jobs)} of {total_jobs} total jobs | Page {current_page} of {total_pages}[/cyan]") return current_page = pagination_metadata.get('Pagination-Page', 1) or 1 total_jobs = pagination_metadata.get('Pagination-Count', 0) page_size_value = pagination_metadata.get('Pagination-Limit', 10) total_pages = (total_jobs + page_size_value - 1) // page_size_value if total_jobs > 0 else 1 show_error = None while True: console.clear() console.print(table) legend = _create_status_legend() console.print(f"{legend}\n") console.print(f"\n[cyan]Total jobs:[/cyan] {total_jobs}") if total_pages > 1: console.print(f"[cyan]Page:[/cyan] {current_page} of {total_pages}") console.print(f"[cyan]Jobs on this page:[/cyan] {len(jobs)}") # Show error message if any if show_error: console.print(show_error) show_error = None # Show pagination controls only if there are multiple pages if total_pages > 1: if not sys.stdin.isatty(): console.print("\n[yellow]Note: Pagination not available in non-interactive mode. Showing page 1 of {0}.[/yellow]".format(total_pages)) console.print("[yellow]Run in an interactive terminal to navigate through all pages.[/yellow]") break console.print(f"\n[bold cyan]n[/] = next, [bold cyan]p[/] = prev, [bold cyan]q[/] = quit") try: choice = input(">>> ").strip().lower() except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Pagination interrupted.[/yellow]") break if choice in ("q", "quit"): break elif choice in ("n", "next"): if current_page < total_pages: try: result = fetch_page_callback(current_page + 1) jobs = result.get('jobs', []) pagination_metadata = result.get('pagination_metadata', {}) current_page = pagination_metadata.get('Pagination-Page', current_page + 1) total_pages = pagination_metadata.get('totalPages', (pagination_metadata.get('Pagination-Count', 0) + page_size_value - 1) // page_size_value if pagination_metadata.get('Pagination-Count', 0) > 0 else 1) # Use terminal_width (not effective_width) for consistent date formatting table = _build_job_table(jobs, cloudos_url, terminal_width, columns_to_show, COLUMN_CONFIGS) except Exception as e: show_error = f"[red]Error fetching page: {str(e)}[/red]" else: show_error = "[yellow]Already on last page[/yellow]" elif choice in ("p", "prev", "previous"): if current_page > 1: try: result = fetch_page_callback(current_page - 1) jobs = result.get('jobs', []) pagination_metadata = result.get('pagination_metadata', {}) current_page = pagination_metadata.get('Pagination-Page', current_page - 1) total_pages = pagination_metadata.get('totalPages', (pagination_metadata.get('Pagination-Count', 0) + page_size_value - 1) // page_size_value if pagination_metadata.get('Pagination-Count', 0) > 0 else 1) # Use terminal_width (not effective_width) for consistent date formatting table = _build_job_table(jobs, cloudos_url, terminal_width, columns_to_show, COLUMN_CONFIGS) except Exception as e: show_error = f"[red]Error fetching page: {str(e)}[/red]" else: show_error = "[yellow]Already on first page[/yellow]" else: show_error = "[yellow]Invalid choice. Use 'n' (next), 'p' (previous), or 'q' (quit)[/yellow]" else: break
[docs] def create_workflow_list_table(workflows, cloudos_url="https://cloudos.lifebit.ai", page_size=10): """Display workflows in a rich formatted table with pagination. Parameters ---------- workflows : list A list of dicts, each corresponding to a workflow. cloudos_url : str The Lifebit Platform URL for creating hyperlinks. page_size : int Number of workflows to display per page. Default is 10. """ console = Console() # Handle empty workflow list if len(workflows) == 0: console.print("\n[yellow]No workflows found in this workspace.[/yellow]") return # Prepare rows data rows = [] for workflow in workflows: # Get workflow ID for the hyperlink workflow_id = str(workflow.get("_id", "N/A")) workflow_url = f"{cloudos_url}/app/advanced-analytics/pipelines-and-tools/workspace/{workflow_id}" # Name with hyperlink name = str(workflow.get("name", "N/A")) name_with_link = f"[link={workflow_url}]{name}[/link]" # Archived status # archived_status = workflow.get("archived", {}) # if isinstance(archived_status, dict): # archived = str(archived_status.get("status", "N/A")) # else: # archived = str(archived_status) # Repository name repository = workflow.get("repository", {}) repo_name = str(repository.get("name", "N/A")) repo_url = str(repository.get("url", "N/A")) # Create hyperlink for repository name if URL is available if repo_url != "N/A" and repo_url: repo_name_with_link = f"[link={repo_url}]{repo_name}[/link]" else: repo_name_with_link = repo_name # Repository platform #repo_platform = str(repository.get("platform", "N/A")) # Repository URL #repo_url = str(repository.get("url", "N/A")) # Is private # is_private = str(repository.get("isPrivate", "N/A")) rows.append([ name_with_link, #archived, repo_name_with_link, #repo_platform, #repo_url, #is_private ]) # Pagination current_page = 0 total_pages = (len(rows) + page_size - 1) // page_size if len(rows) > 0 else 1 show_error = None # Track error messages to display while True: start = current_page * page_size end = start + page_size # Clear console first console.clear() # Create and display table table = Table(title="Workflow List") # Add columns table.add_column("Name", style="green", overflow="fold") #table.add_column("Archived", style="yellow", no_wrap=True) table.add_column("Repository", style="cyan", overflow="fold") #table.add_column("Platform", style="green", no_wrap=True) #table.add_column("Repository URL", style="blue", overflow="fold") #table.add_column("Private", style="red", no_wrap=True) # Get rows for current page page_rows = rows[start:end] # Add rows to table for row in page_rows: table.add_row(*row) # Print table console.print(table) # Display total count and page info console.print(f"\n[cyan]Total workflows:[/cyan] {len(workflows)}") if total_pages > 1: console.print(f"[cyan]Page:[/cyan] {current_page + 1} of {total_pages}") console.print(f"[cyan]Workflows on this page:[/cyan] {len(page_rows)}") # Show error message if any if show_error: console.print(show_error) show_error = None # Reset error after displaying # Show pagination controls if total_pages > 1: # Check if we're in an interactive environment if not sys.stdin.isatty(): console.print("\n[yellow]Note: Pagination not available in non-interactive mode. Showing page 1 of {0}.[/yellow]".format(total_pages)) console.print("[yellow]Run in an interactive terminal to navigate through all pages.[/yellow]") break console.print(f"\n[bold cyan]n[/] = next, [bold cyan]p[/] = prev, [bold cyan]q[/] = quit") # Get user input for navigation try: choice = input(">>> ").strip().lower() except (EOFError, KeyboardInterrupt): # Handle non-interactive environments or user interrupt console.print("\n[yellow]Pagination interrupted.[/yellow]") break if choice in ("q", "quit"): break elif choice in ("n", "next"): if current_page < total_pages - 1: current_page += 1 else: show_error = "[red]Invalid choice. Already on the last page.[/red]" elif choice in ("p", "prev"): if current_page > 0: current_page -= 1 else: show_error = "[red]Invalid choice. Already on the first page.[/red]" else: show_error = "[red]Invalid choice. Please enter 'n' (next), 'p' (prev), or 'q' (quit).[/red]" else: # Only one page, no need for input, just exit break
[docs] def create_queue_list_table(queues, cloudos_url="https://cloudos.lifebit.ai"): """Display job queues in a rich formatted table. Parameters ---------- queues : list A list of dicts, each corresponding to a job queue. cloudos_url : str The Lifebit Platform URL for context (currently not used for hyperlinks). Returns ------- None Prints the formatted table to console. """ console = Console() # Handle empty queue list if len(queues) == 0: console.print("\n[yellow]No job queues found in this workspace.[/yellow]") return # Create table table = Table(title="Job Queue List") # Add columns table.add_column("Label", style="green", overflow="fold", min_width=10) table.add_column("Default", style="cyan", no_wrap=True, min_width=7, justify="center") table.add_column("Resource Type", style="magenta", overflow="fold", min_width=12) table.add_column("Status", style="yellow", no_wrap=True, min_width=8, justify="center") # Process each queue for queue in queues: # Label label = str(queue.get("label", "N/A")) # Default (show as checkmark or dash) is_default = queue.get("isDefault", False) if is_default: default_display = "[bold green]Default[/bold green]" else: default_display = "[dim]—[/dim]" # Resource Type resource_type = str(queue.get("resourceType", "N/A")) if not resource_type or resource_type == "": resource_type = "N/A" elif resource_type == "teamBatchJobQueue": resource_type = "Batch Queues" elif resource_type == "systemBatchJobQueue": resource_type = "System Queue" # Status with checkmark/X icons status_raw = str(queue.get("status", "N/A")) if status_raw.lower() == "ready": status = "[bold green]Ready[/bold green]" else: status = "[bold red]Not Ready[/bold red]" # Add row table.add_row(label, default_display, resource_type, status) # Print table console.print(table) # Display total count console.print(f"\n[cyan]Total job queues:[/cyan] {len(queues)}")
[docs] def create_project_list_table(projects, cloudos_url="https://cloudos.lifebit.ai", page_size=10): """Display projects in a rich formatted table with pagination. Parameters ---------- projects : list A list of dicts, each corresponding to a project. cloudos_url : str The Lifebit Platform URL for creating hyperlinks. page_size : int Number of projects to display per page. Default is 10. """ console = Console() # Handle empty project list if len(projects) == 0: console.print("\n[yellow]No projects found in this workspace.[/yellow]") return # Prepare rows data rows = [] for project in projects: # Name with hyperlink project_id = str(project.get("_id", "N/A")) project_url = f"{cloudos_url}/app/data-science/datasets/projects/{project_id}" name = str(project.get("name", "N/A")) name_with_link = f"[link={project_url}]{name}[/link]" # User (combine name and surname) user_info = project.get("user", {}) user_name = user_info.get("name", "") user_surname = user_info.get("surname", "") if user_name and user_surname: user = f"{user_name} {user_surname}" elif user_name: user = user_name elif user_surname: user = user_surname else: user = "N/A" # Created date (format: yyyy.mm.dd) created_at = project.get("createdAt") if created_at: try: created_dt = datetime.fromisoformat(str(created_at).replace('Z', '+00:00')) created = created_dt.strftime('%Y.%m.%d') except (ValueError, TypeError): created = "N/A" else: created = "N/A" # Updated date (format: yyyy.mm.dd) updated_at = project.get("updatedAt") if updated_at: try: updated_dt = datetime.fromisoformat(str(updated_at).replace('Z', '+00:00')) updated = updated_dt.strftime('%Y.%m.%d') except (ValueError, TypeError): updated = "N/A" else: updated = "N/A" # Job count job_count = str(project.get("jobCount", 0)) # Notebook session count notebook_count = str(project.get("notebookSessionCount", 0)) rows.append([ name_with_link, user, created, updated, job_count, notebook_count ]) # Pagination current_page = 0 total_pages = (len(rows) + page_size - 1) // page_size if len(rows) > 0 else 1 show_error = None # Track error messages to display while True: start = current_page * page_size end = start + page_size # Clear console first console.clear() # Create and display table table = Table(title="Project List") # Add columns table.add_column("Name", style="green", overflow="fold", min_width=15) table.add_column("User", style="cyan", overflow="ellipsis", min_width=12, max_width=20) table.add_column("Created", style="magenta", no_wrap=True, min_width=10) table.add_column("Updated", style="blue", no_wrap=True, min_width=10) table.add_column("Jobs", style="yellow", no_wrap=True, min_width=4, justify="right") table.add_column("Notebooks", style="white", no_wrap=True, min_width=9, justify="right") # Get rows for current page page_rows = rows[start:end] # Add rows to table for row in page_rows: table.add_row(*row) # Print table console.print(table) # Display total count and page info console.print(f"\n[cyan]Total projects:[/cyan] {len(projects)}") if total_pages > 1: console.print(f"[cyan]Page:[/cyan] {current_page + 1} of {total_pages}") console.print(f"[cyan]Projects on this page:[/cyan] {len(page_rows)}") # Show error message if any if show_error: console.print(show_error) show_error = None # Reset error after displaying # Show pagination controls if total_pages > 1: # Check if we're in an interactive environment if not sys.stdin.isatty(): console.print("\n[yellow]Note: Pagination not available in non-interactive mode. Showing page 1 of {0}.[/yellow]".format(total_pages)) console.print("[yellow]Run in an interactive terminal to navigate through all pages.[/yellow]") break console.print(f"\n[bold cyan]n[/] = next, [bold cyan]p[/] = prev, [bold cyan]q[/] = quit") # Get user input for navigation try: choice = input(">>> ").strip().lower() except (EOFError, KeyboardInterrupt): # Handle non-interactive environments or user interrupt console.print("\n[yellow]Pagination interrupted.[/yellow]") break if choice in ("q", "quit"): break elif choice in ("n", "next"): if current_page < total_pages - 1: current_page += 1 else: show_error = "[red]Invalid choice. Already on the last page.[/red]" elif choice in ("p", "prev"): if current_page > 0: current_page -= 1 else: show_error = "[red]Invalid choice. Already on the first page.[/red]" else: show_error = "[red]Invalid choice. Please enter 'n' (next), 'p' (prev), or 'q' (quit).[/red]" else: # Only one page, no need for input, just exit break