Source code for cloudos_cli.utils.last_wf
from datetime import datetime, timezone
def _parse_iso8601_z(dt_str):
"""
Parse an ISO8601 timestamp string that may end with 'Z' (UTC).
Returns a timezone-aware datetime, or None if parsing fails.
"""
if not dt_str:
return None
if dt_str.endswith('Z'):
dt_str = dt_str[:-1] + '+00:00'
try:
return datetime.fromisoformat(dt_str)
except ValueError:
return None
[docs]
def youngest_workflow_id_by_name(content, target_name, ignore_case=False, return_workflow=True):
"""
Find the most recently created workflow whose .name matches `target_name`.
Parameters
----------
content : dict
API response dict containing top-level "workflows".
target_name : str
Name to match.
ignore_case : bool, default False
If True, case-insensitive comparison.
return_workflow : bool, default False
If True, return the whole workflow dict; else return its _id.
Returns
-------
str | dict | None
_id (default), full workflow dict (if return_workflow=True), or None if no match.
"""
workflows = content.get('workflows') or []
if ignore_case:
target_cmp = target_name.lower()
matches = [wf for wf in workflows if wf.get('name', '').lower() == target_cmp]
else:
matches = [wf for wf in workflows if wf.get('name') == target_name]
if not matches:
return None
def sort_key(wf):
created = _parse_iso8601_z(wf.get('createdAt'))
updated = _parse_iso8601_z(wf.get('updatedAt'))
# Prefer createdAt; fall back to updatedAt; else epoch 0
return created or updated or datetime.fromtimestamp(0, tz=timezone.utc)
youngest = max(matches, key=sort_key)
# keep structure as dictionary, will return inner dictionary just for the selected workflow
youngest_d = {"workflows":[ youngest ]}
youngest_id = youngest.get('_id')
return youngest_d if return_workflow else youngest_id