from copy import deepcopy
import traceback
import logging
import json
import jwt
import os
from PIL import Image
from io import BytesIO
import base64
from .. import entities, exceptions, miscellaneous, _api_reference
from ..services.api_client import ApiClient
logger = logging.getLogger(name='dtlpy')
[docs]class Annotations:
"""
Annotations Repository
The Annotation class allows you to manage the annotations of data items. For information on annotations explore our
documentation at:
`Classification SDK <https://developers.dataloop.ai/tutorials/annotations_image/classification_point_and_pose/chapter/>`_,
`Annotation Labels and Attributes <https://developers.dataloop.ai/tutorials/data_management/upload_and_manage_annotations/chapter/#set-attributes-on-annotations>`_,
`Show Video with Annotations <https://developers.dataloop.ai/tutorials/annotations_video/video_annotations/chapter/>`_.
"""
def __init__(self, client_api: ApiClient, item=None, dataset=None, dataset_id=None):
self._client_api = client_api
self._item = item
self._dataset = dataset
self._upload_batch_size = 100
if dataset_id is None:
if dataset is not None:
dataset_id = dataset.id
elif item is not None:
dataset_id = item.dataset_id
self._dataset_id = dataset_id
############
# entities #
############
@property
def dataset(self):
if self._dataset is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "dataset". need to set a Dataset entity or use dataset.annotations repository')
assert isinstance(self._dataset, entities.Dataset)
return self._dataset
@dataset.setter
def dataset(self, dataset: entities.Dataset):
if not isinstance(dataset, entities.Dataset):
raise ValueError('Must input a valid Dataset entity')
self._dataset = dataset
@property
def item(self):
if self._item is None:
raise exceptions.PlatformException(
error='2001',
message='Missing "item". need to set an Item entity or use item.annotations repository')
assert isinstance(self._item, entities.Item)
return self._item
@item.setter
def item(self, item: entities.Item):
if not isinstance(item, entities.Item):
raise ValueError('Must input a valid Item entity')
self._item = item
###########
# methods #
###########
[docs] @_api_reference.add(path='/annotations/{annotationId}', method='get')
def get(self, annotation_id: str) -> entities.Annotation:
"""
Get a single annotation.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:param str annotation_id: The id of the annotation
:return: Annotation object or None
:return: Annotation object or None
:rtype: dtlpy.entities.annotation.Annotation
**Example**:
.. code-block:: python
annotation = item.annotations.get(annotation_id='annotation_id')
"""
success, response = self._client_api.gen_request(req_type='get',
path='/annotations/{}'.format(annotation_id))
if success:
annotation = entities.Annotation.from_json(_json=response.json(),
annotations=self,
dataset=self._dataset,
client_api=self._client_api,
item=self._item)
else:
raise exceptions.PlatformException(response)
return annotation
def _build_entities_from_response(self, response_items):
pool = self._client_api.thread_pools(pool_name='entity.create')
jobs = [None for _ in range(len(response_items))]
# return triggers list
for i_json, _json in enumerate(response_items):
jobs[i_json] = pool.submit(entities.Annotation._protected_from_json,
**{'client_api': self._client_api,
'_json': _json,
'item': self._item,
'dataset': self._dataset,
'annotations': self})
# get all results
results = [j.result() for j in jobs]
# log errors
_ = [logger.warning(r[1]) for r in results if r[0] is False]
# return good jobs
return miscellaneous.List([r[1] for r in results if r[0] is True])
def _list(self, filters: entities.Filters):
"""
Get a dataset's item list. This is a browsing endpoint. For any given path, item count will be returned.
The user is then expected to perform another request for every folder to actually get its item list.
:param dtlpy.entities.filters.Filters filters: Filter entity or a dictionary containing filters parameters
:return: json response
:rtype:
"""
# prepare request
success, response = self._client_api.gen_request(req_type="POST",
path="/datasets/{}/query".format(self._dataset_id),
json_req=filters.prepare(),
headers={'user_query': filters._user_query}
)
if not success:
raise exceptions.PlatformException(response)
return response.json()
[docs] @_api_reference.add(path='/datasets/{id}/query', method='post')
def list(self, filters: entities.Filters = None, page_offset: int = None, page_size: int = None):
"""
List Annotations of a specific item. You must get the item first and then list the annotations with the desired filters.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param int page_offset: starting page
:param int page_size: size of page
:return: Pages object
:rtype: dtlpy.entities.paged_entities.PagedEntities
**Example**:
.. code-block:: python
annotations = item.annotations.list(filters=dl.Filters(
resource=dl.FiltersResource.ANNOTATION,
field='type',
values='box'),
page_size=100,
page_offset=0)
"""
if self._dataset_id is not None:
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.ANNOTATION)
filters._user_query = 'false'
if not filters.resource == entities.FiltersResource.ANNOTATION:
raise exceptions.PlatformException(error='400',
message='Filters resource must to be FiltersResource.ANNOTATION')
if self._item is not None and not filters.has_field('itemId'):
filters = deepcopy(filters)
filters.page_size = 1000
filters.add(field='itemId', values=self.item.id, method=entities.FiltersMethod.AND)
# assert type filters
if not isinstance(filters, entities.Filters):
raise exceptions.PlatformException('400', 'Unknown filters type')
# page size
if page_size is None:
# take from default
page_size = filters.page_size
else:
filters.page_size = page_size
# page offset
if page_offset is None:
# take from default
page_offset = filters.page
else:
filters.page = page_offset
paged = entities.PagedEntities(items_repository=self,
filters=filters,
page_offset=page_offset,
page_size=page_size,
client_api=self._client_api)
paged.get_page()
if self._item is not None:
if paged.total_pages_count > 1:
annotations = list()
for page in paged:
annotations += page
else:
annotations = paged.items
return entities.AnnotationCollection(annotations=annotations, item=self._item)
else:
return paged
else:
raise exceptions.PlatformException('400',
'Please use item.annotations.list() or dataset.annotations.list() '
'to perform this action.')
[docs] def show(self,
image=None,
thickness: int = 1,
with_text: bool = False,
height: float = None,
width: float = None,
annotation_format: entities.ViewAnnotationOptions = entities.ViewAnnotationOptions.MASK,
alpha: float = 1):
"""
Show annotations. To use this method, you must get the item first and then show the annotations with
the desired filters. The method returns an array showing all the annotations.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:param ndarray image: empty or image to draw on
:param int thickness: optional - line thickness, default=1
:param bool with_text: add label to annotation
:param float height: item height
:param float width: item width
:param str annotation_format: the format that want to show ,options: list(dl.ViewAnnotationOptions)
:param float alpha: opacity value [0 1], default 1
:return: ndarray of the annotations
:rtype: ndarray
**Example**:
.. code-block:: python
image = item.annotations.show(image='nd array',
thickness=1,
with_text=False,
height=100,
width=100,
annotation_format=dl.ViewAnnotationOptions.MASK,
alpha=1)
"""
# get item's annotations
annotations = self.list()
return annotations.show(image=image,
width=width,
height=height,
thickness=thickness,
alpha=alpha,
with_text=with_text,
annotation_format=annotation_format)
[docs] def download(self,
filepath: str,
annotation_format: entities.ViewAnnotationOptions = entities.ViewAnnotationOptions.JSON,
img_filepath: str = None,
height: float = None,
width: float = None,
thickness: int = 1,
with_text: bool = False,
alpha: float = 1):
"""
Save annotation to file.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:param str filepath: Target download directory
:param str annotation_format: the format that want to download ,options: list(dl.ViewAnnotationOptions)
:param str img_filepath: img file path - needed for img_mask
:param float height: optional - image height
:param float width: optional - image width
:param int thickness: optional - line thickness, default=1
:param bool with_text: optional - draw annotation with text, default = False
:param float alpha: opacity value [0 1], default 1
:return: file path to where save the annotations
:rtype: str
**Example**:
.. code-block:: python
file_path = item.annotations.download(
filepath='file_path',
annotation_format=dl.ViewAnnotationOptions.MASK,
img_filepath='img_filepath',
height=100,
width=100,
thickness=1,
with_text=False,
alpha=1)
"""
# get item's annotations
annotations = self.list()
if 'text' in self.item.metadata.get('system').get('mimetype', ''):
annotation_format = entities.ViewAnnotationOptions.JSON
elif 'audio' not in self.item.metadata.get('system').get('mimetype', ''):
# height/weight
if height is None:
if self.item.height is None:
raise exceptions.PlatformException('400', 'Height must be provided')
height = self.item.height
if width is None:
if self.item.width is None:
raise exceptions.PlatformException('400', 'Width must be provided')
width = self.item.width
return annotations.download(filepath=filepath,
img_filepath=img_filepath,
width=width,
height=height,
thickness=thickness,
with_text=with_text,
annotation_format=annotation_format,
alpha=alpha)
def _delete_single_annotation(self, w_annotation_id):
try:
creator = jwt.decode(self._client_api.token, algorithms=['HS256'],
verify=False, options={'verify_signature': False})['email']
payload = {'username': creator}
success, response = self._client_api.gen_request(req_type='delete',
path='/annotations/{}'.format(w_annotation_id),
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return success
except Exception:
logger.exception('Failed to delete annotation')
raise
[docs] @_api_reference.add(path='/annotations/{annotationId}', method='delete')
def delete(self, annotation: entities.Annotation = None,
annotation_id: str = None,
filters: entities.Filters = None) -> bool:
"""
Remove an annotation from item.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:param dtlpy.entities.annotation.Annotation annotation: Annotation object
:param str annotation_id: The id of the annotation
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:return: True/False
:rtype: bool
**Example**:
.. code-block:: python
is_deleted = item.annotations.delete(annotation_id='annotation_id')
"""
if annotation is not None:
if isinstance(annotation, entities.Annotation):
annotation_id = annotation.id
elif isinstance(annotation, str) and annotation.lower() == 'all':
if self._item is None:
raise exceptions.PlatformException(error='400',
message='To use "all" option repository must have an item')
filters = entities.Filters(
resource=entities.FiltersResource.ANNOTATION,
field='itemId',
values=self._item.id,
method=entities.FiltersMethod.AND
)
else:
raise exceptions.PlatformException(error='400',
message='Unknown annotation type')
if annotation_id is not None:
if not isinstance(annotation_id, list):
return self._delete_single_annotation(w_annotation_id=annotation_id)
filters = entities.Filters(resource=entities.FiltersResource.ANNOTATION,
field='annotationId',
values=annotation_id,
operator=entities.FiltersOperations.IN)
filters.pop(field="type")
if filters is None:
raise exceptions.PlatformException(error='400',
message='Must input filter, annotation id or annotation entity')
if not filters.resource == entities.FiltersResource.ANNOTATION:
raise exceptions.PlatformException(error='400',
message='Filters resource must to be FiltersResource.ANNOTATION')
if self._item is not None and not filters.has_field('itemId'):
filters = deepcopy(filters)
filters.add(field='itemId', values=self._item.id, method=entities.FiltersMethod.AND)
if self._dataset is not None:
items_repo = self._dataset.items
elif self._item is not None:
items_repo = self._item.dataset.items
else:
raise exceptions.PlatformException(
error='2001',
message='Missing "dataset". need to set a Dataset entity or use dataset.annotations repository')
return items_repo.delete(filters=filters)
@staticmethod
def _update_snapshots(origin, modified):
"""
Update the snapshots if a change occurred and return a list of the new snapshots with the flag to update.
"""
update = False
origin_snapshots = list()
origin_metadata = origin['metadata'].get('system', None)
if origin_metadata:
origin_snapshots = origin_metadata.get('snapshots_', None)
if origin_snapshots is not None:
modified_snapshots = modified['metadata'].get('system', dict()).get('snapshots_', None)
# if the number of the snapshots change
if len(origin_snapshots) != len(modified_snapshots):
origin_snapshots = modified_snapshots
update = True
i = 0
# if some snapshot change
while i < len(origin_snapshots) and not update:
if origin_snapshots[i] != modified_snapshots[i]:
origin_snapshots = modified_snapshots
update = True
break
i += 1
return update, origin_snapshots
def _update_single_annotation(self, w_annotation, system_metadata):
try:
if isinstance(w_annotation, entities.Annotation):
if w_annotation.id is None:
raise exceptions.PlatformException(
'400',
'Cannot update annotation because it was not fetched'
' from platform and therefore does not have an id'
)
annotation_id = w_annotation.id
else:
raise exceptions.PlatformException('400',
'unknown annotations type: {}'.format(type(w_annotation)))
origin = w_annotation._platform_dict
modified = w_annotation.to_json()
# check snapshots
update, updated_snapshots = self._update_snapshots(origin=origin,
modified=modified)
# pop the snapshots to make the diff work with out them
origin.get('metadata', dict()).get('system', dict()).pop('snapshots_', None)
modified.get('metadata', dict()).get('system', dict()).pop('snapshots_', None)
# check diffs in the json
json_req = miscellaneous.DictDiffer.diff(origin=origin,
modified=modified)
# add the new snapshots if exist
if updated_snapshots and update:
if 'metadata' not in json_req:
json_req['metadata'] = dict()
if 'system' not in json_req['metadata']:
json_req['metadata']['system'] = dict()
json_req['metadata']['system']['snapshots_'] = updated_snapshots
# no changes happen
if not json_req and not updated_snapshots:
status = True
result = w_annotation
else:
suc, response = self._update_annotation_req(annotation_json=json_req,
system_metadata=system_metadata,
annotation_id=annotation_id)
if suc:
result = entities.Annotation.from_json(_json=response.json(),
annotations=self,
dataset=self._dataset,
item=self._item)
w_annotation._platform_dict = result._platform_dict
else:
raise exceptions.PlatformException(response)
status = True
except Exception:
status = False
result = traceback.format_exc()
return status, result
def _update_annotation_req(self, annotation_json, system_metadata, annotation_id):
url_path = '/annotations/{}'.format(annotation_id)
if system_metadata:
url_path += '?system=true'
suc, response = self._client_api.gen_request(req_type='put',
path=url_path,
json_req=annotation_json)
return suc, response
[docs] @_api_reference.add(path='/annotations/{annotationId}', method='put')
def update(self, annotations, system_metadata=False):
"""
Update an existing annotation. For example, you may change the annotation's label and then use the update method.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:param dtlpy.entities.annotation.Annotation annotations: Annotation object
:param bool system_metadata: bool - True, if you want to change metadata system
:return: True if successful or error if unsuccessful
:rtype: bool
**Example**:
.. code-block:: python
annotations = item.annotations.update(annotation='annotation')
"""
pool = self._client_api.thread_pools(pool_name='annotation.update')
if not isinstance(annotations, list):
annotations = [annotations]
jobs = [None for _ in range(len(annotations))]
for i_ann, ann in enumerate(annotations):
jobs[i_ann] = pool.submit(self._update_single_annotation,
**{'w_annotation': ann,
'system_metadata': system_metadata})
# get all results
results = [j.result() for j in jobs]
out_annotations = [r[1] for r in results if r[0] is True]
out_errors = [r[1] for r in results if r[0] is False]
if len(out_errors) == 0:
logger.debug('Annotation/s updated successfully. {}/{}'.format(len(out_annotations), len(results)))
else:
logger.error(out_errors)
logger.error('Annotation/s updated with {} errors'.format(len(out_errors)))
return out_annotations
@staticmethod
def _annotation_encoding(annotation):
metadata = annotation.get('metadata', dict())
system = metadata.get('system', dict())
snapshots = system.get('snapshots_', list())
last_frame = {
'label': annotation.get('label', None),
'attributes': annotation.get('attributes', None),
'type': annotation.get('type', None),
'data': annotation.get('coordinates', None),
'fixed': snapshots[0].get('fixed', None) if (isinstance(snapshots, list) and len(snapshots) > 0) else None,
'objectVisible': snapshots[0].get('objectVisible', None) if (
isinstance(snapshots, list) and len(snapshots) > 0) else None,
}
offset = 0
for idx, frame in enumerate(deepcopy(snapshots)):
frame.pop("frame", None)
if frame == last_frame and not frame['fixed']:
del snapshots[idx - offset]
offset += 1
else:
last_frame = frame
return annotation
def _create_batches_for_upload(self, annotations, merge=False):
"""
receives a list of annotations and split them into batches to optimize the upload
:param annotations: list of all annotations
:param merge: bool - merge the new binary annotations with the existing annotations
:return: batch_annotations: list of list of annotation. each batch with size self._upload_batch_size
"""
annotation_batches = list()
single_batch = list()
for annotation in annotations:
if isinstance(annotation, str):
annotation = json.loads(annotation)
elif isinstance(annotation, entities.Annotation):
if annotation._item is None and self._item is not None:
# if annotation is without item - set one (affects the binary annotation color)
annotation._item = self._item
annotation = annotation.to_json()
elif isinstance(annotation, dict):
pass
else:
raise exceptions.PlatformException(error='400',
message='unknown annotations type: {}'.format(type(annotation)))
annotation = self._annotation_encoding(annotation)
single_batch.append(annotation)
if len(single_batch) >= self._upload_batch_size:
annotation_batches.append(single_batch)
single_batch = list()
if len(single_batch) > 0:
annotation_batches.append(single_batch)
if merge and self.item:
annotation_batches = self._merge_new_annotations(annotation_batches)
annotation_batches = self._merge_to_exits_annotations(annotation_batches)
return annotation_batches
def _merge_binary_annotations(self, data_url1, data_url2, item_width, item_height):
# Decode base64 data
img_data1 = base64.b64decode(data_url1.split(",")[1])
img_data2 = base64.b64decode(data_url2.split(",")[1])
# Convert binary data to images
img1 = Image.open(BytesIO(img_data1))
img2 = Image.open(BytesIO(img_data2))
# Create a new image with the target item size
merged_img = Image.new('RGBA', (item_width, item_height))
# Paste both images on the new canvas at their original sizes and positions
# Adjust positioning logic if needed (assuming top-left corner for both images here)
merged_img.paste(img1, (0, 0), img1) # Use img1 as a mask to handle transparency
merged_img.paste(img2, (0, 0), img2) # Overlay img2 at the same position
# Save the merged image to a buffer
buffer = BytesIO()
merged_img.save(buffer, format="PNG")
merged_img_data = buffer.getvalue()
# Encode the merged image back to a base64 string
merged_data_url = "data:image/png;base64," + base64.b64encode(merged_img_data).decode()
return merged_data_url
def _merge_new_annotations(self, annotations_batch):
"""
Merge the new binary annotations with the existing annotations
:param annotations_batch: list of list of annotation. each batch with size self._upload_batch_size
:return: merged_annotations_batch: list of list of annotation. each batch with size self._upload_batch_size
"""
for annotations in annotations_batch:
for annotation in annotations:
if annotation['type'] == 'binary' and not annotation.get('clean', False):
to_merge = [a for a in annotations if
not a.get('clean', False) and a.get("metadata", {}).get('system', {}).get('objectId',
None) ==
annotation.get("metadata", {}).get('system', {}).get('objectId', None) and a['label'] ==
annotation['label']]
if len(to_merge) == 0:
# no annotation to merge with
continue
for a in to_merge:
if a['coordinates'] == annotation['coordinates']:
continue
merged_data_url = self._merge_binary_annotations(a['coordinates'], annotation['coordinates'],
self.item.width, self.item.height)
annotation['coordinates'] = merged_data_url
a['clean'] = True
return [[a for a in annotations if not a.get('clean', False)] for annotations in annotations_batch]
def _merge_to_exits_annotations(self, annotations_batch):
filters = entities.Filters(resource=entities.FiltersResource.ANNOTATION, field='type', values='binary')
filters.add(field='itemId', values=self.item.id, method=entities.FiltersMethod.AND)
exist_annotations = self.list(filters=filters).annotations or list()
to_delete = list()
for annotations in annotations_batch:
for ann in annotations:
if ann['type'] == 'binary':
to_merge = [a for a in exist_annotations if
a.object_id == ann.get("metadata", {}).get('system', {}).get('objectId',
None) and a.label == ann[
'label']]
if len(to_merge) == 0:
# no annotation to merge with
continue
if to_merge[0].coordinates == ann['coordinates']:
# same annotation
continue
if len(to_merge) > 1:
raise exceptions.PlatformException('400', 'Multiple annotations with the same label')
# merge
exist_annotations.remove(to_merge[0])
merged_data_url = self._merge_binary_annotations(to_merge[0].coordinates, ann['coordinates'],
self.item.width, self.item.height)
json_ann = to_merge[0].to_json()
json_ann['coordinates'] = merged_data_url
suc, response = self._update_annotation_req(annotation_json=json_ann,
system_metadata=True,
annotation_id=to_merge[0].id)
if not suc:
raise exceptions.PlatformException(response)
if suc:
result = entities.Annotation.from_json(_json=response.json(),
annotations=self,
dataset=self._dataset,
item=self._item)
exist_annotations.append(result)
to_delete.append(ann)
if len(to_delete) > 0:
annotations_batch = [[a for a in annotations if a not in to_delete] for annotations in annotations_batch]
return annotations_batch
def _upload_single_batch(self, annotation_batch):
try:
suc, response = self._client_api.gen_request(req_type='post',
path='/items/{}/annotations'.format(self.item.id),
json_req=annotation_batch)
if suc:
return_annotations = response.json()
if not isinstance(return_annotations, list):
return_annotations = [return_annotations]
else:
raise exceptions.PlatformException(response)
status = True
result = return_annotations
except Exception:
status = False
result = traceback.format_exc()
return status, result
def _upload_annotations_batches(self, annotation_batches):
if len(annotation_batches) == 1:
# no need for threads
status, result = self._upload_single_batch(annotation_batch=annotation_batches[0])
if status is False:
logger.error(result)
logger.error('Annotation/s uploaded with errors')
# TODO need to raise errors?
uploaded_annotations = result
else:
# threading
pool = self._client_api.thread_pools(pool_name='annotation.upload')
jobs = [None for _ in range(len(annotation_batches))]
for i_ann, annotations_batch in enumerate(annotation_batches):
jobs[i_ann] = pool.submit(self._upload_single_batch,
annotation_batch=annotations_batch)
# get all results
results = [j.result() for j in jobs]
uploaded_annotations = [ann for ann_list in results for ann in ann_list[1] if ann_list[0] is True]
out_errors = [r[1] for r in results if r[0] is False]
if len(out_errors) != 0:
logger.error(out_errors)
logger.error('Annotation/s uploaded with errors')
# TODO need to raise errors?
logger.info('Annotation/s uploaded successfully. num: {}'.format(len(uploaded_annotations)))
return uploaded_annotations
async def _async_upload_annotations(self, annotations, merge=False):
"""
Async function to run from the uploader. will use asyncio to not break the async
:param annotations: list of all annotations
:param merge: bool - merge the new binary annotations with the existing annotations
:return:
"""
async with self._client_api.event_loop.semaphore('annotations.upload'):
annotation_batch = self._create_batches_for_upload(annotations=annotations, merge=merge)
output_annotations = list()
for annotations_list in annotation_batch:
success, response = await self._client_api.gen_async_request(req_type='post',
path='/items/{}/annotations'
.format(self.item.id),
json_req=annotations_list)
if success:
return_annotations = response.json()
if not isinstance(return_annotations, list):
return_annotations = [return_annotations]
output_annotations.extend(return_annotations)
else:
if len(output_annotations) > 0:
logger.warning("Only {} annotations from {} annotations have been uploaded".
format(len(output_annotations), len(annotations)))
raise exceptions.PlatformException(response)
result = entities.AnnotationCollection.from_json(_json=output_annotations, item=self.item)
return result
[docs] @_api_reference.add(path='/items/{itemId}/annotations', method='post')
def upload(self, annotations, merge=False) -> entities.AnnotationCollection:
"""
Upload a new annotation/annotations. You must first create the annotation using the annotation *builder* method.
**Prerequisites**: Any user can upload annotations.
:param List[dtlpy.entities.annotation.Annotation] or dtlpy.entities.annotation.Annotation annotations: list or
single annotation of type Annotation
:param bool merge: optional - merge the new binary annotations with the existing annotations
:return: list of annotation objects
:rtype: entities.AnnotationCollection
**Example**:
.. code-block:: python
annotations = item.annotations.upload(annotations='builder')
"""
# make list if not list
if isinstance(annotations, entities.AnnotationCollection):
# get the annotation from a collection
annotations = annotations.annotations
elif isinstance(annotations, str) and os.path.isfile(annotations):
# load annotation filepath and get list of annotations
with open(annotations, 'r', encoding="utf8") as f:
annotations = json.load(f)
annotations = annotations.get('annotations', [])
# annotations = entities.AnnotationCollection.from_json_file(filepath=annotations).annotations
elif isinstance(annotations, entities.Annotation) or isinstance(annotations, dict):
# convert the single Annotation to a list
annotations = [annotations]
elif isinstance(annotations, list):
pass
else:
exceptions.PlatformException(error='400',
message='Unknown annotation format. type: {}'.format(type(annotations)))
if len(annotations) == 0:
logger.warning('Annotation upload receives 0 annotations. Not doing anything')
out_annotations = list()
else:
annotation_batches = self._create_batches_for_upload(annotations=annotations, merge=merge)
out_annotations = self._upload_annotations_batches(annotation_batches=annotation_batches)
out_annotations = entities.AnnotationCollection.from_json(_json=out_annotations,
item=self.item)
return out_annotations
[docs] def update_status(self,
annotation: entities.Annotation = None,
annotation_id: str = None,
status: entities.AnnotationStatus = entities.AnnotationStatus.ISSUE
) -> entities.Annotation:
"""
Set status on annotation.
**Prerequisites**: You must have an item that has been annotated. You must have the role of an *owner* or
*developer* or be assigned a task that includes that item as an *annotation manager*.
:param dtlpy.entities.annotation.Annotation annotation: Annotation object
:param str annotation_id: optional - annotation id to set status
:param str status: can be AnnotationStatus.ISSUE, APPROVED, REVIEW, CLEAR
:return: Annotation object
:rtype: dtlpy.entities.annotation.Annotation
**Example**:
.. code-block:: python
annotation = item.annotations.update_status(annotation_id='annotation_id', status=dl.AnnotationStatus.ISSUE)
"""
if annotation is None:
if annotation_id is None:
raise ValueError('must input on of "annotation" or "annotation_id"')
annotation = self.get(annotation_id=annotation_id)
if status not in list(entities.AnnotationStatus):
raise ValueError('status must be on of: {}'.format(', '.join(list(entities.AnnotationStatus))))
annotation.status = status
return annotation.update(system_metadata=True)
[docs] def builder(self):
"""
Create Annotation collection.
**Prerequisites**: You must have an item to be annotated. You must have the role of an *owner* or *developer*
or be assigned a task that includes that item as an *annotation manager* or *annotator*.
:return: Annotation collection object
:rtype: dtlpy.entities.annotation_collection.AnnotationCollection
**Example**:
.. code-block:: python
annotation_collection = item.annotations.builder()
"""
return entities.AnnotationCollection(item=self.item)
##################
# async function #
##################