Source code for dtlpy.entities.model

import json
from collections import namedtuple
from enum import Enum
import traceback
import logging

import attr

from .. import repositories, entities
from ..services.api_client import ApiClient

logger = logging.getLogger(name='dtlpy')


[docs]class DatasetSubsetType(str, Enum): """Available types for dataset subsets""" TRAIN = 'train' VALIDATION = 'validation' TEST = 'test'
[docs]class ModelStatus(str, Enum): """Available types for model status""" CREATED = "created", PRE_TRAINED = "pre-trained", PENDING = "pending", TRAINING = "training", TRAINED = "trained", DEPLOYED = "deployed", FAILED = "failed", CLONING = "cloning"
class PlotSample: def __init__(self, figure, legend, x, y): """ Create a single metric sample for Model :param figure: figure name identifier :param legend: line name identifier :param x: x value for the current sample :param y: y value for the current sample """ self.figure = figure self.legend = legend self.x = x self.y = y def to_json(self) -> dict: _json = {'figure': self.figure, 'legend': self.legend, 'data': {'x': self.x, 'y': self.y}} return _json # class MatrixSample: # def __init__(self, figure, legend, x, y): # """ # Create a single metric sample for Model # # :param figure: figure name identifier # :param legend: line name identifier # :param x: x value for the current sample # :param y: y value for the current sample # """ # self.figure = figure # self.legend = legend # self.x = x # self.y = y # # def to_json(self) -> dict: # _json = {'figure': self.figure, # 'legend': self.legend, # 'data': {'x': self.x, # 'y': self.y}} # return _json
[docs]@attr.s class Model(entities.BaseEntity): """ Model object """ # platform id = attr.ib() creator = attr.ib() created_at = attr.ib() updated_at = attr.ib(repr=False) model_artifacts = attr.ib() name = attr.ib() description = attr.ib() ontology_id = attr.ib(repr=False) labels = attr.ib() status = attr.ib() tags = attr.ib() configuration = attr.ib() metadata = attr.ib() input_type = attr.ib() output_type = attr.ib() module_name = attr.ib() url = attr.ib() scope = attr.ib() version = attr.ib() context = attr.ib() # name change package_id = attr.ib(repr=False) project_id = attr.ib() dataset_id = attr.ib(repr=False) # sdk _project = attr.ib(repr=False) _package = attr.ib(repr=False) _dataset = attr.ib(repr=False) _client_api = attr.ib(type=ApiClient, repr=False) _repositories = attr.ib(repr=False) _ontology = attr.ib(repr=False, default=None) updated_by = attr.ib(default=None) @staticmethod def _protected_from_json(_json, client_api, project, package, is_fetched=True): """ Same as from_json but with try-except to catch if error :param _json: platform representation of Model :param client_api: ApiClient entity :param project: project that owns the model :param package: package entity of the model :param is_fetched: is Entity fetched from Platform :return: Model entity """ try: model = Model.from_json(_json=_json, client_api=client_api, project=project, package=package, is_fetched=is_fetched) status = True except Exception: model = traceback.format_exc() status = False return status, model
[docs] @classmethod def from_json(cls, _json, client_api, project, package, is_fetched=True): """ Turn platform representation of model into a model entity :param _json: platform representation of model :param client_api: ApiClient entity :param project: project that owns the model :param package: package entity of the model :param is_fetched: is Entity fetched from Platform :return: Model entity """ if project is not None: if project.id != _json.get('context', {}).get('project', None): logger.warning('Model has been fetched from a project that is not in it projects list') project = None if package is not None: if package.id != _json.get('packageId', None): logger.warning('Model has been fetched from a model that is not in it projects list') model = None model_artifacts = [entities.Artifact.from_json(_json=artifact, client_api=client_api, project=project) for artifact in _json.get('artifacts', list())] inst = cls( configuration=_json.get('configuration', None), description=_json.get('description', None), status=_json.get('status', None), tags=_json.get('tags', None), metadata=_json.get('metadata', dict()), project_id=_json.get('context', {}).get('project', None), dataset_id=_json.get('datasetId', None), package_id=_json.get('packageId', None), model_artifacts=model_artifacts, labels=_json.get('labels', None), ontology_id=_json.get('ontology_id', None), created_at=_json.get('createdAt', None), updated_at=_json.get('updatedAt', None), creator=_json.get('context', {}).get('creator', None), client_api=client_api, name=_json.get('name', None), project=project, package=package, dataset=None, id=_json.get('id', None), url=_json.get('url', None), scope=_json.get('scope', entities.EntityScopeLevel.PROJECT), version=_json.get('version', '1.0.0'), context=_json.get('context', {}), input_type=_json.get('inputType', None), output_type=_json.get('outputType', None), module_name=_json.get('moduleName', None), updated_by=_json.get('updatedBy', None) ) inst.is_fetched = is_fetched return inst
[docs] def to_json(self): """ Get the dict of Model :return: platform json of model :rtype: dict """ _json = attr.asdict(self, filter=attr.filters.exclude(attr.fields(Model)._project, attr.fields(Model)._package, attr.fields(Model)._dataset, attr.fields(Model)._ontology, attr.fields(Model)._repositories, attr.fields(Model)._client_api, attr.fields(Model).package_id, attr.fields(Model).project_id, attr.fields(Model).dataset_id, attr.fields(Model).ontology_id, attr.fields(Model).model_artifacts, attr.fields(Model).created_at, attr.fields(Model).updated_at, attr.fields(Model).input_type, attr.fields(Model).output_type, attr.fields(Model).updated_by )) _json['packageId'] = self.package_id _json['datasetId'] = self.dataset_id _json['createdAt'] = self.created_at _json['updatedAt'] = self.updated_at _json['inputType'] = self.input_type _json['outputType'] = self.output_type _json['moduleName'] = self.module_name model_artifacts = list() for artifact in self.model_artifacts: if artifact.type in ['file', 'dir']: artifact = {'type': 'item', 'itemId': artifact.id} else: artifact = artifact.to_json(as_artifact=True) model_artifacts.append(artifact) _json['artifacts'] = model_artifacts if self.updated_by: _json['updatedBy'] = self.updated_by return _json
############ # entities # ############ @property def project(self): if self._project is None: self._project = self.projects.get(project_id=self.project_id, fetch=None) self._repositories = self.set_repositories() # update the repos with the new fetched entity assert isinstance(self._project, entities.Project) return self._project @property def package(self): if self._package is None: try: self._package = self.packages.get(package_id=self.package_id) except Exception as e: error = e try: self._package = self.dpks.get(dpk_id=self.package_id) except Exception: raise error self._repositories = self.set_repositories() # update the repos with the new fetched entity assert isinstance(self._package, (entities.Package, entities.Dpk)) return self._package @property def dataset(self): if self._dataset is None: if self.dataset_id is None: raise RuntimeError("Model {!r} has no dataset. Can be used only for inference".format(self.id)) self._dataset = self.datasets.get(dataset_id=self.dataset_id, fetch=None) self._repositories = self.set_repositories() # update the repos with the new fetched entity assert isinstance(self._dataset, entities.Dataset) return self._dataset @property def ontology(self): if self._ontology is None: if self.ontology_id is None: raise RuntimeError("Model {!r} has no ontology.".format(self.id)) self._ontology = self.ontologies.get(ontology_id=self.ontology_id) assert isinstance(self._ontology, entities.Ontology) return self._ontology ################ # repositories # ################ @_repositories.default def set_repositories(self): reps = namedtuple('repositories', field_names=['projects', 'datasets', 'packages', 'models', 'ontologies', 'artifacts', 'metrics', 'dpks', 'services']) r = reps(projects=repositories.Projects(client_api=self._client_api), datasets=repositories.Datasets(client_api=self._client_api, project=self._project), models=repositories.Models(client_api=self._client_api, project=self._project, project_id=self.project_id, package=self._package), packages=repositories.Packages(client_api=self._client_api, project=self._project), ontologies=repositories.Ontologies(client_api=self._client_api, project=self._project, dataset=self._dataset), artifacts=repositories.Artifacts(client_api=self._client_api, project=self._project, project_id=self.project_id, model=self), metrics=repositories.Metrics(client_api=self._client_api, model=self), dpks=repositories.Dpks(client_api=self._client_api), services=repositories.Services(client_api=self._client_api, project=self._project, project_id=self.project_id, model_id=self.id, model=self), ) return r @property def platform_url(self): return self._client_api._get_resource_url("projects/{}/model/{}".format(self.project_id, self.id)) @property def projects(self): assert isinstance(self._repositories.projects, repositories.Projects) return self._repositories.projects @property def datasets(self): assert isinstance(self._repositories.datasets, repositories.Datasets) return self._repositories.datasets @property def models(self): assert isinstance(self._repositories.models, repositories.Models) return self._repositories.models @property def packages(self): assert isinstance(self._repositories.packages, repositories.Packages) return self._repositories.packages @property def dpks(self): assert isinstance(self._repositories.dpks, repositories.Dpks) return self._repositories.dpks @property def ontologies(self): assert isinstance(self._repositories.ontologies, repositories.Ontologies) return self._repositories.ontologies @property def artifacts(self): assert isinstance(self._repositories.artifacts, repositories.Artifacts) return self._repositories.artifacts @property def metrics(self): assert isinstance(self._repositories.metrics, repositories.Metrics) return self._repositories.metrics @property def services(self): assert isinstance(self._repositories.services, repositories.Services) return self._repositories.services @property def id_to_label_map(self): if 'id_to_label_map' not in self.configuration: # default if self.ontology_id == 'null' or self.ontology_id is None: self.configuration['id_to_label_map'] = {int(idx): lbl for idx, lbl in enumerate(self.labels)} else: self.configuration['id_to_label_map'] = {int(idx): lbl.tag for idx, lbl in enumerate(self.ontology.labels)} else: self.configuration['id_to_label_map'] = {int(idx): lbl for idx, lbl in self.configuration['id_to_label_map'].items()} return self.configuration['id_to_label_map'] @id_to_label_map.setter def id_to_label_map(self, mapping: dict): self.configuration['id_to_label_map'] = {int(idx): lbl for idx, lbl in mapping.items()} @property def label_to_id_map(self): if 'label_to_id_map' not in self.configuration: self.configuration['label_to_id_map'] = {v: int(k) for k, v in self.id_to_label_map.items()} return self.configuration['label_to_id_map'] @label_to_id_map.setter def label_to_id_map(self, mapping: dict): self.configuration['label_to_id_map'] = {v: int(k) for k, v in mapping.items()} ########### # methods # ###########
[docs] def add_subset(self, subset_name: str, subset_filter: entities.Filters): """ Adds a subset for the model, specifying a subset of the model's dataset that could be used for training or validation. :param str subset_name: the name of the subset :param dtlpy.entities.Filters subset_filter: the filtering operation that this subset performs in the dataset. **Example** .. code-block:: python model.add_subset(subset_name='train', subset_filter=dtlpy.Filters(field='dir', values='/train')) model.metadata['system']['subsets'] {'train': <dtlpy.entities.filters.Filters object at 0x1501dfe20>} """ self.models.add_subset(self, subset_name, subset_filter)
[docs] def delete_subset(self, subset_name: str): """ Removes a subset from the model's metadata. :param str subset_name: the name of the subset **Example** .. code-block:: python model.add_subset(subset_name='train', subset_filter=dtlpy.Filters(field='dir', values='/train')) model.metadata['system']['subsets'] {'train': <dtlpy.entities.filters.Filters object at 0x1501dfe20>} models.delete_subset(subset_name='train') metadata['system']['subsets'] {} """ self.models.delete_subset(self, subset_name)
[docs] def update(self, system_metadata=False): """ Update Models changes to platform :param bool system_metadata: bool - True, if you want to change metadata system :return: Models entity """ return self.models.update(model=self, system_metadata=system_metadata)
[docs] def open_in_web(self): """ Open the model in web platform :return: """ self._client_api._open_in_web(url=self.platform_url)
[docs] def delete(self): """ Delete Model object :return: True """ return self.models.delete(model=self)
[docs] def clone(self, model_name: str, dataset: entities.Dataset = None, configuration: dict = None, status=None, scope=None, project_id: str = None, labels: list = None, description: str = None, tags: list = None, train_filter: entities.Filters = None, validation_filter: entities.Filters = None, wait=True ): """ Clones and creates a new model out of existing one :param str model_name: `str` new model name :param str dataset: dataset object for the cloned model :param dict configuration: `dict` (optional) if passed replaces the current configuration :param str status: `str` (optional) set the new status :param str scope: `str` (optional) set the new scope. default is "project" :param str project_id: `str` specify the project id to create the new model on (if other than the source model) :param list labels: `list` of `str` - label of the model :param str description: `str` description of the new model :param list tags: `list` of `str` - label of the model :param dtlpy.entities.filters.Filters train_filter: Filters entity or a dictionary to define the items' scope in the specified dataset_id for the model train :param dtlpy.entities.filters.Filters validation_filter: Filters entity or a dictionary to define the items' scope in the specified dataset_id for the model validation :param bool wait: `bool` wait for the model to be ready before returning :return: dl.Model which is a clone version of the existing model """ return self.models.clone(from_model=self, model_name=model_name, project_id=project_id, dataset=dataset, scope=scope, status=status, configuration=configuration, labels=labels, description=description, tags=tags, train_filter=train_filter, validation_filter=validation_filter, wait=wait )
[docs] def train(self, service_config=None): """ Train the model in the cloud. This will create a service and will run the adapter's train function as an execution :param dict service_config : Service object as dict. Contains the spec of the default service to create. :return: """ return self.models.train(model_id=self.id, service_config=service_config)
[docs] def evaluate(self, dataset_id, filters: entities.Filters = None, service_config=None): """ Evaluate Model, provide data to evaluate the model on You can also provide specific config for the deployed service :param dict service_config : Service object as dict. Contains the spec of the default service to create. :param str dataset_id: ID of the dataset to evaluate :param entities.Filters filters: dl.Filter entity to run the predictions on :return: """ return self.models.evaluate(model_id=self.id, dataset_id=dataset_id, filters=filters, service_config=service_config)
[docs] def predict(self, item_ids): """ Run model prediction with items :param item_ids: a list of item id to run the prediction. :return: """ return self.models.predict(model=self, item_ids=item_ids)
[docs] def deploy(self, service_config=None) -> entities.Service: """ Deploy a trained model. This will create a service that will execute predictions :param dict service_config : Service object as dict. Contains the spec of the default service to create. :return: dl.Service: The deployed service """ return self.models.deploy(model_id=self.id, service_config=service_config)
[docs] def wait_for_model_ready(self): """ Wait for model to be ready :return: """ return self.models.wait_for_model_ready(model=self)
[docs] def log(self, service=None, size=None, checkpoint=None, start=None, end=None, follow=False, text=None, execution_id=None, function_name=None, replica_id=None, system=False, view=True, until_completed=True, model_operation: str = None, ): """ Get service logs :param service: service object :param int size: size :param dict checkpoint: the information from the lst point checked in the service :param str start: iso format time :param str end: iso format time :param bool follow: if true, keep stream future logs :param str text: text :param str execution_id: execution id :param str function_name: function name :param str replica_id: replica id :param bool system: system :param bool view: if true, print out all the logs :param bool until_completed: wait until completed :param str model_operation: model operation action :return: ServiceLog entity :rtype: ServiceLog **Example**: .. code-block:: python service_log = service.log() """ return self.services.log(service=service, size=size, checkpoint=checkpoint, start=start, end=end, follow=follow, execution_id=execution_id, function_name=function_name, replica_id=replica_id, system=system, text=text, view=view, until_completed=until_completed, model_id=self.id, model_operation=model_operation, project_id=self.project_id)