import attr
import traceback
from enum import Enum
from collections import namedtuple
from .. import repositories, entities
from ..services.api_client import ApiClient
[docs]class CommandsStatus(str, Enum):
CREATED = 'created'
MAKING_CHILDREN = 'making-children'
WAITING_CHILDREN = 'waiting-children'
IN_PROGRESS = 'in-progress'
ABORTED = 'aborted'
CANCELED = 'canceled'
FINALIZING = 'finalizing'
SUCCESS = 'success'
FAILED = 'failed'
TIMEOUT = 'timeout'
[docs]@attr.s
class Command(entities.BaseEntity):
"""
Com entity
"""
# platform
id = attr.ib()
url = attr.ib(repr=False)
status = attr.ib()
created_at = attr.ib()
updated_at = attr.ib()
type = attr.ib()
progress = attr.ib()
spec = attr.ib()
error = attr.ib()
# sdk
_client_api = attr.ib(type=ApiClient, repr=False)
_repositories = attr.ib(repr=False)
################
# repositories #
################
@_repositories.default
def set_repositories(self):
reps = namedtuple('repositories',
field_names=['commands'])
commands_repo = repositories.Commands(client_api=self._client_api)
r = reps(commands=commands_repo)
return r
@property
def commands(self):
assert isinstance(self._repositories.commands, repositories.Commands)
return self._repositories.commands
@staticmethod
def _protected_from_json(_json, client_api, is_fetched=True):
"""
Same as from_json but with try-except to catch if error
:param _json: platform json
:param client_api: ApiClient entity
:param is_fetched: is Entity fetched from Platform
:return:
"""
try:
command = Command.from_json(_json=_json,
client_api=client_api,
is_fetched=is_fetched)
status = True
except Exception:
command = traceback.format_exc()
status = False
return status, command
[docs] @classmethod
def from_json(cls, _json, client_api, is_fetched=True):
"""
Build a Command entity object from a json
:param _json: _json response from host
:param client_api: ApiClient entity
:param is_fetched: is Entity fetched from Platform
:return: Command object
"""
inst = cls(
id=_json.get('id', None),
url=_json.get('url', None),
status=_json.get('status', None),
created_at=_json.get('createdAt', None),
updated_at=_json.get('updatedAt', None),
type=_json.get('type', None),
progress=_json.get('progress', None),
spec=_json.get('spec', None),
error=_json.get('error', None),
client_api=client_api
)
inst.is_fetched = is_fetched
return inst
[docs] def to_json(self):
"""
Returns platform _json format of object
:return: platform json format of object
:rtype: dict
"""
# get excluded
_json = attr.asdict(self, filter=attr.filters.exclude(attr.fields(Command)._client_api,
attr.fields(Command).created_at,
attr.fields(Command).updated_at))
# rename
_json['createdAt'] = self.created_at
_json['updatedAt'] = self.updated_at
return _json
[docs] def abort(self):
"""
abort command
:return:
"""
return self.commands.abort(command_id=self.id)
[docs] def in_progress(self):
"""
Check if command is still in one of the in progress statuses
:return: True if command still in progress
:rtype: bool
"""
return self.status in [entities.CommandsStatus.CREATED,
entities.CommandsStatus.MAKING_CHILDREN,
entities.CommandsStatus.WAITING_CHILDREN,
entities.CommandsStatus.FINALIZING,
entities.CommandsStatus.IN_PROGRESS]
[docs] def wait(self, timeout=0, step=None, backoff_factor=1):
"""
Wait for Command to finish
:param int timeout: int, seconds to wait until TimeoutError is raised. if 0 - wait until done
:param int step: int, seconds between polling
:param float backoff_factor: A backoff factor to apply between attempts after the second try
:return: Command object
"""
if not self.in_progress():
return self
return self.commands.wait(command_id=self.id,
timeout=timeout,
step=step,
url=self.url,
backoff_factor=backoff_factor
)