Source code for dtlpy.entities.pipeline_execution

from collections import namedtuple
import logging
import traceback
import attr

from .. import repositories, entities, services

logger = logging.getLogger(name='dtlpy')


class PipelineExecutionNode:
    def __init__(self, name, node_id, ports, metadata, node_type, namespace, project_id, status):
        self.node_id = node_id
        self.namespace = namespace
        self.node_type = node_type
        self.status = status
        self.ports = ports
        self.metadata = metadata
        self.project_id = project_id
        self.name = name

    @staticmethod
    def from_json(_json: dict):
        ports = [entities.PipelineNodeIO.from_json(_json=i_input) for i_input in _json.get('ports', list())]
        return PipelineExecutionNode(
            node_id=_json.get('id', None),
            namespace=_json.get('namespace', None),
            node_type=_json.get('type', None),
            status=_json.get('status', None),
            ports=ports,
            metadata=_json.get('metadata', None),
            project_id=_json.get('projectId', None),
            name=_json.get('name', None),
        )

    def to_json(self):
        _json = {
            'id': self.node_id,
            'namespace': self.namespace,
            'type': self.node_type,
            'status': self.status,
            'ports': [_io.to_json() for _io in self.ports],
            'metadata': self.metadata,
            'projectId': self.project_id,
            'name': self.name,
        }

        return _json


[docs]@attr.s class PipelineExecution(entities.BaseEntity): """ Package object """ # platform id = attr.ib() nodes = attr.ib(repr=False) executions = attr.ib(repr=False) status = attr.ib() # name change created_at = attr.ib() updated_at = attr.ib(repr=False) pipeline_id = attr.ib() max_attempts = attr.ib() # sdk _pipeline = attr.ib(repr=False) _client_api = attr.ib(type=services.ApiClient, repr=False) _repositories = attr.ib(repr=False) @staticmethod def _protected_from_json(_json, client_api, pipeline, is_fetched=True): """ Same as from_json but with try-except to catch if error :param _json: platform json :param client_api: ApiClient entity :param pipeline: Pipeline entity :param is_fetched: is Entity fetched from Platform :return: """ try: pipeline = PipelineExecution.from_json( _json=_json, client_api=client_api, pipeline=pipeline, is_fetched=is_fetched ) status = True except Exception: pipeline = traceback.format_exc() status = False return status, pipeline
[docs] @classmethod def from_json(cls, _json, client_api, pipeline, is_fetched=True): """ Turn platform representation of pipeline_execution into a pipeline_execution entity :param dict _json: platform representation of package :param dl.ApiClient client_api: ApiClient entity :param dtlpy.entities.pipeline.Pipeline pipeline: Pipeline entity :param bool is_fetched: is Entity fetched from Platform :return: Pipeline entity :rtype: dtlpy.entities.pipeline.Pipeline """ if pipeline is not None: if pipeline.id != _json.get('pipelineId', None): logger.warning('Pipeline has been fetched from a project that is not belong to it') pipeline = None nodes = [PipelineExecutionNode.from_json(_json=node) for node in _json.get('nodes', list())] executions = _json.get('executions', dict()) for node_id, executions_list in executions.items(): if len(executions_list) > 0 and isinstance(executions_list[0], dict): executions[node_id] = [entities.Execution.from_json(_json=execution, client_api=client_api ) for execution in executions_list] inst = cls( id=_json.get('id', None), created_at=_json.get('createdAt', None), updated_at=_json.get('updatedAt', None), pipeline_id=_json.get('pipelineId', None), status=_json.get('status', None), max_attempts=_json.get('maxAttempts', None), nodes=nodes, executions=executions, pipeline=pipeline, client_api=client_api, ) inst.is_fetched = is_fetched return inst
[docs] def to_json(self): """ Turn Package entity into a platform representation of Package :return: platform json of package :rtype: dict """ _json = attr.asdict(self, filter=attr.filters.exclude(attr.fields(PipelineExecution)._repositories, attr.fields(PipelineExecution)._client_api, attr.fields(PipelineExecution)._pipeline, attr.fields(PipelineExecution).nodes, attr.fields(PipelineExecution).created_at, attr.fields(PipelineExecution).updated_at, attr.fields(PipelineExecution).pipeline_id, attr.fields(PipelineExecution).executions, attr.fields(PipelineExecution).max_attempts, )) executions = dict() for node_id, executions_list in self.executions.items(): if len(executions_list) > 0 and isinstance(executions_list[0], entities.Execution): executions[node_id] = [e.to_json() for e in executions_list] else: executions[node_id] = executions_list _json['pipelineId'] = self.pipeline_id _json['maxAttempts'] = self.max_attempts _json['createdAt'] = self.created_at _json['updatedAt'] = self.updated_at _json['nodes'] = [node.to_json() for node in self.nodes] _json['executions'] = executions return _json
######### # Props # ######### @property def pipeline(self): if self._pipeline is None: self._pipeline = self.pipelines.get(pipeline_id=self.pipeline_id, fetch=None) assert isinstance(self._pipeline, entities.Pipeline) return self._pipeline @property def project(self): return self.pipeline.project ################ # repositories # ################ @_repositories.default def set_repositories(self): reps = namedtuple('repositories', field_names=['projects', 'pipelines']) r = reps( projects=repositories.Projects(client_api=self._client_api), pipelines=repositories.Pipelines(client_api=self._client_api), ) return r @property def projects(self): assert isinstance(self._repositories.projects, repositories.Projects) return self._repositories.projects @property def pipelines(self): assert isinstance(self._repositories.pipelines, repositories.Pipelines) return self._repositories.pipelines