Source code for dtlpy.entities.package

from collections import namedtuple
from typing import Union
from enum import Enum
import importlib.util
import traceback
import logging
import inspect
import typing
import json
import os

from .package_module import PackageModule
from .package_slot import PackageSlot
from .. import repositories, entities, exceptions, services

logger = logging.getLogger(name='dtlpy')


[docs]class RequirementOperator(str, Enum): EQUAL = '==', GREATER_THAN = '>', LESS_THAN = '<', EQUAL_OR_LESS_THAN = '<=', EQUAL_OR_GREATER_THAN = '>=' @staticmethod def keys(): return [key.value for key in list(RequirementOperator)]
class PackageRequirement: def __init__(self, name: str, version: str = None, operator: str = None): self.name = name self.version = version valid_operators = RequirementOperator.keys() if operator is not None and operator not in valid_operators: raise Exception('Illegal operator: {}. Please select from: {}'.format(operator, valid_operators)) self.operator = operator def to_json(self): _json = {'name': self.name} if self.version is not None: _json['version'] = self.version if self.operator is not None: _json['operator'] = self.operator return _json @classmethod def from_json(cls, _json: dict): return cls(**_json)
[docs]class Package(entities.DlEntity): """ Package object """ # platform id = entities.DlProperty(location=['id'], type=str) url = entities.DlProperty(location=['url'], type=str) name = entities.DlProperty(location=['name'], type=str) version = entities.DlProperty(location=['version'], type=str) created_at = entities.DlProperty(location=['createdAt'], type=str) updated_at = entities.DlProperty(location=['updatedAt'], type=str) project_id = entities.DlProperty(location=['projectId'], type=str) creator = entities.DlProperty(location=['creator'], type=str) type = entities.DlProperty(location=['type'], type=str) metadata = entities.DlProperty(location=['metadata'], type=dict) ui_hooks = entities.DlProperty(location=['uiHooks'], type=str) service_config = entities.DlProperty(location=['serviceConfig'], type=str) is_global = entities.DlProperty(location=['global'], type=str) codebase: entities.ItemCodebase modules: typing.List[PackageModule] slots: typing.List[PackageSlot] requirements: typing.List[PackageRequirement] # sdk _client_api: services.ApiClient _revisions = None __repositories = None _project = None def __init__(self, _dict): self._dict = _dict def __repr__(self): # TODO need to move to DlEntity return "Package(id={id}, name={name}, creator={creator}, project_id={project_id}, type={type}, version={version})".format( id=self.id, name=self.name, version=self.version, type=self.type, project_id=self.project_id, creator=self.creator) @property def createdAt(self): return self.created_at @property def updatedAt(self): return self.updated_at @property def revisions(self): if self._revisions is None: self._revisions = self.packages.revisions(package=self) return self._revisions @property def platform_url(self): return self._client_api._get_resource_url("projects/{}/packages/{}/main".format(self.project.id, self.id)) @property def codebase_id(self): if self.codebase is not None and self.codebase.type == entities.PackageCodebaseType.ITEM: return self.codebase.item_id return None @codebase_id.setter def codebase_id(self, item_id: str): self.codebase = entities.ItemCodebase(item_id=item_id) @staticmethod def _protected_from_json(_json, client_api, project, is_fetched=True): """ Same as from_json but with try-except to catch if error :param _json: platform json :param client_api: ApiClient entity :return: """ try: package = Package.from_json(_json=_json, client_api=client_api, project=project, is_fetched=is_fetched) status = True except Exception: package = traceback.format_exc() status = False return status, package
[docs] @classmethod def from_json(cls, _json, client_api, project, is_fetched=True): """ Turn platform representation of package into a package entity :param dict _json: platform representation of package :param dl.ApiClient client_api: ApiClient entity :param dtlpy.entities.project.Project project: project entity :param is_fetched: is Entity fetched from Platform :return: Package entity :rtype: dtlpy.entities.package.Package """ if project is not None: if project.id != _json.get('projectId', None): logger.warning('Package has been fetched from a project that is not belong to it') project = None modules = _json.get('modules', None) if modules is not None: modules = [entities.PackageModule.from_json(module) for module in modules] slots = _json.get('slots', None) if slots is not None: slots = [entities.PackageSlot.from_json(_slot) for _slot in slots] codebase = _json.get('codebase', None) if 'codebase' is not None: codebase = entities.Codebase.from_json(_json=codebase, client_api=client_api) requirements = _json.get('requirements', None) if requirements is not None: requirements = [PackageRequirement.from_json(r) for r in requirements] # Entity inst = cls(_dict=_json) inst.codebase = codebase inst.modules = modules inst.requirements = requirements inst.slots = slots # Platform inst._project = project inst._client_api = client_api inst.is_fetched = is_fetched return inst
[docs] def to_json(self): """ Turn Package entity into a platform representation of Package :return: platform json of package :rtype: dict """ _json = self._dict # Sanity modules = self.modules unique_modules = len(modules) == len(set([module.name for module in modules])) if not unique_modules: raise Exception('Cannot have 2 modules by the same name in one package.') if not isinstance(modules, list): raise Exception('Package modules must be a list.') _json['modules'] = [m.to_json() for m in modules] if self.slots is not None: _json['slots'] = [m.to_json() for m in self.slots] if self.codebase is not None: _json['codebase'] = self.codebase.to_json() if self.requirements is not None: _json['requirements'] = [r.to_json() for r in self.requirements] return _json
############ # entities # ############ @property def project(self): if self._project is None: self._project = self.projects.get(project_id=self.project_id, fetch=None) assert isinstance(self._project, entities.Project) return self._project @project.setter def project(self, project): assert isinstance(self._project, entities.Project), "Unknwon 'project' type: {}".format(type(project)) self._project = project ################ # repositories # ################ @property def _repositories(self): if self.__repositories is None: reps = namedtuple('repositories', field_names=['executions', 'services', 'projects', 'packages', 'artifacts', 'codebases', 'models']) self.__repositories = reps( executions=repositories.Executions(client_api=self._client_api, project=self._project), services=repositories.Services(client_api=self._client_api, package=self, project=self._project, project_id=self.project_id), projects=repositories.Projects(client_api=self._client_api), packages=repositories.Packages(client_api=self._client_api, project=self._project), artifacts=repositories.Artifacts(client_api=self._client_api, project=self._project, project_id=self.project_id, package=self), codebases=repositories.Codebases(client_api=self._client_api, project=self._project, project_id=self.project_id), models=repositories.Models(client_api=self._client_api, project=self._project, package=self, project_id=self.project_id) ) return self.__repositories @property def executions(self): assert isinstance(self._repositories.executions, repositories.Executions) return self._repositories.executions @property def services(self): assert isinstance(self._repositories.services, repositories.Services) return self._repositories.services @property def projects(self): assert isinstance(self._repositories.projects, repositories.Projects) return self._repositories.projects @property def packages(self): assert isinstance(self._repositories.packages, repositories.Packages) return self._repositories.packages @property def codebases(self): assert isinstance(self._repositories.codebases, repositories.Codebases) return self._repositories.codebases @property def artifacts(self): assert isinstance(self._repositories.artifacts, repositories.Artifacts) return self._repositories.artifacts @property def models(self): assert isinstance(self._repositories.models, repositories.Models) return self._repositories.models ############## # properties # ############## @property def git_status(self): status = 'Git status unavailable' try: if self.codebase.type == entities.PackageCodebaseType.ITEM: if 'git' in self.codebase.item.metadata: status = self.codebase.item.metadata['git'].get('status', status) except Exception: logging.debug('Error getting codebase') return status @property def git_log(self): log = 'Git log unavailable' try: if self.codebase.type == entities.PackageCodebaseType.ITEM: if 'git' in self.codebase.item.metadata: log = self.codebase.item.metadata['git'].get('log', log) except Exception: logging.debug('Error getting codebase') return log ########### # methods # ###########
[docs] def update(self): """ Update Package changes to platform :return: Package entity """ return self.packages.update(package=self)
[docs] def deploy(self, service_name=None, revision=None, init_input=None, runtime=None, sdk_version=None, agent_versions=None, verify=True, bot=None, pod_type=None, module_name=None, run_execution_as_process=None, execution_timeout=None, drain_time=None, on_reset=None, max_attempts=None, force=False, secrets: list = None, **kwargs): """ Deploy package :param str service_name: service name :param str revision: package revision - default=latest :param init_input: config to run at startup :param dict runtime: runtime resources :param str sdk_version: - optional - string - sdk version :param dict agent_versions: - dictionary - - optional -versions of sdk, agent runner and agent proxy :param str bot: bot email :param str pod_type: pod type dl.InstanceCatalog :param bool verify: verify the inputs :param str module_name: module name :param bool run_execution_as_process: run execution as process :param int execution_timeout: execution timeout :param int drain_time: drain time :param str on_reset: on reset :param int max_attempts: Maximum execution retries in-case of a service reset :param bool force: optional - terminate old replicas immediately :param list secrets: list of the integrations ids :return: Service object :rtype: dtlpy.entities.service.Service **Example**: .. code-block:: python service: dl.Service = package.deploy(service_name=package_name, execution_timeout=3 * 60 * 60, module_name=module.name, runtime=dl.KubernetesRuntime( concurrency=10, pod_type=dl.InstanceCatalog.REGULAR_S, autoscaler=dl.KubernetesRabbitmqAutoscaler( min_replicas=1, max_replicas=20, queue_length=20 ) ) ) """ return self.project.packages.deploy(package=self, service_name=service_name, project_id=self.project_id, revision=revision, init_input=init_input, runtime=runtime, sdk_version=sdk_version, agent_versions=agent_versions, pod_type=pod_type, bot=bot, verify=verify, module_name=module_name, run_execution_as_process=run_execution_as_process, execution_timeout=execution_timeout, drain_time=drain_time, on_reset=on_reset, max_attempts=max_attempts, force=force, jwt_forward=kwargs.get('jwt_forward', None), is_global=kwargs.get('is_global', None), secrets=secrets)
[docs] def checkout(self): """ Checkout as package :return: """ return self.packages.checkout(package=self)
[docs] def delete(self) -> bool: """ Delete Package object :return: True """ return self.packages.delete(package=self)
[docs] def push(self, codebase: Union[entities.GitCodebase, entities.ItemCodebase] = None, src_path: str = None, package_name: str = None, modules: list = None, checkout: bool = False, revision_increment: str = None, service_update: bool = False, service_config: dict = None, package_type='faas' ): """ Push local package :param dtlpy.entities.codebase.Codebase codebase: PackageCode object - defines how to store the package code :param bool checkout: save package to local checkout :param str src_path: location of pacjage codebase folder to zip :param str package_name: name of package :param list modules: list of PackageModule :param str revision_increment: optional - str - version bumping method - major/minor/patch - default = None :param bool service_update: optional - bool - update the service :param dict service_config: optional - json of service - a service that have config from the main service if wanted :param str package_type: default is "faas", one of "app", "ml" :return: package entity :rtype: dtlpy.entities.package.Package **Example**: .. code-block:: python package = packages.push(package_name='package_name', modules=[module], version='1.0.0', src_path=os.getcwd()) """ return self.project.packages.push( package_name=package_name if package_name is not None else self.name, modules=modules if modules is not None else self.modules, revision_increment=revision_increment, codebase=codebase, src_path=src_path, checkout=checkout, service_update=service_update, service_config=service_config, package_type=package_type )
[docs] def pull(self, version=None, local_path=None) -> str: """ Pull local package :param str version: version :param str local_path: local path **Example**: .. code-block:: python path = package.pull(local_path='local_path') """ return self.packages.pull(package=self, version=version, local_path=local_path)
[docs] def build(self, local_path=None, from_local=None, module_name=None): """ Push local model :param local_path: local path where the model code should be. if model is downloaded - this will be the point it will be downloaded (if from_local=False - codebase will be downloaded) :param module_name: Name of the module to build the runner class :param from_local: bool. use current directory to build :return: ModelAdapter (dl.BaseModelAdapter) """ return self.packages.build(package=self, module_name=module_name, local_path=local_path, from_local=from_local)
[docs] def open_in_web(self): """ Open the package in web platform """ self._client_api._open_in_web(url=self.platform_url)
[docs] def test(self, cwd=None, concurrency=None, module_name=entities.package_defaults.DEFAULT_PACKAGE_MODULE_NAME, function_name=entities.package_defaults.DEFAULT_PACKAGE_FUNCTION_NAME, class_name=entities.package_defaults.DEFAULT_PACKAGE_CLASS_NAME, entry_point=entities.package_defaults.DEFAULT_PACKAGE_ENTRY_POINT ): """ Test local package in local environment. :param str cwd: path to the file :param int concurrency: the concurrency of the test :param str module_name: module name :param str function_name: function name :param str class_name: class name :param str entry_point: the file to run like main.py :return: list created by the function that tested the output :rtype: list **Example**: .. code-block:: python package.test(cwd='path_to_package', function_name='run') """ return self.project.packages.test_local_package( cwd=cwd, concurrency=concurrency, package=self, module_name=module_name, function_name=function_name, class_name=class_name, entry_point=entry_point )
@staticmethod def _mockify_input(input_type): _json = dict() if input_type == 'Dataset': _json.update({'dataset_id': 'id'}) if input_type == 'Item': _json.update({'item_id': 'id', 'dataset_id': 'id'}) if input_type == 'Annotation': _json.update({'annotation_id': 'id', 'item_id': 'id', 'dataset_id': 'id'}) return _json def mockify(self, local_path=None, module_name=None, function_name=None): if local_path is None: local_path = os.getcwd() if module_name is None: if self.modules: module_name = self.modules[0].name else: raise exceptions.PlatformException('400', 'Package has no modules') modules = [module for module in self.modules if module.name == module_name] if not modules: raise exceptions.PlatformException('404', 'Module not found: {}'.format(module_name)) module = modules[0] if function_name is None: funcs = [func for func in module.functions] if funcs: func = funcs[0] else: raise exceptions.PlatformException('400', 'Module: {} has no functions'.format(module_name)) else: funcs = [func for func in module.functions if func.name == function_name] if not funcs: raise exceptions.PlatformException('404', 'Function not found: {}'.format(function_name)) func = funcs[0] mock = dict() for module in self.modules: mock['module_name'] = module.name mock['function_name'] = func.name mock['init_params'] = {inpt.name: self._mockify_input(input_type=inpt.type) for inpt in module.init_inputs} mock['inputs'] = [{'name': inpt.name, 'value': self._mockify_input(input_type=inpt.type)} for inpt in func.inputs] with open(os.path.join(local_path, 'mock.json'), 'w') as f: json.dump(mock, f) @staticmethod def get_ml_metadata(cls=None, output_type=entities.AnnotationType.CLASSIFICATION, default_configuration: dict = None): available_methods = ['predict', 'train', 'evaluate'] if cls is not None: available_methods = [ {name: 'BaseModelAdapter' not in getattr(cls, name).__qualname__} for name in ['predict', 'train', 'evaluate'] ] if default_configuration is None: default_configuration = dict() metadata = { 'system': {'ml': {'defaultConfiguration': default_configuration, 'outputType': output_type, 'supportedMethods': available_methods }}} return metadata class decorators: @staticmethod def module(name='default-module', description='', init_inputs=None): def wrapper(cls: typing.Callable): # package_module_dict = package_module.to_json() package_module_dict = {"name": name, "description": description, "functions": list(), "className": cls.__name__} if init_inputs is not None: package_module_dict.update(initInputs=Package.decorators.parse_io(io_list=init_inputs)) for member_name, member in inspect.getmembers(cls, predicate=inspect.isfunction): spec = getattr(member, '__dtlpy__', None) if spec is not None: package_module_dict["functions"].append(spec) cls.__dtlpy__ = package_module_dict return cls return wrapper @staticmethod def function(display_name=None, inputs=None, outputs=None): def wrapper(func: typing.Callable): if display_name is None: d_name = func.__name__ else: d_name = display_name func.__dtlpy__ = {"name": func.__name__, "displayName": d_name, "input": Package.decorators.parse_io(io_list=inputs), "output": Package.decorators.parse_io(io_list=outputs)} return func return wrapper @staticmethod def parse_io(io_list: dict): output = list() if io_list is not None: for io_name, io_type in io_list.items(): if isinstance(io_type, Enum): io_type = io_type.name if isinstance(io_type, type): io_type = io_type.__name__ else: io_type = str(io_type) output.append({"name": io_name, "type": str(io_type)}) return output