Source code for dtlpy.repositories.codebases

import hashlib
import logging
import os
import io
import random
from typing import List

from .. import entities, PlatformException, exceptions, repositories, miscellaneous
from ..services.api_client import ApiClient
from ..services.api_client import client as client_api

logger = logging.getLogger(name='dtlpy')


[docs]class Codebases: """ Codebase Repository The Codebases class allows the user to manage codebases and their properties. The codebase is the code the user uploads for the user's packages to run. Read more about `codebase <https://developers.dataloop.ai/tutorials/faas/introduction/chapter/>`_ in our FaaS (function as a service). """ def __init__(self, client_api: ApiClient, project: entities.Project = None, dataset: entities.Dataset = None, project_id: str = None): self._client_api = client_api self._project = project self._project_id = project_id self._dataset = dataset self._items_repository = None self.git_utils = miscellaneous.GitUtils() @property def items_repository(self) -> repositories.Items: if self._items_repository is None: if self._dataset is not None or self._project is not None: self._items_repository = self.dataset.items else: self._items_repository = repositories.Items(client_api=client_api) assert isinstance(self._items_repository, repositories.Items) return self._items_repository @property def project(self) -> entities.Project: if self._project is None: # project is None - try dataset if self._dataset is None: # dataset is None - try from project id if self._project_id is None: raise PlatformException(error='400', message='Cannot perform this action without a project') else: self._project = repositories.Projects(client_api=self._client_api).get(project_id=self._project_id) else: # dataset is not None - take project from there self._project = self.dataset.project assert isinstance(self._project, entities.Project) return self._project @property def dataset(self) -> entities.Dataset: if self._dataset is None: # get dataset from project try: self._dataset = self.project.datasets._get_binaries_dataset() except exceptions.NotFound: raise ValueError('Missing "Binaries" dataset in the project. Please contact support for help') assert isinstance(self._dataset, entities.Dataset) return self._dataset @dataset.setter def dataset(self, dataset: entities.Dataset): if not isinstance(dataset, entities.Dataset): raise ValueError('Must input a valid Dataset entity') self._dataset = dataset @staticmethod def __file_hash(filepath): m = hashlib.md5() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): m.update(chunk) return m.hexdigest()
[docs] def list_versions(self, codebase_name: str) -> entities.PagedEntities: """ List all codebase versions. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. **Example**: .. code-block:: python package.codebases.list_versions(codebase_name='codebase_name') :param str codebase_name: code base name :return: list of versions :rtype: list """ filters = entities.Filters() filters.add(field='filename', values='/codebases/{}/*'.format(codebase_name)) versions = self.items_repository.list(filters=filters) return versions
[docs] def list(self) -> entities.PagedEntities: """ List all codebases. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. **Example**: .. code-block:: python package.codebases.list() :return: Paged entity :rtype: dtlpy.entities.paged_entities.PagedEntities """ filters = entities.Filters() filters.add(field='filename', values='/codebases/*') filters.add(field='type', values='dir') codebases = self.items_repository.list(filters=filters) return codebases
[docs] def get(self, codebase_name: str = None, codebase_id: str = None, version: str = None): """ Get a Codebase object to use in your code. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. **Example**: .. code-block:: python package.codebases.get(codebase_name='codebase_name') :param str codebase_name: optional - search by name :param str codebase_id: optional - search by id :param str version: codebase version. default is latest. options: "all", "latest" or ver number - "10" :return: Codebase object """ if codebase_id is not None: matched_version = self.items_repository.get(item_id=codebase_id) # verify input codebase name is same as the given id if codebase_name is not None and matched_version.name != codebase_name: logger.warning( "Mismatch found in codebases.get: codebase_name is different then codebase.name: " "{!r} != {!r}".format( codebase_name, matched_version.name)) codebase = entities.Codebase(type='item', client_api=self._client_api, itemId=matched_version.id, item=matched_version) return codebase if codebase_name is None: raise PlatformException(error='400', message='Either "codebase_name" or "codebase_id" is needed') if version is None: version = 'latest' if version not in ['all', 'latest']: try: matched_version = self.items_repository.get( filepath='/codebases/{}/{}.zip'.format(codebase_name, version)) except Exception: raise PlatformException(error='404', message='No matching version was found. version: {}'.format(version)) codebase = entities.Codebase(type='item', client_api=self._client_api, itemId=matched_version.id, item=matched_version) return codebase # get all or latest versions_pages = self.list_versions(codebase_name=codebase_name) if versions_pages.items_count == 0: raise PlatformException(error='404', message='No codebase was found. name: {}'.format(codebase_name)) else: if version == 'all': codebase = [entities.Codebase(type='item', client_api=self._client_api, itemId=mv.id, item=mv) for mv in versions_pages.all()] elif version == 'latest': max_ver = -1 matched_version = None for page in versions_pages: for ver in page: if ver.type == 'dir': continue # extract version from filepath ver_int = int(os.path.splitext(ver.name)[0]) if ver_int > max_ver: max_ver = ver_int matched_version = ver if matched_version is None: raise PlatformException(error='404', message='No codebase was found. name: {}'.format(codebase_name)) else: codebase = entities.Codebase(type='item', client_api=self._client_api, itemId=matched_version.id, item=matched_version) else: raise PlatformException(error='404', message='Unknown version string: {}'.format(version)) return codebase
[docs] @staticmethod def get_current_version(all_versions_pages, zip_md): """ This method returns the current version of the codebase and other versions found. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. :param codebase all_versions_pages: codebase object :param zip_md: zipped file of codebase :return: current version and all versions found of codebase :rtype: int, int **Example**: .. code-block:: python package.codebases.get_current_version(all_versions_pages='codebase_entity', zip_md='path') """ latest_version = 0 same_version_found = None # go over all existing versions for v_item in all_versions_pages: # get latest version if int(os.path.splitext(v_item.item.name)[0]) > latest_version: latest_version = int(os.path.splitext(v_item.item.name)[0]) # check md5 to find same codebase if 'md5' in v_item.item.metadata['system'] and v_item.item.metadata['system']['md5'] == zip_md: same_version_found = v_item break return latest_version + 1, same_version_found
[docs] def pack(self, directory: str, name: str = None, extension: str = 'zip', description: str = '', ignore_directories: List[str] = None, ignore_max_file_size: bool = False): """ Zip a local code directory and post to codebases. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. :param str directory: local directory to pack :param str name: codebase name :param str extension: the extension of the file :param str description: codebase description :param list[str] ignore_directories: directories to ignore. :return: Codebase object :rtype: dtlpy.entities.codebase.Codebase **Example**: .. code-block:: python package.codebases.pack(directory='path_dir', name='codebase_name') """ # create/get .dataloop dir cwd = os.getcwd() dl_dir = os.path.join(cwd, '.dataloop') if not os.path.isdir(dl_dir): os.mkdir(dl_dir) # get codebase name if name is None: name = os.path.basename(directory) # create/get dist folder zip_filename = os.path.join(dl_dir, '{}_{}.{}'.format(name, str(random.randrange(0, 1000)), extension)) try: if not os.path.isdir(directory): raise PlatformException(error='400', message='Not a directory: {}'.format(directory)) directory = os.path.abspath(directory) # create zipfile miscellaneous.Zipping.zip_directory(zip_filename=zip_filename, directory=directory, ignore_directories=ignore_directories, ignore_max_file_size=ignore_max_file_size) zip_md = self.__file_hash(zip_filename) # get latest version same_version_found = None try: all_versions_pages = self.get(codebase_name=name, version='all') except exceptions.NotFound: all_versions_pages = None if all_versions_pages is None: # no codebase with that name - create new version current_version = 0 else: current_version, same_version_found = self.get_current_version(all_versions_pages=all_versions_pages, zip_md=zip_md) if same_version_found is not None: # same md5 hash file found in version - return the matched version codebase = same_version_found else: # no matched version was found - create a new version # read from zipped file with open(zip_filename, 'rb') as f: buffer = io.BytesIO(f.read()) buffer.name = '{}.{}'.format(str(current_version), extension) # upload item item = self.items_repository.upload(local_path=buffer, remote_path='/codebases/{}'.format(name)) if isinstance(item, list) and len(item) == 0: raise PlatformException(error='400', message='Failed upload codebase, check log file for details') # add source code to metadata if 'system' not in item.metadata: item.metadata['system'] = dict() item.metadata['system']['description'] = description item.metadata['system']['md5'] = zip_md # add git info to metadata if miscellaneous.GitUtils.is_git_repo(path=directory): # create 'git' field in metadata if 'git' not in item.metadata: item.metadata['git'] = dict() # add to metadata item.metadata['git']['status'] = miscellaneous.GitUtils.git_status(path=directory) item.metadata['git']['log'] = miscellaneous.GitUtils.git_log(path=directory) item.metadata['git']['url'] = miscellaneous.GitUtils.git_url(path=directory) # update item item = self.items_repository.update(item=item, system_metadata=True) codebase = entities.Codebase(type='item', client_api=self._client_api, itemId=item.id, item=item) except Exception: logger.error('Error when packing:') raise finally: # cleanup if zip_filename is not None: if os.path.isfile(zip_filename): os.remove(zip_filename) return codebase
def _unpack_single(self, codebase, download_path: str, local_path: str): """ :param dtlpy.entities.codebase.Codebase codebase: codebase object :param str download_path: :param str local_path: """ # downloading with specific filename if isinstance(codebase, entities.ItemCodebase): artifact_filepath = self.items_repository.download(items=codebase.item_id, save_locally=True, local_path=os.path.join(download_path, codebase.item.name), to_items_folder=False) if not os.path.isfile(artifact_filepath): raise PlatformException(error='404', message='error downloading codebase. see above for more information') miscellaneous.Zipping.unzip_directory(zip_filename=artifact_filepath, to_directory=local_path) os.remove(artifact_filepath) logger.info('Source code was unpacked to: {}'.format(artifact_filepath)) elif isinstance(codebase, entities.Item): artifact_filepath = codebase.download(save_locally=True, local_path=os.path.join(download_path, codebase.name), to_items_folder=False) if not os.path.isfile(artifact_filepath): raise PlatformException(error='404', message='error downloading codebase. see above for more information') miscellaneous.Zipping.unzip_directory(zip_filename=artifact_filepath, to_directory=local_path) os.remove(artifact_filepath) logger.info('Source code was unpacked to: {}'.format(artifact_filepath)) elif isinstance(codebase, entities.GitCodebase): if codebase.is_git_repo(local_path) or \ codebase.is_git_repo(os.path.join(local_path, codebase.git_repo_name)): artifact_filepath = self.pull_git(codebase=codebase, local_path=local_path) else: # Clone the repo if not exist artifact_filepath = self.clone_git(codebase=codebase, local_path=local_path) else: raise ValueError('Not implemented: "_unpack_single" for codebase type: {!r}'.format(codebase.type)) return artifact_filepath def _get_single(self, codebase: entities.GitCodebase, name: str): value = None integration_id = codebase.credentials.get(name, {}).get('id', None) key = codebase.credentials.get(name, {}).get('key', None) if integration_id is not None: try: key = self.project.integrations.get(integrations_id=integration_id).name except Exception: pass value = os.environ.get(key, None) return value def _get_credential(self, codebase: entities.GitCodebase): username = password = None if codebase.credentials: try: username = self._get_single(codebase=codebase, name='username') password = self._get_single(codebase=codebase, name='password') except Exception: logger.exception('Failed to get credentials from codebase') return username, password
[docs] def clone_git(self, codebase: entities.Codebase, local_path: str): """ Clone code base **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. :param dtlpy.entities.codebase.Codebase codebase: codebase object :param str local_path: local path :return: path where the clone will be :rtype: str **Example**: .. code-block:: python package.codebases.clone_git(codebase='codebase_entity', local_path='local_path') """ if not isinstance(codebase, entities.GitCodebase): raise RuntimeError('only support Git Codebase') username, password = self._get_credential(codebase=codebase) response = self.git_utils.git_clone(path=local_path, git_url=codebase.git_url, tag=codebase.git_tag, username=username, password=password) if response: logger.info('Source code was cloned from {}(Git) to: {}'.format(codebase.git_url, local_path)) else: raise RuntimeError('Failed cloning. See above for full log. codebase: {}'.format(codebase)) return local_path
[docs] def pull_git(self, codebase, local_path): """ Pull (download) a codebase. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. :param dtlpy.entities.codebase.Codebase codebase: codebase object :param str local_path: local path :return: path where the Pull will be :rtype: str **Example**: .. code-block:: python package.codebases.pull_git(codebase='codebase_entity', local_path='local_path') """ pull_cmd = 'git pull' if not codebase.is_git_repo(local_path): local_path = os.path.join(local_path, codebase.git_repo_name) response = self.git_utils.git_command(path=local_path, cmd=pull_cmd) if response: logger.info('pull successful {}(Git) to: {}'.format(codebase.git_url, os.path.dirname(local_path))) else: logger.critical("Could not pull") # we can test if this is not the same repo if needed... # FIXME need to change the order - checkout new branch and pull response_2 = self.git_utils.git_command(path=local_path, cmd='git checkout {}'.format(codebase.git_tag)) return local_path
[docs] def unpack(self, codebase: entities.Codebase = None, codebase_name: str = None, codebase_id: str = None, local_path: str = None, version: str = None): """ Unpack codebase locally. Download source code and unzip. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must have a package. :param dtlpy.entities.codebase.Codebase codebase: `dl.Codebase` object :param str codebase_name: search by name :param str codebase_id: search by id :param str local_path: local path to save codebase :param str version: codebase version to unpack. default - latest :return: String (dirpath) :rtype: str **Example**: .. code-block:: python package.codebases.unpack(codebase='codebase_entity', local_path='local_path') """ # get the codebase / multiple codebase if codebase is None: codebase = self.get(codebase_name=codebase_name, codebase_id=codebase_id, version=version) elif codebase_name is not None or codebase_id is not None: logger.warning("Using given codebase. Does not preforming search with name {!r} / id {!r}". format(codebase_name, codebase_id)) download_path = local_path if isinstance(codebase, entities.PagedEntities): for page in codebase: for item in page: local_path = os.path.join(download_path, 'v.' + item.name.split('.')[0]) self._unpack_single(codebase=item, download_path=download_path, local_path=local_path) return os.path.dirname(local_path) elif isinstance(codebase, list): for item in codebase: local_path = os.path.join(download_path, 'v.' + item.item.name.split('.')[0]) self._unpack_single(codebase=item, download_path=download_path, local_path=local_path) return os.path.dirname(local_path) elif isinstance(codebase, (entities.ItemCodebase, entities.Item, entities.GitCodebase)): artifact_filepath = self._unpack_single(codebase=codebase, download_path=download_path, local_path=local_path) if isinstance(codebase, (entities.ItemCodebase, entities.Item)): dir_path = os.path.dirname(artifact_filepath) # use the directory of the artifact else: dir_path = artifact_filepath logger.info('Source code was unpacked to: {}'.format(dir_path)) else: raise PlatformException( error='404', message='Codebase was not found! name:{name}, id:{id}'.format(name=codebase_name, id=codebase_id)) return dir_path