import os
from collections import namedtuple
from enum import Enum
from typing import Union
import logging
import traceback
import attr
import json
from .. import repositories, entities, exceptions, services
logger = logging.getLogger(name='dtlpy')
[docs]class RequirementOperator(str, Enum):
EQUAL = '==',
GREATER_THAN = '>',
LESS_THAN = '<',
EQUAL_OR_LESS_THAN = '<=',
EQUAL_OR_GREATER_THAN = '>='
@staticmethod
def keys():
return [key.value for key in list(RequirementOperator)]
class PackageRequirement:
def __init__(self, name: str, version: str = None, operator: str = None):
self.name = name
self.version = version
valid_operators = RequirementOperator.keys()
if operator is not None and operator not in valid_operators:
raise Exception('Illegal operator: {}. Please select from: {}'.format(operator, valid_operators))
self.operator = operator
def to_json(self):
_json = {'name': self.name}
if self.version is not None:
_json['version'] = self.version
if self.operator is not None:
_json['operator'] = self.operator
return _json
@classmethod
def from_json(cls, _json: dict):
return cls(**_json)
[docs]@attr.s
class Package(entities.BaseEntity):
"""
Package object
"""
# platform
id = attr.ib()
url = attr.ib(repr=False)
version = attr.ib()
created_at = attr.ib()
updated_at = attr.ib(repr=False)
name = attr.ib()
codebase = attr.ib()
_modules = attr.ib()
slots = attr.ib(type=list)
ui_hooks = attr.ib()
creator = attr.ib()
is_global = attr.ib()
type = attr.ib()
service_config = attr.ib()
# name change
project_id = attr.ib()
# sdk
_project = attr.ib(repr=False)
_client_api = attr.ib(type=services.ApiClient, repr=False)
_revisions = attr.ib(default=None, repr=False)
_repositories = attr.ib(repr=False)
_artifacts = attr.ib(default=None)
_codebases = attr.ib(default=None)
# defaults
requirements = attr.ib(default=None)
@property
def createdAt(self):
return self.created_at
@property
def updatedAt(self):
return self.updated_at
@property
def modules(self):
return self._modules
@property
def platform_url(self):
return self._client_api._get_resource_url("projects/{}/packages/{}/main".format(self.project.id, self.id))
@property
def codebase_id(self):
if self.codebase is not None and self.codebase.type == entities.PackageCodebaseType.ITEM:
return self.codebase.item_id
return None
@codebase_id.setter
def codebase_id(self, item_id: str):
self.codebase = entities.ItemCodebase(item_id=item_id)
@modules.setter
def modules(self, modules: list):
if not self.unique_modules(modules):
raise Exception('Cannot have 2 modules by the same name in one package.')
if not isinstance(modules, list):
raise Exception('Package modules must be a list.')
self._modules = modules
@staticmethod
def unique_modules(modules: list):
return len(modules) == len(set([module.name for module in modules]))
@staticmethod
def _protected_from_json(_json, client_api, project, is_fetched=True):
"""
Same as from_json but with try-except to catch if error
:param _json: platform json
:param client_api: ApiClient entity
:return:
"""
try:
package = Package.from_json(_json=_json,
client_api=client_api,
project=project,
is_fetched=is_fetched)
status = True
except Exception:
package = traceback.format_exc()
status = False
return status, package
[docs] @classmethod
def from_json(cls, _json, client_api, project, is_fetched=True):
"""
Turn platform representation of package into a package entity
:param dict _json: platform representation of package
:param dl.ApiClient client_api: ApiClient entity
:param dtlpy.entities.project.Project project: project entity
:param is_fetched: is Entity fetched from Platform
:return: Package entity
:rtype: dtlpy.entities.package.Package
"""
if project is not None:
if project.id != _json.get('projectId', None):
logger.warning('Package has been fetched from a project that is not belong to it')
project = None
modules = [entities.PackageModule.from_json(_module) for _module in _json.get('modules', list())]
slots = _json.get('slots', None)
if slots is not None:
slots = [entities.PackageSlot.from_json(_slot) for _slot in slots]
if 'codebase' in _json:
codebase = entities.Codebase.from_json(_json=_json['codebase'],
client_api=client_api)
else:
codebase = None
requirements = _json.get('requirements', None)
if requirements is not None:
requirements = [PackageRequirement.from_json(r) for r in requirements]
inst = cls(
project_id=_json.get('projectId', None),
codebase=codebase,
created_at=_json.get('createdAt', None),
updated_at=_json.get('updatedAt', None),
version=_json.get('version', None),
creator=_json.get('creator', None),
is_global=_json.get('global', None),
client_api=client_api,
modules=modules,
slots=slots,
ui_hooks=_json.get('uiHooks', None),
name=_json.get('name', None),
url=_json.get('url', None),
project=project,
id=_json.get('id', None),
service_config=_json.get('serviceConfig', None),
requirements=requirements,
type=_json.get('type', None)
)
inst.is_fetched = is_fetched
return inst
[docs] def to_json(self):
"""
Turn Package entity into a platform representation of Package
:return: platform json of package
:rtype: dict
"""
_json = attr.asdict(self,
filter=attr.filters.exclude(attr.fields(Package)._project,
attr.fields(Package)._repositories,
attr.fields(Package)._artifacts,
attr.fields(Package)._codebases,
attr.fields(Package)._client_api,
attr.fields(Package)._revisions,
attr.fields(Package).project_id,
attr.fields(Package)._modules,
attr.fields(Package).slots,
attr.fields(Package).is_global,
attr.fields(Package).ui_hooks,
attr.fields(Package).codebase,
attr.fields(Package).service_config,
attr.fields(Package).created_at,
attr.fields(Package).updated_at,
attr.fields(Package).requirements,
))
modules = self.modules
# check in inputs is a list
if not isinstance(modules, list):
modules = [modules]
# if is dtlpy entity convert to dict
if modules and isinstance(modules[0], entities.PackageModule):
modules = [module.to_json() for module in modules]
_json['modules'] = modules
if self.slots is not None:
slot = [slot.to_json() for slot in self.slots]
_json['slots'] = slot
_json['projectId'] = self.project_id
_json['createdAt'] = self.created_at
_json['updatedAt'] = self.updated_at
if self.is_global is not None:
_json['global'] = self.is_global
if self.codebase is not None:
_json['codebase'] = self.codebase.to_json()
if self.ui_hooks is not None:
_json['uiHooks'] = self.ui_hooks
if self.service_config is not None:
_json['serviceConfig'] = self.service_config
if self.requirements is not None:
_json['requirements'] = [r.to_json() for r in self.requirements]
return _json
############
# entities #
############
@property
def revisions(self):
if self._revisions is None:
self._revisions = self.packages.revisions(package=self)
return self._revisions
@property
def project(self):
if self._project is None:
self._project = self.projects.get(project_id=self.project_id, fetch=None)
assert isinstance(self._project, entities.Project)
return self._project
################
# repositories #
################
@_repositories.default
def set_repositories(self):
reps = namedtuple('repositories',
field_names=['executions', 'services', 'projects', 'packages'])
r = reps(executions=repositories.Executions(client_api=self._client_api, project=self._project),
services=repositories.Services(client_api=self._client_api, package=self, project=self._project,
project_id=self.project_id),
projects=repositories.Projects(client_api=self._client_api),
packages=repositories.Packages(client_api=self._client_api, project=self._project))
return r
@property
def executions(self):
assert isinstance(self._repositories.executions, repositories.Executions)
return self._repositories.executions
@property
def services(self):
assert isinstance(self._repositories.services, repositories.Services)
return self._repositories.services
@property
def projects(self):
assert isinstance(self._repositories.projects, repositories.Projects)
return self._repositories.projects
@property
def packages(self):
assert isinstance(self._repositories.packages, repositories.Packages)
return self._repositories.packages
@property
def codebases(self):
if self._codebases is None:
self._codebases = repositories.Codebases(
client_api=self._client_api,
project=self._project,
project_id=self.project_id
)
assert isinstance(self._codebases, repositories.Codebases)
return self._codebases
@property
def artifacts(self):
if self._artifacts is None:
self._artifacts = repositories.Artifacts(
client_api=self._client_api,
project=self._project,
project_id=self.project_id
)
assert isinstance(self._artifacts, repositories.Artifacts)
return self._artifacts
##############
# properties #
##############
@property
def git_status(self):
status = 'Git status unavailable'
try:
if self.codebase.type == entities.PackageCodebaseType.ITEM:
if 'git' in self.codebase.item.metadata:
status = self.codebase.item.metadata['git'].get('status', status)
except Exception:
logging.debug('Error getting codebase')
return status
@property
def git_log(self):
log = 'Git log unavailable'
try:
if self.codebase.type == entities.PackageCodebaseType.ITEM:
if 'git' in self.codebase.item.metadata:
log = self.codebase.item.metadata['git'].get('log', log)
except Exception:
logging.debug('Error getting codebase')
return log
###########
# methods #
###########
[docs] def update(self):
"""
Update Package changes to platform
:return: Package entity
"""
return self.packages.update(package=self)
[docs] def deploy(self,
service_name=None,
revision=None,
init_input=None,
runtime=None,
sdk_version=None,
agent_versions=None,
verify=True,
bot=None,
pod_type=None,
module_name=None,
run_execution_as_process=None,
execution_timeout=None,
drain_time=None,
on_reset=None,
max_attempts=None,
force=False,
**kwargs):
"""
Deploy package
:param str service_name: service name
:param str revision: package revision - default=latest
:param init_input: config to run at startup
:param dict runtime: runtime resources
:param str sdk_version: - optional - string - sdk version
:param dict agent_versions: - dictionary - - optional -versions of sdk, agent runner and agent proxy
:param str bot: bot email
:param str pod_type: pod type dl.InstanceCatalog
:param bool verify: verify the inputs
:param str module_name: module name
:param bool run_execution_as_process: run execution as process
:param int execution_timeout: execution timeout
:param int drain_time: drain time
:param str on_reset: on reset
:param int max_attempts: Maximum execution retries in-case of a service reset
:param bool force: optional - terminate old replicas immediately
:return: Service object
:rtype: dtlpy.entities.service.Service
**Example**:
.. code-block:: python
package.deploy(service_name=package_name,
execution_timeout=3 * 60 * 60,
module_name=module.name,
runtime=dl.KubernetesRuntime(
concurrency=10,
pod_type=dl.InstanceCatalog.REGULAR_S,
autoscaler=dl.KubernetesRabbitmqAutoscaler(
min_replicas=1,
max_replicas=20,
queue_length=20
)
)
)
"""
return self.project.packages.deploy(package=self,
service_name=service_name,
project_id=self.project_id,
revision=revision,
init_input=init_input,
runtime=runtime,
sdk_version=sdk_version,
agent_versions=agent_versions,
pod_type=pod_type,
bot=bot,
verify=verify,
module_name=module_name,
run_execution_as_process=run_execution_as_process,
execution_timeout=execution_timeout,
drain_time=drain_time,
on_reset=on_reset,
max_attempts=max_attempts,
force=force,
jwt_forward=kwargs.get('jwt_forward', None),
is_global=kwargs.get('is_global', None))
[docs] def checkout(self):
"""
Checkout as package
:return:
"""
return self.packages.checkout(package=self)
[docs] def delete(self):
"""
Delete Package object
:return: True
"""
return self.packages.delete(package=self)
[docs] def push(self,
codebase: Union[entities.GitCodebase, entities.ItemCodebase] = None,
src_path: str = None,
package_name: str = None,
modules: list = None,
checkout: bool = False,
revision_increment: str = None,
service_update: bool = False,
service_config: dict = None,
):
"""
Push local package
:param dtlpy.entities.codebase.Codebase codebase: PackageCode object - defines how to store the package code
:param bool checkout: save package to local checkout
:param str src_path: location of pacjage codebase folder to zip
:param str package_name: name of package
:param list modules: list of PackageModule
:param str revision_increment: optional - str - version bumping method - major/minor/patch - default = None
:param bool service_update: optional - bool - update the service
:param dict service_config: optional - json of service - a service that have config from the main service if wanted
:return: package entity
:rtype: dtlpy.entities.package.Package
**Example**:
.. code-block:: python
packages.push(package_name='package_name',
modules=[module],
version='1.0.0',
src_path=os.getcwd()
)
"""
return self.project.packages.push(
package_name=package_name if package_name is not None else self.name,
modules=modules if modules is not None else self.modules,
revision_increment=revision_increment,
codebase=codebase,
src_path=src_path,
checkout=checkout,
service_update=service_update,
service_config=service_config
)
[docs] def pull(self, version=None, local_path=None):
"""
Pull local package
:param str version: version
:param str local_path: local path
**Example**:
.. code-block:: python
package.pull(local_path='local_path')
"""
return self.packages.pull(package=self,
version=version,
local_path=local_path)
[docs] def open_in_web(self):
"""
Open the package in web platform
"""
self._client_api._open_in_web(url=self.platform_url)
[docs] def test(self,
cwd=None,
concurrency=None,
module_name=entities.package_defaults.DEFAULT_PACKAGE_MODULE_NAME,
function_name=entities.package_defaults.DEFAULT_PACKAGE_FUNCTION_NAME,
class_name=entities.package_defaults.DEFAULT_PACKAGE_CLASS_NAME,
entry_point=entities.package_defaults.DEFAULT_PACKAGE_ENTRY_POINT
):
"""
Test local package in local environment.
:param str cwd: path to the file
:param int concurrency: the concurrency of the test
:param str module_name: module name
:param str function_name: function name
:param str class_name: class name
:param str entry_point: the file to run like main.py
:return: list created by the function that tested the output
:rtype: list
**Example**:
.. code-block:: python
package.test(cwd='path_to_package',
function_name='run')
"""
return self.project.packages.test_local_package(
cwd=cwd,
concurrency=concurrency,
package=self,
module_name=module_name,
function_name=function_name,
class_name=class_name,
entry_point=entry_point
)
@staticmethod
def _mockify_input(input_type):
_json = dict()
if input_type == 'Dataset':
_json.update({'dataset_id': 'id'})
if input_type == 'Item':
_json.update({'item_id': 'id', 'dataset_id': 'id'})
if input_type == 'Annotation':
_json.update({'annotation_id': 'id', 'item_id': 'id', 'dataset_id': 'id'})
return _json
def mockify(self, local_path=None, module_name=None, function_name=None):
if local_path is None:
local_path = os.getcwd()
if module_name is None:
if self.modules:
module_name = self.modules[0].name
else:
raise exceptions.PlatformException('400', 'Package has no modules')
modules = [module for module in self.modules if module.name == module_name]
if not modules:
raise exceptions.PlatformException('404', 'Module not found: {}'.format(module_name))
module = modules[0]
if function_name is None:
funcs = [func for func in module.functions]
if funcs:
func = funcs[0]
else:
raise exceptions.PlatformException('400', 'Module: {} has no functions'.format(module_name))
else:
funcs = [func for func in module.functions if func.name == function_name]
if not funcs:
raise exceptions.PlatformException('404', 'Function not found: {}'.format(function_name))
func = funcs[0]
mock = dict()
for module in self.modules:
mock['module_name'] = module.name
mock['function_name'] = func.name
mock['init_params'] = {inpt.name: self._mockify_input(input_type=inpt.type) for inpt in module.init_inputs}
mock['inputs'] = [{'name': inpt.name, 'value': self._mockify_input(input_type=inpt.type)} for inpt in
func.inputs]
with open(os.path.join(local_path, 'mock.json'), 'w') as f:
json.dump(mock, f)