Source code for dtlpy.repositories.datasets

"""
Datasets Repository
"""

import copy
import json
import logging
import os
import sys
import tempfile
import time
import zipfile
from pathlib import Path
from typing import Generator, Optional, Union

import tqdm

from .. import _api_reference, entities, exceptions, miscellaneous, PlatformException, repositories, services
from ..entities.dataset import ExportType, OutputExportType
from ..services import service_defaults
from ..services.api_client import ApiClient

logger = logging.getLogger(name='dtlpy')

MAX_ITEMS_PER_SUBSET = 50000
DOWNLOAD_ANNOTATIONS_MAX_ITEMS_PER_SUBSET = 1000

[docs]class Datasets: """ Datasets Repository The Datasets class allows the user to manage datasets. Read more about datasets in our `documentation <https://dataloop.ai/docs/dataset>`_ and `SDK documentation <https://developers.dataloop.ai/tutorials/data_management/manage_datasets/chapter/>`_. """ def __init__(self, client_api: ApiClient, project: entities.Project = None): self._client_api = client_api self._project = project ############ # entities # ############ @property def project(self) -> entities.Project: if self._project is None: # try get checkout project = self._client_api.state_io.get('project') if project is not None: self._project = entities.Project.from_json(_json=project, client_api=self._client_api) if self._project is None: raise exceptions.PlatformException( error='2001', message='Cannot perform action WITHOUT Project entity in Datasets repository.' ' Please checkout or set a project') assert isinstance(self._project, entities.Project) return self._project @project.setter def project(self, project: entities.Project): if not isinstance(project, entities.Project): raise ValueError('Must input a valid Project entity') self._project = project ########### # methods # ########### def __get_from_cache(self) -> entities.Dataset: dataset = self._client_api.state_io.get('dataset') if dataset is not None: dataset = entities.Dataset.from_json(_json=dataset, client_api=self._client_api, datasets=self, project=self._project) return dataset def __get_by_id(self, dataset_id) -> entities.Dataset: success, response = self._client_api.gen_request(req_type='get', path='/datasets/{}'.format(dataset_id)) if dataset_id is None or dataset_id == '': raise exceptions.PlatformException('400', 'Please checkout a dataset') if success: dataset = entities.Dataset.from_json(client_api=self._client_api, _json=response.json(), datasets=self, project=self._project) else: raise exceptions.PlatformException(response) return dataset def __get_by_identifier(self, identifier=None) -> entities.Dataset: datasets = self.list() datasets_by_name = [dataset for dataset in datasets if identifier in dataset.name or identifier in dataset.id] if len(datasets_by_name) == 1: return datasets_by_name[0] elif len(datasets_by_name) > 1: raise Exception('Multiple datasets with this name exist') else: raise Exception("Dataset not found") def _bulid_folder_filter(self, folder_path, filters=None): if filters is None: filters = entities.Filters() filters._user_query = 'false' if not folder_path.startswith('/'): folder_path = '/' + folder_path filters.add(field='dir', values=folder_path, method=entities.FiltersMethod.OR) if not folder_path.endswith('*'): if not folder_path.endswith('/'): folder_path += '/' filters.add(field='dir', values=folder_path + '*', method=entities.FiltersMethod.OR) return filters def _get_binaries_dataset(self): filters = entities.Filters(resource=entities.FiltersResource.DATASET) filters.add(field='name', values='Binaries') filters.system_space = True datasets = self.list(filters=filters) if len(datasets) == 0: # empty list raise exceptions.PlatformException('404', 'Dataset not found. Name: "Binaries"') # dataset = None elif len(datasets) > 1: raise exceptions.PlatformException('400', 'More than one dataset with same name.') else: dataset = datasets[0] return dataset def _resolve_dataset_id(self, dataset, dataset_name, dataset_id): if dataset is None and dataset_name is None and dataset_id is None: raise ValueError('Must provide dataset, dataset name or dataset id') if dataset_id is None: if dataset is None: dataset = self.get(dataset_name=dataset_name) dataset_id = dataset.id return dataset_id @staticmethod def _save_item_json_file(item_data, base_path: Path, export_version=None): """ Save a single item's JSON data to a file, creating the directory structure as needed. :param dict item_data: The item data dictionary (must have 'filename' key) :param Path base_path: Base directory path where JSON files should be saved :param entities.ExportVersion export_version: Optional export version (V1 or V2) affecting filename handling :return: Path to the saved JSON file :rtype: Path """ # Get filename and remove leading slash filename = item_data.get('filename', '') if not filename: raise ValueError("item_data must have a 'filename' key") filename = filename.lstrip('/') # Determine relative JSON path based on export version if export_version == entities.ExportVersion.V1: # V1: Replace extension with .json (e.g., "file.jpg" -> "file.json") rel_json_path = str(Path(filename).with_suffix('.json')) elif export_version == entities.ExportVersion.V2: # V2: Append .json (e.g., "file.jpg" -> "file.jpg.json") rel_json_path = filename + '.json' else: # Default/None: Replace extension with .json (backward compatible with section 1) rel_json_path = os.path.splitext(filename)[0] + '.json' # Remove leading slash if present if rel_json_path.startswith('/'): rel_json_path = rel_json_path[1:] # Build output path out_path = base_path / rel_json_path # Create parent directories out_path.parent.mkdir(parents=True, exist_ok=True) # Write JSON file try: with open(out_path, 'w') as outf: json.dump(item_data, outf, indent=2) except Exception: logger.exception(f'Failed writing export item JSON to {out_path}') raise return out_path @staticmethod def _build_payload(filters, include_feature_vectors, include_annotations, export_type, annotation_filters, feature_vector_filters, dataset_lock, lock_timeout_sec, export_summary): valid_list = [e.value for e in entities.ExportType] valid_types = ', '.join(valid_list) if export_type not in ['json', 'zip']: raise ValueError('export_type must be one of the following: {}'.format(valid_types)) payload = {'exportType': export_type} if filters is None: filters = entities.Filters() if isinstance(filters, entities.Filters): payload['itemsQuery'] = {'filter': filters.prepare()['filter'], 'join': filters.prepare().get("join", {})} elif isinstance(filters, dict): payload['itemsQuery'] = filters else: raise exceptions.BadRequest(message='filters must be of type dict or Filters', status_code=500) payload['itemsVectorQuery'] = {} if include_feature_vectors: payload['includeItemVectors'] = True payload['itemsVectorQuery']['select'] = {"datasetId": 1, 'featureSetId': 1, 'value': 1} if feature_vector_filters is not None: payload['itemsVectorQuery']['filter'] = feature_vector_filters.prepare()['filter'] payload['annotations'] = {"include": include_annotations, "convertSemantic": False} if annotation_filters is not None: payload['annotationsQuery'] = annotation_filters.prepare() if dataset_lock: payload['datasetLock'] = dataset_lock if export_summary: payload['summary'] = export_summary if lock_timeout_sec: payload['lockTimeoutSec'] = lock_timeout_sec return payload def _download_exported_item(self, item_id, export_type, local_path=None, unzip=True): logger.debug(f"start downloading exported item {item_id} with export_type {export_type} and local_path {local_path} and unzip {unzip}") export_item = repositories.Items(client_api=self._client_api).get(item_id=item_id) export_item_path = export_item.download(local_path=local_path) # Common validation check for both JSON and other export types if isinstance(export_item_path, list) or not os.path.isfile(export_item_path): raise exceptions.PlatformException( error='404', message='error downloading annotation zip file. see above for more information. item id: {!r}'.format( export_item.id)) result = None if unzip is False or export_type == entities.ExportType.JSON: result = export_item_path else: try: miscellaneous.Zipping.unzip_directory(zip_filename=export_item_path, to_directory=local_path) result = local_path except Exception as e: logger.warning("Failed to extract zip file error: {}".format(e)) finally: # cleanup only for zip files to avoid removing needed results if isinstance(export_item_path, str) and os.path.isfile(export_item_path): os.remove(export_item_path) logger.debug(f"end downloading, result {result}") return result @property def platform_url(self): return self._client_api._get_resource_url("projects/{}/datasets".format(self.project.id))
[docs] def open_in_web(self, dataset_name: str = None, dataset_id: str = None, dataset: entities.Dataset = None): """ Open the dataset in web platform. **Prerequisites**: You must be an *owner* or *developer* to use this method. :param str dataset_name: The Name of the dataset :param str dataset_id: The Id of the dataset :param dtlpy.entities.dataset.Dataset dataset: dataset object **Example**: .. code-block:: python project.datasets.open_in_web(dataset_id='dataset_id') """ if dataset_name is not None: dataset = self.get(dataset_name=dataset_name) if dataset is not None: dataset.open_in_web() elif dataset_id is not None: self._client_api._open_in_web(url=f'{self.platform_url}/{dataset_id}/items') else: self._client_api._open_in_web(url=self.platform_url)
[docs] def checkout(self, identifier: str = None, dataset_name: str = None, dataset_id: str = None, dataset: entities.Dataset = None): """ Checkout (switch) to a dataset to work on it. **Prerequisites**: You must be an *owner* or *developer* to use this method. You must provide at least ONE of the following params: dataset_id, dataset_name. :param str identifier: project name or partial id that you wish to switch :param str dataset_name: The Name of the dataset :param str dataset_id: The Id of the dataset :param dtlpy.entities.dataset.Dataset dataset: dataset object **Example**: .. code-block:: python project.datasets.checkout(dataset_id='dataset_id') """ if dataset is None: if dataset_id is not None or dataset_name is not None: try: dataset = self.project.datasets.get(dataset_name=dataset_name, dataset_id=dataset_id) except exceptions.MissingEntity: dataset = self.get(dataset_id=dataset_id, dataset_name=dataset_name) elif identifier is not None: dataset = self.__get_by_identifier(identifier=identifier) else: raise exceptions.PlatformException(error='400', message='Must provide partial/full id/name to checkout') self._client_api.state_io.put('dataset', dataset.to_json()) logger.info('Checked out to dataset {}'.format(dataset.name))
[docs] @_api_reference.add(path='/datasets/query', method='post') def list(self, name=None, creator=None, filters: entities.Filters = None) -> miscellaneous.List[entities.Dataset]: """ List all datasets. **Prerequisites**: You must be an *owner* or *developer* to use this method. :param str name: list by name :param str creator: list by :param dtlpy.entities.filters.Filters filters: Filters entity containing filters parameters :return: List of datasets :rtype: list **Example**: .. code-block:: python filters = dl.Filters(resource='datasets') filters.add(field='readonly', values=False) datasets = project.datasets.list(filters=filters) """ if filters is None: filters = entities.Filters(resource=entities.FiltersResource.DATASET) # assert type filters elif not isinstance(filters, entities.Filters): raise exceptions.PlatformException(error='400', message='Unknown filters type: {!r}'.format(type(filters))) if filters.resource != entities.FiltersResource.DATASET: raise exceptions.PlatformException( error='400', message='Filters resource must to be FiltersResource.DATASET. Got: {!r}'.format(filters.resource)) url = '/datasets/query' if name is not None: filters.add(field='name', values=name) if creator is not None: filters.add(field='creator', values=creator) if self._project is not None: filters.context = {"projects": [self._project.id]} filters.page_size = 1000 filters.page = 0 datasets = list() while True: success, response = self._client_api.gen_request(req_type='POST', json_req=filters.prepare(), path=url, headers={'user_query': filters._user_query}) if success: pool = self._client_api.thread_pools('entity.create') datasets_json = response.json()['items'] jobs = [None for _ in range(len(datasets_json))] # return triggers list for i_dataset, dataset in enumerate(datasets_json): jobs[i_dataset] = pool.submit(entities.Dataset._protected_from_json, **{'client_api': self._client_api, '_json': dataset, 'datasets': self, 'project': self.project}) # get all results results = [j.result() for j in jobs] # log errors _ = [logger.warning(r[1]) for r in results if r[0] is False] # return good jobs datasets.extend([r[1] for r in results if r[0] is True]) if response.json()['hasNextPage'] is True: filters.page += 1 else: break else: raise exceptions.PlatformException(response) datasets = miscellaneous.List(datasets) return datasets
[docs] @_api_reference.add(path='/datasets/{id}', method='get') def get(self, dataset_name: str = None, dataset_id: str = None, checkout: bool = False, fetch: bool = None ) -> entities.Dataset: """ Get dataset by name or id. **Prerequisites**: You must be an *owner* or *developer* to use this method. You must provide at least ONE of the following params: dataset_id, dataset_name. :param str dataset_name: optional - search by name :param str dataset_id: optional - search by id :param bool checkout: set the dataset as a default dataset object (cookies) :param bool fetch: optional - fetch entity from platform (True), default taken from cookie :return: Dataset object :rtype: dtlpy.entities.dataset.Dataset **Example**: .. code-block:: python dataset = project.datasets.get(dataset_id='dataset_id') """ if fetch is None: fetch = self._client_api.fetch_entities if dataset_id is None and dataset_name is None: dataset = self.__get_from_cache() if dataset is None: raise exceptions.PlatformException( error='400', message='No checked-out Dataset was found, must checkout or provide an identifier in inputs') elif fetch: if dataset_id is not None and dataset_id != '': dataset = self.__get_by_id(dataset_id) # verify input dataset name is same as the given id if dataset_name is not None and dataset.name != dataset_name: logger.warning( "Mismatch found in datasets.get: dataset_name is different then dataset.name: " "{!r} != {!r}".format( dataset_name, dataset.name)) elif dataset_name is not None: datasets = self.list(name=dataset_name) if not datasets: # empty list raise exceptions.PlatformException('404', 'Dataset not found. Name: {!r}'.format(dataset_name)) # dataset = None elif len(datasets) > 1: raise exceptions.PlatformException('400', 'More than one dataset with same name.') else: dataset = datasets[0] else: raise exceptions.PlatformException( error='404', message='No input and no checked-out found') else: dataset = entities.Dataset.from_json(_json={'id': dataset_id, 'name': dataset_id}, client_api=self._client_api, datasets=self, project=self._project, is_fetched=False) assert isinstance(dataset, entities.Dataset) if checkout: self.checkout(dataset=dataset) return dataset
[docs] @_api_reference.add(path='/datasets/{id}', method='delete') def delete(self, dataset_name: str = None, dataset_id: str = None, sure: bool = False, really: bool = False): """ Delete a dataset forever! **Prerequisites**: You must be an *owner* or *developer* to use this method. **Example**: .. code-block:: python is_deleted = project.datasets.delete(dataset_id='dataset_id', sure=True, really=True) :param str dataset_name: optional - search by name :param str dataset_id: optional - search by id :param bool sure: Are you sure you want to delete? :param bool really: Really really sure? :return: True is success :rtype: bool """ if sure and really: dataset = self.get(dataset_name=dataset_name, dataset_id=dataset_id) success, response = self._client_api.gen_request(req_type='delete', path='/datasets/{}'.format(dataset.id)) if not success: raise exceptions.PlatformException(response) logger.info('Dataset {!r} was deleted successfully'.format(dataset.name)) return True else: raise exceptions.PlatformException( error='403', message='Cant delete dataset from SDK. Please login to platform to delete')
[docs] @_api_reference.add(path='/datasets/{id}', method='patch') def update(self, dataset: entities.Dataset, system_metadata: bool = False, patch: dict = None ) -> entities.Dataset: """ Update dataset field. **Prerequisites**: You must be an *owner* or *developer* to use this method. :param dtlpy.entities.dataset.Dataset dataset: dataset object :param bool system_metadata: True, if you want to change metadata system :param dict patch: Specific patch request :return: Dataset object :rtype: dtlpy.entities.dataset.Dataset **Example**: .. code-block:: python dataset = project.datasets.update(dataset='dataset_entity') """ url_path = '/datasets/{}'.format(dataset.id) if system_metadata: url_path += '?system=true' if patch is None: patch = dataset.to_json() success, response = self._client_api.gen_request(req_type='patch', path=url_path, json_req=patch) if success: logger.info('Dataset was updated successfully') return dataset else: raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/datasets/{id}/unlock', method='patch') def unlock(self, dataset: entities.Dataset ) -> entities.Dataset: """ Unlock dataset. **Prerequisites**: You must be an *owner* or *developer* to use this method. :param dtlpy.entities.dataset.Dataset dataset: dataset object :return: Dataset object :rtype: dtlpy.entities.dataset.Dataset **Example**: .. code-block:: python dataset = project.datasets.unlock(dataset='dataset_entity') """ url_path = '/datasets/{}/unlock'.format(dataset.id) success, response = self._client_api.gen_request(req_type='patch', path=url_path) if success: logger.info('Dataset was unlocked successfully') return dataset else: raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/datasets/{id}/directoryTree', method='get') def directory_tree(self, dataset: entities.Dataset = None, dataset_name: str = None, dataset_id: str = None): """ Get dataset's directory tree. **Prerequisites**: You must be an *owner* or *developer* to use this method. You must provide at least ONE of the following params: dataset, dataset_name, dataset_id. :param dtlpy.entities.dataset.Dataset dataset: dataset object :param str dataset_name: The Name of the dataset :param str dataset_id: The Id of the dataset :return: DirectoryTree **Example**: .. code-block:: python directory_tree = dataset.directory_tree directory_tree = project.datasets.directory_tree(dataset='dataset_entity') """ dataset_id = self._resolve_dataset_id(dataset, dataset_name, dataset_id) url_path = '/datasets/{}/directoryTree'.format(dataset_id) success, response = self._client_api.gen_request(req_type='get', path=url_path) if success: return entities.DirectoryTree(_json=response.json()) else: raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/datasets/{id}/clone', method='post') def clone(self, dataset_id: str, clone_name: str = None, filters: entities.Filters = None, with_items_annotations: bool = True, with_metadata: bool = True, with_task_annotations_status: bool = True, dst_dataset_id: str = None, target_directory: str = None): """ Clone a dataset. Read more about cloning datatsets and items in our `documentation <https://dataloop.ai/docs/clone-merge-dataset#cloned-dataset>`_ and `SDK documentation <https://developers.dataloop.ai/tutorials/data_management/data_versioning/chapter/>`_. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str dataset_id: id of the dataset you wish to clone :param str clone_name: new dataset name :param dtlpy.entities.filters.Filters filters: Filters entity or a query dict :param bool with_items_annotations: true to clone with items annotations :param bool with_metadata: true to clone with metadata :param bool with_task_annotations_status: true to clone with task annotations' status :param str dst_dataset_id: destination dataset id :param str target_directory: target directory :return: dataset object :rtype: dtlpy.entities.dataset.Dataset **Example**: .. code-block:: python dataset = project.datasets.clone(dataset_id='dataset_id', clone_name='dataset_clone_name', with_metadata=True, with_items_annotations=False, with_task_annotations_status=False) """ if clone_name is None and dst_dataset_id is None: raise exceptions.PlatformException('400', 'Must provide clone name or destination dataset id') if filters is None: filters = entities.Filters() filters._user_query = 'false' elif not isinstance(filters, entities.Filters): raise exceptions.PlatformException( error='400', message='"filters" must be a dl.Filters entity. got: {!r}'.format(type(filters))) copy_filters = copy.deepcopy(filters) if copy_filters.has_field('hidden'): copy_filters.pop('hidden') if target_directory is not None and not target_directory.startswith('/'): target_directory = '/' + target_directory payload = { "name": clone_name, "filter": copy_filters.prepare(), "cloneDatasetParams": { "withItemsAnnotations": with_items_annotations, "withMetadata": with_metadata, "withTaskAnnotationsStatus": with_task_annotations_status, "targetDirectory": target_directory } } if dst_dataset_id is not None: payload['cloneDatasetParams']['targetDatasetId'] = dst_dataset_id success, response = self._client_api.gen_request(req_type='post', path='/datasets/{}/clone'.format(dataset_id), json_req=payload, headers={'user_query': filters._user_query}) if not success: raise exceptions.PlatformException(response) command = entities.Command.from_json(_json=response.json(), client_api=self._client_api) command = command.wait() if 'returnedModelId' not in command.spec: raise exceptions.PlatformException(error='400', message="returnedModelId key is missing in command response: {!r}" .format(response)) return self.get(dataset_id=command.spec['returnedModelId'])
def _export_recursive( self, dataset: entities.Dataset = None, dataset_name: str = None, dataset_id: str = None, local_path: str = None, filters: Union[dict, entities.Filters] = None, annotation_filters: entities.Filters = None, feature_vector_filters: entities.Filters = None, include_feature_vectors: bool = False, include_annotations: bool = False, timeout: int = 0, dataset_lock: bool = False, lock_timeout_sec: int = None, export_summary: bool = False, max_items_per_subset: int = MAX_ITEMS_PER_SUBSET, export_type: ExportType = ExportType.JSON, output_export_type: OutputExportType = OutputExportType.JSON, ) -> Generator[str, None, None]: """ Export dataset items recursively by splitting large datasets into smaller subsets. Args: dataset (entities.Dataset, optional): Dataset entity to export dataset_name (str, optional): Name of the dataset to export dataset_id (str, optional): ID of the dataset to export local_path (str, optional): Local path to save the exported data filters (Union[dict, entities.Filters], optional): Filters to apply on the items annotation_filters (entities.Filters, optional): Filters to apply on the annotations feature_vector_filters (entities.Filters, optional): Filters to apply on the feature vectors include_feature_vectors (bool, optional): Whether to include feature vectors in export. Defaults to False include_annotations (bool, optional): Whether to include annotations in export. Defaults to False timeout (int, optional): Timeout in seconds for the export operation. Defaults to 0 dataset_lock (bool, optional): Whether to lock the dataset during export. Defaults to False lock_timeout_sec (int, optional): Timeout for dataset lock in seconds. Defaults to None export_summary (bool, optional): Whether to include export summary. Defaults to False max_items_per_subset (int, optional): Maximum items per subset for recursive export. Defaults to MAX_ITEMS_PER_SUBSET export_type (ExportType, optional): Type of export (JSON or ZIP). Defaults to ExportType.JSON output_export_type (OutputExportType, optional): Output format type. Defaults to OutputExportType.JSON Returns: Generator[str, None, None]: Generator yielding export paths Raises: NotImplementedError: If ZIP export type is used with JSON output type exceptions.PlatformException: If API request fails or command response is invalid """ logger.debug(f"exporting dataset with export_type {export_type} and output_export_type {output_export_type}") if export_type == ExportType.ZIP and output_export_type == OutputExportType.JSON: raise NotImplementedError( "Zip export type is not supported for JSON output type.\n" "If Json output is required, please use the export_type = JSON" ) # Get dataset entity for recursive filtering dataset_entity = self.get(dataset_id=self._resolve_dataset_id(dataset, dataset_name, dataset_id)) if export_type != ExportType.JSON: filters_list = [filters] else: # Generate filter subsets using recursive_get_filters filters_list = entities.Filters._get_split_filters( dataset=dataset_entity, filters=filters, max_items=max_items_per_subset ) # First loop: Make all API requests without waiting commands = [] logger.debug("start making all API requests without waiting") for filter_i in filters_list: # Build payload for this subset payload = self._build_payload( filters=filter_i, include_feature_vectors=include_feature_vectors, include_annotations=include_annotations, export_type=export_type, annotation_filters=annotation_filters, feature_vector_filters=feature_vector_filters, dataset_lock=dataset_lock, lock_timeout_sec=lock_timeout_sec, export_summary=export_summary, ) # Make API request for this subset success, response = self._client_api.gen_request( req_type='post', path=f'/datasets/{dataset_entity.id}/export', json_req=payload ) if not success: logger.error(f"failed to make API request /datasets/{dataset_entity.id}/export with payload {payload} response {response}") raise exceptions.PlatformException(response) # Handle command execution commands.append( entities.Command.from_json(_json=response.json(), client_api=self._client_api)) time.sleep(2) # as the command have wrong progress in the beginning logger.debug("start waiting for all commands") # Second loop: Wait for all commands and process results for command in commands: command = command.wait(timeout=timeout) if 'outputItemId' not in command.spec: raise exceptions.PlatformException( error='400', message="outputItemId key is missing in command response" ) item_id = command.spec['outputItemId'] # Download and process the exported item yield self._download_exported_item( item_id=item_id, export_type=export_type, local_path=local_path, unzip=output_export_type != OutputExportType.ZIP, )
[docs] @_api_reference.add(path='/datasets/{id}/export', method='post') def export( self, dataset: entities.Dataset = None, dataset_name: str = None, dataset_id: str = None, local_path: str = None, filters: Union[dict, entities.Filters] = None, annotation_filters: entities.Filters = None, feature_vector_filters: entities.Filters = None, include_feature_vectors: bool = False, include_annotations: bool = False, export_type: ExportType = ExportType.JSON, timeout: int = 0, dataset_lock: bool = False, lock_timeout_sec: int = None, export_summary: bool = False, output_export_type: OutputExportType = None, ) -> Optional[str]: """ Export dataset items and annotations. **Prerequisites**: You must be an *owner* or *developer* to use this method. You must provide at least ONE of the following params: dataset, dataset_name, dataset_id. **Export Behavior by Parameter Combination:** The behavior of this method depends on the combination of `export_type` and `output_export_type`: **When export_type = ExportType.JSON:** - **output_export_type = OutputExportType.JSON (default when None):** - Exports data in JSON format, split into subsets of max 500 items - Downloads all subset JSON files and concatenates them into a single `result.json` file - Returns the path to the concatenated JSON file - Cleans up individual subset files after concatenation - **output_export_type = OutputExportType.ZIP:** - Same as JSON export, but zips the final `result.json` file - Returns the path to the zipped file (`result.json.zip`) - Cleans up the unzipped JSON file after zipping - **output_export_type = OutputExportType.FOLDERS:** - Exports data in JSON format, split into subsets of max 500 items - Downloads all subset JSON files and creates individual JSON files for each item - Creates a folder structure mirroring the remote dataset structure - Returns the path to the base directory containing the folder structure - Each item gets its own JSON file named after the original filename **When export_type = ExportType.ZIP:** - **output_export_type = OutputExportType.ZIP:** - Exports data as a ZIP file containing the dataset - Returns the downloaded ZIP item directly - No additional processing or concatenation - **output_export_type = OutputExportType.JSON:** - **NOT SUPPORTED** - Raises NotImplementedError - Use export_type=ExportType.JSON instead for JSON output - **output_export_type = OutputExportType.FOLDERS:** - **NOT SUPPORTED** - Raises NotImplementedError - Use export_type=ExportType.JSON instead for folder output **When output_export_type = None (legacy behavior):** - Defaults to OutputExportType.JSON - Maintains backward compatibility with existing code :param dtlpy.entities.dataset.Dataset dataset: Dataset object :param str dataset_name: The name of the dataset :param str dataset_id: The ID of the dataset :param str local_path: Local path to save the exported dataset :param Union[dict, dtlpy.entities.filters.Filters] filters: Filters entity or a query dictionary :param dtlpy.entities.filters.Filters annotation_filters: Filters entity to filter annotations for export :param dtlpy.entities.filters.Filters feature_vector_filters: Filters entity to filter feature vectors for export :param bool include_feature_vectors: Include item feature vectors in the export :param bool include_annotations: Include item annotations in the export :param bool dataset_lock: Make dataset readonly during the export :param bool export_summary: Get Summary of the dataset export :param int lock_timeout_sec: Timeout for locking the dataset during export in seconds :param entities.ExportType export_type: Type of export ('json' or 'zip') :param entities.OutputExportType output_export_type: Output format ('json', 'zip', or 'folders'). If None, defaults to 'json' :param int timeout: Maximum time in seconds to wait for the export to complete :return: Path to exported file/directory, or None if export result is empty :rtype: Optional[str] """ export_result = list( self._export_recursive( dataset=dataset, dataset_name=dataset_name, dataset_id=dataset_id, local_path=local_path, filters=filters, annotation_filters=annotation_filters, feature_vector_filters=feature_vector_filters, include_feature_vectors=include_feature_vectors, include_annotations=include_annotations, timeout=timeout, dataset_lock=dataset_lock, lock_timeout_sec=lock_timeout_sec, export_summary=export_summary, export_type=export_type, output_export_type=output_export_type, ) ) if all(x is None for x in export_result): logger.error("export result is empty") return None if export_type == ExportType.ZIP: # if export type is zip, then return the _export_recursive result as it return export_result[0] # if user didn't provide output_export_type, keep the previous behavior if output_export_type is None: output_export_type = OutputExportType.JSON # export type is jsos : # Load all items from subset JSON files and clean them up all_items = [] logger.debug("start loading all items from subset JSON files") for json_file in export_result: if json_file is None: continue if os.path.isfile(json_file): with open(json_file, 'r') as f: items = json.load(f) if isinstance(items, list): all_items.extend(items) os.remove(json_file) base_dir = os.path.dirname(export_result[0]) if output_export_type != OutputExportType.FOLDERS: dataset_id=self._resolve_dataset_id(dataset, dataset_name, dataset_id) result_file_name = f"{dataset_id}.json" result_file = os.path.join(base_dir, result_file_name) logger.debug(f"start writing all items to result file {result_file}") with open(result_file, 'w') as f: json.dump(all_items, f) if output_export_type == OutputExportType.ZIP: # Zip the result file zip_filename = result_file + '.zip' # Create zip file logger.debug(f"start zipping result file {zip_filename}") with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zf: zf.write(result_file, arcname=os.path.basename(result_file)) # Remove original json after zipping os.remove(result_file) result_file = zip_filename return result_file logger.debug("start building per-item JSON files under local_path mirroring remote structure") # Build per-item JSON files under local_path mirroring remote structure for item in all_items: self._save_item_json_file(item_data=item, base_path=Path(base_dir), export_version=None) logger.debug("end building per-item JSON files under local_path mirroring remote structure") return base_dir
[docs] @_api_reference.add(path='/datasets/merge', method='post') def merge(self, merge_name: str, dataset_ids: list, project_ids: str, with_items_annotations: bool = True, with_metadata: bool = True, with_task_annotations_status: bool = True, wait: bool = True): """ Merge a dataset. See our `SDK docs <https://developers.dataloop.ai/tutorials/data_management/data_versioning/chapter/>`_ for more information. **Prerequisites**: You must be an *owner* or *developer* to use this method. :param str merge_name: new dataset name :param list dataset_ids: list id's of the datatsets you wish to merge :param str project_ids: the project id that include the datasets :param bool with_items_annotations: true to merge with items annotations :param bool with_metadata: true to merge with metadata :param bool with_task_annotations_status: true to merge with task annotations' status :param bool wait: wait for the command to finish :return: True if success :rtype: bool **Example**: .. code-block:: python success = project.datasets.merge(dataset_ids=['dataset_id1','dataset_id2'], merge_name='dataset_merge_name', with_metadata=True, with_items_annotations=False, with_task_annotations_status=False) """ payload = { "name": merge_name, "datasetsIds": dataset_ids, "projectIds": project_ids, "mergeDatasetParams": { "withItemsAnnotations": with_items_annotations, "withMetadata": with_metadata, "withTaskAnnotationsStatus": with_task_annotations_status }, 'asynced': wait } success, response = self._client_api.gen_request(req_type='post', path='/datasets/merge', json_req=payload) if success: command = entities.Command.from_json(_json=response.json(), client_api=self._client_api) if not wait: return command command = command.wait(timeout=0) if 'mergeDatasetsConfiguration' not in command.spec: raise exceptions.PlatformException(error='400', message="mergeDatasetsConfiguration key is missing in command response: {}" .format(response)) return True else: raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/datasets/{id}/sync', method='post') def sync(self, dataset_id: str, wait: bool = True): """ Sync dataset with external storage. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str dataset_id: The Id of the dataset to sync :param bool wait: wait for the command to finish :return: True if success :rtype: bool **Example**: .. code-block:: python success = project.datasets.sync(dataset_id='dataset_id') """ success, response = self._client_api.gen_request(req_type='post', path='/datasets/{}/sync'.format(dataset_id)) if success: command = entities.Command.from_json(_json=response.json(), client_api=self._client_api) if not wait: return command command = command.wait(timeout=0) if 'datasetId' not in command.spec: raise exceptions.PlatformException(error='400', message="datasetId key is missing in command response: {}" .format(response)) return True else: raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/datasets', method='post') def create(self, dataset_name: str, labels=None, attributes=None, ontology_ids=None, driver: entities.Driver = None, driver_id: str = None, checkout: bool = False, expiration_options: entities.ExpirationOptions = None, index_driver: entities.IndexDriver = None, recipe_id: str = None ) -> entities.Dataset: """ Create a new dataset **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str dataset_name: The Name of the dataset :param list labels: dictionary of {tag: color} or list of label entities :param list attributes: dataset's ontology's attributes :param list ontology_ids: optional - dataset ontology :param dtlpy.entities.driver.Driver driver: optional - storage driver Driver object or driver name :param str driver_id: optional - driver id :param bool checkout: set the dataset as a default dataset object (cookies) :param ExpirationOptions expiration_options: dl.ExpirationOptions object that contain definitions for dataset like MaxItemDays :param str index_driver: dl.IndexDriver, dataset driver version :param str recipe_id: optional - recipe id :return: Dataset object :rtype: dtlpy.entities.dataset.Dataset **Example**: .. code-block:: python dataset = project.datasets.create(dataset_name='dataset_name', ontology_ids='ontology_ids') """ create_default_recipe = True if any([labels, attributes, ontology_ids, recipe_id]): create_default_recipe = False # labels to list if labels is not None: if not isinstance(labels, list): labels = [labels] if not all(isinstance(label, entities.Label) for label in labels): labels = entities.Dataset.serialize_labels(labels) else: labels = list() # get creator from token payload = {'name': dataset_name, 'projects': [self.project.id], 'createDefaultRecipe': create_default_recipe } if driver_id is None and driver is not None: if isinstance(driver, entities.Driver): driver_id = driver.id elif isinstance(driver, str): driver_id = self.project.drivers.get(driver_name=driver).id else: raise exceptions.PlatformException( error=400, message='Input arg "driver" must be Driver object or a string driver name. got type: {!r}'.format( type(driver))) if driver_id is not None: payload['driver'] = driver_id if expiration_options: payload['expirationOptions'] = expiration_options.to_json() if index_driver is not None: payload['indexDriver'] = index_driver success, response = self._client_api.gen_request(req_type='post', path='/datasets', json_req=payload) if success: dataset = entities.Dataset.from_json(client_api=self._client_api, _json=response.json(), datasets=self, project=self.project) # create ontology and recipe if not create_default_recipe: if recipe_id is not None: dataset.switch_recipe(recipe_id=recipe_id) else: dataset = dataset.recipes.create(ontology_ids=ontology_ids, labels=labels, attributes=attributes).dataset else: raise exceptions.PlatformException(response) logger.info('Dataset was created successfully. Dataset id: {!r}'.format(dataset.id)) assert isinstance(dataset, entities.Dataset) if checkout: self.checkout(dataset=dataset) return dataset
@staticmethod def _convert_single(downloader, item, img_filepath, local_path, overwrite, annotation_options, annotation_filters, thickness, with_text, progress, alpha, export_version): # this is to convert the downloaded json files to any other annotation type try: if entities.ViewAnnotationOptions.ANNOTATION_ON_IMAGE in annotation_options: if img_filepath is None: img_filepath = item.download() downloader._download_img_annotations(item=item, img_filepath=img_filepath, local_path=local_path, overwrite=overwrite, annotation_options=annotation_options, annotation_filters=annotation_filters, thickness=thickness, alpha=alpha, with_text=with_text, export_version=export_version ) except Exception: logger.error('Failed to download annotation for item: {!r}'.format(item.name)) progress.update()
[docs] @staticmethod def download_annotations(dataset: entities.Dataset, local_path: str = None, filters: entities.Filters = None, annotation_options: entities.ViewAnnotationOptions = None, annotation_filters: entities.Filters = None, overwrite: bool = False, thickness: int = 1, with_text: bool = False, remote_path: str = None, include_annotations_in_output: bool = True, export_png_files: bool = False, filter_output_annotations: bool = False, alpha: float = 1, export_version=entities.ExportVersion.V1, dataset_lock: bool = False, lock_timeout_sec: int = None, export_summary: bool = False, ) -> str: """ Download dataset's annotations by filters. You may filter the dataset both for items and for annotations and download annotations. Optional -- download annotations as: mask, instance, image mask of the item. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param dtlpy.entities.dataset.Dataset dataset: dataset object :param str local_path: local folder or filename to save to. :param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters :param list annotation_options: type of download annotations: list(dl.ViewAnnotationOptions) :param dtlpy.entities.filters.Filters annotation_filters: Filters entity to filter annotations for download :param bool overwrite: optional - default = False to overwrite the existing files :param bool dataset_loc: optional - default = False to make the dataset readonly :param int thickness: optional - line thickness, if -1 annotation will be filled, default =1 :param bool with_text: optional - add text to annotations, default = False :param str remote_path: DEPRECATED and ignored :param bool include_annotations_in_output: default - False , if export should contain annotations :param bool export_png_files: default - if True, semantic annotations should be exported as png files :param bool filter_output_annotations: default - False, given an export by filter - determine if to filter out annotations :param float alpha: opacity value [0 1], default 1 :param str export_version: exported items will have original extension in filename, `V1` - no original extension in filenames :return: local_path of the directory where all the downloaded item :param bool dataset_lock: optional - default = False :param bool export_summary: optional - default = False :param int lock_timeout_sec: optional :rtype: str **Example**: .. code-block:: python file_path = project.datasets.download_annotations(dataset='dataset_entity', local_path='local_path', annotation_options=dl.ViewAnnotationOptions, overwrite=False, thickness=1, with_text=False, alpha=1, dataset_lock=False, lock_timeout_sec=300, export_summary=False ) """ if annotation_options is None: annotation_options = list() elif not isinstance(annotation_options, list): annotation_options = [annotation_options] for ann_option in annotation_options: if ann_option not in entities.ViewAnnotationOptions: raise PlatformException( error='400', message=f'Unknown annotation download option: {ann_option}, please choose from: {list(entities.ViewAnnotationOptions)}', ) if remote_path is not None: logger.warning(f'"remote_path" is ignored. Use "filters=dl.Filters(field="dir, values={remote_path!r}"') if filter_output_annotations is True: logger.warning("'filter_output_annotations' is ignored but kept for legacy support") if include_annotations_in_output is False: logger.warning("include_annotations_in_output was False, but was set to True since this function downloads annotations.") include_annotations_in_output = True if local_path is None: if dataset.project is None: # by dataset name local_path = str(Path(service_defaults.DATALOOP_PATH) / "datasets" / f"{dataset.name}_{dataset.id}") else: # by dataset and project name local_path = str(Path(service_defaults.DATALOOP_PATH) / "projects" / dataset.project.name / "datasets" / dataset.name) if filters is None: filters = entities.Filters() filters._user_query = 'false' if annotation_filters is not None: for annotation_filter_and in annotation_filters.and_filter_list: filters.add_join(field=annotation_filter_and.field, values=annotation_filter_and.values, operator=annotation_filter_and.operator, method=entities.FiltersMethod.AND) for annotation_filter_or in annotation_filters.or_filter_list: filters.add_join(field=annotation_filter_or.field, values=annotation_filter_or.values, operator=annotation_filter_or.operator, method=entities.FiltersMethod.OR) downloader = repositories.Downloader(items_repository=dataset.items) # Setup for incremental processing if len(annotation_options) == 0 : pool = None progress = None jobs = [] else: # Get total count for progress bar filter_copy = copy.deepcopy(filters) filter_copy.page_size = 0 pages = dataset.items.list(filters=filter_copy) total_items = pages.items_count # Setup thread pool and progress bar pool = dataset._client_api.thread_pools(pool_name='dataset.download') progress = tqdm.tqdm( total=total_items, disable=dataset._client_api.verbose.disable_progress_bar_download_annotations, file=sys.stdout, desc='Download Annotations' ) jobs = [] # Call _export_recursive as generator export_generator = dataset.project.datasets._export_recursive( dataset=dataset, local_path=tempfile.mkdtemp(prefix='annotations_jsons_'), filters=filters, annotation_filters=annotation_filters, include_annotations=True, export_type=ExportType.JSON, dataset_lock=dataset_lock, lock_timeout_sec=lock_timeout_sec, export_summary=export_summary, timeout=0, max_items_per_subset=DOWNLOAD_ANNOTATIONS_MAX_ITEMS_PER_SUBSET ) # Process each subset JSON file incrementally for subset_json_file in export_generator: if subset_json_file is None or not Path(subset_json_file).is_file(): continue try: # Open and load the items array with open(subset_json_file, 'r') as f: items_data = json.load(f) # Process each item immediately for item_data in items_data: # Split and save individual JSON file Datasets._save_item_json_file(item_data=item_data, base_path=Path(local_path) / 'json', export_version=export_version) # If annotation_options are provided, submit to thread pool immediately if annotation_options: # Create Item entity from item_data item = entities.Item.from_json( _json=item_data, client_api=dataset._client_api, dataset=dataset ) job = pool.submit( Datasets._convert_single, **{ 'downloader': downloader, 'item': item, 'img_filepath': None, 'local_path': local_path, 'overwrite': overwrite, 'annotation_options': annotation_options, 'annotation_filters': annotation_filters, 'thickness': thickness, 'with_text': with_text, 'progress': progress, 'alpha': alpha, 'export_version': export_version } ) jobs.append(job) # Clean up temporary subset JSON file os.remove(subset_json_file) except Exception as e: logger.exception(f'Failed processing subset JSON file {subset_json_file}: {e}') # Wait for all thread pool jobs to complete if annotation_options: _ = [j.result() for j in jobs] progress.close() return local_path
def _upload_single_item_annotation(self, item, file, pbar): try: item.annotations.upload(file) except Exception as err: raise err finally: pbar.update()
[docs] def upload_annotations(self, dataset, local_path, filters: entities.Filters = None, clean=False, remote_root_path='/', export_version=entities.ExportVersion.V1 ): """ Upload annotations to dataset. Example for remote_root_path: If the item filepath is "/a/b/item" and remote_root_path is "/a" - the start folder will be b instead of a **Prerequisites**: You must have a dataset with items that are related to the annotations. The relationship between the dataset and annotations is shown in the name. You must be in the role of an *owner* or *developer*. :param dtlpy.entities.dataset.Dataset dataset: dataset to upload to :param str local_path: str - local folder where the annotations files are :param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters :param bool clean: True to remove the old annotations :param str remote_root_path: the remote root path to match remote and local items :param str export_version: exported items will have original extension in filename, `V1` - no original extension in filenames **Example**: .. code-block:: python project.datasets.upload_annotations(dataset='dataset_entity', local_path='local_path', clean=False, export_version=dl.ExportVersion.V1 ) """ if filters is None: filters = entities.Filters() filters._user_query = 'false' pages = dataset.items.list(filters=filters) total_items = pages.items_count pbar = tqdm.tqdm(total=total_items, disable=dataset._client_api.verbose.disable_progress_bar_upload_annotations, file=sys.stdout, desc='Upload Annotations') pool = self._client_api.thread_pools('annotation.upload') annotations_uploaded_count = 0 for item in pages.all(): if export_version == entities.ExportVersion.V1: _, ext = os.path.splitext(item.filename) filepath = item.filename.replace(ext, '.json') else: filepath = item.filename + '.json' # make the file path ignore the hierarchy of the files that in remote_root_path filepath = os.path.relpath(filepath, remote_root_path) json_file = os.path.join(local_path, filepath) if not os.path.isfile(json_file): pbar.update() continue annotations_uploaded_count += 1 if item.annotated and clean: item.annotations.delete(filters=entities.Filters(resource=entities.FiltersResource.ANNOTATION)) pool.submit(self._upload_single_item_annotation, **{'item': item, 'file': json_file, 'pbar': pbar}) pool.shutdown() if annotations_uploaded_count == 0: logger.warning(msg="No annotations uploaded to dataset! ") else: logger.info(msg='Found and uploaded {} annotations.'.format(annotations_uploaded_count))
[docs] def set_readonly(self, state: bool, dataset: entities.Dataset): """ Set dataset readonly mode. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param bool state: state to update readonly mode :param dtlpy.entities.dataset.Dataset dataset: dataset object **Example**: .. code-block:: python project.datasets.set_readonly(dataset='dataset_entity', state=True) """ import warnings warnings.warn("`readonly` flag on dataset is deprecated, doing nothing.", DeprecationWarning)
[docs] @_api_reference.add(path='/datasets/{id}/split', method='post') def split_ml_subsets(self, dataset_id: str, items_query: entities.filters, ml_split_list: dict) -> bool: """ Split dataset items into ML subsets. :param str dataset_id: The ID of the dataset. :param dict items_query: Query to select items. :param dict ml_split_list: Dictionary with 'train', 'validation', 'test' keys and integer percentages. :return: True if the split operation was successful. :rtype: bool :raises: PlatformException on failure and ValueError if percentages do not sum to 100 or invalid keys/values. """ # Validate percentages if not ml_split_list: ml_split_list = {'train': 80, 'validation': 10, 'test': 10} if not items_query: items_query = entities.Filters() items_query_dict = items_query.prepare() required_keys = {'train', 'validation', 'test'} if set(ml_split_list.keys()) != required_keys: raise ValueError("MLSplitList must have exactly the keys 'train', 'validation', 'test'.") total = sum(ml_split_list.values()) if total != 100: raise ValueError( "Please set the Train, Validation, and Test subsets percentages to add up to 100%. " "For example: 70, 15, 15." ) for key, value in ml_split_list.items(): if not isinstance(value, int) or value < 0: raise ValueError("Percentages must be integers >= 0.") payload = { 'itemsQuery': items_query_dict, 'MLSplitList': ml_split_list } path = f'/datasets/{dataset_id}/split' success, response = self._client_api.gen_request(req_type='post', path=path, json_req=payload) if success: # Wait for the split operation to complete command = entities.Command.from_json(_json=response.json(), client_api=self._client_api) command.wait() return True else: raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/datasets/{id}/items/bulk-update-metadata', method='post') def bulk_update_ml_subset(self, dataset_id: str, items_query: dict, subset: str = None, deleteTag: bool = False) -> bool: """ Bulk update ML subset assignment for selected items. If subset is None, remove subsets. Otherwise, assign the specified subset. :param str dataset_id: ID of the dataset :param dict items_query: DQLResourceQuery (filters) for selecting items :param str subset: 'train', 'validation', 'test' or None to remove all :return: True if success :rtype: bool """ if items_query is None: items_query = entities.Filters() items_query_dict = items_query.prepare() if not deleteTag and subset not in ['train', 'validation', 'test']: raise ValueError("subset must be one of: 'train', 'validation', 'test'") # Determine tag values based on subset tags = { 'train': True if subset == 'train' else None, 'validation': True if subset == 'validation' else None, 'test': True if subset == 'test' else None } payload = { "query": items_query_dict, "updateQuery": { "update": { "metadata": { "system": { "tags": tags } } }, "systemSpace": True } } success, response = self._client_api.gen_request( req_type='post', path=f'/datasets/{dataset_id}/items/bulk-update-metadata', json_req=payload ) if success: # Similar to split operation, a command is returned command = entities.Command.from_json(_json=response.json(), client_api=self._client_api) command.wait() return True else: raise exceptions.PlatformException(response)