import logging
from .. import entities, miscellaneous, exceptions, repositories, _api_reference
from ..services.api_client import ApiClient
logger = logging.getLogger(name='dtlpy')
[docs]class Triggers:
"""
Triggers Repository
The Triggers class allows users to manage triggers and their properties. Triggers activate services.
See our documentation for more information on `triggers <https://developers.dataloop.ai/tutorials/faas/concept/chapter/>`_.
"""
def __init__(self,
client_api: ApiClient,
project: entities.Project = None,
service: entities.Service = None,
project_id: str = None,
pipeline: entities.Pipeline = None):
self._client_api = client_api
self._project = project
self._service = service
self._pipeline = pipeline
if project_id is None:
if self._project is not None:
project_id = self._project.id
elif self._service is not None:
project_id = self._service.project_id
self._project_id = project_id
############
# entities #
############
@property
def service(self) -> entities.Service:
if self._service is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "service". need to set a Service entity or use service.triggers repository')
assert isinstance(self._service, entities.Service)
return self._service
@service.setter
def service(self, service: entities.Service):
if not isinstance(service, entities.Service):
raise ValueError('Must input a valid Service entity')
self._service = service
@property
def pipeline(self) -> entities.Pipeline:
if self._pipeline is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "pipeline". need to set a Pipeline entity or use pipeline.triggers repository')
assert isinstance(self._pipeline, entities.Pipeline)
return self._pipeline
@pipeline.setter
def pipeline(self, pipeline: entities.Pipeline):
if not isinstance(pipeline, entities.Pipeline):
raise ValueError('Must input a valid Service entity')
self._pipeline = pipeline
@property
def project(self) -> entities.Project:
if self._project is None:
if self._service is not None:
self._project = self._service._project
if self._project is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "project". need to set a Project entity or use project.triggers repository')
assert isinstance(self._project, entities.Project)
return self._project
@project.setter
def project(self, project: entities.Project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Project entity')
self._project = project
[docs] def name_validation(self, name: str):
"""
This method validates the trigger name. If name is not valid, this method will return an error. Otherwise, it will not return anything.
:param str name: trigger name
"""
url = '/piper-misc/naming/triggers/{}'.format(name)
# request
success, response = self._client_api.gen_request(req_type='get',
path=url)
if not success:
raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/triggers', method='post')
def create(self,
# for both trigger types
service_id: str = None,
trigger_type: entities.TriggerType = entities.TriggerType.EVENT,
name: str = None,
webhook_id=None,
function_name=entities.package_defaults.DEFAULT_PACKAGE_FUNCTION_NAME,
project_id=None,
active=True,
# for event trigger
filters=None,
resource: entities.TriggerResource = entities.TriggerResource.ITEM,
actions: entities.TriggerAction = None,
execution_mode: entities.TriggerExecutionMode = entities.TriggerExecutionMode.ONCE,
# for cron triggers
start_at=None,
end_at=None,
inputs=None,
cron=None,
pipeline_id=None,
pipeline=None,
pipeline_node_id=None,
root_node_namespace=None,
**kwargs) -> entities.BaseTrigger:
"""
Create a Trigger. Can create two types: a cron trigger or an event trigger.
Inputs are different for each type
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
Inputs for all types:
:param str service_id: Id of services to be triggered
:param str trigger_type: can be cron or event. use enum dl.TriggerType for the full list
:param str name: name of the trigger
:param str webhook_id: id for webhook to be called
:param str function_name: the function name to be called when triggered (must be defined in the package)
:param str project_id: project id where trigger will work
:param bool active: optional - True/False, default = True, if true trigger is active
Inputs for event trigger:
:param dtlpy.entities.filters.Filters filters: optional - Item/Annotation metadata filters, default = none
:param str resource: optional - Dataset/Item/Annotation/ItemStatus, default = Item
:param str actions: optional - Created/Updated/Deleted, default = create
:param str execution_mode: how many times trigger should be activated; default is "Once". enum dl.TriggerExecutionMode
Inputs for cron trigger:
:param start_at: iso format date string to start activating the cron trigger
:param end_at: iso format date string to end the cron activation
:param inputs: dictionary "name":"val" of inputs to the function
:param str cron: cron spec specifying when it should run. more information: https://en.wikipedia.org/wiki/Cron
:param str pipeline_id: Id of pipeline to be triggered
:param pipeline: pipeline entity to be triggered
:param str pipeline_node_id: Id of pipeline root node to be triggered
:param root_node_namespace: namespace of pipeline root node to be triggered
:return: Trigger entity
:rtype: dtlpy.entities.trigger.Trigger
**Example**:
.. code-block:: python
service.triggers.create(name='triggername',
execution_mode=dl.TriggerExecutionMode.ONCE,
resource='Item',
actions='Created',
function_name='run',
filters={'$and': [{'hidden': False},
{'type': 'file'}]}
)
"""
scope = kwargs.get('scope', None)
if service_id is None and webhook_id is None and pipeline_id is None and pipeline is None:
if self._service is not None:
service_id = self._service.id
elif self._pipeline is not None:
pipeline = self._pipeline
pipeline_id = self._pipeline.id
if pipeline is not None:
pipeline_id = pipeline.id
# type
input_num = sum(input_id is not None for input_id in [service_id, webhook_id, pipeline_id])
if input_num != 1:
raise exceptions.PlatformException('400',
'Must provide only one of service id, webhook id, pipeline id or pipeline')
if pipeline_id is not None:
if pipeline is None:
pipeline = repositories.Pipelines(client_api=self._client_api).get(pipeline_id=pipeline_id)
if pipeline_node_id is None:
if pipeline.start_nodes:
for pipe_node in pipeline.start_nodes:
if pipe_node['type'] == 'root':
pipeline_node_id = pipe_node['nodeId']
if pipeline_node_id is None:
raise exceptions.PlatformException('400', 'Must provide pipeline node id')
if not actions:
actions = [entities.TriggerAction.CREATED]
pipeline.nodes.get(node_id=pipeline_node_id).add_trigger(
trigger_type=trigger_type,
filters=filters,
resource=resource,
actions=actions,
execution_mode=execution_mode,
cron=cron,
)
logger.info("The trigger will not create until pipeline is install")
pipeline.update()
return True
else:
if name is None:
if self._service is not None:
name = self._service.name
else:
name = 'defaulttrigger'
if filters is None:
filters = dict()
elif isinstance(filters, entities.Filters):
filters = filters.prepare(query_only=True).get('filter', dict())
if webhook_id is not None:
operation = {
'type': 'webhook',
'webhookId': webhook_id
}
else:
operation = {
'type': 'function',
'serviceId': service_id,
'functionName': function_name
}
if actions is not None:
if not isinstance(actions, list):
actions = [actions]
else:
actions = [entities.TriggerAction.CREATED]
if len(actions) == 0:
actions = [entities.TriggerAction.CREATED]
if trigger_type == entities.TriggerType.EVENT:
spec = {
'filter': filters,
'resource': resource,
'executionMode': execution_mode,
'actions': actions
}
elif trigger_type == entities.TriggerType.CRON:
spec = {
'endAt': end_at,
'startAt': start_at,
'cron': cron,
}
else:
raise ValueError('Unknown trigger type: "{}". Use dl.TriggerType for known types'.format(trigger_type))
spec['input'] = dict() if inputs is None else inputs
spec['operation'] = operation
# payload
if self._project_id is None and project_id is None:
raise exceptions.PlatformException('400', 'Please provide a project id')
elif project_id is None:
project_id = self._project_id
payload = {
'type': trigger_type,
'active': active,
'projectId': project_id,
'name': name,
'spec': spec
}
if scope is not None:
logger.warning(
"Only superuser is allowed to define a trigger's scope. "
"If you are not a superuser you will not be able to perform this action")
payload['scope'] = scope
# request
success, response = self._client_api.gen_request(req_type='post',
path='/triggers',
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
return entities.BaseTrigger.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project if self._project_id == project_id else None,
service=self._service)
[docs] @_api_reference.add(path='/triggers/{id}', method='get')
def get(self, trigger_id=None, trigger_name=None) -> entities.BaseTrigger:
"""
Get Trigger object
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str trigger_id: trigger id
:param str trigger_name: trigger name
:return: Trigger entity
:rtype: dtlpy.entities.trigger.Trigger
**Example**:
.. code-block:: python
service.triggers.get(trigger_id='trigger_id')
"""
# request
if trigger_id is not None:
success, response = self._client_api.gen_request(
req_type="get",
path="/triggers/{}".format(trigger_id)
)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
trigger = entities.BaseTrigger.from_json(client_api=self._client_api,
_json=response.json(),
project=self._project,
service=self._service)
# verify input trigger name is same as the given id
if trigger_name is not None and trigger.name != trigger_name:
logger.warning(
"Mismatch found in triggers.get: trigger_name is different then trigger.name:"
" {!r} != {!r}".format(
trigger_name,
trigger.name))
else:
if trigger_name is None:
raise exceptions.PlatformException('400', 'Must provide either trigger name or trigger id')
else:
filters = self.__generate_default_filter()
filters.add(field='name', values=trigger_name)
triggers = self.list(filters)
if triggers.items_count == 0:
raise exceptions.PlatformException('404', 'Trigger not found')
elif triggers.items_count == 1:
trigger = triggers.items[0]
else:
raise exceptions.PlatformException('404',
'More than one trigger by name {} exist'.format(trigger_name))
return trigger
[docs] @_api_reference.add(path='/triggers/{id}', method='delete')
def delete(self, trigger_id=None, trigger_name=None):
"""
Delete Trigger object
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param str trigger_id: trigger id
:param str trigger_name: trigger name
:return: True is successful error if not
:rtype: bool
**Example**:
.. code-block:: python
service.triggers.delete(trigger_id='trigger_id')
"""
if trigger_id is None:
if trigger_name is None:
raise exceptions.PlatformException('400', 'Must provide either trigger name or trigger id')
else:
trigger_id = self.get(trigger_name=trigger_name).id
# request
success, response = self._client_api.gen_request(
req_type="delete",
path="/triggers/{}".format(trigger_id)
)
# exception handling
if not success:
raise exceptions.PlatformException(response)
return True
[docs] @_api_reference.add(path='/triggers/{id}', method='patch')
def update(self, trigger: entities.BaseTrigger) -> entities.BaseTrigger:
"""
Update trigger
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.trigger.Trigger trigger: Trigger entity
:return: Trigger entity
:rtype: dtlpy.entities.trigger.Trigger
**Example**:
.. code-block:: python
service.triggers.update(trigger='trigger_entity')
"""
# payload
payload = trigger.to_json()
# request
success, response = self._client_api.gen_request(req_type='patch',
path='/triggers/{}'.format(trigger.id),
json_req=payload)
# exception handling
if not success:
raise exceptions.PlatformException(response)
# return entity
return entities.BaseTrigger.from_json(_json=response.json(),
client_api=self._client_api,
project=self._project,
service=self._service)
def _build_entities_from_response(self, response_items) -> miscellaneous.List[entities.BaseTrigger]:
pool = self._client_api.thread_pools(pool_name='entity.create')
jobs = [None for _ in range(len(response_items))]
# return triggers list
for i_trigger, trigger in enumerate(response_items):
jobs[i_trigger] = pool.submit(entities.BaseTrigger._protected_from_json,
**{'client_api': self._client_api,
'_json': trigger,
'project': self._project,
'service': self._service})
# get all results
results = [j.result() for j in jobs]
# log errors
_ = [logger.warning(r[1]) for r in results if r[0] is False]
# return good jobs
triggers = miscellaneous.List([r[1] for r in results if r[0] is True])
return triggers
def _list(self, filters: entities.Filters):
"""
List project triggers
:return:
"""
url = '/query/faas'
success, response = self._client_api.gen_request(req_type='POST',
path=url,
json_req=filters.prepare())
if not success:
raise exceptions.PlatformException(response)
return response.json()
def __generate_default_filter(self):
filters = entities.Filters(resource=entities.FiltersResource.TRIGGER)
if self._project is not None:
filters.add(field='projectId', values=self._project.id)
if self._service is not None:
filters.add(field='spec.operation.serviceId', values=self._service.id)
if self._pipeline is not None:
filters.add(field='spec.operation.id', values=self._pipeline.id)
return filters
[docs] @_api_reference.add(path='/query/faas', method='post')
def list(self, filters: entities.Filters = None) -> entities.PagedEntities:
"""
List triggers of a project, package, or service.
**Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a service.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:return: Paged entity
:rtype: dtlpy.entities.paged_entities.PagedEntities
**Example**:
.. code-block:: python
service.triggers.list()
"""
if filters is None:
filters = self.__generate_default_filter()
# assert type filters
elif not isinstance(filters, entities.Filters):
raise exceptions.PlatformException(error='400',
message='Unknown filters type: {!r}'.format(type(filters)))
if filters.resource != entities.FiltersResource.TRIGGER:
raise exceptions.PlatformException(
error='400',
message='Filters resource must to be FiltersResource.TRIGGER. Got: {!r}'.format(filters.resource))
paged = entities.PagedEntities(items_repository=self,
filters=filters,
page_offset=filters.page,
page_size=filters.page_size,
client_api=self._client_api)
paged.get_page()
return paged