import time
from typing import List
import logging
from .. import entities, repositories, exceptions, miscellaneous
from ..services.api_client import ApiClient
logger = logging.getLogger(name='dtlpy')
MIN_INTERVAL = 1
BACKOFF_FACTOR = 1.2
MAX_INTERVAL = 12
[docs]class Models:
"""
Models Repository
"""
def __init__(self,
client_api: ApiClient,
package: entities.Package = None,
project: entities.Project = None,
project_id: str = None):
self._client_api = client_api
self._project = project
self._package = package
self._project_id = project_id
if self._project is not None:
self._project_id = self._project.id
############
# entities #
############
@property
def project(self) -> entities.Project:
if self._project is None:
if self._project_id is not None:
projects = repositories.Projects(client_api=self._client_api)
self._project = projects.get(project_id=self._project_id)
if self._project is None:
if self._package is not None:
if self._package._project is not None:
self._project = self._package._project
if self._project is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "project". need to set a Project entity or use project.models repository')
assert isinstance(self._project, entities.Project)
return self._project
@project.setter
def project(self, project: entities.Project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Project entity')
self._project = project
@property
def package(self) -> entities.Package:
if self._package is None:
raise exceptions.PlatformException(
error='2001',
message='Cannot perform action WITHOUT Package entity in {} repository.'.format(
self.__class__.__name__) +
' Please use package.models or set a model')
assert isinstance(self._package, entities.Package)
return self._package
###########
# methods #
###########
[docs] def get(self, model_name=None, model_id=None) -> entities.Model:
"""
Get model object
:param model_name:
:param model_id:
:return: dl.Model object
"""
if model_id is not None:
success, response = self._client_api.gen_request(req_type="get",
path="/ml/models/{}".format(model_id))
if not success:
raise exceptions.PlatformException(response)
model = entities.Model.from_json(client_api=self._client_api,
_json=response.json(),
project=self._project,
package=self._package)
# verify input model name is same as the given id
if model_name is not None and model.name != model_name:
logger.warning(
"Mismatch found in models.get: model_name is different then model.name:"
" {!r} != {!r}".format(
model_name,
model.name))
elif model_name is not None:
filters = entities.Filters(
resource=entities.FiltersResource.MODEL,
field='name',
values=model_name
)
project_id = None
if self._project is not None:
project_id = self._project.id
elif self._project_id is not None:
project_id = self._project_id
if project_id is not None:
filters.add(field='projectId', values=project_id)
if self._package is not None:
filters.add(field='packageId', values=self._package.id)
models = self.list(filters=filters)
if models.items_count == 0:
raise exceptions.PlatformException(
error='404',
message='Model not found. Name: {}'.format(model_name))
elif models.items_count > 1:
raise exceptions.PlatformException(
error='400',
message='More than one Model found by the name of: {}. Try "get" by id or "list()".'.format(
model_name))
model = models.items[0]
else:
raise exceptions.PlatformException(
error='400',
message='No checked-out Model was found, must checkout or provide an identifier in inputs')
return model
def _build_entities_from_response(self, response_items) -> miscellaneous.List[entities.Model]:
jobs = [None for _ in range(len(response_items))]
pool = self._client_api.thread_pools(pool_name='entity.create')
# return triggers list
for i_service, service in enumerate(response_items):
jobs[i_service] = pool.submit(entities.Model._protected_from_json,
**{'client_api': self._client_api,
'_json': service,
'package': self._package,
'project': self._project})
# get all results
results = [j.result() for j in jobs]
# log errors
_ = [logger.warning(r[1]) for r in results if r[0] is False]
# return good jobs
return miscellaneous.List([r[1] for r in results if r[0] is True])
def _list(self, filters: entities.Filters):
# request
success, response = self._client_api.gen_request(req_type='POST',
path='/ml/models/query',
json_req=filters.prepare())
if not success:
raise exceptions.PlatformException(response)
return response.json()
[docs] def list(self, filters: entities.Filters = None) -> entities.PagedEntities:
"""
List project model
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:return: Paged entity
:rtype: dtlpy.entities.paged_entities.PagedEntities
"""
# default filters
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.MODEL)
if self._project is not None:
filters.add(field='projectId', values=self._project.id)
if self._package is not None:
filters.add(field='packageId', values=self._package.id)
# assert type filters
if not isinstance(filters, entities.Filters):
raise exceptions.PlatformException(error='400',
message='Unknown filters type: {!r}'.format(type(filters)))
if filters.resource != entities.FiltersResource.MODEL:
raise exceptions.PlatformException(
error='400',
message='Filters resource must to be FiltersResource.MODEL. Got: {!r}'.format(filters.resource))
paged = entities.PagedEntities(items_repository=self,
filters=filters,
page_offset=filters.page,
page_size=filters.page_size,
client_api=self._client_api)
paged.get_page()
return paged
def _set_model_filter(self,
metadata: dict,
train_filter: entities.Filters = None,
validation_filter: entities.Filters = None):
if metadata is None:
metadata = {}
if 'system' not in metadata:
metadata['system'] = {}
if 'subsets' not in metadata['system']:
metadata['system']['subsets'] = {}
if train_filter is not None:
metadata['system']['subsets']['train'] = train_filter.prepare() if isinstance(train_filter,
entities.Filters) else train_filter
if validation_filter is not None:
metadata['system']['subsets']['validation'] = validation_filter.prepare() if isinstance(validation_filter,
entities.Filters) else validation_filter
return metadata
[docs] @staticmethod
def add_subset(model: entities.Model, subset_name: str, subset_filter: entities.Filters):
"""
Adds a subset for a model, specifying a subset of the model's dataset that could be used for training or
validation.
:param dtlpy.entities.Model model: the model to which the subset should be added
:param str subset_name: the name of the subset
:param dtlpy.entities.Filters subset_filter: the filtering operation that this subset performs in the dataset.
**Example**
.. code-block:: python
project.models.add_subset(model=model_entity, subset_name='train', subset_filter=dtlpy.Filters(field='dir', values='/train'))
model_entity.metadata['system']['subsets']
{'train': <dtlpy.entities.filters.Filters object at 0x1501dfe20>}
"""
if 'system' not in model.metadata:
model.metadata['system'] = dict()
if 'subsets' not in model.metadata['system']:
model.metadata['system']['subsets'] = dict()
model.metadata['system']['subsets'][subset_name] = subset_filter.prepare()
model.update(system_metadata=True)
[docs] @staticmethod
def delete_subset(model: entities.Model, subset_name: str):
"""
Removes a subset from a model's metadata.
:param dtlpy.entities.Model model: the model to which the subset should be added
:param str subset_name: the name of the subset
**Example**
.. code-block:: python
project.models.add_subset(model=model_entity, subset_name='train', subset_filter=dtlpy.Filters(field='dir', values='/train'))
model_entity.metadata['system']['subsets']
{'train': <dtlpy.entities.filters.Filters object at 0x1501dfe20>}
project.models.delete_subset(model=model_entity, subset_name='train')
model_entity.metadata['system']['subsets']
{}
"""
if model.metadata.get("system", dict()).get("subsets", dict()).get(subset_name) is None:
logger.error(f"Model system metadata incomplete, could not delete subset {subset_name}.")
else:
_ = model.metadata['system']['subsets'].pop(subset_name)
model.update(system_metadata=True)
[docs] def create(
self,
model_name: str,
dataset_id: str = None,
labels: list = None,
ontology_id: str = None,
description: str = None,
model_artifacts: List[entities.Artifact] = None,
project_id=None,
tags: List[str] = None,
package: entities.Package = None,
configuration: dict = None,
status: str = None,
scope: entities.EntityScopeLevel = entities.EntityScopeLevel.PROJECT,
version: str = '1.0.0',
input_type=None,
output_type=None,
train_filter: entities.Filters = None,
validation_filter: entities.Filters = None,
app: entities.App = None
) -> entities.Model:
"""
Create a Model entity
:param str model_name: name of the model
:param str dataset_id: dataset id
:param list labels: list of labels from ontology (must mach ontology id) can be a subset
:param str ontology_id: ontology to connect to the model
:param str description: description
:param model_artifacts: optional list of dl.Artifact. Can be ItemArtifact, LocaArtifact or LinkArtifact
:param str project_id: project that owns the model
:param list tags: list of string tags
:param package: optional - Package object
:param dict configuration: optional - model configuration - dict
:param str status: `str` of the optional values of
:param str scope: the scope level of the model dl.EntityScopeLevel
:param str version: version of the model
:param str input_type: the file type the model expect as input (image, video, txt, etc)
:param str output_type: dl.AnnotationType - the type of annotations the model produces (class, box segment, text, etc)
:param dtlpy.entities.filters.Filters train_filter: Filters entity or a dictionary to define the items' scope in the specified dataset_id for the model train
:param dtlpy.entities.filters.Filters validation_filter: Filters entity or a dictionary to define the items' scope in the specified dataset_id for the model validation
:param dtlpy.entities.App app: App entity to connect the model to
:return: Model Entity
**Example**:
.. code-block:: python
project.models.create(model_name='model_name', dataset_id='dataset_id', labels=['label1', 'label2'], train_filter={filter: {$and: [{dir: "/10K short videos"}]},page: 0,pageSize: 1000,resource: "items"}})
"""
if ontology_id is not None:
# take labels from ontology
ontologies = repositories.Ontologies(client_api=self._client_api)
labels = [label.tag for label in ontologies.get(ontology_id=ontology_id).labels]
if labels is None:
# dont have to have labels. can use an empty list
labels = list()
if input_type is None:
input_type = 'image'
if output_type is None:
output_type = entities.AnnotationType.CLASSIFICATION
if package is None and self._package is None:
raise exceptions.PlatformException('Must provide a Package or create from package.models')
elif package is None:
package = self._package
# TODO need to remove the entire project id user interface - need to take it from dataset id (in BE)
if project_id is None:
if self._project is None:
raise exceptions.PlatformException('Please provide project_id')
project_id = self._project.id
else:
if project_id != self._project_id:
if (isinstance(package, entities.Package) and not package.is_global) or \
(isinstance(package, entities.Dpk) and not package.scope != 'public'):
logger.warning(
"Note! you are specified project_id {!r} which is different from repository context: {!r}".format(
project_id, self._project_id))
if model_artifacts is None:
model_artifacts = []
if not isinstance(model_artifacts, list):
raise ValueError('`model_artifacts` must be a list of dl.Artifact entities')
# create payload for request
payload = {
'packageId': package.id,
'name': model_name,
'projectId': project_id,
'datasetId': dataset_id,
'labels': labels,
'artifacts': [artifact.to_json(as_artifact=True) for artifact in model_artifacts],
'scope': scope,
'version': version,
'inputType': input_type,
'outputType': output_type,
}
if app is not None:
if not isinstance(package, entities.Dpk):
raise ValueError('package must be a Dpk entity')
if app.dpk_name != package.name or app.dpk_version != package.version:
raise ValueError('App and package must be the same')
component_name = None
compute_config = None
for model in package.components.models:
if model['name'] == model_name:
component_name = model['name']
compute_config = model.get('computeConfigs', None)
break
if component_name is None:
raise ValueError('Model name not found in package')
payload['app'] = {
"id": app.id,
"componentName": component_name,
"dpkName": package.name,
"dpkVersion": package.version
}
if compute_config is not None:
payload['app']['computeConfig'] = compute_config
if configuration is not None:
payload['configuration'] = configuration
if tags is not None:
payload['tags'] = tags
if description is not None:
payload['description'] = description
if status is not None:
payload['status'] = status
if train_filter or validation_filter:
metadata = self._set_model_filter(metadata={},
train_filter=train_filter,
validation_filter=validation_filter)
payload['metadata'] = metadata
# request
success, response = self._client_api.gen_request(req_type='post',
path='/ml/models',
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
model = entities.Model.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
package=package)
return model
[docs] def clone(self,
from_model: entities.Model,
model_name: str,
dataset: entities.Dataset = None,
configuration: dict = None,
status=None,
scope=None,
project_id: str = None,
labels: list = None,
description: str = None,
tags: list = None,
train_filter: entities.Filters = None,
validation_filter: entities.Filters = None,
wait=True,
) -> entities.Model:
"""
Clones and creates a new model out of existing one
:param from_model: existing model to clone from
:param str model_name: `str` new model name
:param str dataset: dataset object for the cloned model
:param dict configuration: `dict` (optional) if passed replaces the current configuration
:param str status: `str` (optional) set the new status
:param str scope: `str` (optional) set the new scope. default is "project"
:param str project_id: `str` specify the project id to create the new model on (if other than the source model)
:param list labels: `list` of `str` - label of the model
:param str description: `str` description of the new model
:param list tags: `list` of `str` - label of the model
:param dtlpy.entities.filters.Filters train_filter: Filters entity or a dictionary to define the items' scope in the specified dataset_id for the model train
:param dtlpy.entities.filters.Filters validation_filter: Filters entity or a dictionary to define the items' scope in the specified dataset_id for the model validation
:param bool wait: `bool` wait for model to be ready
:return: dl.Model which is a clone version of the existing model
"""
from_json = {"name": model_name,
"packageId": from_model.package_id,
"configuration": from_model.configuration,
"outputType": from_model.output_type,
"inputType": from_model.input_type}
if project_id is None:
if dataset is not None:
# take dataset project
project_id = dataset.project.id
else:
# take model's project
project_id = self.project.id
from_json['projectId'] = project_id
if dataset is not None:
if labels is None:
labels = list(dataset.labels_flat_dict.keys())
from_json['datasetId'] = dataset.id
if labels is not None:
from_json['labels'] = labels
# if there are new labels - pop the mapping from the original
_ = from_json['configuration'].pop('id_to_label_map', None)
_ = from_json['configuration'].pop('label_to_id_map', None)
if configuration is not None:
from_json['configuration'].update(configuration)
if description is not None:
from_json['description'] = description
if tags is not None:
from_json['tags'] = tags
if scope is not None:
from_json['scope'] = scope
if status is not None:
from_json['status'] = status
metadata = self._set_model_filter(metadata={},
train_filter=train_filter if train_filter is not None else from_model.metadata.get(
'system', {}).get('subsets', {}).get('train', None),
validation_filter=validation_filter if validation_filter is not None else from_model.metadata.get(
'system', {}).get('subsets', {}).get('validation', None))
if metadata:
from_json['metadata'] = metadata
success, response = self._client_api.gen_request(req_type='post',
path='/ml/models/{}/clone'.format(from_model.id),
json_req=from_json)
if not success:
raise exceptions.PlatformException(response)
new_model = entities.Model.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
package=from_model._package)
if wait:
new_model = self.wait_for_model_ready(model=new_model)
return new_model
[docs] def wait_for_model_ready(self, model: entities.Model):
"""
Wait for model to be ready
:param model: Model entity
"""
sleep_time = MIN_INTERVAL
while model.status == entities.ModelStatus.CLONING:
model = self.get(model_id=model.id)
time.sleep(sleep_time)
sleep_time = min(sleep_time * BACKOFF_FACTOR, MAX_INTERVAL)
time.sleep(sleep_time)
return model
@property
def platform_url(self):
return self._client_api._get_resource_url("projects/{}/models".format(self.project.id))
[docs] def open_in_web(self, model=None, model_id=None):
"""
Open the model in web platform
:param model: model entity
:param str model_id: model id
"""
if model is not None:
model.open_in_web()
elif model_id is not None:
self._client_api._open_in_web(url=self.platform_url + '/' + str(model_id) + '/main')
else:
self._client_api._open_in_web(url=self.platform_url)
[docs] def delete(self, model: entities.Model = None, model_name=None, model_id=None):
"""
Delete Model object
:param model: Model entity to delete
:param str model_name: delete by model name
:param str model_id: delete by model id
:return: True
:rtype: bool
"""
# get id and name
if model_id is None:
if model is not None:
model_id = model.id
elif model_name is not None:
model = self.get(model_name=model_name)
model_id = model.id
else:
raise exceptions.PlatformException(error='400',
message='Must input at least one parameter to models.delete')
# request
success, response = self._client_api.gen_request(
req_type="delete",
path="/ml/models/{}".format(model_id)
)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return results
return True
[docs] def update(self,
model: entities.Model,
system_metadata: bool = False) -> entities.Model:
"""
Update Model changes to platform
:param model: Model entity
:param bool system_metadata: True, if you want to change metadata system
:return: Model entity
"""
# payload
payload = model.to_json()
# url
url_path = '/ml/models/{}'.format(model.id)
if system_metadata:
url_path += '?system=true'
# request
success, response = self._client_api.gen_request(req_type='patch',
path=url_path,
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
return entities.Model.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
package=model._package)
[docs] def train(self, model_id: str, service_config=None):
"""
Train the model in the cloud. This will create a service and will run the adapter's train function as an execution
:param model_id: id of the model to train
:param dict service_config : Service object as dict. Contains the spec of the default service to create.
:return:
"""
payload = dict()
if service_config is not None:
payload['serviceConfig'] = service_config
success, response = self._client_api.gen_request(req_type="post",
path=f"/ml/models/{model_id}/train",
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Execution.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project)
[docs] def evaluate(self, model_id: str, dataset_id: str, filters: entities.Filters = None, service_config=None):
"""
Evaluate Model, provide data to evaluate the model on You can also provide specific config for the deployed service
:param str model_id: Model id to predict
:param dict service_config : Service object as dict. Contains the spec of the default service to create.
:param str dataset_id: ID of the dataset to evaluate
:param entities.Filters filters: dl.Filter entity to run the predictions on
:return:
"""
payload = {'input': {'datasetId': dataset_id}}
if service_config is not None:
payload['config'] = {'serviceConfig': service_config}
if filters is None:
filters = entities.Filters()
if filters is not None:
payload['input']['datasetQuery'] = filters.prepare()
success, response = self._client_api.gen_request(req_type="post",
path=f"/ml/models/{model_id}/evaluate",
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Execution.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project)
[docs] def predict(self, model, item_ids, dataset_id=None):
"""
Run model prediction with items
:param model: dl.Model entity to run the prediction.
:param item_ids: a list of item id to run the prediction.
:param dataset_id: a dataset id to run the prediction.
:return:
"""
if len(model.metadata['system'].get('deploy', {}).get('services', [])) == 0:
# no services for model
raise ValueError("Model doesnt have any associated services. Need to deploy before predicting")
if item_ids is None and dataset_id is None:
raise ValueError("Need to provide either item_ids or dataset_id")
payload = {'input': {'itemIds': item_ids, 'datasetId': dataset_id},
'config': {'serviceId': model.metadata['system']['deploy']['services'][0]}}
success, response = self._client_api.gen_request(req_type="post",
path=f"/ml/models/{model.id}/predict",
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Execution.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project)
[docs] def embed(self, model, item_ids=None, dataset_id=None):
"""
Run model embed with items
:param model: dl.Model entity to run the prediction.
:param item_ids: a list of item id to run the embed.
:param dataset_id: a dataset id to run the embed.
:return: Execution
:rtype: dtlpy.entities.execution.Execution
"""
if len(model.metadata['system'].get('deploy', {}).get('services', [])) == 0:
# no services for model
raise ValueError("Model doesnt have any associated services. Need to deploy before predicting")
if item_ids is None and dataset_id is None:
raise ValueError("Need to provide either item_ids or dataset_id")
payload = {'input': {'itemIds': item_ids, 'datasetId': dataset_id},
'config': {'serviceId': model.metadata['system']['deploy']['services'][0]}}
success, response = self._client_api.gen_request(req_type="post",
path=f"/ml/models/{model.id}/embed",
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Execution.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project)
[docs] def embed_datasets(self, model, dataset_ids, attach_trigger=False):
"""
Run model embed with datasets
:param model: dl.Model entity to run the prediction.
:param dataset_ids: a list of dataset id to run the embed.
:param attach_trigger: bool, if True will activate the trigger
:return:
"""
if len(model.metadata['system'].get('deploy', {}).get('services', [])) == 0:
# no services for model
raise ValueError("Model doesnt have any associated services. Need to deploy before predicting")
if dataset_ids is None:
raise ValueError("Need to provide either dataset_id")
payload = {'datasetIds': dataset_ids,
'config': {'serviceId': model.metadata['system']['deploy']['services'][0]},
'attachTrigger': attach_trigger
}
success, response = self._client_api.gen_request(req_type="post",
path=f"/ml/models/{model.id}/embed/datasets",
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
command = entities.Command.from_json(_json=response.json(),
client_api=self._client_api)
command = command.wait()
return command
[docs] def deploy(self, model_id: str, service_config=None) -> entities.Service:
"""
Deploy a trained model. This will create a service that will execute predictions
:param model_id: id of the model to deploy
:param dict service_config : Service object as dict. Contains the spec of the default service to create.
:return: dl.Service: the deployed service
"""
payload = dict()
if service_config is not None:
payload['serviceConfig'] = service_config
success, response = self._client_api.gen_request(req_type="post",
path=f"/ml/models/{model_id}/deploy",
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Service.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
package=self._package)
class Metrics:
def __init__(self, client_api, model=None, model_id=None):
self._client_api = client_api
self._model_id = model_id
self._model = model
@property
def model(self):
return self._model
def create(self, samples, dataset_id) -> bool:
"""
Add Samples for model analytics and metrics
:param samples: list of dl.PlotSample - must contain: model_id, figure, legend, x, y
:param model_id: model id to save samples on
:param dataset_id:
:return: bool: True if success
"""
if not isinstance(samples, list):
samples = [samples]
payload = list()
for sample in samples:
_json = sample.to_json()
_json['modelId'] = self.model.id
_json['datasetId'] = dataset_id
payload.append(_json)
# request
success, response = self._client_api.gen_request(req_type='post',
path='/ml/metrics/publish',
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
return True
def _list(self, filters: entities.Filters):
# request
success, response = self._client_api.gen_request(req_type='POST',
path='/ml/metrics/query',
json_req=filters.prepare())
if not success:
raise exceptions.PlatformException(response)
return response.json()
def _build_entities_from_response(self, response_items) -> miscellaneous.List[entities.Model]:
jobs = [None for _ in range(len(response_items))]
pool = self._client_api.thread_pools(pool_name='entity.create')
# return triggers list
for i_service, sample in enumerate(response_items):
jobs[i_service] = pool.submit(entities.PlotSample,
**{'x': sample.get('data', dict()).get('x', None),
'y': sample.get('data', dict()).get('y', None),
'legend': sample.get('legend', ''),
'figure': sample.get('figure', '')})
# get all results
results = [j.result() for j in jobs]
# return good jobs
return miscellaneous.List(results)
def list(self, filters=None) -> entities.PagedEntities:
"""
List Samples for model analytics and metrics
:param filters: dl.Filter query entity
"""
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.METRICS)
if not isinstance(filters, entities.Filters):
raise exceptions.PlatformException(error='400',
message='Unknown filters type: {!r}'.format(type(filters)))
if filters.resource != entities.FiltersResource.METRICS:
raise exceptions.PlatformException(
error='400',
message='Filters resource must to be FiltersResource.METRICS. Got: {!r}'.format(filters.resource))
if self._model is not None:
filters.add(field='modelId', values=self._model.id)
paged = entities.PagedEntities(items_repository=self,
filters=filters,
page_offset=filters.page,
page_size=filters.page_size,
client_api=self._client_api)
paged.get_page()
return paged