Source code for dtlpy.entities.paged_entities

import logging
import math
import time
import tqdm
import copy
import sys
from typing import Optional, List, Any

import attr

from .filters import FiltersOperations, FiltersOrderByDirection, FiltersResource
from .. import miscellaneous
from ..services.api_client import ApiClient

logger = logging.getLogger(name='dtlpy')


[docs]@attr.s class PagedEntities: """ Pages object for efficient API pagination. Defaults to offset-based pagination for compatibility with all operations. Switches to keyset/cursor-based pagination (using 'id' as the cursor) during iteration for performance. Falls back to offset-based pagination if keyset is not possible (e.g., custom sort). """ # api _client_api: ApiClient = attr.ib(repr=False) # params page_offset: int = attr.ib() page_size: int = attr.ib() filters: Any = attr.ib() items_repository: Any = attr.ib(repr=False) has_next_page: bool = attr.ib(default=False) total_pages_count: int = attr.ib(default=0) items_count: int = attr.ib(default=0) # hybrid pagination use_id_based_paging: bool = attr.ib(default=False) # Default to False for offset-based pagination last_seen_id: Optional[Any] = attr.ib(default=None) # execution attribute _service_id = attr.ib(default=None, repr=False) _project_id = attr.ib(default=None, repr=False) _list_function = attr.ib(default=None, repr=False) # items list items: List[Any] = attr.ib(default=miscellaneous.List(), repr=False) @staticmethod def _has_explicit_sort(flt) -> bool: """ Check if the filter has custom sort fields defined (not id/createdAt). """ prepared = flt.prepare() if flt else {} sort_fields = list(prepared.get("sort", {}).keys()) if isinstance(sort_fields, list) and len(sort_fields) > 0: return sort_fields[0] not in {"id", "createdAt"} return False def _should_use_keyset_pagination(self) -> bool: """ Determine whether to use keyset pagination based on page offset and resource type. Keyset pagination can only be used when page_offset is 0 (first page). :param page_offset: The page offset to check :return: True if keyset pagination should be used, False otherwise """ # Keyset pagination only works for page 0 (first page) if self.page_offset != 0: return False # can't use add to custom filter if self.filters.custom_filter is not None: return False # Check if the resource supports keyset pagination enable_id_based_paging = getattr(self.filters, "resource", None) in [ FiltersResource.ITEM, FiltersResource.ANNOTATION, FiltersResource.FEATURE, ] if not enable_id_based_paging: return False # Check if there's no explicit sort that would prevent keyset pagination if self._has_explicit_sort(self.filters): return False return True
[docs] def process_result(self, result: dict) -> List[Any]: """ Process the API result and update pagination state. :param result: json object :return: list of items """ # Only update page_offset if using offset-based pagination if not self.use_id_based_paging and 'page_offset' in result: self.page_offset = result['page_offset'] if 'page_size' in result: self.page_size = result['page_size'] if 'hasNextPage' in result: self.has_next_page = result['hasNextPage'] if 'totalItemsCount' in result: self.items_count = result['totalItemsCount'] if 'totalPagesCount' in result: self.total_pages_count = result['totalPagesCount'] if 'items' in result: items = self.items_repository._build_entities_from_response(response_items=result['items']) else: items = miscellaneous.List(list()) return items
def __getitem__(self, y: int) -> List[Any]: # If we're already on the requested page, return current items if y == self.page_offset: return self.items # Otherwise, go to the requested page self.go_to_page(y) return self.items def __len__(self) -> int: return self.items_count def __iter__(self): # Use keyset/cursor-based pagination for iteration when possible self.last_seen_id = None self.page_offset = 0 # Start from the first page for iteration self.use_id_based_paging = self._should_use_keyset_pagination() self.has_next_page = True # Start with assumption that there are more pages self.page_size = self.page_size or 100 pbar = tqdm.tqdm(total=self.items_count, disable=self._client_api.verbose.disable_progress_bar_iterate_pages, file=sys.stdout, desc="Iterate Pages") # Get the first page self.get_page() if self.items: yield self.items pbar.update() # Continue with next pages while self.has_next_page: if self.use_id_based_paging: # For keyset pagination, just get the next page self.page_offset = 0 self.get_page() else: # For offset pagination, increment the offset self._move_page_offset(1) self.get_page() if not self.items: break yield self.items pbar.update() pbar.close() def __reversed__(self): # Force offset-based pagination for reverse iteration self.use_id_based_paging = False self.page_offset = self.total_pages_count - 1 while True: self.get_page() yield self.items if self.page_offset == 0: break self._move_page_offset(-1) def _move_page_offset(self, offset: int) -> None: """ Move the page offset by a given step. :param offset: offset to move """ self.page_offset += offset if self.filters.custom_filter is not None: if 'page' in self.filters.custom_filter and self.filters.custom_filter['page'] != self.page_offset: self.filters.custom_filter['page'] = self.page_offset
[docs] def return_page(self, page_offset: Optional[int] = None, page_size: Optional[int] = None) -> List[Any]: """ Return a page of results using offset-based pagination by default. Switches to keyset/cursor-based pagination when supported and beneficial. :param page_offset: page offset (for offset-based) :param page_size: page size :return: list of items """ if page_size is not None: self.page_size = page_size if page_offset is not None: self.page_offset = page_offset if self.filters is None: raise ValueError("Can't return page. Filters is empty") self.filters.page_size = self.page_size self.filters.page = self.page_offset req = copy.deepcopy(self.filters) # Determine pagination method based on page offset and resource type self.use_id_based_paging = self._should_use_keyset_pagination() if self.use_id_based_paging: # Use keyset/cursor-based pagination prepared = req.prepare() sort_spec = prepared.get("sort", {}) order = next(iter(sort_spec.values()), None) if order is None: order = FiltersOrderByDirection.ASCENDING if order == FiltersOrderByDirection.DESCENDING: operator_value = FiltersOperations.LESS_THAN else: operator_value = FiltersOperations.GREATER_THAN req.sort_by(field="id", value=order) req.page = 0 # always fetch from the start for keyset # Only add last_seen_id filter if we're not explicitly requesting page 0 if self.last_seen_id: req.add( field="id", values=self.last_seen_id, operator=operator_value, method=FiltersOperations.AND, ) # Fetch data if self._list_function is None: result = self.items_repository._list(filters=req) else: result = self._list_function(filters=req) items = self.process_result(result) # Update last_seen_id for keyset if self.use_id_based_paging and items and hasattr(items[-1], "id"): self.last_seen_id = items[-1].id elif self.use_id_based_paging and not items: self.last_seen_id = None return items
[docs] def get_page(self, page_offset: Optional[int] = None, page_size: Optional[int] = None) -> None: """ Get a page of results and update self.items. :param page_offset: page offset (for offset-based) :param page_size: page size """ items = self.return_page(page_offset=page_offset, page_size=page_size) self.items = items
[docs] def next_page(self) -> None: """ Brings the next page of items from host. """ if self.use_id_based_paging: # For keyset pagination, just get the next page self.get_page() else: # For offset pagination, increment the offset self._move_page_offset(1) self.get_page()
[docs] def prev_page(self) -> None: """ Brings the previous page of items from host. Only works with offset-based pagination. """ if self.use_id_based_paging: raise NotImplementedError("prev_page is not supported for keyset pagination.") self._move_page_offset(-1) self.get_page()
[docs] def go_to_page(self, page: int = 0) -> None: """ Brings specified page of items from host. For page 0, uses keyset pagination if supported. For other pages, uses offset-based pagination. :param page: page number """ # Reset last_seen_id when going to page 0 to ensure we get all items if page == 0: self.last_seen_id = None self.page_offset = page self.get_page()
[docs] def all(self): """ Iterate over all items in all pages efficiently. Uses the iterator implementation (__iter__). """ for items in self: for item in items: yield item
######## # misc # ######## def print(self, columns=None): self.items.print(columns=columns) def to_df(self, columns=None): return self.items.to_df(columns=columns)