Source code for dtlpy.entities.pipeline

from collections import namedtuple
import logging
import traceback
import attr

from .node import PipelineNode, PipelineConnection
from .. import repositories, entities, services

logger = logging.getLogger(name='dtlpy')


[docs]@attr.s class Pipeline(entities.BaseEntity): """ Package object """ # platform id = attr.ib() name = attr.ib() creator = attr.ib() org_id = attr.ib() connections = attr.ib() # name change created_at = attr.ib() updated_at = attr.ib(repr=False) start_nodes = attr.ib() project_id = attr.ib() composition_id = attr.ib() url = attr.ib() preview = attr.ib() description = attr.ib() revisions = attr.ib() info = attr.ib() # sdk _project = attr.ib(repr=False) _client_api = attr.ib(type=services.ApiClient, repr=False) _repositories = attr.ib(repr=False) @staticmethod def _protected_from_json(_json, client_api, project, is_fetched=True): """ Same as from_json but with try-except to catch if error :param _json: platform json :param client_api: ApiClient entity :param is_fetched: is Entity fetched from Platform :return: """ try: pipeline = Pipeline.from_json( _json=_json, client_api=client_api, project=project, is_fetched=is_fetched ) status = True except Exception: pipeline = traceback.format_exc() status = False return status, pipeline
[docs] @classmethod def from_json(cls, _json, client_api, project, is_fetched=True): """ Turn platform representation of pipeline into a pipeline entity :param _json: platform representation of package :param client_api: ApiClient entity :param project: project entity :param is_fetched: is Entity fetched from Platform :return: Package entity """ if project is not None: if project.id != _json.get('projectId', None): logger.warning('Pipeline has been fetched from a project that is not belong to it') project = None connections = [PipelineConnection.from_json(_json=con) for con in _json.get('connections', list())] inst = cls( created_at=_json.get('createdAt', None), updated_at=_json.get('updatedAt', None), project_id=_json.get('projectId', None), org_id=_json.get('orgId', None), composition_id=_json.get('compositionId', None), creator=_json.get('creator', None), client_api=client_api, name=_json.get('name', None), project=project, id=_json.get('id', None), connections=connections, start_nodes=_json.get('startNodes', None), url=_json.get('url', None), preview=_json.get('preview', None), description=_json.get('description', None), revisions=_json.get('revisions', None), info=_json.get('info', None) ) for node in _json.get('nodes', list()): inst.nodes.add(node=PipelineNode.from_json(node)) inst.is_fetched = is_fetched return inst
[docs] def to_json(self): """ Turn Package entity into a platform representation of Package :return: platform json of package :rtype: dict """ _json = attr.asdict(self, filter=attr.filters.exclude(attr.fields(Pipeline)._project, attr.fields(Pipeline)._repositories, attr.fields(Pipeline)._client_api, attr.fields(Pipeline).project_id, attr.fields(Pipeline).org_id, attr.fields(Pipeline).connections, attr.fields(Pipeline).created_at, attr.fields(Pipeline).updated_at, attr.fields(Pipeline).start_nodes, attr.fields(Pipeline).project_id, attr.fields(Pipeline).composition_id, attr.fields(Pipeline).url, attr.fields(Pipeline).preview, attr.fields(Pipeline).description, attr.fields(Pipeline).revisions, attr.fields(Pipeline).info )) _json['projectId'] = self.project_id _json['createdAt'] = self.created_at _json['updatedAt'] = self.updated_at _json['compositionId'] = self.composition_id _json['startNodes'] = self.start_nodes _json['orgId'] = self.org_id _json['nodes'] = [node.to_json() for node in self.nodes] _json['connections'] = [con.to_json() for con in self.connections] _json['url'] = self.url _json['preview'] = self.preview _json['description'] = self.description _json['revisions'] = self.revisions _json['info'] = self.info return _json
######### # Props # ######### @property def platform_url(self): return self._client_api._get_resource_url("projects/{}/pipelines/{}".format(self.project_id, self.id)) @property def project(self): if self._project is None: self._project = self.projects.get(project_id=self.project_id, fetch=None) assert isinstance(self._project, entities.Project) return self._project ################ # repositories # ################ @_repositories.default def set_repositories(self): reps = namedtuple('repositories', field_names=['projects', 'pipelines', 'pipeline_executions', 'triggers', 'nodes']) r = reps( projects=repositories.Projects(client_api=self._client_api), pipelines=repositories.Pipelines(client_api=self._client_api), pipeline_executions=repositories.PipelineExecutions( client_api=self._client_api, project=self._project, pipeline=self ), triggers=repositories.Triggers(client_api=self._client_api, pipeline=self), nodes=repositories.Nodes(client_api=self._client_api, pipeline=self) ) return r @property def projects(self): assert isinstance(self._repositories.projects, repositories.Projects) return self._repositories.projects @property def triggers(self): assert isinstance(self._repositories.triggers, repositories.Triggers) return self._repositories.triggers @property def nodes(self): assert isinstance(self._repositories.nodes, repositories.Nodes) return self._repositories.nodes @property def pipelines(self): assert isinstance(self._repositories.pipelines, repositories.Pipelines) return self._repositories.pipelines @property def pipeline_executions(self): assert isinstance(self._repositories.pipeline_executions, repositories.PipelineExecutions) return self._repositories.pipeline_executions ########### # methods # ###########
[docs] def update(self): """ Update pipeline changes to platform :return: pipeline entity """ return self.pipelines.update(pipeline=self)
[docs] def delete(self): """ Delete pipeline object :return: True """ return self.pipelines.delete(pipeline=self)
[docs] def open_in_web(self): """ Open the pipeline in web platform :return: """ self._client_api._open_in_web(url=self.platform_url)
[docs] def install(self): """ install pipeline :return: Composition entity """ return self.pipelines.install(pipeline=self)
[docs] def pause(self): """ pause pipeline :return: Composition entity """ return self.pipelines.pause(pipeline=self)
[docs] def execute(self, execution_input=None): """ execute a pipeline and return the execute :param execution_input: list of the dl.FunctionIO or dict of pipeline input - example {'item': 'item_id'} :return: entities.PipelineExecution object """ execution = self.pipeline_executions.create(pipeline_id=self.id, execution_input=execution_input) return execution
[docs] def set_start_node(self, node: PipelineNode): """ Set the start node of the pipeline :param PipelineNode node: node to be the start node """ if self.start_nodes: for pipe_node in self.start_nodes: if pipe_node['type'] == 'root': pipe_node['nodeId'] = node.node_id else: self.start_nodes = [{"nodeId": node.node_id, "type": "root", }]