Source code for dtlpy.entities.paged_entities

import logging
import math
import time
import tqdm
import copy
import sys

import attr
from .. import miscellaneous
from ..services.api_client import ApiClient

logger = logging.getLogger(name='dtlpy')


[docs]@attr.s class PagedEntities: """ Pages object """ # api _client_api = attr.ib(type=ApiClient, repr=False) # params page_offset = attr.ib() page_size = attr.ib() filters = attr.ib() items_repository = attr.ib(repr=False) has_next_page = attr.ib(default=False) total_pages_count = attr.ib(default=0) items_count = attr.ib(default=0) # execution attribute _service_id = attr.ib(default=None, repr=False) _project_id = attr.ib(default=None, repr=False) _order_by_type = attr.ib(default=None, repr=False) _order_by_direction = attr.ib(default=None, repr=False) _execution_status = attr.ib(default=None, repr=False) _execution_resource_type = attr.ib(default=None, repr=False) _execution_resource_id = attr.ib(default=None, repr=False) _execution_function_name = attr.ib(default=None, repr=False) _list_function = attr.ib(default=None, repr=False) # items list items = attr.ib(default=miscellaneous.List(), repr=False)
[docs] def process_result(self, result): """ :param result: json object """ if 'page_offset' in result: self.page_offset = result['page_offset'] if 'page_size' in result: self.page_size = result['page_size'] if 'hasNextPage' in result: self.has_next_page = result['hasNextPage'] if 'totalItemsCount' in result: self.items_count = result['totalItemsCount'] if 'totalPagesCount' in result: self.total_pages_count = result['totalPagesCount'] if 'items' in result: items = self.items_repository._build_entities_from_response(response_items=result['items']) else: items = miscellaneous.List(list()) return items
def __getitem__(self, y): self.go_to_page(y) return self.items def __len__(self): return self.items_count def __iter__(self): pbar = tqdm.tqdm(total=self.total_pages_count, disable=self._client_api.verbose.disable_progress_bar, file=sys.stdout, desc="Iterate Pages") if self.page_offset != 0: # reset the count for page 0 self.page_offset = 0 self.get_page() while True: yield self.items pbar.update() if self.has_next_page: self.page_offset += 1 self.get_page() else: pbar.close() break def __reversed__(self): self.page_offset = self.total_pages_count - 1 while True: self.get_page() yield self.items if self.page_offset == 0: break self.page_offset -= 1
[docs] def return_page(self, page_offset=None, page_size=None): """ Return page :param page_offset: page offset :param page_size: page size """ if page_size is None: page_size = self.page_size if page_offset is None: page_offset = self.page_offset if self.filters is not None: filters = copy.copy(self.filters) filters.page = page_offset filters.page_size = page_size if self._list_function is None: result = self.items_repository._list(filters=filters) else: result = self._list_function(filters=filters) items = self.process_result(result) return items else: raise ValueError('Cant return page. Filters is empty')
[docs] def get_page(self, page_offset=None, page_size=None): """ Get page :param page_offset: page offset :param page_size: page size """ items = self.return_page(page_offset=page_offset, page_size=page_size) self.items = items
[docs] def next_page(self): """ Brings the next page of items from host :return: """ self.page_offset += 1 self.get_page()
[docs] def prev_page(self): """ Brings the previous page of items from host :return: """ self.page_offset -= 1 self.get_page()
[docs] def go_to_page(self, page=0): """ Brings specified page of items from host :param page: page number :return: """ self.page_offset = page self.get_page()
def all(self): page_offset = 0 page_size = 100 pbar = tqdm.tqdm(total=self.items_count, disable=self._client_api.verbose.disable_progress_bar, file=sys.stdout, desc='Iterate Entity') total_pages = math.ceil(self.items_count / page_size) jobs = list() pool = self._client_api.thread_pools('item.page') while True: time.sleep(0.01) # to flush the results if page_offset <= total_pages: jobs.append(pool.submit(self.return_page, **{'page_offset': page_offset, 'page_size': page_size})) page_offset += 1 for i_job, job in enumerate(jobs): if job.done(): for item in job.result(): pbar.update() yield item jobs.remove(job) if len(jobs) == 0: pbar.close() break ######## # misc # ######## def print(self, columns=None): self.items.print(columns=columns) def to_df(self, columns=None): return self.items.to_df(columns=columns)