from collections import namedtuple
import traceback
import logging
import attr
import os
from .. import repositories, entities, services, exceptions
from .annotation import ViewAnnotationOptions, AnnotationType
logger = logging.getLogger(name='dtlpy')
[docs]class ExpirationOptions:
"""
ExpirationOptions object
"""
def __init__(self, item_max_days: int = None):
"""
:param item_max_days: int. items in dataset will be auto delete after this number id days
"""
self.item_max_days = item_max_days
def to_json(self):
_json = dict()
if self.item_max_days is not None:
_json["itemMaxDays"] = self.item_max_days
return _json
@classmethod
def from_json(cls, _json: dict):
item_max_days = _json.get('itemMaxDays', None)
if item_max_days:
return cls(item_max_days=item_max_days)
return None
[docs]@attr.s
class Dataset(entities.BaseEntity):
"""
Dataset object
"""
# dataset information
id = attr.ib()
url = attr.ib()
name = attr.ib()
annotated = attr.ib(repr=False)
creator = attr.ib()
projects = attr.ib(repr=False)
items_count = attr.ib()
metadata = attr.ib(repr=False)
directoryTree = attr.ib(repr=False)
export = attr.ib(repr=False)
expiration_options = attr.ib()
# name change when to_json
created_at = attr.ib()
items_url = attr.ib(repr=False)
readable_type = attr.ib(repr=False)
access_level = attr.ib(repr=False)
driver = attr.ib(repr=False)
_readonly = attr.ib(repr=False)
# api
_client_api = attr.ib(type=services.ApiClient, repr=False)
_instance_map = attr.ib(default=None, repr=False)
# entities
_project = attr.ib(default=None, repr=False)
# repositories
_datasets = attr.ib(repr=False, default=None)
_repositories = attr.ib(repr=False)
# defaults
_ontology_ids = attr.ib(default=None, repr=False)
_labels = attr.ib(default=None, repr=False)
_directory_tree = attr.ib(default=None, repr=False)
@property
def itemsCount(self):
return self.items_count
@staticmethod
def _protected_from_json(project: entities.Project,
_json: dict,
client_api: services.ApiClient,
datasets=None,
is_fetched=True):
"""
Same as from_json but with try-except to catch if error
:param project: dataset's project
:param _json: _json response from host
:param client_api: ApiClient entity
:param datasets: Datasets repository
:param is_fetched: is Entity fetched from Platform
:return: Dataset object
"""
try:
dataset = Dataset.from_json(project=project,
_json=_json,
client_api=client_api,
datasets=datasets,
is_fetched=is_fetched)
status = True
except Exception:
dataset = traceback.format_exc()
status = False
return status, dataset
[docs] @classmethod
def from_json(cls,
project: entities.Project,
_json: dict,
client_api: services.ApiClient,
datasets=None,
is_fetched=True):
"""
Build a Dataset entity object from a json
:param project: dataset's project
:param dict _json: _json response from host
:param client_api: ApiClient entity
:param datasets: Datasets repository
:param bool is_fetched: is Entity fetched from Platform
:return: Dataset object
:rtype: dtlpy.entities.dataset.Dataset
"""
projects = _json.get('projects', None)
if project is not None and projects is not None:
if project.id not in projects:
logger.warning('Dataset has been fetched from a project that is not in it projects list')
project = None
expiration_options = _json.get('expirationOptions', None)
if expiration_options:
expiration_options = ExpirationOptions.from_json(expiration_options)
inst = cls(metadata=_json.get('metadata', None),
directoryTree=_json.get('directoryTree', None),
readable_type=_json.get('readableType', None),
access_level=_json.get('accessLevel', None),
created_at=_json.get('createdAt', None),
items_count=_json.get('itemsCount', None),
annotated=_json.get('annotated', None),
readonly=_json.get('readonly', None),
projects=projects,
creator=_json.get('creator', None),
items_url=_json.get('items', None),
export=_json.get('export', None),
driver=_json.get('driver', None),
name=_json.get('name', None),
url=_json.get('url', None),
id=_json.get('id', None),
datasets=datasets,
client_api=client_api,
project=project,
expiration_options=expiration_options)
inst.is_fetched = is_fetched
return inst
[docs] def to_json(self):
"""
Returns platform _json format of object
:return: platform json format of object
:rtype: dict
"""
_json = attr.asdict(self, filter=attr.filters.exclude(attr.fields(Dataset)._client_api,
attr.fields(Dataset)._project,
attr.fields(Dataset)._readonly,
attr.fields(Dataset)._datasets,
attr.fields(Dataset)._repositories,
attr.fields(Dataset)._ontology_ids,
attr.fields(Dataset)._labels,
attr.fields(Dataset)._directory_tree,
attr.fields(Dataset)._instance_map,
attr.fields(Dataset).access_level,
attr.fields(Dataset).readable_type,
attr.fields(Dataset).created_at,
attr.fields(Dataset).items_url,
attr.fields(Dataset).expiration_options,
attr.fields(Dataset).items_count,
))
_json.update({'items': self.items_url})
_json['readableType'] = self.readable_type
_json['createdAt'] = self.created_at
_json['accessLevel'] = self.access_level
_json['readonly'] = self._readonly
_json['itemsCount'] = self.items_count
if self.expiration_options and self.expiration_options.to_json():
_json['expirationOptions'] = self.expiration_options.to_json()
return _json
@property
def labels(self):
if self._labels is None:
self._labels = self.recipes.list()[0].ontologies.list()[0].labels
return self._labels
@property
def readonly(self):
return self._readonly
@property
def platform_url(self):
return self._client_api._get_resource_url("projects/{}/datasets/{}".format(self.project.id, self.id))
@readonly.setter
def readonly(self, state):
raise exceptions.PlatformException(
error='400',
message='Cannot set attribute readonly. Please use "set_readonly({})" method'.format(state))
@property
def labels_flat_dict(self):
flatten_dict = dict()
def add_to_dict(tag: str, father: entities.Label):
flatten_dict[tag] = father
for child in father.children:
add_to_dict('{}.{}'.format(tag, child.tag), child)
for label in self.labels:
add_to_dict(label.tag, label)
return flatten_dict
@property
def instance_map(self):
if self._instance_map is None:
labels = [label for label in self.labels_flat_dict]
labels.sort()
# each label gets index as instance id
self._instance_map = {label: (i_label + 1) for i_label, label in enumerate(labels)}
return self._instance_map
@instance_map.setter
def instance_map(self, value: dict):
"""
instance mapping for creating instance mask
:param value: dictionary {label: map_id}
"""
if not isinstance(value, dict):
raise ValueError('input must be a dictionary of {lable_name: instance_id}')
self._instance_map = value
@property
def ontology_ids(self):
if self._ontology_ids is None:
self._ontology_ids = list()
if self.metadata is not None and 'system' in self.metadata and 'recipes' in self.metadata['system']:
recipe_ids = self.get_recipe_ids()
for rec_id in recipe_ids:
recipe = self.recipes.get(recipe_id=rec_id)
self._ontology_ids += recipe.ontology_ids
return self._ontology_ids
@_repositories.default
def set_repositories(self):
reps = namedtuple('repositories',
field_names=['items', 'recipes', 'datasets', 'assignments', 'tasks', 'annotations',
'ontologies', 'features', 'settings'])
if self._project is None:
datasets = repositories.Datasets(client_api=self._client_api, project=self._project)
features = repositories.Features(client_api=self._client_api, project=self._project)
else:
datasets = self._project.datasets
features = self._project.features
return reps(
items=repositories.Items(client_api=self._client_api, dataset=self, datasets=datasets),
recipes=repositories.Recipes(client_api=self._client_api, dataset=self),
assignments=repositories.Assignments(project=self._project, client_api=self._client_api, dataset=self),
tasks=repositories.Tasks(client_api=self._client_api, project=self._project, dataset=self),
annotations=repositories.Annotations(client_api=self._client_api, dataset=self),
datasets=datasets,
ontologies=repositories.Ontologies(client_api=self._client_api, dataset=self),
features=features,
settings=repositories.Settings(client_api=self._client_api, dataset=self),
)
@property
def settings(self):
assert isinstance(self._repositories.settings, repositories.Settings)
return self._repositories.settings
@property
def items(self):
assert isinstance(self._repositories.items, repositories.Items)
return self._repositories.items
@property
def ontologies(self):
assert isinstance(self._repositories.ontologies, repositories.Ontologies)
return self._repositories.ontologies
@property
def recipes(self):
assert isinstance(self._repositories.recipes, repositories.Recipes)
return self._repositories.recipes
@property
def datasets(self):
assert isinstance(self._repositories.datasets, repositories.Datasets)
return self._repositories.datasets
@property
def assignments(self):
assert isinstance(self._repositories.assignments, repositories.Assignments)
return self._repositories.assignments
@property
def tasks(self):
assert isinstance(self._repositories.tasks, repositories.Tasks)
return self._repositories.tasks
@property
def annotations(self):
assert isinstance(self._repositories.annotations, repositories.Annotations)
return self._repositories.annotations
@property
def features(self):
assert isinstance(self._repositories.features, repositories.Features)
return self._repositories.features
@property
def project(self):
if self._project is None:
# get from cache
project = self._client_api.state_io.get('project')
if project is not None:
# build entity from json
p = entities.Project.from_json(_json=project, client_api=self._client_api)
# check if dataset belongs to project
if p.id in self.projects:
self._project = p
if self._project is None:
self._project = repositories.Projects(client_api=self._client_api).get(project_id=self.projects[0],
fetch=None)
assert isinstance(self._project, entities.Project)
return self._project
@project.setter
def project(self, project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Project entity')
self._project = project
@property
def directory_tree(self):
if self._directory_tree is None:
self._directory_tree = self.project.datasets.directory_tree(dataset_id=self.id)
assert isinstance(self._directory_tree, entities.DirectoryTree)
return self._directory_tree
def __copy__(self):
return Dataset.from_json(_json=self.to_json(),
project=self._project,
client_api=self._client_api,
is_fetched=self.is_fetched,
datasets=self.datasets)
def __get_local_path__(self):
if self._project is not None:
local_path = os.path.join(services.service_defaults.DATALOOP_PATH,
'projects',
self.project.name,
'datasets',
self.name)
else:
local_path = os.path.join(services.service_defaults.DATALOOP_PATH,
'datasets',
'%s_%s' % (self.name, self.id))
return local_path
[docs] @staticmethod
def serialize_labels(labels_dict):
"""
Convert hex color format to rgb
:param labels_dict: dict of labels
:return: dict of converted labels
"""
dataset_labels_dict = dict()
for label, color in labels_dict.items():
dataset_labels_dict[label] = '#%02x%02x%02x' % color
return dataset_labels_dict
[docs] def get_recipe_ids(self):
"""
Get dataset recipe Ids
:return: list of recipe ids
"""
return self.metadata['system']['recipes']
[docs] def switch_recipe(self, recipe_id=None, recipe=None):
"""
Switch the recipe that linked to the dataset with the given one
:param recipe_id: recipe id
:param recipe: recipe entity
:return:
"""
if recipe is None and recipe_id is None:
raise exceptions.PlatformException('400', 'Must provide recipe or recipe_id')
if recipe_id is None:
if not isinstance(recipe, entities.Recipe):
raise exceptions.PlatformException('400', 'Recipe must me entities.Recipe type')
else:
recipe_id = recipe.id
# add recipe id to dataset metadata
if 'system' not in self.metadata:
self.metadata['system'] = dict()
if 'recipes' not in self.metadata['system']:
self.metadata['system']['recipes'] = list()
self.metadata['system']['recipes'] = [recipe_id]
self.update(system_metadata=True)
[docs] def delete(self, sure=False, really=False):
"""
Delete a dataset forever!
:param bool sure: are you sure you want to delete?
:param bool really: really really?
:return: True is success
:rtype: bool
"""
return self.datasets.delete(dataset_id=self.id,
sure=sure,
really=really)
[docs] def update(self, system_metadata=False):
"""
Update dataset field
:param bool system_metadata: bool - True, if you want to change metadata system
:return: Dataset object
:rtype: dtlpy.entities.dataset.Dataset
"""
return self.datasets.update(dataset=self,
system_metadata=system_metadata)
[docs] def set_readonly(self, state: bool):
"""
Set dataset readonly mode
:param bool state: state
"""
if not isinstance(state, bool):
raise exceptions.PlatformException(
error='400',
message='Argument "state" must be bool. input type: {}'.format(type(state)))
return self.datasets.set_readonly(dataset=self, state=state)
[docs] def clone(self, clone_name, filters=None, with_items_annotations=True, with_metadata=True,
with_task_annotations_status=True):
"""
Clone dataset
:param clone_name: new dataset name
:param dtlpy.entities.filters.Filters filters: Filters entity or a query dict
:param with_items_annotations: clone all item's annotations
:param with_metadata: clone metadata
:param with_task_annotations_status: clone task annotations status
:return: dataset object
:rtype: dtlpy.entities.dataset.Dataset
"""
return self.datasets.clone(dataset_id=self.id,
filters=filters,
clone_name=clone_name,
with_metadata=with_metadata,
with_items_annotations=with_items_annotations,
with_task_annotations_status=with_task_annotations_status)
[docs] def sync(self, wait=True):
"""
Sync dataset with external storage
:param wait: wait the command to finish
:return: True if success
:rtype: bool
"""
return self.datasets.sync(dataset_id=self.id, wait=wait)
[docs] def download_annotations(self,
local_path=None,
filters=None,
annotation_options: ViewAnnotationOptions = None,
annotation_filters=None,
overwrite=False,
thickness=1,
with_text=False,
remote_path=None,
include_annotations_in_output=True,
export_png_files=False,
filter_output_annotations=False,
alpha=None
):
"""
Download dataset by filters.
Filtering the dataset for items and save them local
Optional - also download annotation, mask, instance and image mask of the item
:param local_path: local folder or filename to save to.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param annotation_options: download annotations options: list(dl.ViewAnnotationOptions)
:param annotation_filters: Filters entity to filter annotations for download
:param overwrite: optional - default = False
:param thickness: optional - line thickness, if -1 annotation will be filled, default =1
:param with_text: optional - add text to annotations, default = False
:param remote_path: DEPRECATED and ignored. use filters
:param include_annotations_in_output: default - False , if export should contain annotations
:param export_png_files: default - if True, semantic annotations should be exported as png files
:param filter_output_annotations: default - False, given an export by filter - determine if to filter out annotations
:param alpha: opacity value [0 1], default 1
:return: `List` of local_path per each downloaded item
"""
return self.datasets.download_annotations(
dataset=self,
local_path=local_path,
overwrite=overwrite,
filters=filters,
annotation_options=annotation_options,
annotation_filters=annotation_filters,
thickness=thickness,
with_text=with_text,
remote_path=remote_path,
include_annotations_in_output=include_annotations_in_output,
export_png_files=export_png_files,
filter_output_annotations=filter_output_annotations,
alpha=alpha
)
[docs] def upload_annotations(self,
local_path,
filters=None,
clean=False,
remote_root_path='/'
):
"""
Upload annotations to dataset.
:param local_path: str - local folder where the annotations files is.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param clean: bool - if True it remove the old annotations
:param remote_root_path: str - the remote root path to match remote and local items
For example, if the item filepath is a/b/item and remote_root_path is /a the start folder will be b instead of a
"""
return self.datasets.upload_annotations(
dataset=self,
local_path=local_path,
filters=filters,
clean=clean,
remote_root_path=remote_root_path
)
[docs] def checkout(self):
"""
Checkout the dataset
"""
self.datasets.checkout(dataset=self)
[docs] def open_in_web(self):
"""
Open the dataset in web platform
"""
self._client_api._open_in_web(url=self.platform_url)
[docs] def add_label(self, label_name, color=None, children=None, attributes=None, display_label=None, label=None,
recipe_id=None, ontology_id=None, icon_path=None):
"""
Add single label to dataset
:param label_name: str - label name
:param color: color
:param children: children (sub labels)
:param attributes: attributes
:param display_label: display_label
:param label: label
:param recipe_id: optional recipe id
:param ontology_id: optional ontology id
:param icon_path: path to image to be display on label
:return: label entity
"""
# get recipe
if recipe_id is None:
recipe_id = self.get_recipe_ids()[0]
recipe = self.recipes.get(recipe_id=recipe_id)
# get ontology
if ontology_id is None:
ontology_id = recipe.ontology_ids[0]
ontology = recipe.ontologies.get(ontology_id=ontology_id)
# ontology._dataset = self
# add label
added_label = ontology.add_label(label_name=label_name,
color=color,
children=children,
attributes=attributes,
display_label=display_label,
label=label,
update_ontology=True,
icon_path=icon_path)
return added_label
[docs] def add_labels(self, label_list, ontology_id=None, recipe_id=None):
"""
Add labels to dataset
:param label_list: label list
:param ontology_id: optional ontology id
:param recipe_id: optional recipe id
:return: label entities
"""
# get recipe
if recipe_id is None:
recipe_id = self.get_recipe_ids()[0]
recipe = self.recipes.get(recipe_id=recipe_id)
# get ontology
if ontology_id is None:
ontology_id = recipe.ontology_ids[0]
ontology = recipe.ontologies.get(ontology_id=ontology_id)
# add labels to ontology
added_labels = ontology.add_labels(label_list=label_list, update_ontology=True)
return added_labels
[docs] def update_label(self, label_name, color=None, children=None, attributes=None, display_label=None, label=None,
recipe_id=None, ontology_id=None, upsert=False, icon_path=None):
"""
Add single label to dataset
:param label_name: label name
:param color: color
:param children: children (sub labels)
:param attributes: attributes
:param display_label: display label
:param label: label
:param recipe_id: optional recipe id
:param ontology_id: optional ontology id
:param upsert: if True will add in case it does not existing
:param icon_path: path to image to be display on label
:return: label entity
"""
# get recipe
if recipe_id is None:
recipe_id = self.get_recipe_ids()[0]
recipe = self.recipes.get(recipe_id=recipe_id)
# get ontology
if ontology_id is None:
ontology_id = recipe.ontology_ids[0]
ontology = recipe.ontologies.get(ontology_id=ontology_id)
# add label
added_label = ontology.update_label(label_name=label_name,
color=color,
children=children,
attributes=attributes,
display_label=display_label,
label=label,
update_ontology=True,
upsert=upsert,
icon_path=icon_path)
return added_label
[docs] def update_labels(self, label_list, ontology_id=None, recipe_id=None, upsert=False):
"""
Add labels to dataset
:param label_list: label list
:param ontology_id: optional ontology id
:param recipe_id: optional recipe id
:param upsert: if True will add in case it does not existing
:return: label entities
"""
# get recipe
if recipe_id is None:
recipe_id = self.get_recipe_ids()[0]
recipe = self.recipes.get(recipe_id=recipe_id)
# get ontology
if ontology_id is None:
ontology_id = recipe.ontology_ids[0]
ontology = recipe.ontologies.get(ontology_id=ontology_id)
# add labels to ontology
added_labels = ontology.update_labels(label_list=label_list, update_ontology=True, upsert=upsert)
return added_labels
[docs] def download(
self,
filters=None,
local_path=None,
file_types=None,
annotation_options: ViewAnnotationOptions = None,
annotation_filters=None,
overwrite=False,
to_items_folder=True,
thickness=1,
with_text=False,
without_relative_path=None,
alpha=None
):
"""
Download dataset by filters.
Filtering the dataset for items and save them local
Optional - also download annotation, mask, instance and image mask of the item
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param local_path: local folder or filename to save to.
:param file_types: a list of file type to download. e.g ['video/webm', 'video/mp4', 'image/jpeg', 'image/png']
:param annotation_options: download annotations options: list(dl.ViewAnnotationOptions) not relevant for JSON option
:param annotation_filters: Filters entity to filter annotations for download not relevant for JSON option
:param overwrite: optional - default = False
:param to_items_folder: Create 'items' folder and download items to it
:param thickness: optional - line thickness, if -1 annotation will be filled, default =1
:param with_text: optional - add text to annotations, default = False
:param without_relative_path: string - remote path - download items without the relative path from platform
:param alpha: opacity value [0 1], default 1
:return: `List` of local_path per each downloaded item
"""
return self.items.download(filters=filters,
local_path=local_path,
file_types=file_types,
annotation_options=annotation_options,
annotation_filters=annotation_filters,
overwrite=overwrite,
to_items_folder=to_items_folder,
thickness=thickness,
with_text=with_text,
without_relative_path=without_relative_path,
alpha=alpha)
[docs] def delete_labels(self, label_names):
"""
Delete labels from dataset's ontologies
:param label_names: label object/ label name / list of label objects / list of label names
:return:
"""
for recipe in self.recipes.list():
for ontology in recipe.ontologies.list():
ontology.delete_labels(label_names=label_names)
self._labels = None
[docs] def download_partition(self, partition, local_path=None, filters=None, annotation_options=None):
"""
Download a specific partition of the dataset to local_path
This function is commonly used with dl.ModelAdapter which implements thc convert to specific model structure
:param partition: `dl.SnapshotPartitionType` name of the partition
:param local_path: local path directory to download the data
:param dtlpy.entities.filters.Filters filters: dl.entities.Filters to add the specific partitions constraint to
:return List `str` of the new downloaded path of each item
"""
if local_path is None:
local_path = os.getcwd()
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.ITEM)
if annotation_options is None:
annotation_options = entities.ViewAnnotationOptions.JSON
if partition == 'all': # TODO: should it be all or None (all != list(SnapshotPartitions) )
logger.info("downloading all items - even without partitions")
else:
filters.add(field='metadata.system.snapshotPartition', values=partition)
return self.items.download(filters=filters,
local_path=local_path,
annotation_options=annotation_options)
[docs] def set_partition(self, partition, filters=None):
"""
Updates all items returned by filters in the dataset to specific partition
:param partition: `dl.entities.SnapshotPartitionType` to set to
:param dtlpy.entities.filters.Filters filters: dl.entities.Filters to add the specific partitions constraint to
:return: dl.PagedEntities
"""
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.ITEM)
# TODO: How to preform update using the Filter - where do i set the field - docstring should state dict key-val while arg name is only values....
return self.items.update(filters=filters,
system_update_values={'snapshotPartition': partition},
system_metadata=True)
[docs] def get_partitions(self, partitions, filters=None, batch_size: int = None):
"""
Returns PagedEntity of items from one or more partitions
:param partitions: `dl.entities.SnapshotPartitionType` or a list. Name of the partitions
:param dtlpy.entities.filters.Filters filters: dl.Filters to add the specific partitions constraint to
:param batch_size: `int` how many items per page
:return: `dl.PagedEntities` of `dl.Item` preforms items.list()
"""
# Question: do we have to give a partition? how do we get in case no partition is defined?
if isinstance(partitions, str):
partitions = [partitions]
if filters is None:
filters = entities.Filters(resource=entities.FiltersResource.ITEM)
if partitions == 'all':
logger.info("downloading all items - even without partitions")
else:
filters.add(field='metadata.system.snapshotPartition',
values=partitions,
operator=entities.FiltersOperations.IN)
return self.items.list(filters=filters, page_size=batch_size)