import traceback
from enum import Enum
from typing import Union, List
import attr
import logging
from .. import repositories, entities, exceptions
logger = logging.getLogger(name='dtlpy')
[docs]class ConsensusTaskType(str, Enum):
CONSENSUS = 'consensus'
QUALIFICATION = 'qualification'
HONEYPOT = 'honeypot'
[docs]class TaskPriority(int, Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
class ItemAction:
def __init__(self, action, display_name=None, color='#FFFFFF', icon=None):
if not action or not isinstance(action, str):
raise ValueError('action should be a non-empty string')
self.action = action
if not display_name:
display_name = action
self.display_name = display_name
self.color = color
self.icon = icon
@classmethod
def from_json(cls, _json: dict):
kwarg = {
'action': _json.get('action')
}
if _json.get('displayName', False):
kwarg['display_name'] = _json['displayName']
if _json.get('color', False):
kwarg['color'] = _json['color']
if _json.get('icon', False):
kwarg['icon'] = _json['icon']
return cls(**kwarg)
def to_json(self) -> dict:
_json = {
'action': self.action,
'color': self.color,
'displayName': self.display_name if self.display_name is not None else self.action
}
if self.icon is not None:
_json['icon'] = self.icon
return _json
[docs]@attr.s
class Task:
"""
Task object
"""
# platform
name = attr.ib()
status = attr.ib()
project_id = attr.ib()
metadata = attr.ib(repr=False)
id = attr.ib()
url = attr.ib(repr=False)
task_owner = attr.ib(repr=False)
item_status = attr.ib(repr=False)
creator = attr.ib()
due_date = attr.ib()
dataset_id = attr.ib()
spec = attr.ib()
recipe_id = attr.ib(repr=False)
query = attr.ib(repr=False)
assignmentIds = attr.ib(repr=False)
annotation_status = attr.ib(repr=False)
progress = attr.ib()
for_review = attr.ib()
issues = attr.ib()
updated_at = attr.ib()
created_at = attr.ib()
available_actions = attr.ib()
total_items = attr.ib()
priority = attr.ib()
_description = attr.ib()
# sdk
_client_api = attr.ib(repr=False)
_current_assignments = attr.ib(default=None, repr=False)
_assignments = attr.ib(default=None, repr=False)
_project = attr.ib(default=None, repr=False)
_dataset = attr.ib(default=None, repr=False)
_tasks = attr.ib(default=None, repr=False)
_settings = attr.ib(default=None, repr=False)
@property
def description(self):
return self._description
@description.setter
def description(self, description):
if not isinstance(description, str):
raise ValueError('description should be a string')
if self._description is None:
self._description = {}
self._description['content'] = description
@staticmethod
def _protected_from_json(_json, client_api, project, dataset):
"""
Same as from_json but with try-except to catch if error
:param dict _json: platform json that describe the task
:param client_api: ApiClient object
:param dtlpy.entities.project.Project project: project object where task will create
:param dtlpy.entities.dataset.Dataset dataset: dataset object that refer to the task
:return:
"""
try:
task = Task.from_json(
_json=_json,
client_api=client_api,
project=project,
dataset=dataset
)
status = True
except Exception:
task = traceback.format_exc()
status = False
return status, task
[docs] @classmethod
def from_json(cls, _json, client_api, project=None, dataset=None):
"""
Return the task object form the json
:param dict _json: platform json that describe the task
:param client_api: ApiClient object
:param dtlpy.entities.project.Project project: project object where task will create
:param dtlpy.entities.dataset.Dataset dataset: dataset object that refer to the task
:return:
"""
if project is not None:
if project.id != _json.get('projectId', None):
logger.warning('Task has been fetched from a project that is not belong to it')
project = None
if dataset is not None:
if dataset.id != _json.get('datasetId', None):
logger.warning('Task has been fetched from a dataset that is not belong to it')
dataset = None
actions = [ItemAction.from_json(_json=action) for action in _json.get('availableActions', list())]
return cls(
name=_json.get('name', None),
status=_json.get('status', None),
project_id=_json.get('projectId', None),
metadata=_json.get('metadata', dict()),
url=_json.get('url', None),
spec=_json.get('spec', None),
id=_json['id'],
creator=_json.get('creator', None),
due_date=_json.get('dueDate', 0),
dataset_id=_json.get('datasetId', None),
recipe_id=_json.get('recipeId', None),
query=_json.get('query', None),
task_owner=_json.get('taskOwner', None),
item_status=_json.get('itemStatus', None),
assignmentIds=_json.get('assignmentIds', list()),
dataset=dataset,
project=project,
client_api=client_api,
annotation_status=_json.get('annotationStatus', None),
progress=_json.get('progress', None),
for_review=_json.get('forReview', None),
issues=_json.get('issues', None),
updated_at=_json.get('updatedAt', None),
created_at=_json.get('createdAt', None),
available_actions=actions,
total_items=_json.get('totalItems', None),
priority=_json.get('priority', None),
description=_json.get('description', None)
)
[docs] def to_json(self):
"""
Returns platform _json format of object
:return: platform json format of object
:rtype: dict
"""
_json = attr.asdict(
self, filter=attr.filters.exclude(
attr.fields(Task)._client_api,
attr.fields(Task)._project,
attr.fields(Task).project_id,
attr.fields(Task).dataset_id,
attr.fields(Task).recipe_id,
attr.fields(Task).task_owner,
attr.fields(Task).available_actions,
attr.fields(Task).item_status,
attr.fields(Task).due_date,
attr.fields(Task)._tasks,
attr.fields(Task)._dataset,
attr.fields(Task)._current_assignments,
attr.fields(Task)._assignments,
attr.fields(Task).annotation_status,
attr.fields(Task).for_review,
attr.fields(Task).issues,
attr.fields(Task).updated_at,
attr.fields(Task).created_at,
attr.fields(Task).total_items,
attr.fields(Task)._settings,
attr.fields(Task)._description
)
)
_json['projectId'] = self.project_id
_json['datasetId'] = self.dataset_id
_json['recipeId'] = self.recipe_id
_json['taskOwner'] = self.task_owner
_json['dueDate'] = self.due_date
_json['totalItems'] = self.total_items
_json['forReview'] = self.for_review
_json['description'] = self.description
if self.available_actions is not None:
_json['availableActions'] = [action.to_json() for action in self.available_actions]
return _json
@property
def platform_url(self):
return self._client_api._get_resource_url("projects/{}/tasks/{}".format(self.project.id, self.id))
@property
def current_assignments(self):
if self._current_assignments is None:
self._current_assignments = list()
for assignment in self.assignmentIds:
self._current_assignments.append(self.assignments.get(assignment_id=assignment))
return self._current_assignments
@property
def assignments(self):
if self._assignments is None:
self._assignments = repositories.Assignments(client_api=self._client_api, dataset=self._dataset,
project=self.project, task=self, project_id=self.project_id)
assert isinstance(self._assignments, repositories.Assignments)
return self._assignments
@property
def tasks(self):
if self._tasks is None:
self._tasks = repositories.Tasks(client_api=self._client_api, project=self.project, dataset=self.dataset)
assert isinstance(self._tasks, repositories.Tasks)
return self._tasks
@property
def settings(self):
if self._settings is None:
self._settings = repositories.Settings(
client_api=self._client_api,
project=self.project,
dataset=self.dataset,
task=self
)
assert isinstance(self._settings, repositories.Settings)
return self._settings
@property
def project(self):
if self._project is None:
self.get_project()
if self._project is None:
raise exceptions.PlatformException(error='2001',
message='Missing entity "project". need to "get_project()" ')
assert isinstance(self._project, entities.Project)
return self._project
@property
def dataset(self):
if self._dataset is None:
self.get_dataset()
if self._dataset is None:
raise exceptions.PlatformException(error='2001',
message='Missing entity "dataset". need to "get_dataset()" ')
assert isinstance(self._dataset, entities.Dataset)
return self._dataset
def get_project(self):
if self._project is None:
self._project = repositories.Projects(client_api=self._client_api).get(project_id=self.project_id)
def get_dataset(self):
if self._dataset is None:
self._dataset = repositories.Datasets(client_api=self._client_api, project=self._project).get(
dataset_id=self.dataset_id)
[docs] def open_in_web(self):
"""
Open the task in web platform
:return:
"""
self._client_api._open_in_web(url=self.platform_url)
[docs] def delete(self, wait=True):
"""
Delete task from platform
:param bool wait: wait until delete task finish
:return: True
:rtype: bool
"""
return self.tasks.delete(task_id=self.id, wait=wait)
[docs] def update(self, system_metadata=False):
"""
Update an Annotation Task
:param bool system_metadata: True, if you want to change metadata system
"""
return self.tasks.update(task=self, system_metadata=system_metadata)
[docs] def create_qa_task(self,
due_date,
assignee_ids,
filters=None,
items=None,
query=None,
workload=None,
metadata=None,
available_actions=None,
wait=True,
batch_size=None,
max_batch_workload=None,
allowed_assignees=None,
priority=TaskPriority.MEDIUM
):
"""
Create a new QA Task
:param float due_date: date by which the QA task should be finished; for example, due_date=datetime.datetime(day=1, month=1, year=2029).timestamp()
:param list assignee_ids: list the QA task assignees (contributors) that should be working on the task. Provide a list of users' emails
:param entities.Filters filters: dl.Filters entity to filter items for the task
:param List[entities.Item] items: list of items (item Id or objects) to insert to the task
:param dict DQL query: filter items for the task
:param List[WorkloadUnit] workload: list of WorkloadUnit objects. Customize distribution (percentage) between the task assignees. For example: [dl.WorkloadUnit(annotator@hi.com, 80), dl.WorkloadUnit(annotator2@hi.com, 20)]
:param dict metadata: metadata for the task
:param list available_actions: list of available actions (statuses) that will be available for the task items; The default statuses are: "approved" and "discard"
:param bool wait: wait until create task finish
:param int batch_size: Pulling batch size (items), use with pulling allocation method. Restrictions - Min 3, max 100
:param int max_batch_workload: Max items in assignment, use with pulling allocation method. Restrictions - Min batchSize + 2, max batchSize * 2
:param list allowed_assignees: list the task assignees (contributors) that should be working on the task. Provide a list of users' emails
:param entities.TaskPriority priority: priority of the task options in entities.TaskPriority
:return: task object
:rtype: dtlpy.entities.task.Task
**Example**:
.. code-block:: python
task = task.create_qa_task(due_date = datetime.datetime(day= 1, month= 1, year= 2029).timestamp(),
assignee_ids =[ 'annotator1@dataloop.ai', 'annotator2@dataloop.ai'])
"""
return self.tasks.create_qa_task(task=self,
due_date=due_date,
assignee_ids=assignee_ids,
filters=filters,
items=items,
query=query,
workload=workload,
metadata=metadata,
available_actions=available_actions,
wait=wait,
batch_size=batch_size,
max_batch_workload=max_batch_workload,
allowed_assignees=allowed_assignees,
priority=priority
)
[docs] def create_assignment(self, assignment_name, assignee_id, items=None, filters=None):
"""
Create a new assignment
:param str assignment_name: assignment name
:param str assignee_id: the assignment assignees (contributors) that should be working on the task. Provide a user email
:param List[entities.Item] items: list of items (item Id or objects) to insert to the task
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:return: Assignment object
:rtype: dtlpy.entities.assignment.Assignment assignment
**Example**:
.. code-block:: python
assignment = task.create_assignment(assignee_id='annotator1@dataloop.ai')
"""
assignment = self.assignments.create(assignee_id=assignee_id,
filters=filters,
items=items)
assignment.metadata['system']['taskId'] = self.id
assignment.update(system_metadata=True)
self.assignmentIds.append(assignment.id)
self.update()
self.add_items(filters=filters, items=items)
return assignment
[docs] def add_items(self, filters=None, items=None, assignee_ids=None, workload=None, limit=None, wait=True, query=None):
"""
Add items to Task
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param list items: list of items (item Ids or objects) to add to the task
:param list assignee_ids: list to assignee who works in the task
:param list workload: list of WorkloadUnit objects. Customize distribution (percentage) between the task assignees. For example: [dl.WorkloadUnit(annotator@hi.com, 80), dl.WorkloadUnit(annotator2@hi.com, 20)]
:param int limit: the limit items that task can include
:param bool wait: wait until add items will to finish
:param dict query: query to filter the items for the task
:return: task entity
:rtype: dtlpy.entities.task.Task
"""
return self.tasks.add_items(task=self,
filters=filters,
items=items,
assignee_ids=assignee_ids,
workload=workload,
limit=limit,
wait=wait,
query=query)
[docs] def remove_items(self,
filters: entities.Filters = None,
query=None,
items=None,
wait=True):
"""
remove items from Task.
**Prerequisites**: You must be in the role of an *owner*, *developer*, or *annotation manager* who has been assigned to be *owner* of the annotation task.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param dict query: query to filter the items use it
:param list items: list of items to add to the task
:param bool wait: wait until remove items finish
:return: True if success and an error if failed
:rtype: bool
"""
return self.tasks.remove_items(task=self,
query=query,
filters=filters,
items=items,
wait=wait)
[docs] def get_items(self, filters=None, get_consensus_items: bool = False):
"""
Get the task items
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:return: list of the items or PagedEntity output of items
:rtype: list or dtlpy.entities.paged_entities.PagedEntities
"""
return self.tasks.get_items(task_id=self.id, dataset=self.dataset, filters=filters, get_consensus_items=get_consensus_items)
[docs] def set_status(self, status: str, operation: str, item_ids: List[str]):
"""
Update item status within task
:param str status: string the describes the status
:param str operation: the status action need 'create' or 'delete'
:param list item_ids: List[str] id items ids
:return: True if success
:rtype: bool
"""
return self.tasks.set_status(status=status, operation=operation, item_ids=item_ids, task_id=self.id)