import logging
from .. import entities, exceptions, repositories, miscellaneous, _api_reference
from ..services.api_client import ApiClient
logger = logging.getLogger(name='dtlpy')
[docs]class Items:
"""
Items Repository
The Items class allows you to manage items in your datasets.
For information on actions related to items see https://developers.dataloop.ai/tutorials/data_management/upload_and_manage_items/chapter/
"""
def __init__(self,
client_api: ApiClient,
datasets: repositories.Datasets = None,
dataset: entities.Dataset = None,
dataset_id=None,
items_entity=None,
project=None):
self._client_api = client_api
self._dataset = dataset
self._dataset_id = dataset_id
self._datasets = datasets
self._project = project
# set items entity to represent the item (Item, Codebase, Artifact etc...)
if items_entity is None:
self.items_entity = entities.Item
if self._dataset_id is None and self._dataset is not None:
self._dataset_id = self._dataset.id
############
# entities #
############
@property
def dataset(self) -> entities.Dataset:
if self._dataset is None:
if self._dataset_id is None:
raise exceptions.PlatformException(
error='400',
message='Cannot perform action WITHOUT Dataset entity in Items repository. Please set a dataset')
self._dataset = self.datasets.get(dataset_id=self._dataset_id, fetch=None)
assert isinstance(self._dataset, entities.Dataset)
return self._dataset
@dataset.setter
def dataset(self, dataset: entities.Dataset):
if not isinstance(dataset, entities.Dataset):
raise ValueError('Must input a valid Dataset entity')
self._dataset = dataset
@property
def project(self) -> entities.Project:
if self._project is None:
raise exceptions.PlatformException(
error='400',
message='Cannot perform action WITHOUT Project entity in Items repository. Please set a project')
assert isinstance(self._dataset, entities.Dataset)
return self._project
@project.setter
def project(self, project: entities.Project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Dataset entity')
self._project = project
################
# repositories #
################
@property
def datasets(self) -> repositories.Datasets:
if self._datasets is None:
self._datasets = repositories.Datasets(client_api=self._client_api)
assert isinstance(self._datasets, repositories.Datasets)
return self._datasets
###########
# methods #
###########
[docs] def set_items_entity(self, entity):
"""
Set the item entity type to `Artifact <https://dataloop.ai/docs/auto-annotation-service?#uploading-model-weights-as-artifacts>`_, Item, or Codebase.
:param entities.Item, entities.Artifact, entities.Codebase entity: entity type [entities.Item, entities.Artifact, entities.Codebase]
"""
if entity in [entities.Item, entities.Artifact, entities.Codebase]:
self.items_entity = entity
else:
raise exceptions.PlatformException(error="403",
message="Unable to set given entity. Entity give: {}".format(entity))
[docs] def get_all_items(self, filters: entities.Filters = None) -> [entities.Item]:
"""
Get all items in dataset.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param dtlpy.entities.filters.Filters filters: dl.Filters entity to filters items
:return: list of all items
:rtype: list
**Example**:
.. code-block:: python
dataset.items.get_all_items()
"""
if filters is None:
filters = entities.Filters()
filters._user_query = 'false'
filters.add(field='type', values='file')
pages = self.list(filters=filters)
num_items = pages.items_count
items = [None for _ in range(num_items)]
for i_item, item in enumerate(pages.all()):
items[i_item] = item
items = [item for item in items if item is not None]
return items
def _build_entities_from_response(self, response_items) -> miscellaneous.List[entities.Item]:
pool = self._client_api.thread_pools(pool_name='entity.create')
jobs = [None for _ in range(len(response_items))]
# return triggers list
for i_item, item in enumerate(response_items):
jobs[i_item] = pool.submit(self.items_entity._protected_from_json,
**{'client_api': self._client_api,
'_json': item,
'dataset': self.dataset})
# get all results
results = [j.result() for j in jobs]
# log errors
_ = [logger.warning(r[1]) for r in results if r[0] is False]
# return good jobs
items = miscellaneous.List([r[1] for r in results if r[0] is True])
return items
def _list(self, filters: entities.Filters):
"""
Get dataset items list This is a browsing endpoint, for any given path item count will be returned,
user is expected to perform another request then for every folder item to actually get the its item list.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:return: json response
"""
# prepare request
success, response = self._client_api.gen_request(req_type="POST",
path="/datasets/{}/query".format(self.dataset.id),
json_req=filters.prepare(),
headers={'user_query': filters._user_query})
if not success:
raise exceptions.PlatformException(response)
return response.json()
[docs] @_api_reference.add(path='/datasets/{id}/query', method='post')
def list(self,
filters: entities.Filters = None,
page_offset: int = None,
page_size: int = None
) -> entities.PagedEntities:
"""
List items in a dataset.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param int page_offset: start page
:param int page_size: page size
:return: Pages object
:rtype: dtlpy.entities.paged_entities.PagedEntities
**Example**:
.. code-block:: python
dataset.items.list(page_offset=0, page_size=100)
"""
# default filters
if filters is None:
filters = entities.Filters()
filters._user_query = 'false'
# assert type filters
elif not isinstance(filters, entities.Filters):
raise exceptions.PlatformException(error='400',
message='Unknown filters type: {!r}'.format(type(filters)))
if filters.resource != entities.FiltersResource.ITEM and filters.resource != entities.FiltersResource.ANNOTATION:
raise exceptions.PlatformException(
error='400',
message='Filters resource must to be FiltersResource.ITEM. Got: {!r}'.format(filters.resource))
# page size
if page_size is None:
# take from default
page_size = filters.page_size
else:
filters.page_size = page_size
# page offset
if page_offset is None:
# take from default
page_offset = filters.page
else:
filters.page = page_offset
if filters.resource == entities.FiltersResource.ITEM:
items_repository = self
else:
items_repository = repositories.Annotations(client_api=self._client_api,
dataset=self._dataset)
paged = entities.PagedEntities(items_repository=items_repository,
filters=filters,
page_offset=page_offset,
page_size=page_size,
client_api=self._client_api)
paged.get_page()
return paged
[docs] @_api_reference.add(path='/items/{id}', method='get')
def get(self,
filepath: str = None,
item_id: str = None,
fetch: bool = None,
is_dir: bool = False
) -> entities.Item:
"""
Get Item object
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str filepath: optional - search by remote path
:param str item_id: optional - search by id
:param bool fetch: optional - fetch entity from platform, default taken from cookie
:param bool is_dir: True if you want to get an item from dir type
:return: Item object
:rtype: dtlpy.entities.item.Item
**Example**:
.. code-block:: python
dataset.items.get(item_id='item_id')
"""
if fetch is None:
fetch = self._client_api.fetch_entities
if fetch:
if item_id is not None:
success, response = self._client_api.gen_request(req_type="get",
path="/items/{}".format(item_id))
if success:
item = self.items_entity.from_json(client_api=self._client_api,
_json=response.json(),
dataset=self._dataset,
project=self._project)
# verify input filepath is same as the given id
if filepath is not None and item.filename != filepath:
logger.warning(
"Mismatch found in items.get: filepath is different then item.filename: "
"{!r} != {!r}".format(
filepath,
item.filename))
else:
raise exceptions.PlatformException(response)
elif filepath is not None:
filters = entities.Filters()
filters.pop(field='hidden')
if is_dir:
filters.add(field='type', values='dir')
filters.recursive = False
filters.add(field='filename', values=filepath)
paged_entity = self.list(filters=filters)
if len(paged_entity.items) == 0:
raise exceptions.PlatformException(error='404',
message='Item not found. filepath= "{}"'.format(filepath))
elif len(paged_entity.items) > 1:
raise exceptions.PlatformException(
error='404',
message='More than one item found. Please "get" by id. filepath: "{}"'.format(filepath))
else:
item = paged_entity.items[0]
else:
raise exceptions.PlatformException(error="400",
message='Must choose by at least one. "filename" or "item_id"')
else:
item = entities.Item.from_json(_json={'id': item_id,
'filename': filepath},
client_api=self._client_api,
dataset=self._dataset,
is_fetched=False,
project=self._project)
assert isinstance(item, entities.Item)
return item
[docs] @_api_reference.add(path='/items/{id}/clone', method='post')
def clone(self,
item_id: str,
dst_dataset_id: str,
remote_filepath: str = None,
metadata: dict = None,
with_annotations: bool = True,
with_metadata: bool = True,
with_task_annotations_status: bool = False,
allow_many: bool = False,
wait: bool = True):
"""
Clone item. Read more about cloning datatsets and items in our `documentation <https://dataloop.ai/docs/clone-merge-dataset#cloned-dataset>`_ and `SDK documentation <https://developers.dataloop.ai/tutorials/data_management/data_versioning/chapter/>`_.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str item_id: item to clone
:param str dst_dataset_id: destination dataset id
:param str remote_filepath: complete filepath
:param dict metadata: new metadata to add
:param bool with_annotations: clone annotations
:param bool with_metadata: clone metadata
:param bool with_task_annotations_status: clone task annotations status
:param bool allow_many: `bool` if True, using multiple clones in single dataset is allowed, (default=False)
:param bool wait: wait for the command to finish
:return: Item object
:rtype: dtlpy.entities.item.Item
**Example**:
.. code-block:: python
dataset.items.clone(item_id='item_id',
dst_dataset_id='dist_dataset_id',
with_metadata=True,
with_task_annotations_status=False,
with_annotations=False)
"""
if metadata is None:
metadata = dict()
payload = {"targetDatasetId": dst_dataset_id,
"remoteFileName": remote_filepath,
"metadata": metadata,
"cloneDatasetParams": {
"withItemsAnnotations": with_annotations,
"withMetadata": with_metadata,
"withTaskAnnotationsStatus": with_task_annotations_status},
"allowMany": allow_many
}
success, response = self._client_api.gen_request(req_type="post",
path="/items/{}/clone".format(item_id),
json_req=payload)
# check response
if not success:
raise exceptions.PlatformException(response)
command = entities.Command.from_json(_json=response.json(),
client_api=self._client_api)
if not wait:
return command
command = command.wait()
if 'returnedModelId' not in command.spec:
raise exceptions.PlatformException(error='400',
message="returnedModelId key is missing in command response: {}"
.format(response))
cloned_item = self.get(item_id=command.spec['returnedModelId'][0])
return cloned_item
[docs] @_api_reference.add(path='/items/{id}', method='delete')
def delete(self,
filename: str = None,
item_id: str = None,
filters: entities.Filters = None):
"""
Delete item from platform.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
You must provide at least ONE of the following params: item id, filename, filters.
:param str filename: optional - search item by remote path
:param str item_id: optional - search item by id
:param dtlpy.entities.filters.Filters filters: optional - delete items by filter
:return: True if success
:rtype: bool
**Example**:
.. code-block:: python
dataset.items.delete(item_id='item_id')
"""
if item_id is not None:
success, response = self._client_api.gen_request(req_type="delete",
path="/items/{}".format(item_id),
)
elif filename is not None:
if not filename.startswith("/"):
filename = "/" + filename
items = self.get(filepath=filename)
if not isinstance(items, list):
items = [items]
if len(items) == 0:
raise exceptions.PlatformException("404", "Item not found")
elif len(items) > 1:
raise exceptions.PlatformException(error="404", message="More the 1 item exist by the name provided")
else:
item_id = items[0].id
success, response = self._client_api.gen_request(req_type="delete",
path="/items/{}".format(item_id))
elif filters is not None:
# prepare request
success, response = self._client_api.gen_request(req_type="POST",
path="/datasets/{}/query".format(self.dataset.id),
json_req=filters.prepare(operation='delete'))
else:
raise exceptions.PlatformException("400", "Must provide item id, filename or filters")
# check response
if success:
logger.debug("Item/s deleted successfully")
return success
else:
raise exceptions.PlatformException(response)
[docs] @_api_reference.add(path='/items/{id}', method='patch')
def update(self,
item: entities.Item = None,
filters: entities.Filters = None,
update_values=None,
system_update_values=None,
system_metadata: bool = False):
"""
Update item metadata.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
You must provide at least ONE of the following params: update_values, system_update_values.
:param dtlpy.entities.item.Item item: Item object
:param dtlpy.entities.filters.Filters filters: optional update filtered items by given filter
:param update_values: optional field to be updated and new values
:param system_update_values: values in system metadata to be updated
:param bool system_metadata: True, if you want to update the metadata system
:return: Item object
:rtype: dtlpy.entities.item.Item
**Example**:
.. code-block:: python
dataset.items.update(item='item_entity')
"""
ref = filters is not None and (filters._ref_task or filters._ref_assignment)
if system_update_values and not system_metadata:
logger.warning('system metadata will not be updated because param system_metadata is False')
# check params
if item is None and filters is None:
raise exceptions.PlatformException('400', 'must provide either item or filters')
value_to_update = update_values or system_update_values
if item is None and not ref and not value_to_update:
raise exceptions.PlatformException('400',
'Must provide update_values or system_update_values')
if item is not None and value_to_update:
raise exceptions.PlatformException('400',
'Cannot provide "update_values" or "system_update_values" with a specific "item" for an individual update. '
'These parameters are intended only for bulk updates using filters.')
# update item
if item is not None:
json_req = miscellaneous.DictDiffer.diff(origin=item._platform_dict,
modified=item.to_json())
if not json_req:
return item
url_path = "/items/{}".format(item.id)
if system_metadata:
url_path += "?system=true"
success, response = self._client_api.gen_request(req_type="patch",
path=url_path,
json_req=json_req)
if success:
logger.debug("Item was updated successfully. Item id: {}".format(item.id))
return self.items_entity.from_json(client_api=self._client_api,
_json=response.json(),
dataset=self._dataset)
else:
logger.error("Error while updating item")
raise exceptions.PlatformException(response)
# update by filters
else:
# prepare request
prepared_filter = filters.prepare(operation='update',
system_update=system_update_values,
system_metadata=system_metadata,
update=update_values)
success, response = self._client_api.gen_request(req_type="POST",
path="/datasets/{}/query".format(self.dataset.id),
json_req=prepared_filter)
if not success:
raise exceptions.PlatformException(response)
else:
logger.debug("Items were updated successfully.")
return response.json()
[docs] def download(
self,
filters: entities.Filters = None,
items=None,
# download options
local_path: str = None,
file_types: list = None,
save_locally: bool = True,
to_array: bool = False,
annotation_options: entities.ViewAnnotationOptions = None,
annotation_filters: entities.Filters = None,
overwrite: bool = False,
to_items_folder: bool = True,
thickness: int = 1,
with_text: bool = False,
without_relative_path=None,
avoid_unnecessary_annotation_download: bool = False,
include_annotations_in_output: bool = True,
export_png_files: bool = False,
filter_output_annotations: bool = False,
alpha: float = 1,
export_version=entities.ExportVersion.V1
):
"""
Download dataset items by filters.
Filters the dataset for items and saves them locally.
Optional -- download annotation, mask, instance, and image mask of the item.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param List[dtlpy.entities.item.Item] or dtlpy.entities.item.Item items: download Item entity or item_id (or a list of item)
:param str local_path: local folder or filename to save to.
:param list file_types: a list of file type to download. e.g ['video/webm', 'video/mp4', 'image/jpeg', 'image/png']
:param bool save_locally: bool. save to disk or return a buffer
:param bool to_array: returns Ndarray when True and local_path = False
:param list annotation_options: download annotations options: list(dl.ViewAnnotationOptions)
:param dtlpy.entities.filters.Filters annotation_filters: Filters entity to filter annotations for download
:param bool overwrite: optional - default = False
:param bool to_items_folder: Create 'items' folder and download items to it
:param int thickness: optional - line thickness, if -1 annotation will be filled, default =1
:param bool with_text: optional - add text to annotations, default = False
:param bool without_relative_path: bool - download items without the relative path from platform
:param bool avoid_unnecessary_annotation_download: default - False
:param bool include_annotations_in_output: default - False , if export should contain annotations
:param bool export_png_files: default - if True, semantic annotations should be exported as png files
:param bool filter_output_annotations: default - False, given an export by filter - determine if to filter out annotations
:param float alpha: opacity value [0 1], default 1
:param str export_version: exported items will have original extension in filename, `V1` - no original extension in filenames
:return: generator of local_path per each downloaded item
:rtype: generator or single item
**Example**:
.. code-block:: python
dataset.items.download(local_path='local_path',
annotation_options=dl.ViewAnnotationOptions,
overwrite=False,
thickness=1,
with_text=False,
alpha=1,
save_locally=True
)
"""
downloader = repositories.Downloader(self)
return downloader.download(
filters=filters,
items=items,
local_path=local_path,
file_types=file_types,
save_locally=save_locally,
to_array=to_array,
annotation_options=annotation_options,
annotation_filters=annotation_filters,
overwrite=overwrite,
to_items_folder=to_items_folder,
thickness=thickness,
alpha=alpha,
with_text=with_text,
without_relative_path=without_relative_path,
avoid_unnecessary_annotation_download=avoid_unnecessary_annotation_download,
include_annotations_in_output=include_annotations_in_output,
export_png_files=export_png_files,
filter_output_annotations=filter_output_annotations,
export_version=export_version
)
[docs] def upload(
self,
# what to upload
local_path: str,
local_annotations_path: str = None,
# upload options
remote_path: str = "/",
remote_name: str = None,
file_types: list = None,
overwrite: bool = False,
item_metadata: dict = None,
output_entity=entities.Item,
no_output: bool = False,
export_version: str = entities.ExportVersion.V1,
item_description: str = None
):
"""
Upload local file to dataset.
Local filesystem will remain unchanged.
If "*" at the end of local_path (e.g. "/images/*") items will be uploaded without the head directory.
**Prerequisites**: Any user can upload items.
:param str local_path: list of local file, local folder, BufferIO, numpy.ndarray or url to upload
:param str local_annotations_path: path to dataloop format annotations json files.
:param str remote_path: remote path to save.
:param str remote_name: remote base name to save. when upload numpy.ndarray as local path, remote_name with .jpg or .png ext is mandatory
:param list file_types: list of file type to upload. e.g ['.jpg', '.png']. default is all
:param dict item_metadata: metadata dict to upload to item or ExportMetadata option to export metadata from annotation file
:param bool overwrite: optional - default = False
:param output_entity: output type
:param bool no_output: do not return the items after upload
:param str export_version: exported items will have original extension in filename, `V1` - no original extension in filenames
:param str item_description: add a string description to the uploaded item
:return: Output (generator/single item)
:rtype: generator or single item
**Example**:
.. code-block:: python
dataset.items.upload(local_path='local_path',
local_annotations_path='local_annotations_path',
overwrite=True,
item_metadata={'Hellow': 'Word'}
)
"""
# initiate and use uploader
uploader = repositories.Uploader(items_repository=self, output_entity=output_entity, no_output=no_output)
return uploader.upload(
local_path=local_path,
local_annotations_path=local_annotations_path,
# upload options
remote_path=remote_path,
remote_name=remote_name,
file_types=file_types,
# config
overwrite=overwrite,
# metadata to upload with items
item_metadata=item_metadata,
export_version=export_version,
item_description=item_description
)
@property
def platform_url(self):
return self._client_api._get_resource_url(
"projects/{}/datasets/{}/items".format(self.dataset.project.id, self.dataset.id))
[docs] def open_in_web(self, filepath=None, item_id=None, item=None):
"""
Open the item in web platform
**Prerequisites**: You must be in the role of an *owner* or *developer* or be an *annotation manager*/*annotator* with access to that item through task.
:param str filepath: item file path
:param str item_id: item id
:param dtlpy.entities.item.Item item: item entity
**Example**:
.. code-block:: python
dataset.items.open_in_web(item_id='item_id')
"""
if filepath is not None:
item = self.get(filepath=filepath)
if item is not None:
item.open_in_web()
elif item_id is not None:
self._client_api._open_in_web(url=self.platform_url + '/' + str(item_id))
else:
self._client_api._open_in_web(url=self.platform_url)
[docs] def update_status(self,
status: entities.ItemStatus,
items=None,
item_ids=None,
filters=None,
dataset=None,
clear=False):
"""
Update item status in task
**Prerequisites**: You must be in the role of an *owner* or *developer* or *annotation manager* who has been assigned a task with the item.
You must provide at least ONE of the following params: items, item_ids, filters.
:param str status: ItemStatus.COMPLETED, ItemStatus.APPROVED, ItemStatus.DISCARDED
:param list items: list of items
:param list item_ids: list of items id
:param dtlpy.entities.filters.Filters filters: Filters entity or a dictionary containing filters parameters
:param dtlpy.entities.dataset.Dataset dataset: dataset object
:param bool clear: to delete status
**Example**:
.. code-block:: python
dataset.items.update_status(item_ids='item_id', status=dl.ItemStatus.COMPLETED)
"""
if items is None and item_ids is None and filters is None:
raise exceptions.PlatformException('400', 'Must provide either items, item_ids or filters')
if self._dataset is None and dataset is None:
raise exceptions.PlatformException('400', 'Please provide dataset')
elif dataset is None:
dataset = self._dataset
if filters is not None:
items = dataset.items.list(filters=filters)
item_count = items.items_count
elif items is not None:
if isinstance(items, entities.PagedEntities):
item_count = items.items_count
else:
if not isinstance(items, list):
items = [items]
item_count = len(items)
items = [items]
else:
if not isinstance(item_ids, list):
item_ids = [item_ids]
item_count = len(item_ids)
items = [[dataset.items.get(item_id=item_id, fetch=False) for item_id in item_ids]]
pool = self._client_api.thread_pools(pool_name='item.status_update')
jobs = [None for _ in range(item_count)]
# call multiprocess wrapper to run service on each item in list
for page in items:
for i_item, item in enumerate(page):
jobs[i_item] = pool.submit(item.update_status,
**{'status': status,
'clear': clear})
# get all results
results = [j.result() for j in jobs]
out_success = [r for r in results if r is True]
out_errors = [r for r in results if r is False]
if len(out_errors) == 0:
logger.debug('Item/s updated successfully. {}/{}'.format(len(out_success), len(results)))
else:
logger.error(out_errors)
logger.error('Item/s updated with {} errors'.format(len(out_errors)))
[docs] def make_dir(self, directory, dataset: entities.Dataset = None) -> entities.Item:
"""
Create a directory in a dataset.
**Prerequisites**: All users.
:param str directory: name of directory
:param dtlpy.entities.dataset.Dataset dataset: dataset object
:return: Item object
:rtype: dtlpy.entities.item.Item
**Example**:
.. code-block:: python
dataset.items.make_dir(directory='directory_name')
"""
if self._dataset_id is None and dataset is None:
raise exceptions.PlatformException('400', 'Please provide parameter dataset')
payload = {
'type': 'dir',
'path': directory
}
headers = {'content-type': 'application/x-www-form-urlencoded'}
success, response = self._client_api.gen_request(req_type="post",
headers=headers,
path="/datasets/{}/items".format(self._dataset_id),
data=payload)
if success:
item = self.items_entity.from_json(client_api=self._client_api,
_json=response.json(),
dataset=self._dataset)
else:
raise exceptions.PlatformException(response)
return item
[docs] def move_items(self,
destination: str,
filters: entities.Filters = None,
items=None,
dataset: entities.Dataset = None
) -> bool:
"""
Move items to another directory.
If directory does not exist we will create it
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str destination: destination directory
:param dtlpy.entities.filters.Filters filters: optional - either this or items. Query of items to move
:param items: optional - either this or filters. A list of items to move
:param dtlpy.entities.dataset.Dataset dataset: dataset object
:return: True if success
:rtype: bool
**Example**:
.. code-block:: python
dataset.items.move_items(destination='directory_name')
"""
if filters is None and items is None:
raise exceptions.PlatformException('400', 'Must provide either filters or items')
dest_dir_filter = entities.Filters(resource=entities.FiltersResource.ITEM, field='type', values='dir')
dest_dir_filter.recursive = False
dest_dir_filter.add(field='filename', values=destination)
dirs_page = self.list(filters=dest_dir_filter)
if dirs_page.items_count == 0:
directory = self.make_dir(directory=destination, dataset=dataset)
elif dirs_page.items_count == 1:
directory = dirs_page.items[0]
else:
raise exceptions.PlatformException('404', 'More than one directory by the name of: {}'.format(destination))
if filters is not None:
items = self.list(filters=filters)
elif isinstance(items, list):
items = [items]
elif not isinstance(items, entities.PagedEntities):
raise exceptions.PlatformException('400', 'items must be a list of items or a pages entity not {}'.format(
type(items)))
item_ids = list()
for page in items:
for item in page:
item_ids.append(item.id)
success, response = self._client_api.gen_request(req_type="put",
path="/datasets/{}/items/{}".format(self._dataset_id,
directory.id),
json_req=item_ids)
if not success:
raise exceptions.PlatformException(response)
return success