Source code for dtlpy.repositories.codebases

import hashlib
import logging
import os
import io
import random

from .. import entities, PlatformException, exceptions, repositories, miscellaneous, services

logger = logging.getLogger(name='dtlpy')


[docs]class Codebases: """ Codebase repository """ def __init__(self, client_api: services.ApiClient, project: entities.Project = None, dataset: entities.Dataset = None, project_id: str = None): self._client_api = client_api if project is None and dataset is None: if project_id is None: raise PlatformException('400', 'at least one must be not None: dataset, project or project_id') else: project = repositories.Projects(client_api=client_api).get(project_id=project_id) self._project = project self._dataset = dataset self._items_repository = None self.git_utils = miscellaneous.GitUtils() @property def items_repository(self) -> repositories.Items: if self._items_repository is None: self._items_repository = self.dataset.items assert isinstance(self._items_repository, repositories.Items) return self._items_repository @property def project(self) -> entities.Project: if self._project is None: self._project = self.dataset.project assert isinstance(self._project, entities.Project) return self._project @property def dataset(self) -> entities.Dataset: if self._dataset is None: # get dataset from project try: self._dataset = self.project.datasets.get(dataset_name='Binaries') except exceptions.NotFound: self._dataset = None if self._dataset is None: logger.debug( 'Dataset for codebases was not found. Creating... dataset name: "Binaries". project_id={}'.format( self.project.id)) self._dataset = self.project.datasets.create(dataset_name='Binaries') # add system to metadata if 'metadata' not in self._dataset.to_json(): self._dataset.metadata = dict() if 'system' not in self._dataset.metadata: self._dataset.metadata['system'] = dict() self._dataset.metadata['system']['scope'] = 'system' self.project.datasets.update(dataset=self._dataset, system_metadata=True) assert isinstance(self._dataset, entities.Dataset) return self._dataset @dataset.setter def dataset(self, dataset: entities.Dataset): if not isinstance(dataset, entities.Dataset): raise ValueError('Must input a valid Dataset entity') self._dataset = dataset @staticmethod def __file_hash(filepath): m = hashlib.md5() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): m.update(chunk) return m.hexdigest()
[docs] def list_versions(self, codebase_name): """ List all codebase versions :param codebase_name: code base name :return: list of versions """ filters = entities.Filters() filters.add(field='filename', values='/codebases/{}/*'.format(codebase_name)) versions = self.items_repository.list(filters=filters) return versions
[docs] def list(self) -> entities.PagedEntities: """ List all code bases :return: Paged entity """ filters = entities.Filters() filters.add(field='filename', values='/codebases/*') filters.add(field='type', values='dir') codebases = self.items_repository.list(filters=filters) return codebases
[docs] def get(self, codebase_name=None, codebase_id=None, version=None): """ Get a Codebase object :param codebase_name: optional - search by name :param codebase_id: optional - search by id :param version: codebase version. default is latest. options: "all", "latest" or ver number - "10" :return: Codebase object """ if codebase_id is not None: matched_version = self.items_repository.get(item_id=codebase_id) # verify input codebase name is same as the given id if codebase_name is not None and matched_version.name != codebase_name: logger.warning( "Mismatch found in codebases.get: codebase_name is different then codebase.name: " "{!r} != {!r}".format( codebase_name, matched_version.name)) codebase = entities.ItemCodebase(item_id=matched_version.id, item=matched_version) return codebase if codebase_name is None: raise PlatformException(error='400', message='Either "codebase_name" or "codebase_id" is needed') if version is None: version = 'latest' if version not in ['all', 'latest']: try: matched_version = self.items_repository.get( filepath='/codebases/{}/{}.zip'.format(codebase_name, version)) except Exception: raise PlatformException(error='404', message='No matching version was found. version: {}'.format(version)) codebase = entities.ItemCodebase(item_id=matched_version.id, item=matched_version) return codebase # get all or latest versions_pages = self.list_versions(codebase_name=codebase_name) if versions_pages.items_count == 0: raise PlatformException(error='404', message='No codebase was found. name: {}'.format(codebase_name)) else: if version == 'all': codebase = [entities.ItemCodebase(item_id=mv.id, item=mv) for mv in versions_pages.all()] elif version == 'latest': max_ver = -1 matched_version = None for page in versions_pages: for ver in page: if ver.type == 'dir': continue # extract version from filepath ver_int = int(os.path.splitext(ver.name)[0]) if ver_int > max_ver: max_ver = ver_int matched_version = ver if matched_version is None: raise PlatformException(error='404', message='No codebase was found. name: {}'.format(codebase_name)) else: codebase = entities.ItemCodebase(item_id=matched_version.id, item=matched_version) else: raise PlatformException(error='404', message='Unknown version string: {}'.format(version)) return codebase
[docs] @staticmethod def get_current_version(all_versions_pages, zip_md): """ :param all_versions_pages: :param zip_md: """ latest_version = 0 same_version_found = None # go over all existing versions for v_item in all_versions_pages: # get latest version if int(os.path.splitext(v_item.item.name)[0]) > latest_version: latest_version = int(os.path.splitext(v_item.item.name)[0]) # check md5 to find same codebase if 'md5' in v_item.item.metadata['system'] and v_item.item.metadata['system']['md5'] == zip_md: same_version_found = v_item break return latest_version + 1, same_version_found
[docs] def pack(self, directory, name=None, description=''): """ Zip a local code directory and post to codebases :param directory: local directory to pack :param name: codebase name :param description: codebase description :return: Codebase object """ # create/get .dataloop dir cwd = os.getcwd() dl_dir = os.path.join(cwd, '.dataloop') if not os.path.isdir(dl_dir): os.mkdir(dl_dir) # get codebase name if name is None: name = os.path.basename(directory) # create/get dist folder zip_filename = os.path.join(dl_dir, '{}_{}.zip'.format(name, str(random.randrange(0, 1000)))) try: if not os.path.isdir(directory): raise PlatformException(error='400', message='Not a directory: {}'.format(directory)) directory = os.path.abspath(directory) # create zipfile miscellaneous.Zipping.zip_directory(zip_filename=zip_filename, directory=directory) zip_md = self.__file_hash(zip_filename) # get latest version same_version_found = None try: all_versions_pages = self.get(codebase_name=name, version='all') except exceptions.NotFound: all_versions_pages = None if all_versions_pages is None: # no codebase with that name - create new version current_version = 0 else: current_version, same_version_found = self.get_current_version(all_versions_pages=all_versions_pages, zip_md=zip_md) if same_version_found is not None: # same md5 hash file found in version - return the matched version codebase = same_version_found else: # no matched version was found - create a new version # read from zipped file with open(zip_filename, 'rb') as f: buffer = io.BytesIO(f.read()) buffer.name = str(current_version) + '.zip' # upload item item = self.items_repository.upload(local_path=buffer, remote_path='/codebases/{}'.format(name)) if isinstance(item, list) and len(item) == 0: raise PlatformException(error='400', message='Failed upload codebase, check log file for details') # add source code to metadata if 'system' not in item.metadata: item.metadata['system'] = dict() item.metadata['system']['description'] = description item.metadata['system']['md5'] = zip_md # add git info to metadata if miscellaneous.GitUtils.is_git_repo(path=directory): # create 'git' field in metadata if 'git' not in item.metadata: item.metadata['git'] = dict() # add to metadata item.metadata['git']['status'] = miscellaneous.GitUtils.git_status(path=directory) item.metadata['git']['log'] = miscellaneous.GitUtils.git_log(path=directory) item.metadata['git']['url'] = miscellaneous.GitUtils.git_url(path=directory) # update item item = self.items_repository.update(item=item, system_metadata=True) codebase = entities.ItemCodebase(item_id=item.id, client_api=self._client_api) except Exception: logger.error('Error when packing:') raise finally: # cleanup if zip_filename is not None: if os.path.isfile(zip_filename): os.remove(zip_filename) return codebase
def _unpack_single(self, codebase, download_path, local_path): """ :param codebase: :param download_path: :param local_path: """ # downloading with specific filename if isinstance(codebase, entities.ItemCodebase): artifact_filepath = self.items_repository.download(items=codebase.item_id, save_locally=True, local_path=os.path.join(download_path, codebase.item.name), to_items_folder=False) if not os.path.isfile(artifact_filepath): raise PlatformException(error='404', message='error downloading codebase. see above for more information') miscellaneous.Zipping.unzip_directory(zip_filename=artifact_filepath, to_directory=local_path) os.remove(artifact_filepath) logger.info('Source code was unpacked to: {}'.format(artifact_filepath)) elif isinstance(codebase, entities.Item): artifact_filepath = codebase.download(save_locally=True, local_path=os.path.join(download_path, codebase.name), to_items_folder=False) if not os.path.isfile(artifact_filepath): raise PlatformException(error='404', message='error downloading codebase. see above for more information') miscellaneous.Zipping.unzip_directory(zip_filename=artifact_filepath, to_directory=local_path) os.remove(artifact_filepath) logger.info('Source code was unpacked to: {}'.format(artifact_filepath)) elif isinstance(codebase, entities.GitCodebase): if codebase.is_git_repo(local_path) or \ codebase.is_git_repo(os.path.join(local_path, codebase.git_repo_name)): artifact_filepath = self.pull_git(codebase=codebase, local_path=local_path) else: # Clone the repo if not exist artifact_filepath = self.clone_git(codebase=codebase, local_path=local_path) else: raise ValueError('Not implemented: "_unpack_single" for codebase type: {!r}'.format(codebase.type)) return artifact_filepath
[docs] def clone_git(self, codebase, local_path): """ :param codebase: :param local_path: """ if not isinstance(codebase, entities.GitCodebase): raise RuntimeError('only support Git Codebase') response = self.git_utils.git_clone(path=os.path.join(local_path, codebase.git_repo_name), git_url=codebase.git_url, tag=codebase.git_tag) if response: logger.info('Source code was cloned from {}(Git) to: {}'.format(codebase.git_url, local_path)) else: raise RuntimeError('Failed cloning. See above for full log. codebase: {}'.format(codebase)) return os.path.join(local_path, codebase.git_repo_name)
[docs] def pull_git(self, codebase, local_path): """ :param codebase: :param local_path: """ pull_cmd = 'git pull' if not codebase.is_git_repo(local_path): local_path = os.path.join(local_path, codebase.git_repo_name) response = self.git_utils.git_command(path=local_path, cmd=pull_cmd) if response: logger.info('pull successful {}(Git) to: {}'.format(codebase.git_url, os.path.dirname(local_path))) else: logger.critical("Could not pull") # we can test if this is not the same repo if needed... # FIXME need to change the order - checkout new branch and pull response_2 = self.git_utils.git_command(path=local_path, cmd='git checkout {}'.format(codebase.git_tag)) return local_path
[docs] def unpack(self, codebase: entities.Codebase = None, codebase_name=None, codebase_id=None, local_path=None, version=None): """ Unpack codebase locally. Download source code and unzip :param codebase: `dl.Codebase` object :param codebase_name: search by name :param codebase_id: search by id :param local_path: local path to save codebase :param version: codebase version to unpack. default - latest :return: String (dirpath) """ # get the codebase / multiple codebase if codebase is None: codebase = self.get(codebase_name=codebase_name, codebase_id=codebase_id, version=version) elif codebase_name is not None or codebase_id is not None: logger.warning("Using given codebase. Does not preforming search with name {!r} / id {!r}". format(codebase_name, codebase_id)) download_path = local_path if isinstance(codebase, entities.PagedEntities): for page in codebase: for item in page: local_path = os.path.join(download_path, 'v.' + item.name.split('.')[0]) self._unpack_single(codebase=item, download_path=download_path, local_path=local_path) return os.path.dirname(local_path) elif isinstance(codebase, list): for item in codebase: local_path = os.path.join(download_path, 'v.' + item.item.name.split('.')[0]) self._unpack_single(codebase=item, download_path=download_path, local_path=local_path) return os.path.dirname(local_path) elif isinstance(codebase, (entities.Codebase, entities.Item)): artifact_filepath = self._unpack_single(codebase=codebase, download_path=download_path, local_path=local_path) if isinstance(codebase, (entities.ItemCodebase, entities.Item)): dir_path = os.path.dirname(artifact_filepath) # use the directory of the artifact else: dir_path = artifact_filepath logger.info('Source code was unpacked to: {}'.format(dir_path)) else: raise PlatformException( error='404', message='Codebase was not found! name:{name}, id:{id}'.format(name=codebase_name, id=codebase_id)) return dir_path