Source code for cloudos_cli.utils.errors
"""
Specific classes to handle errors.
"""
[docs]
class BadRequestException(Exception):
"""Handle bad request exceptions and shows improved messages.
Parameters
----------
rv : requests.Response
The request variable returned that caused the error.
"""
def __init__(self, rv):
# Try to get the message from response body first
error_message = None
try:
response_body = rv.json()
error_message = response_body.get('message')
except (ValueError, AttributeError):
# Response is not JSON or doesn't have expected structure
pass
# Prioritize message from response, fallback to reason
if error_message:
msg = "Server returned status {}. Message: {}".format(rv.status_code, error_message)
else:
msg = "Server returned status {}. Reason: {}".format(rv.status_code, rv.reason)
super(BadRequestException, self).__init__(msg)
self.rv = rv
[docs]
class TimeOutException(Exception):
"""Handle TimeOut exceptions and shows improved messages.
Parameters
----------
rv : requests.Response
The request variable returned that caused the error.
"""
def __init__(self, rv):
msg = ("Server exceeded the max time to process request. " +
"Status: {}; Reason: {}".format(rv.status_code, rv.reason))
super(TimeOutException, self).__init__(msg)
self.rv = rv
[docs]
class AccountNotLinkedException(Exception):
"""
Displays a meaningful message when the user tries to import a repository from an account that is not linked
with their cloudOS account
"""
def __init__(self, wf_url):
msg = (f"The pipeline at the URL {wf_url} cannot be imported. Check that you repository account " +
"has been linked in your cloudOS workspace")
super(AccountNotLinkedException, self).__init__(msg)
self.wf_url = wf_url
[docs]
class JoBNotCompletedException(Exception):
def __init__(self, job, status):
msg = f"Job {job} has status {status}. Results are only available for jobs with status \"completed\""
super(JoBNotCompletedException, self).__init__(msg)
self.job = job
self.status = status
[docs]
class NotAuthorisedException(Exception):
def __init__(self):
msg = ("Not authorised to run this operation. Check your API key, and that the resource you request is "
"in the same workspace as the workspace specified in the cloudOS cli")
super(NotAuthorisedException, self).__init__(msg)
[docs]
class NoCloudForWorkspaceException(Exception):
def __init__(self, workspace_id):
msg = f"Workspace ID {workspace_id} is not associated with supported cloud providers. Check the workspace ID"
super(NoCloudForWorkspaceException, self).__init__(msg)
self.workspace_id = workspace_id
[docs]
class JobAccessDeniedException(Exception):
def __init__(self, job_id, job_owner_name=None, current_user_name=None):
if job_owner_name and current_user_name:
msg = (f"Access denied to job {job_id}. This job belongs to {job_owner_name}, "
f"but you are authenticated as {current_user_name}. "
f"You can only access jobs that belong to your account.")
elif job_owner_name:
msg = (f"Access denied to job {job_id}. This job belongs to {job_owner_name}. "
f"You can only access jobs that belong to your account.")
else:
msg = (f"Access denied to job {job_id}. You can only access jobs that belong to your account. "
f"This job may belong to another user or you may not have the required permissions.")
super(JobAccessDeniedException, self).__init__(msg)
self.job_id = job_id
self.job_owner_name = job_owner_name
self.current_user_name = current_user_name