import numpy as np
import urllib.parse
import logging
import json
import os
import io
import copy
from typing import Generator, Tuple, Optional
from collections import deque
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
from bson import ObjectId
from enum import Enum
from .. import exceptions, entities
logger = logging.getLogger(name="dtlpy")
[docs]class FiltersKnownFields(str, Enum):
DIR = "dir"
ANNOTATED = "annotated"
FILENAME = "filename"
CREATED_AT = "createdAt"
UPDATED_AT = "updatedAt"
LABEL = "label"
NAME = "name"
HIDDEN = "hidden"
TYPE = "type"
[docs]class FiltersResource(str, Enum):
ITEM = "items"
ANNOTATION = "annotations"
EXECUTION = "executions"
PACKAGE = "packages"
DPK = "dpks"
APP = "apps"
SERVICE = "services"
TRIGGER = "triggers"
MODEL = "models"
WEBHOOK = "webhooks"
RECIPE = "recipe"
DATASET = "datasets"
ONTOLOGY = "ontology"
TASK = "tasks"
PIPELINE = "pipeline"
PIPELINE_EXECUTION = "pipelineState"
COMPOSITION = "composition"
FEATURE = "feature_vectors"
FEATURE_SET = "feature_sets"
ORGANIZATIONS = "organizations"
DRIVERS = "drivers"
SETTINGS = "setting"
RESOURCE_EXECUTION = "resourceExecution"
METRICS = ("metrics",)
SERVICE_DRIVER = ("serviceDrivers",)
COMPUTE = "compute"
[docs]class FiltersOperations(str, Enum):
OR = "or"
AND = "and"
IN = "in"
NOT_EQUAL = "ne"
EQUAL = "eq"
GREATER_THAN = "gt"
LESS_THAN = "lt"
EXISTS = "exists"
MATCH = "match"
NIN = "nin"
GREATER_THAN_OR_EQUAL = "gte"
LESS_THAN_OR_EQUAL = "lte"
[docs]class FiltersMethod(str, Enum):
OR = "or"
AND = "and"
[docs]class FiltersOrderByDirection(str, Enum):
DESCENDING = "descending"
ASCENDING = "ascending"
[docs]class Filters:
"""
Filters entity to filter items from pages in platform
"""
def __init__(
self,
field=None,
values=None,
operator: FiltersOperations = None,
method: FiltersMethod = None,
custom_filter=None,
resource: FiltersResource = FiltersResource.ITEM,
use_defaults=True,
context=None,
page_size=None,
):
if page_size is None:
if resource in [FiltersResource.EXECUTION, FiltersResource.PIPELINE_EXECUTION, FiltersResource.DPK]:
page_size = 100
else:
page_size = 1000
self.or_filter_list = list()
self.and_filter_list = list()
self._unique_fields = list()
self.custom_filter = custom_filter
self.known_operators = ["or", "and", "in", "ne", "eq", "gt", "lt", "exists"]
self._resource = resource
self.page = 0
self.page_size = page_size
self.method = FiltersMethod.AND
self.sort = dict()
self.join = None
self.recursive = True
# system only - task and assignment attributes
self._user_query = "true"
self._ref_task = False
self._ref_assignment = False
self._ref_op = None
self._ref_assignment_id = None
self._ref_task_id = None
self._system_space = None
self._use_defaults = use_defaults
self.__add_defaults()
self.context = context
if field is not None:
self.add(field=field, values=values, operator=operator, method=method)
def __validate_page_size(self):
max_page_size = self.__max_page_size
if self.page_size > max_page_size:
logger.warning(
"Cannot list {} with page size greater than {}. Changing page_size to {}.".format(
self.resource, max_page_size, max_page_size
)
)
self.page_size = max_page_size
@property
def __max_page_size(self):
page_size = 1000
if self.resource in [FiltersResource.EXECUTION, FiltersResource.PIPELINE_EXECUTION]:
page_size = 100
return page_size
@property
def resource(self):
return f"{self._resource.value}" if isinstance(self._resource, FiltersResource) else f"{self._resource}"
@resource.setter
def resource(self, resource):
self._resource = resource
self.reset()
self.__add_defaults()
@property
def system_space(self):
return self._system_space
@system_space.setter
def system_space(self, val: bool):
self._system_space = val
def reset(self):
self.or_filter_list = list()
self.and_filter_list = list()
self._unique_fields = list()
self.custom_filter = None
self.page = 0
self.page_size = 1000
self.method = FiltersMethod.AND
self.sort = dict()
self.join = None
self.recursive = True
self._nullify_refs()
def _nullify_refs(self):
self._ref_task = False
self._ref_assignment = False
self._ref_op = None
self._ref_assignment_id = None
self._ref_task_id = None
[docs] def add(self, field, values, operator: FiltersOperations = None, method: FiltersMethod = None):
"""
Add filter
:param str field: Metadata field / attribute
:param values: field values
:param dl.FiltersOperations operator: optional - in, gt, lt, eq, ne
:param dl.FiltersMethod method: Optional - or/and
**Example**:
.. code-block:: python
filter.add(field='metadata.user', values=['1','2'], operator=dl.FiltersOperations.IN)
"""
if method is None:
method = self.method
if "metadata.system.refs.metadata" in field and self.resource == FiltersResource.ITEM:
logger.warning(
"Filtering by metadata.system.refs.metadata may cause incorrect results. please use match operator"
)
# create SingleFilter object and add to self.filter_list
if method == FiltersMethod.OR:
self.or_filter_list.append(SingleFilter(field=field, values=values, operator=operator))
elif method == FiltersMethod.AND:
self.__override(field=field, values=values, operator=operator)
else:
raise exceptions.PlatformException(
error="400", message="Unknown method {}, please select from: or/and".format(method)
)
def __override(self, field, values, operator=None):
if field in self._unique_fields:
indices_to_remove = []
for i_single_filter, single_filter in enumerate(self.and_filter_list):
if single_filter.field == field:
indices_to_remove.append(i_single_filter)
# Remove indices in descending order to avoid IndexError
# When removing items, indices shift down, so we must remove from highest to lowest
for index in sorted(indices_to_remove, reverse=True):
self.and_filter_list.pop(index)
self.and_filter_list.append(SingleFilter(field=field, values=values, operator=operator))
[docs] def generate_url_query_params(self, url):
"""
generate url query params
:param str url:
"""
url = "{}?".format(url)
for f in self.and_filter_list:
if isinstance(f.values, list):
url = "{}{}={}&".format(url, f.field, ",".join(f.values))
else:
url = "{}{}={}&".format(url, f.field, f.values)
return "{}&pageOffset={}&pageSize={}".format(url, self.page, self.page_size)
[docs] def has_field(self, field):
"""
is filter has field
:param str field: field to check
:return: Ture is have it
:rtype: bool
"""
for single_filter in self.or_filter_list:
if single_filter.field == field:
return True
for single_filter in self.and_filter_list:
if single_filter.field == field:
return True
return False
[docs] def pop(self, field):
"""
Pop filed
:param str field: field to pop
"""
for single_filter in self.or_filter_list:
if single_filter.field == field:
self.or_filter_list.remove(single_filter)
for single_filter in self.and_filter_list:
if single_filter.field == field:
self.and_filter_list.remove(single_filter)
[docs] def pop_join(self, field):
"""
Pop join
:param str field: field to pop
"""
if self.join is not None:
for single_filter in self.join["filter"]["$and"]:
if field in single_filter:
self.join["filter"]["$and"].remove(single_filter)
[docs] def add_join(self, field, values, operator: FiltersOperations = None, method: FiltersMethod = FiltersMethod.AND):
"""
join a query to the filter
:param str field: Metadata field / attribute
:param str or list values: field values
:param dl.FiltersOperations operator: optional - in, gt, lt, eq, ne
:param method: optional - str - FiltersMethod.AND, FiltersMethod.OR
**Example**:
.. code-block:: python
filter.add_join(field='metadata.user', values=['1','2'], operator=dl.FiltersOperations.IN)
"""
if self.resource not in [FiltersResource.ITEM, FiltersResource.ANNOTATION]:
raise exceptions.PlatformException(error="400", message="Cannot join to {} filters".format(self.resource))
if self.join is None:
self.join = dict()
if "on" not in self.join:
if self.resource == FiltersResource.ITEM:
self.join["on"] = {"resource": FiltersResource.ANNOTATION.value, "local": "itemId", "forigen": "id"}
else:
self.join["on"] = {"resource": FiltersResource.ITEM.value, "local": "id", "forigen": "itemId"}
if "filter" not in self.join:
self.join["filter"] = dict()
join_method = "$" + method
if join_method not in self.join["filter"]:
self.join["filter"][join_method] = list()
self.join["filter"][join_method].append(SingleFilter(field=field, values=values, operator=operator).prepare())
def __add_defaults(self):
if self._use_defaults:
# add items defaults
if self.resource == FiltersResource.ITEM:
self._unique_fields = ["type", "hidden"]
self.add(field="hidden", values=False, method=FiltersMethod.AND)
self.add(field="type", values="file", method=FiltersMethod.AND)
# add service defaults
elif self.resource == FiltersResource.SERVICE:
self._unique_fields = ["global"]
self.add(field="global", values=True, operator=FiltersOperations.NOT_EQUAL, method=FiltersMethod.AND)
elif self.resource == FiltersResource.PACKAGE:
self._unique_fields = ["global"]
self.add(field="global", values=True, operator=FiltersOperations.NOT_EQUAL, method=FiltersMethod.AND)
# add annotations defaults
elif self.resource == FiltersResource.ANNOTATION:
self._unique_fields = ["type"]
values = [annotation_type.value for annotation_type in entities.AnnotationType]
values.remove(entities.AnnotationType.NOTE.value)
self.add(field="type", values=values, operator=FiltersOperations.IN, method=FiltersMethod.AND)
def __generate_query(self):
filters_dict = dict()
if len(self.or_filter_list) > 0:
or_filters = list()
for single_filter in self.or_filter_list:
or_filters.append(
single_filter.prepare(recursive=self.recursive and self.resource == FiltersResource.ITEM)
)
filters_dict["$or"] = or_filters
if len(self.and_filter_list) > 0:
and_filters = list()
for single_filter in self.and_filter_list:
and_filters.append(
single_filter.prepare(recursive=self.recursive and self.resource == FiltersResource.ITEM)
)
filters_dict["$and"] = and_filters
return filters_dict
def __generate_custom_query(self):
if "filter" not in self.custom_filter:
query_dict = {"filter": self.custom_filter}
else:
query_dict = self.custom_filter
if "resource" not in query_dict:
query_dict["resource"] = self.resource
if "page" not in query_dict:
query_dict["page"] = self.page
if "pageSize" not in query_dict:
query_dict["pageSize"] = self.page_size
if self.join is not None and 'join' not in query_dict:
query_dict["join"] = self.join
if "join" in query_dict and "on" not in query_dict["join"]:
if self.resource == FiltersResource.ITEM:
query_dict["join"]["on"] = {
"resource": FiltersResource.ANNOTATION.value,
"local": "itemId",
"forigen": "id",
}
else:
query_dict["join"]["on"] = {"resource": FiltersResource.ITEM.value, "local": "id", "forigen": "itemId"}
return query_dict
def __generate_ref_query(self):
refs = list()
if self._ref_task:
task_refs = list()
if not isinstance(self._ref_task_id, list):
self._ref_task_id = [self._ref_task_id]
for ref_id in self._ref_task_id:
task_refs.append({"type": "task", "id": ref_id})
refs += task_refs
if self._ref_assignment:
assignment_refs = list()
if not isinstance(self._ref_assignment_id, list):
self._ref_assignment_id = [self._ref_assignment_id]
for ref_id in self._ref_assignment_id:
assignment_refs.append({"type": "assignment", "id": ref_id})
refs += assignment_refs
return refs
[docs] def prepare(self, operation=None, update=None, query_only=False, system_update=None, system_metadata=False):
"""
To dictionary for platform call
:param str operation: operation
:param update: update
:param bool query_only: query only
:param system_update: system update
:param system_metadata: True, if you want to change metadata system
:return: dict of the filter
:rtype: dict
"""
########
# json #
########
_json = dict()
if self.custom_filter is not None:
_json = self.__generate_custom_query()
return _json
_json["filter"] = self.__generate_query()
##################
# filter options #
##################
if not query_only:
if len(self.sort) > 0:
_json["sort"] = self.sort
self.__validate_page_size()
_json["page"] = self.page
_json["pageSize"] = self.page_size
_json["resource"] = self.resource
########
# join #
########
if self.join is not None:
_json["join"] = self.join
#####################
# operation or refs #
#####################
if self._ref_assignment or self._ref_task:
_json["references"] = {"operation": self._ref_op, "refs": self.__generate_ref_query()}
elif operation is not None:
if operation == "update":
if update:
_json[operation] = {"metadata": {"user": update}}
else:
_json[operation] = dict()
if system_metadata and system_update:
_json["systemSpace"] = True
_json[operation]["metadata"] = _json[operation].get("metadata", dict())
_json[operation]["metadata"]["system"] = system_update
elif operation == "delete":
_json[operation] = True
_json.pop("sort", None)
if self.resource == FiltersResource.ITEM:
_json.pop("page", None)
_json.pop("pageSize", None)
else:
raise exceptions.PlatformException(error="400", message="Unknown operation: {}".format(operation))
if self.context is not None:
_json["context"] = self.context
if self._system_space is not None:
_json["systemSpace"] = self._system_space
return _json
def print(self, indent=2):
print(json.dumps(self.prepare(), indent=indent))
[docs] def sort_by(self, field, value: FiltersOrderByDirection = FiltersOrderByDirection.ASCENDING):
"""
sort the filter
:param str field: field to sort by it
:param dl.FiltersOrderByDirection value: FiltersOrderByDirection.ASCENDING, FiltersOrderByDirection.DESCENDING
**Example**:
.. code-block:: python
filter.sort_by(field='metadata.user', values=dl.FiltersOrderByDirection.ASCENDING)
"""
if value not in [FiltersOrderByDirection.ASCENDING, FiltersOrderByDirection.DESCENDING]:
raise exceptions.PlatformException(error="400", message="Sort can be by ascending or descending order only")
self.sort[field] = value.value if isinstance(value, FiltersOrderByDirection) else value
[docs] def open_in_web(self, resource):
"""
Open the filter in the platform data browser (in a new web browser)
:param str resource: dl entity to apply filter on. currently only supports dl.Dataset
"""
if isinstance(resource, entities.Dataset):
resource._client_api._open_in_web(url=self.platform_url(resource=resource))
else:
raise NotImplementedError("Not implemented for resource type: {}".format(type(resource)))
[docs] def save(self, project: entities.Project, filter_name: str):
"""
Save the current DQL filter to the project
:param project: dl.Project
:param filter_name: the saved filter's name
:return: True if success
"""
_json_filter = self.prepare()
shebang_dict = {
"type": "dql",
"shebang": "dataloop",
"metadata": {
"version": "1.0.0",
"system": {"mimetype": "dql"},
"dltype": "filter",
"filterFieldsState": [],
"resource": "items",
"filter": _json_filter.pop("filter"),
"join": _json_filter.pop("join"),
},
}
b_dataset = project.datasets._get_binaries_dataset()
byte_io = io.BytesIO()
byte_io.name = filter_name
byte_io.write(json.dumps(shebang_dict).encode())
byte_io.seek(0)
b_dataset.items.upload(local_path=byte_io, remote_path="/.dataloop/dqlfilters/items", remote_name=filter_name)
return True
[docs] @classmethod
def load(cls, project: entities.Project, filter_name: str) -> "Filters":
"""
Load a saved filter from the project by name
:param project: dl.Project entity
:param filter_name: filter name
:return: dl.Filters
"""
b_dataset = project.datasets._get_binaries_dataset()
f = entities.Filters(
custom_filter={
"filter": {"$and": [{"filename": f"/.dataloop/dqlfilters/items/{filter_name}"}]},
"page": 0,
"pageSize": 1000,
"resource": "items",
}
)
pages = b_dataset.items.list(filters=f)
if pages.items_count == 0:
raise exceptions.NotFound(
f"Saved filter not found: {filter_name}. Run `Filters.list()` to list existing filters"
)
with open(pages.items[0].download()) as f:
data = json.load(f)
custom_filter = data["metadata"]["filter"]
custom_filter["join"] = data["metadata"]["join"]
return cls(custom_filter=custom_filter)
[docs] @staticmethod
def list(project: entities.Project) -> list:
"""
List all saved filters for a project
:param project: dl.Project entity
:return: a list of all the saved filters' names
"""
b_dataset = project.datasets._get_binaries_dataset()
f = entities.Filters(use_defaults=False, field="dir", values="/.dataloop/dqlfilters/items")
pages = b_dataset.items.list(filters=f)
all_filter_items = list(pages.all())
names = [i.name for i in all_filter_items]
return names
@staticmethod
def _get_split_filters(dataset, filters, max_items, max_workers=4, max_depth=None) -> Generator[dict, None, None]:
"""
Generator that yields filter chunks for large datasets using a bounded
thread pool. Splits ranges by id until each subset holds <= max_items.
:param dataset: Dataset object to get filters for
:param filters: Base filters to apply
:param max_items: Maximum number of items per filter chunk
:param max_workers: Maximum number of threads for parallel processing
:param max_depth: Maximum depth of the filter tree. Default calculated by the formula: np.ceil(np.log2(count/max_items) + 3).
:yield: Filter payloads covering subsets of items
"""
if max_items <= 0:
raise ValueError("_get_split_filters : max_items must be greater than 0")
if filters is None:
filters = entities.Filters()
from_id, count = Filters._get_first_last_item(
items_repo=dataset.items, filters=filters, order_by_direction=FiltersOrderByDirection.ASCENDING
)
to_id, count = Filters._get_first_last_item(
items_repo=dataset.items, filters=filters, order_by_direction=FiltersOrderByDirection.DESCENDING
)
if from_id is None or to_id is None or count == 0:
return
max_depth = max_depth if max_depth is not None else np.ceil(np.log2(count / max_items) + 3)
def make_filter_dict(range_from_id, range_to_id, strict_from: bool = False):
fdict = copy.deepcopy(filters.prepare())
lower_op = "$gt" if strict_from else "$gte"
fdict["filter"].setdefault("$and", []).extend(
[{"id": {lower_op: range_from_id}}, {"id": {"$lte": range_to_id}}]
)
return fdict
def task(range_from_id, range_to_id, depth, strict_from: bool):
fdict = make_filter_dict(range_from_id, range_to_id, strict_from)
range_filters = entities.Filters(custom_filter=fdict, page_size=1)
actual_from, count = Filters._get_first_last_item(
dataset.items, range_filters, FiltersOrderByDirection.ASCENDING
)
if count == 0:
return ("none", None, None)
if count <= max_items or depth >= max_depth:
return ("yield", fdict, None)
actual_to, count = Filters._get_first_last_item(
dataset.items, range_filters, FiltersOrderByDirection.DESCENDING
)
if not actual_from or not actual_to or actual_from == actual_to:
return ("yield", fdict, None)
mid = Filters._get_middle_id(actual_from, actual_to)
if not mid or mid == actual_from or mid == actual_to:
return ("yield", fdict, None)
# Left child: [actual_from, mid] inclusive; Right child: (mid, actual_to] exclusive lower bound
return (
"split",
None,
(
(actual_from, mid, depth + 1, False), # left child includes lower bound
(mid, actual_to, depth + 1, True), # right child excludes midpoint
),
)
pending = deque([(from_id, to_id, 0, False)])
futures = set()
with ThreadPoolExecutor(max_workers=max_workers) as pool:
while futures or pending:
# Submit all pending tasks
while pending:
fr, to, d, strict = pending.popleft()
futures.add(pool.submit(task, fr, to, d, strict))
if not futures:
break
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for fut in done:
try:
kind, fdict, ranges = fut.result()
except Exception as e:
logger.warning(f"split filters task failed: {e}")
continue
if kind == "yield" and fdict is not None:
yield fdict
elif kind == "split" and ranges is not None:
left, right = ranges
pending.append(left)
pending.append(right)
@staticmethod
def _get_first_last_item(
items_repo, filters, order_by_direction=FiltersOrderByDirection.ASCENDING
) -> Tuple[Optional[str], int]:
filters_dict = copy.deepcopy(filters.prepare())
filters_dict["sort"] = {"id": order_by_direction.value}
filters_dict["page"] = 0
filters_dict["pageSize"] = 1
cloned_filters = entities.Filters(custom_filter=filters_dict)
try:
pages = items_repo.list(filters=cloned_filters)
return (pages.items[0].id if pages.items else None, pages.items_count)
except Exception:
return None, 0
@staticmethod
def _get_middle_id(from_id, to_id):
"""Calculate middle ObjectId between two ObjectIds with sub-second precision.
Computes the midpoint in the full 12-byte ObjectId numeric space to avoid
second-level rounding inherent to datetime-based construction.
"""
try:
# Convert ObjectId strings to integers using base 16 (hexadecimal)
start_int = int(str(ObjectId(from_id)), base=16)
end_int = int(str(ObjectId(to_id)), base=16)
if start_int >= end_int:
return from_id
mid_int = (start_int + end_int) // 2
if mid_int <= start_int:
mid_int = start_int + 1
if mid_int > end_int:
mid_int = end_int
# Convert back to 12-byte ObjectId format
mid_bytes = mid_int.to_bytes(length=12, byteorder="big")
return str(ObjectId(mid_bytes))
except Exception:
return from_id # Fallback to from_id if calculation fails
class SingleFilter:
def __init__(self, field, values, operator: FiltersOperations = None):
self.field = field
self.values = values
self.operator = operator
@staticmethod
def __add_recursive(value):
if not value.endswith("*") and not os.path.splitext(value)[-1].startswith("."):
if value.endswith("/"):
value = value + "**"
else:
value = value + "/**"
return value
def prepare(self, recursive=False):
"""
To dictionary for platform call
:param recursive:recursive
"""
_json = dict()
values = self.values
if recursive and self.field == "filename":
if isinstance(values, str):
values = self.__add_recursive(value=values)
elif isinstance(values, list):
for i_value, value in enumerate(values):
values[i_value] = self.__add_recursive(value=value)
if self.operator is None:
_json[self.field] = values
else:
value = dict()
op = self.operator.value if isinstance(self.operator, FiltersOperations) else self.operator
value["${}".format(op)] = values
_json[self.field] = value
return _json
def print(self, indent=2):
print(json.dumps(self.prepare(), indent=indent))