Source code for dtlpy.entities.dataset

from collections import namedtuple
import traceback
import logging
import attr
import os

from .. import repositories, entities, services, exceptions
from .annotation import ViewAnnotationOptions, AnnotationType

logger = logging.getLogger(name='dtlpy')


[docs]class ExpirationOptions: """ ExpirationOptions object """ def __init__(self, item_max_days: int = None): """ :param item_max_days: int. items in dataset will be auto delete after this number id days """ self.item_max_days = item_max_days def to_json(self): _json = dict() if self.item_max_days is not None: _json["itemMaxDays"] = self.item_max_days return _json @classmethod def from_json(cls, _json: dict): item_max_days = _json.get('itemMaxDays', None) if item_max_days: return cls(item_max_days=item_max_days) return None
[docs]@attr.s class Dataset(entities.BaseEntity): """ Dataset object """ # dataset information id = attr.ib() url = attr.ib() name = attr.ib() annotated = attr.ib(repr=False) creator = attr.ib() projects = attr.ib(repr=False) items_count = attr.ib() metadata = attr.ib(repr=False) directoryTree = attr.ib(repr=False) export = attr.ib(repr=False) expiration_options = attr.ib() # name change when to_json created_at = attr.ib() items_url = attr.ib(repr=False) readable_type = attr.ib(repr=False) access_level = attr.ib(repr=False) driver = attr.ib(repr=False) _readonly = attr.ib(repr=False) # api _client_api = attr.ib(type=services.ApiClient, repr=False) _instance_map = attr.ib(default=None, repr=False) # entities _project = attr.ib(default=None, repr=False) # repositories _datasets = attr.ib(repr=False, default=None) _repositories = attr.ib(repr=False) # defaults _ontology_ids = attr.ib(default=None, repr=False) _labels = attr.ib(default=None, repr=False) _directory_tree = attr.ib(default=None, repr=False) @property def itemsCount(self): return self.items_count @staticmethod def _protected_from_json(project: entities.Project, _json: dict, client_api: services.ApiClient, datasets=None, is_fetched=True): """ Same as from_json but with try-except to catch if error :param project: dataset's project :param _json: _json response from host :param client_api: ApiClient entity :param datasets: Datasets repository :param is_fetched: is Entity fetched from Platform :return: Dataset object """ try: dataset = Dataset.from_json(project=project, _json=_json, client_api=client_api, datasets=datasets, is_fetched=is_fetched) status = True except Exception: dataset = traceback.format_exc() status = False return status, dataset
[docs] @classmethod def from_json(cls, project: entities.Project, _json: dict, client_api: services.ApiClient, datasets=None, is_fetched=True): """ Build a Dataset entity object from a json :param project: dataset's project :param dict _json: _json response from host :param client_api: ApiClient entity :param datasets: Datasets repository :param bool is_fetched: is Entity fetched from Platform :return: Dataset object :rtype: dtlpy.entities.dataset.Dataset """ projects = _json.get('projects', None) if project is not None and projects is not None: if project.id not in projects: logger.warning('Dataset has been fetched from a project that is not in it projects list') project = None expiration_options = _json.get('expirationOptions', None) if expiration_options: expiration_options = ExpirationOptions.from_json(expiration_options) inst = cls(metadata=_json.get('metadata', None), directoryTree=_json.get('directoryTree', None), readable_type=_json.get('readableType', None), access_level=_json.get('accessLevel', None), created_at=_json.get('createdAt', None), items_count=_json.get('itemsCount', None), annotated=_json.get('annotated', None), readonly=_json.get('readonly', None), projects=projects, creator=_json.get('creator', None), items_url=_json.get('items', None), export=_json.get('export', None), driver=_json.get('driver', None), name=_json.get('name', None), url=_json.get('url', None), id=_json.get('id', None), datasets=datasets, client_api=client_api, project=project, expiration_options=expiration_options) inst.is_fetched = is_fetched return inst
[docs] def to_json(self): """ Returns platform _json format of object :return: platform json format of object :rtype: dict """ _json = attr.asdict(self, filter=attr.filters.exclude(attr.fields(Dataset)._client_api, attr.fields(Dataset)._project, attr.fields(Dataset)._readonly, attr.fields(Dataset)._datasets, attr.fields(Dataset)._repositories, attr.fields(Dataset)._ontology_ids, attr.fields(Dataset)._labels, attr.fields(Dataset)._directory_tree, attr.fields(Dataset)._instance_map, attr.fields(Dataset).access_level, attr.fields(Dataset).readable_type, attr.fields(Dataset).created_at, attr.fields(Dataset).items_url, attr.fields(Dataset).expiration_options, attr.fields(Dataset).items_count, )) _json.update({'items': self.items_url}) _json['readableType'] = self.readable_type _json['createdAt'] = self.created_at _json['accessLevel'] = self.access_level _json['readonly'] = self._readonly _json['itemsCount'] = self.items_count if self.expiration_options and self.expiration_options.to_json(): _json['expirationOptions'] = self.expiration_options.to_json() return _json
@property def labels(self): if self._labels is None: self._labels = self.recipes.list()[0].ontologies.list()[0].labels return self._labels @property def readonly(self): return self._readonly @property def platform_url(self): return self._client_api._get_resource_url("projects/{}/datasets/{}".format(self.project.id, self.id)) @readonly.setter def readonly(self, state): raise exceptions.PlatformException( error='400', message='Cannot set attribute readonly. Please use "set_readonly({})" method'.format(state)) @property def labels_flat_dict(self): flatten_dict = dict() def add_to_dict(tag: str, father: entities.Label): flatten_dict[tag] = father for child in father.children: add_to_dict('{}.{}'.format(tag, child.tag), child) for label in self.labels: add_to_dict(label.tag, label) return flatten_dict @property def instance_map(self): if self._instance_map is None: labels = [label for label in self.labels_flat_dict] labels.sort() # each label gets index as instance id self._instance_map = {label: (i_label + 1) for i_label, label in enumerate(labels)} return self._instance_map @instance_map.setter def instance_map(self, value: dict): """ instance mapping for creating instance mask :param value: dictionary {label: map_id} """ if not isinstance(value, dict): raise ValueError('input must be a dictionary of {lable_name: instance_id}') self._instance_map = value @property def ontology_ids(self): if self._ontology_ids is None: self._ontology_ids = list() if self.metadata is not None and 'system' in self.metadata and 'recipes' in self.metadata['system']: recipe_ids = self.get_recipe_ids() for rec_id in recipe_ids: recipe = self.recipes.get(recipe_id=rec_id) self._ontology_ids += recipe.ontology_ids return self._ontology_ids @_repositories.default def set_repositories(self): reps = namedtuple('repositories', field_names=['items', 'recipes', 'datasets', 'assignments', 'tasks', 'annotations', 'ontologies', 'features', 'settings']) if self._project is None: datasets = repositories.Datasets(client_api=self._client_api, project=self._project) features = repositories.Features(client_api=self._client_api, project=self._project) else: datasets = self._project.datasets features = self._project.features return reps( items=repositories.Items(client_api=self._client_api, dataset=self, datasets=datasets), recipes=repositories.Recipes(client_api=self._client_api, dataset=self), assignments=repositories.Assignments(project=self._project, client_api=self._client_api, dataset=self), tasks=repositories.Tasks(client_api=self._client_api, project=self._project, dataset=self), annotations=repositories.Annotations(client_api=self._client_api, dataset=self), datasets=datasets, ontologies=repositories.Ontologies(client_api=self._client_api, dataset=self), features=features, settings=repositories.Settings(client_api=self._client_api, dataset=self), ) @property def settings(self): assert isinstance(self._repositories.settings, repositories.Settings) return self._repositories.settings @property def items(self): assert isinstance(self._repositories.items, repositories.Items) return self._repositories.items @property def ontologies(self): assert isinstance(self._repositories.ontologies, repositories.Ontologies) return self._repositories.ontologies @property def recipes(self): assert isinstance(self._repositories.recipes, repositories.Recipes) return self._repositories.recipes @property def datasets(self): assert isinstance(self._repositories.datasets, repositories.Datasets) return self._repositories.datasets @property def assignments(self): assert isinstance(self._repositories.assignments, repositories.Assignments) return self._repositories.assignments @property def tasks(self): assert isinstance(self._repositories.tasks, repositories.Tasks) return self._repositories.tasks @property def annotations(self): assert isinstance(self._repositories.annotations, repositories.Annotations) return self._repositories.annotations @property def features(self): assert isinstance(self._repositories.features, repositories.Features) return self._repositories.features @property def project(self): if self._project is None: # get from cache project = self._client_api.state_io.get('project') if project is not None: # build entity from json p = entities.Project.from_json(_json=project, client_api=self._client_api) # check if dataset belongs to project if p.id in self.projects: self._project = p if self._project is None: self._project = repositories.Projects(client_api=self._client_api).get(project_id=self.projects[0], fetch=None) assert isinstance(self._project, entities.Project) return self._project @project.setter def project(self, project): if not isinstance(project, entities.Project): raise ValueError('Must input a valid Project entity') self._project = project @property def directory_tree(self): if self._directory_tree is None: self._directory_tree = self.project.datasets.directory_tree(dataset_id=self.id) assert isinstance(self._directory_tree, entities.DirectoryTree) return self._directory_tree def __copy__(self): return Dataset.from_json(_json=self.to_json(), project=self._project, client_api=self._client_api, is_fetched=self.is_fetched, datasets=self.datasets) def __get_local_path__(self): if self._project is not None: local_path = os.path.join(services.service_defaults.DATALOOP_PATH, 'projects', self.project.name, 'datasets', self.name) else: local_path = os.path.join(services.service_defaults.DATALOOP_PATH, 'datasets', '%s_%s' % (self.name, self.id)) return local_path
[docs] @staticmethod def serialize_labels(labels_dict): """ Convert hex color format to rgb :param labels_dict: dict of labels :return: dict of converted labels """ dataset_labels_dict = dict() for label, color in labels_dict.items(): dataset_labels_dict[label] = '#%02x%02x%02x' % color return dataset_labels_dict
[docs] def get_recipe_ids(self): """ Get dataset recipe Ids :return: list of recipe ids """ return self.metadata['system']['recipes']
[docs] def switch_recipe(self, recipe_id=None, recipe=None): """ Switch the recipe that linked to the dataset with the given one :param recipe_id: recipe id :param recipe: recipe entity :return: """ if recipe is None and recipe_id is None: raise exceptions.PlatformException('400', 'Must provide recipe or recipe_id') if recipe_id is None: if not isinstance(recipe, entities.Recipe): raise exceptions.PlatformException('400', 'Recipe must me entities.Recipe type') else: recipe_id = recipe.id # add recipe id to dataset metadata if 'system' not in self.metadata: self.metadata['system'] = dict() if 'recipes' not in self.metadata['system']: self.metadata['system']['recipes'] = list() self.metadata['system']['recipes'] = [recipe_id] self.update(system_metadata=True)
[docs] def delete(self, sure=False, really=False): """ Delete a dataset forever! :param bool sure: are you sure you want to delete? :param bool really: really really? :return: True is success :rtype: bool """ return self.datasets.delete(dataset_id=self.id, sure=sure, really=really)
[docs] def update(self, system_metadata=False): """ Update dataset field :param bool system_metadata: bool - True, if you want to change metadata system :return: Dataset object :rtype: dtlpy.entities.dataset.Dataset """ return self.datasets.update(dataset=self, system_metadata=system_metadata)
[docs] def set_readonly(self, state: bool): """ Set dataset readonly mode :param bool state: state """ if not isinstance(state, bool): raise exceptions.PlatformException( error='400', message='Argument "state" must be bool. input type: {}'.format(type(state))) return self.datasets.set_readonly(dataset=self, state=state)
[docs] def clone(self, clone_name, filters=None, with_items_annotations=True, with_metadata=True, with_task_annotations_status=True): """ Clone dataset :param clone_name: new dataset name :param dtlpy.entities.filters.Filters filters: Filters entity or a query dict :param with_items_annotations: clone all item's annotations :param with_metadata: clone metadata :param with_task_annotations_status: clone task annotations status :return: dataset object :rtype: dtlpy.entities.dataset.Dataset """ return self.datasets.clone(dataset_id=self.id, filters=filters, clone_name=clone_name, with_metadata=with_metadata, with_items_annotations=with_items_annotations, with_task_annotations_status=with_task_annotations_status)
[docs] def sync(self, wait=True): """ Sync dataset with external storage :param wait: wait the command to finish :return: True if success :rtype: bool """ return self.datasets.sync(dataset_id=self.id, wait=wait)
[docs] def download_annotations(self, local_path=None, filters=None, annotation_options: ViewAnnotationOptions = None, annotation_filters=None, overwrite=False, thickness=1, with_text=False, remote_path=None, include_annotations_in_output=True, export_png_files=False, filter_output_annotations=False, alpha=None ): """ Download dataset by filters. Filtering the dataset for items and save them local Optional - also download annotation, mask, instance and image mask of the item :param local_path: local folder or filename to save to. :param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters :param annotation_options: download annotations options: list(dl.ViewAnnotationOptions) :param annotation_filters: Filters entity to filter annotations for download :param overwrite: optional - default = False :param thickness: optional - line thickness, if -1 annotation will be filled, default =1 :param with_text: optional - add text to annotations, default = False :param remote_path: DEPRECATED and ignored. use filters :param include_annotations_in_output: default - False , if export should contain annotations :param export_png_files: default - if True, semantic annotations should be exported as png files :param filter_output_annotations: default - False, given an export by filter - determine if to filter out annotations :param alpha: opacity value [0 1], default 1 :return: `List` of local_path per each downloaded item """ return self.datasets.download_annotations( dataset=self, local_path=local_path, overwrite=overwrite, filters=filters, annotation_options=annotation_options, annotation_filters=annotation_filters, thickness=thickness, with_text=with_text, remote_path=remote_path, include_annotations_in_output=include_annotations_in_output, export_png_files=export_png_files, filter_output_annotations=filter_output_annotations, alpha=alpha )
[docs] def upload_annotations(self, local_path, filters=None, clean=False, remote_root_path='/' ): """ Upload annotations to dataset. :param local_path: str - local folder where the annotations files is. :param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters :param clean: bool - if True it remove the old annotations :param remote_root_path: str - the remote root path to match remote and local items For example, if the item filepath is a/b/item and remote_root_path is /a the start folder will be b instead of a """ return self.datasets.upload_annotations( dataset=self, local_path=local_path, filters=filters, clean=clean, remote_root_path=remote_root_path )
[docs] def checkout(self): """ Checkout the dataset """ self.datasets.checkout(dataset=self)
[docs] def open_in_web(self): """ Open the dataset in web platform """ self._client_api._open_in_web(url=self.platform_url)
[docs] def add_label(self, label_name, color=None, children=None, attributes=None, display_label=None, label=None, recipe_id=None, ontology_id=None, icon_path=None): """ Add single label to dataset :param label_name: str - label name :param color: color :param children: children (sub labels) :param attributes: attributes :param display_label: display_label :param label: label :param recipe_id: optional recipe id :param ontology_id: optional ontology id :param icon_path: path to image to be display on label :return: label entity """ # get recipe if recipe_id is None: recipe_id = self.get_recipe_ids()[0] recipe = self.recipes.get(recipe_id=recipe_id) # get ontology if ontology_id is None: ontology_id = recipe.ontology_ids[0] ontology = recipe.ontologies.get(ontology_id=ontology_id) # ontology._dataset = self # add label added_label = ontology.add_label(label_name=label_name, color=color, children=children, attributes=attributes, display_label=display_label, label=label, update_ontology=True, icon_path=icon_path) return added_label
[docs] def add_labels(self, label_list, ontology_id=None, recipe_id=None): """ Add labels to dataset :param label_list: label list :param ontology_id: optional ontology id :param recipe_id: optional recipe id :return: label entities """ # get recipe if recipe_id is None: recipe_id = self.get_recipe_ids()[0] recipe = self.recipes.get(recipe_id=recipe_id) # get ontology if ontology_id is None: ontology_id = recipe.ontology_ids[0] ontology = recipe.ontologies.get(ontology_id=ontology_id) # add labels to ontology added_labels = ontology.add_labels(label_list=label_list, update_ontology=True) return added_labels
[docs] def update_label(self, label_name, color=None, children=None, attributes=None, display_label=None, label=None, recipe_id=None, ontology_id=None, upsert=False, icon_path=None): """ Add single label to dataset :param label_name: label name :param color: color :param children: children (sub labels) :param attributes: attributes :param display_label: display label :param label: label :param recipe_id: optional recipe id :param ontology_id: optional ontology id :param upsert: if True will add in case it does not existing :param icon_path: path to image to be display on label :return: label entity """ # get recipe if recipe_id is None: recipe_id = self.get_recipe_ids()[0] recipe = self.recipes.get(recipe_id=recipe_id) # get ontology if ontology_id is None: ontology_id = recipe.ontology_ids[0] ontology = recipe.ontologies.get(ontology_id=ontology_id) # add label added_label = ontology.update_label(label_name=label_name, color=color, children=children, attributes=attributes, display_label=display_label, label=label, update_ontology=True, upsert=upsert, icon_path=icon_path) return added_label
[docs] def update_labels(self, label_list, ontology_id=None, recipe_id=None, upsert=False): """ Add labels to dataset :param label_list: label list :param ontology_id: optional ontology id :param recipe_id: optional recipe id :param upsert: if True will add in case it does not existing :return: label entities """ # get recipe if recipe_id is None: recipe_id = self.get_recipe_ids()[0] recipe = self.recipes.get(recipe_id=recipe_id) # get ontology if ontology_id is None: ontology_id = recipe.ontology_ids[0] ontology = recipe.ontologies.get(ontology_id=ontology_id) # add labels to ontology added_labels = ontology.update_labels(label_list=label_list, update_ontology=True, upsert=upsert) return added_labels
[docs] def download( self, filters=None, local_path=None, file_types=None, annotation_options: ViewAnnotationOptions = None, annotation_filters=None, overwrite=False, to_items_folder=True, thickness=1, with_text=False, without_relative_path=None, alpha=None ): """ Download dataset by filters. Filtering the dataset for items and save them local Optional - also download annotation, mask, instance and image mask of the item :param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters :param local_path: local folder or filename to save to. :param file_types: a list of file type to download. e.g ['video/webm', 'video/mp4', 'image/jpeg', 'image/png'] :param annotation_options: download annotations options: list(dl.ViewAnnotationOptions) not relevant for JSON option :param annotation_filters: Filters entity to filter annotations for download not relevant for JSON option :param overwrite: optional - default = False :param to_items_folder: Create 'items' folder and download items to it :param thickness: optional - line thickness, if -1 annotation will be filled, default =1 :param with_text: optional - add text to annotations, default = False :param without_relative_path: string - remote path - download items without the relative path from platform :param alpha: opacity value [0 1], default 1 :return: `List` of local_path per each downloaded item """ return self.items.download(filters=filters, local_path=local_path, file_types=file_types, annotation_options=annotation_options, annotation_filters=annotation_filters, overwrite=overwrite, to_items_folder=to_items_folder, thickness=thickness, with_text=with_text, without_relative_path=without_relative_path, alpha=alpha)
[docs] def delete_labels(self, label_names): """ Delete labels from dataset's ontologies :param label_names: label object/ label name / list of label objects / list of label names :return: """ for recipe in self.recipes.list(): for ontology in recipe.ontologies.list(): ontology.delete_labels(label_names=label_names) self._labels = None
[docs] def download_partition(self, partition, local_path=None, filters=None, annotation_options=None): """ Download a specific partition of the dataset to local_path This function is commonly used with dl.ModelAdapter which implements thc convert to specific model structure :param partition: `dl.SnapshotPartitionType` name of the partition :param local_path: local path directory to download the data :param dtlpy.entities.filters.Filters filters: dl.entities.Filters to add the specific partitions constraint to :return List `str` of the new downloaded path of each item """ if local_path is None: local_path = os.getcwd() if filters is None: filters = entities.Filters(resource=entities.FiltersResource.ITEM) if annotation_options is None: annotation_options = entities.ViewAnnotationOptions.JSON if partition == 'all': # TODO: should it be all or None (all != list(SnapshotPartitions) ) logger.info("downloading all items - even without partitions") else: filters.add(field='metadata.system.snapshotPartition', values=partition) return self.items.download(filters=filters, local_path=local_path, annotation_options=annotation_options)
[docs] def set_partition(self, partition, filters=None): """ Updates all items returned by filters in the dataset to specific partition :param partition: `dl.entities.SnapshotPartitionType` to set to :param dtlpy.entities.filters.Filters filters: dl.entities.Filters to add the specific partitions constraint to :return: dl.PagedEntities """ if filters is None: filters = entities.Filters(resource=entities.FiltersResource.ITEM) # TODO: How to preform update using the Filter - where do i set the field - docstring should state dict key-val while arg name is only values.... return self.items.update(filters=filters, system_update_values={'snapshotPartition': partition}, system_metadata=True)
[docs] def get_partitions(self, partitions, filters=None, batch_size: int = None): """ Returns PagedEntity of items from one or more partitions :param partitions: `dl.entities.SnapshotPartitionType` or a list. Name of the partitions :param dtlpy.entities.filters.Filters filters: dl.Filters to add the specific partitions constraint to :param batch_size: `int` how many items per page :return: `dl.PagedEntities` of `dl.Item` preforms items.list() """ # Question: do we have to give a partition? how do we get in case no partition is defined? if isinstance(partitions, str): partitions = [partitions] if filters is None: filters = entities.Filters(resource=entities.FiltersResource.ITEM) if partitions == 'all': logger.info("downloading all items - even without partitions") else: filters.add(field='metadata.system.snapshotPartition', values=partitions, operator=entities.FiltersOperations.IN) return self.items.list(filters=filters, page_size=batch_size)