import threading
import logging
import time
import numpy as np
from .. import exceptions, entities, repositories, miscellaneous, services
logger = logging.getLogger(name='dtlpy')
[docs]class Executions:
"""
Service Executions Repository
The Executions class allows the users to manage executions (executions of services) and their properties. See our documentation for more information about `executions <https://dataloop.ai/docs/faas-execution>`_.
"""
def __init__(self,
client_api: services.ApiClient,
service: entities.Service = None,
project: entities.Project = None):
self._client_api = client_api
self._service = service
self._project = project
############
# entities #
############
@property
def service(self) -> entities.Service:
if self._service is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "service". need to set a Service entity or use service.executions repository')
assert isinstance(self._service, entities.Service)
return self._service
@service.setter
def service(self, service: entities.Service):
if not isinstance(service, entities.Service):
raise ValueError('Must input a valid Service entity')
self._service = service
@property
def project(self) -> entities.Project:
if self._project is None:
if self._service is not None:
self._project = self._service._project
if self._project is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "project". need to set a Project entity or use Project.executions repository')
assert isinstance(self._project, entities.Project)
return self._project
@project.setter
def project(self, project: entities.Project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Project entity')
self._project = project
def __get_from_entity(self, name, value):
project_id = None
try:
from_dataset = False
entity_obj = None
for input_type in entities.FunctionIO.INPUT_TYPES:
entity = input_type.lower()
param = '{}_id'.format(entity)
if isinstance(value, dict):
if param in value:
repo = getattr(repositories, '{}s'.format(input_type))(client_api=self._client_api)
entity_obj = repo.get(**{param: value[param]})
if param in ['annotation_id', 'item_id']:
from_dataset = True
if param == 'item_id':
entity_obj = entity_obj.dataset
else:
entity_obj = repositories.Datasets(client_api=self._client_api).get(
dataset_id=entity_obj.dataset_id)
elif param in ['dataset_id']:
from_dataset = True
break
elif isinstance(value, str):
if entity == name:
repo = getattr(repositories, '{}s'.format(input_type))(client_api=self._client_api)
entity_obj = repo.get(**{param: value})
if name in ['annotation', 'item']:
from_dataset = True
if param == 'item_id':
entity_obj = entity_obj.dataset
else:
entity_obj = repositories.Datasets(client_api=self._client_api).get(
dataset_id=entity_obj.dataset_id)
elif name in ['dataset']:
from_dataset = True
break
if entity_obj is not None:
if isinstance(entity_obj, entities.Project):
project_id = entity_obj.id
elif from_dataset:
project_id = entity_obj.projects[0]
else:
project_id = entity_obj.project_id
except Exception:
project_id = None
return project_id
def __get_project_id(self, project_id=None, payload=None):
if project_id is None:
inputs = payload.get('input', dict())
try:
for key, val in inputs.items():
project_id = self.__get_from_entity(name=key, value=val)
if project_id is not None:
break
except Exception:
project_id = None
if project_id is None:
# if still None - get from current repository
if self._project is not None:
project_id = self._project.id
else:
raise exceptions.PlatformException('400', 'Please provide project_id')
return project_id
###########
# methods #
###########
[docs] def create(self,
# executions info
service_id: str = None,
execution_input: list = None,
function_name: str = None,
# inputs info
resource: entities.PackageInputType = None,
item_id: str = None,
dataset_id: str = None,
annotation_id: str = None,
project_id: str = None,
# execution config
sync: bool = False,
stream_logs: bool = False,
return_output: bool = False,
# misc
return_curl_only: bool = False,
timeout: int = None) -> entities.Execution:
"""
Execute a function on an existing service
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str service_id: service id to execute on
:param List[FunctionIO] or dict execution_input: input dictionary or list of FunctionIO entities
:param str function_name: function name to run
:param str resource: input type.
:param str item_id: optional - item id as input to function
:param str dataset_id: optional - dataset id as input to function
:param str annotation_id: optional - annotation id as input to function
:param str project_id: resource's project
:param bool sync: if true, wait for function to end
:param bool stream_logs: prints logs of the new execution. only works with sync=True
:param bool return_output: if True and sync is True - will return the output directly
:param bool return_curl_only: return the cURL of the creation WITHOUT actually do it
:param int timeout: int, seconds to wait until TimeoutError is raised. if <=0 - wait until done -
by default wait take the service timeout
:return: execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.create(function_name='function_name', item_id='item_id', project_id='project_id')
"""
if service_id is None:
if self._service is None:
raise exceptions.PlatformException('400', 'Please provide service id')
service_id = self._service.id
if resource is None:
if annotation_id is not None:
resource = entities.PackageInputType.ANNOTATION
elif item_id is not None:
resource = entities.PackageInputType.ITEM
elif dataset_id is not None:
resource = entities.PackageInputType.DATASET
# payload
payload = dict()
if execution_input is None:
if resource is not None:
inputs = {resource.lower(): {
'dataset_id': dataset_id}
}
if item_id is not None:
inputs[resource.lower()]['item_id'] = item_id
if annotation_id is not None:
inputs[resource.lower()]['annotation_id'] = annotation_id
payload['input'] = inputs
else:
if isinstance(execution_input, dict):
payload['input'] = execution_input
else:
if not isinstance(execution_input, list):
execution_input = [execution_input]
if len(execution_input) > 0 and isinstance(execution_input[0], entities.FunctionIO):
payload['input'] = dict()
for single_input in execution_input:
payload['input'].update(single_input.to_json(resource='execution'))
else:
raise exceptions.PlatformException('400', 'Unknown input type')
payload['projectId'] = self.__get_project_id(project_id=project_id, payload=payload)
if function_name is not None:
payload['functionName'] = function_name
else:
payload['functionName'] = entities.package_defaults.DEFAULT_PACKAGE_FUNCTION_NAME
# request url
url_path = '/executions/{service_id}'.format(service_id=service_id)
if return_curl_only:
curl = self._client_api.export_curl_request(req_type='post',
path=url_path,
json_req=payload)
logger.warning(msg='Execution was NOT created. Exporting cURL only.')
return curl
success, response = self._client_api.gen_request(req_type='post',
path=url_path,
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
execution = entities.Execution.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
service=self._service)
if sync and not return_output and not stream_logs:
execution = self.wait(execution_id=execution.id, timeout=timeout)
if sync and (stream_logs or return_output):
thread = None
if stream_logs:
thread = threading.Thread(target=self.logs,
kwargs={'execution_id': execution.id,
'follow': True,
'until_completed': True})
thread.setDaemon(True)
thread.start()
execution = self.get(execution_id=execution.id,
sync=True)
# stream logs
if stream_logs and thread is not None:
thread.join()
if sync and return_output:
return execution.output
return execution
def _list(self, filters: entities.Filters):
"""
List service executions
:param dtlpy.entities.filters.Filters filters: dl.Filters entity to filters items
:return:
"""
url = '/query/faas'
# request
success, response = self._client_api.gen_request(req_type='POST',
path=url,
json_req=filters.prepare())
if not success:
raise exceptions.PlatformException(response)
return response.json()
[docs] def list(self, filters: entities.Filters = None) -> entities.PagedEntities:
"""
List service executions
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.filters.Filters filters: dl.Filters entity to filters items
:return: Paged entity
:rtype: dtlpy.entities.paged_entities.PagedEntities
**Example**:
.. code-block:: python
service.executions.list()
"""
# default filtersf
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.EXECUTION)
# assert type filters
elif not isinstance(filters, entities.Filters):
raise exceptions.PlatformException(
error='400',
message='Unknown filters type: {!r}'.format(type(filters)))
if filters.resource != entities.FiltersResource.EXECUTION:
raise exceptions.PlatformException(
error='400',
message='Filters resource must to be FiltersResource.EXECUTION. Got: {!r}'.format(filters.resource))
if self._project is not None:
filters.add(field='projectId', values=self._project.id)
if self._service is not None:
filters.add(field='serviceId', values=self._service.id)
paged = entities.PagedEntities(items_repository=self,
filters=filters,
page_offset=filters.page,
page_size=filters.page_size,
client_api=self._client_api)
paged.get_page()
return paged
def _build_entities_from_response(self, response_items) -> miscellaneous.List[entities.Execution]:
pool = self._client_api.thread_pools(pool_name='entity.create')
jobs = [None for _ in range(len(response_items))]
# return execution list
for i_item, item in enumerate(response_items):
jobs[i_item] = pool.submit(entities.Execution._protected_from_json,
**{'client_api': self._client_api,
'_json': item,
'project': self._project,
'service': self._service})
# get results
results = [j.result() for j in jobs]
# log errors
_ = [logger.warning(r[1]) for r in results if r[0] is False]
# return good jobs
return miscellaneous.List([r[1] for r in results if r[0] is True])
[docs] def get(self,
execution_id: str = None,
sync: bool = False
) -> entities.Execution:
"""
Get Service execution object
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str execution_id: execution id
:param bool sync: if true, wait for the execution to finish
:return: Service execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.get(execution_id='execution_id')
"""
url_path = "/executions/{}".format(execution_id)
if sync:
return self.wait(execution_id=execution_id)
success, response = self._client_api.gen_request(req_type="get",
path=url_path)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
return entities.Execution.from_json(client_api=self._client_api,
_json=response.json(),
project=self._project,
service=self._service)
[docs] def logs(self,
execution_id: str,
follow: bool = True,
until_completed: bool = True):
"""
executions logs
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str execution_id: execution id
:param bool follow: if true, keep stream future logs
:param bool until_completed: if true, wait until completed
:return: executions logs
**Example**:
.. code-block:: python
service.executions.logs(execution_id='execution_id')
"""
return self.service.log(execution_id=execution_id,
follow=follow,
until_completed=until_completed,
view=True)
[docs] def increment(self, execution: entities.Execution):
"""
Increment the number of attempts that an execution is allowed to attempt to run a service that is not responding.
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.execution.Execution execution:
:return: int
:rtype: int
**Example**:
.. code-block:: python
service.executions.increment(execution='execution_entity')
"""
# request
success, response = self._client_api.gen_request(
req_type='post',
path='/executions/{}/attempts'.format(execution.id)
)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# exception handling
if not success:
raise exceptions.PlatformException(response)
else:
return response.json()
[docs] def rerun(self, execution: entities.Execution, sync: bool = False):
"""
Rerun execution
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.execution.Execution execution:
:param bool sync: wait for the execution to finish
:return: Execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.rerun(execution='execution_entity')
"""
url_path = "/executions/{}/rerun".format(execution.id)
if sync:
url_path += '?sync=true'
# request
success, response = self._client_api.gen_request(req_type='post',
path=url_path)
# exception handling
if not success:
raise exceptions.PlatformException(response)
else:
return entities.Execution.from_json(
client_api=self._client_api,
_json=response.json(),
project=self._project,
service=self._service
)
[docs] def wait(self,
execution_id: str,
timeout: int = None):
"""
Get Service execution object.
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str execution_id: execution id
:param int timeout: seconds to wait until TimeoutError is raised. if <=0 - wait until done - by default wait take the service timeout
:return: Service execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.wait(execution_id='execution_id')
"""
url_path = "/executions/{}".format(execution_id)
elapsed = 0
start = int(time.time())
if timeout is not None and timeout <= 0:
timeout = np.inf
i = 1
while True:
success, response = self._client_api.gen_request(req_type="get",
path=url_path,
log_error=False)
if not success:
raise exceptions.PlatformException(response)
# return entity
execution = entities.Execution.from_json(client_api=self._client_api,
_json=response.json(),
project=self._project,
service=self._service)
if timeout is None:
timeout = execution.service.execution_timeout + 60
if execution.latest_status['status'] not in ['inProgress', 'created']:
break
elapsed = int(time.time()) - start
sleep_time = np.minimum(timeout - elapsed, 2 ** i)
time.sleep(sleep_time)
i += 1
if i > 18 or elapsed > timeout:
break
if execution is None:
raise ValueError('Nothing to wait for')
if elapsed >= timeout:
raise TimeoutError("execution wait() got timeout. id: {!r}, status: {}".format(
execution.id, execution.latest_status))
return execution
[docs] def terminate(self, execution: entities.Execution):
"""
Terminate Execution
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.execution.Execution execution:
:return: execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.terminate(execution='execution_entity')
"""
# request
success, response = self._client_api.gen_request(req_type='post',
path='/executions/{}/terminate'.format(execution.id))
# exception handling
if not success:
raise exceptions.PlatformException(response)
else:
return entities.Execution.from_json(_json=response.json(),
service=self._service,
project=self._project,
client_api=self._client_api)
[docs] def update(self, execution: entities.Execution) -> entities.Execution:
"""
Update execution changes to platform
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.execution.Execution execution: execution entity
:return: Service execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.update(execution='execution_entity')
"""
# payload
payload = execution.to_json()
# request
success, response = self._client_api.gen_request(req_type='patch',
path='/executions/{}'.format(execution.id),
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
if self._project is not None:
project = self._project
else:
project = execution._project
# return
if self._service is not None:
service = self._service
else:
service = execution._service
return entities.Execution.from_json(_json=response.json(),
service=service,
project=self._project,
client_api=self._client_api)
[docs] def progress_update(
self,
execution_id: str,
status: entities.ExecutionStatus = None,
percent_complete: int = None,
message: str = None,
output: str = None,
service_version: str = None
):
"""
Update Execution Progress.
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str execution_id: execution id
:param str status: ExecutionStatus
:param int percent_complete: percent work done
:param str message: message
:param str output: the output of the execution
:param str service_version: service version
:return: Service execution object
:rtype: dtlpy.entities.execution.Execution
**Example**:
.. code-block:: python
service.executions.progress_update(execution_id='execution_id', status='complete', percent_complete=100)
"""
# create payload
payload = dict()
if status is not None:
payload['status'] = status
else:
if percent_complete is not None and isinstance(percent_complete, int):
if percent_complete < 100:
payload['status'] = 'inProgress'
else:
payload['status'] = 'completed'
elif output is not None:
payload['status'] = 'completed'
else:
payload['status'] = 'inProgress'
if percent_complete is not None:
payload['percentComplete'] = percent_complete
if message is not None:
payload['message'] = message
if output is not None:
payload['output'] = output
if service_version is not None:
payload['serviceVersion'] = service_version
# request
success, response = self._client_api.gen_request(
req_type="post",
path="/executions/{}/progress".format(execution_id),
json_req=payload
)
# exception handling
if success:
return entities.Execution.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
service=self._service)
else:
raise exceptions.PlatformException(response)