Source code for dtlpy.repositories.drivers

import logging
import re

from .. import entities, miscellaneous, exceptions, _api_reference
from ..services.api_client import ApiClient

logger = logging.getLogger(name="dtlpy")


[docs]class Drivers: """ Drivers Repository The Drivers class allows users to manage drivers that are used to connect with external storage. Read more about external storage in our `documentation <https://dataloop.ai/docs/overview-1>`_ and `developers' docs <https://developers.dataloop.ai/tutorials/data_management/>`_. """ def __init__(self, client_api: ApiClient, project: entities.Project = None): """ Initialize the Drivers repository. :param ApiClient client_api: The API client instance :param Project project: Optional project entity. If not provided, will try to get from checkout """ self._client_api = client_api self._project = project ############ # entities # ############ @property def project(self) -> entities.Project: if self._project is None: # try get checkout project = self._client_api.state_io.get("project") if project is not None: self._project = entities.Project.from_json(_json=project, client_api=self._client_api) if self._project is None: raise exceptions.PlatformException( error="2001", message="Cannot perform action WITHOUT Project entity in Drivers repository." " Please checkout or set a project", ) assert isinstance(self._project, entities.Project) return self._project @project.setter def project(self, project: entities.Project): """ Set the project for this Drivers repository. :param Project project: The project entity to set :raises ValueError: If the provided project is not a valid Project entity """ if not isinstance(project, entities.Project): raise ValueError("Must input a valid Project entity") self._project = project ########### # methods # ########### def __get_by_id(self, driver_id) -> entities.Driver: """ Get a driver by its ID from the platform. :param str driver_id: The driver ID :return: Driver object :rtype: dtlpy.entities.driver.Driver :raises PlatformException: If the driver is not found or there's an API error """ success, response = self._client_api.gen_request(req_type="get", path="/drivers/{}".format(driver_id)) if success: _json = response.json() driver = self._getDriverClass(_json).from_json(client_api=self._client_api, _json=_json) else: raise exceptions.PlatformException(response) return driver
[docs] @_api_reference.add(path="/drivers", method="get") def list(self) -> miscellaneous.List[entities.Driver]: """ Get the project's drivers list. **Prerequisites**: You must be in the role of an *owner* or *developer*. :return: List of Drivers objects :rtype: list **Example**: .. code-block:: python project.drivers.list() """ success, response = self._client_api.gen_request( req_type="get", path="/drivers?projectId={}".format(self.project.id) ) if not success: raise exceptions.PlatformException(response) drivers = miscellaneous.List( [ self._getDriverClass(_json).from_json(_json=_json, client_api=self._client_api) for _json in response.json() ] ) return drivers
def _getDriverClass(self, _json): """ Determine the appropriate driver class based on the driver type in the JSON response. :param dict _json: The JSON response containing driver information :return: The appropriate driver class (S3Driver, GcsDriver, AzureBlobDriver, or Driver) :rtype: type """ driver_type = _json.get("type", None) if driver_type == entities.ExternalStorage.S3: driver_class = entities.S3Driver elif driver_type == entities.ExternalStorage.GCS: driver_class = entities.GcsDriver elif driver_type in [entities.ExternalStorage.AZUREBLOB, entities.ExternalStorage.AZURE_DATALAKE_GEN2]: driver_class = entities.AzureBlobDriver else: driver_class = entities.Driver return driver_class
[docs] @_api_reference.add(path="/drivers/{id}", method="get") def get(self, driver_name: str = None, driver_id: str = None) -> entities.Driver: """ Get a Driver object to use in your code. **Prerequisites**: You must be in the role of an *owner* or *developer*. You must provide at least ONE of the following params: driver_name, driver_id. :param str driver_name: optional - search by name :param str driver_id: optional - search by id :return: Driver object :rtype: dtlpy.entities.driver.Driver **Example**: .. code-block:: python project.drivers.get(driver_id='driver_id') """ if driver_id is not None: driver = self.__get_by_id(driver_id) elif driver_name is not None: drivers = self.list() driver = [driver for driver in drivers if driver.name == driver_name] if not driver: # list is empty raise exceptions.PlatformException( error="404", message="Driver not found. Name: {}".format(driver_name) ) # driver = None elif len(driver) > 1: # more than one matching driver raise exceptions.PlatformException( error="404", message='More than one driver with same name. Please "get" by id' ) else: driver = driver[0] else: raise exceptions.PlatformException(error="400", message="Must provide an identifier (name or id) in inputs") return driver
@_api_reference.add(path="/drivers", method="post") def _create_driver(self, payload: dict): """ Create a driver on the platform using the provided payload. :param dict payload: The driver creation payload :return: The created driver object :rtype: dtlpy.entities.driver.Driver :raises PlatformException: If the driver creation fails """ success, response = self._client_api.gen_request(req_type="post", path="/drivers", json_req=payload) if not success: raise exceptions.PlatformException(response) else: _json = response.json() return self._getDriverClass(_json).from_json(_json=_json, client_api=self._client_api)
[docs] def create_powerscale_s3( self, name: str, integration: entities.Integration, elastic_index: str, endpoint: str, bucket_name: str, region: str, elastic_index_path: str, path: str = None, project_id: str = None, allow_external_delete: bool = False ): """ Create a PowerScale S3 driver. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str name: The driver name :param Integration integration: The S3 integration to use :param str elastic_index: The elastic index for PowerScale S3 driver :param str project_id: Optional project ID. If not provided, uses the current project :param bool allow_external_delete: True to allow deleting files from external storage when files are deleted in your Dataloop storage :param str endpoint: The endpoint URL. Must be in the format 'http://<hostname>:<port>' or 'https://<hostname>:<port>' :param str bucket_name: The external bucket name :param str region: The bucket region (relevant for S3) :param str elastic_index_path: The elastic index path for PowerScale S3 driver :param str path: The path to the NFS driver :return: The created PowerScale S3 driver object :rtype: dtlpy.entities.driver.Driver :raises ValueError: If the integration type is not S3 :raises PlatformException: If the driver creation fails **Example**: .. code-block:: python project.drivers.create_powerscale_s3( name='powerscale_driver', integration=integration, elastic_index='my_index', elastic_index_path='my_index_path', endpoint='http://location:8765', bucket_name='my_bucket', region='us-west-1', path='my_path' ) """ if not integration.type == entities.IntegrationType.S3: raise ValueError("Integration type must be S3 for PowerScale S3 driver") payload = { "integrationId": integration.id, "integrationType": integration.type, "name": name, "metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}}, "type": entities.ExternalStorage.POWERSCALE_S3, "payload": { "elasticIndex": elastic_index, "endpoint": endpoint, "path": path, "elasticIndexPath": elastic_index_path, "bucketName": bucket_name, "region": region }, "allowExternalDelete": allow_external_delete, "creator": self._client_api.info().get("user_email"), } return self._create_driver(payload)
[docs] def create_powerscale_nfs( self, name: str, elastic_index: str, elastic_index_path: str, integration: entities.Integration = None, project_id: str = None, allow_external_delete: bool = False ): """ Create a PowerScale NFS driver. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str name: The driver name :param str elastic_index: The elastic index for PowerScale NFS driver :param Integration integration (optional): The S3 integration to use. If not provided, integration id will be 'system' :param str project_id: Optional project ID. If not provided, uses the current project :param bool allow_external_delete: True to allow deleting files from external storage when files are deleted in your Dataloop storage :param str elastic_index_path: The elastic index path for PowerScale NFS driver :return: The created PowerScale NFS driver object :rtype: dtlpy.entities.driver.Driver :raises PlatformException: If the driver creation fails **Example**: .. code-block:: python project.drivers.create_powerscale_nfs( name='powerscale_driver', elastic_index='my_index', elastic_index_path='my_index_path', ) """ integration_id = 'system' integration_type = None if integration is not None: if not integration.type == entities.IntegrationType.S3: raise ValueError("Integration type must be S3 for PowerScale NFS driver") integration_id = integration.id integration_type = integration.type payload = { "integrationId": integration_id, "integrationType": integration_type, "name": name, "metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}}, "type": entities.ExternalStorage.POWERSCALE_NFS, "payload": {"elasticIndex": elastic_index, "elasticIndexPath": elastic_index_path}, "allowExternalDelete": allow_external_delete, "creator": self._client_api.info().get("user_email"), } return self._create_driver(payload)
[docs] def create_min_io( self, name: str, integration: entities.Integration, bucket_name: str, endpoint: str, project_id: str = None, allow_external_delete: bool = True, region: str = None, storage_class: str = "", path: str = "", ): """ Create a MinIO driver. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str name: The driver name :param Integration integration: The S3 integration to use :param str bucket_name: The external bucket name :param str endpoint: The MinIO endpoint URL. Must be in the format 'http://<hostname>:<port>' or 'https://<hostname>:<port>' :param str project_id: Optional project ID. If not provided, uses the current project :param bool allow_external_delete: True to allow deleting files from external storage when files are deleted in your Dataloop storage :param str region: The bucket region (relevant for S3) :param str storage_class: The storage class (relevant for S3) :param str path: Optional path. By default path is the root folder. Path is case sensitive :return: The created MinIO driver object :rtype: dtlpy.entities.driver.Driver :raises ValueError: If the integration type is not S3 or if the endpoint URL format is invalid :raises PlatformException: If the driver creation fails **Example**: .. code-block:: python project.drivers.create_min_io( name='minio_driver', integration=integration, bucket_name='my_bucket', endpoint='http://localhost:9000' ) """ if not integration.type == entities.IntegrationType.S3: raise ValueError("Integration type must be S3 for Minio driver") if not re.match(r"^https?://[A-Za-z0-9.-]+:\d+$", endpoint): raise ValueError( f"Invalid endpoint URL '{endpoint}'. Must be 'http://<hostname>:<port>' or 'https://<hostname>:<port>'." ) payload = { "integrationId": integration.id, "integrationType": integration.type, "name": name, "metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}}, "type": entities.ExternalStorage.MIN_IO, "payload": {"bucketName": bucket_name, "storageClass": storage_class, "region": region, "path": path, "endpoint": endpoint}, "allowExternalDelete": allow_external_delete, "creator": self._client_api.info().get("user_email"), } return self._create_driver(payload)
[docs] def create( self, name: str, driver_type: entities.ExternalStorage, integration_id: str, bucket_name: str, integration_type: entities.IntegrationType, project_id: str = None, allow_external_delete: bool = True, region: str = None, storage_class: str = "", path: str = "", endpoint: str = None ): """ Create a storage driver. **Prerequisites**: You must be in the role of an *owner* or *developer*. :param str name: the driver name :param ExternalStorage driver_type: dl.ExternalStorage (Enum). For all options run: list(dl.ExsternalStorage) :param str integration_id: the integration id :param str bucket_name: the external bucket name :param IntegrationType integration_type: dl.IntegrationType (Enum). For all options run: list(dl.IntegrationType) :param str project_id: project id :param bool allow_external_delete: true to allow deleting files from external storage when files are deleted in your Dataloop storage :param str region: relevant only for s3 - the bucket region :param str storage_class: relevant only for s3 :param str path: Optional. By default path is the root folder. Path is case sensitive integration :param str endpoint: Optional. Custom endpoint for minio storage. Must be in the format 'http://<hostname>:<port>' or 'https://<hostname>:<port>'. :return: driver object :rtype: dtlpy.entities.driver.Driver **Example**: .. code-block:: python project.drivers.create(name='driver_name', driver_type=dl.ExternalStorage.S3, integration_id='integration_id', bucket_name='bucket_name', project_id='project_id', region='ey-west-1') """ if integration_type is None: integration_type = driver_type if driver_type == entities.ExternalStorage.S3: bucket_payload = "bucketName" if endpoint: if not re.match(r"^https?://[A-Za-z0-9.-]+:\d+$", endpoint): raise ValueError( f"Invalid endpoint URL '{endpoint}'. Must be 'http://<hostname>:<port>' or 'https://<hostname>:<port>'." ) elif driver_type == entities.ExternalStorage.GCS: bucket_payload = "bucket" else: bucket_payload = "containerName" payload = { "integrationId": integration_id, "integrationType": integration_type, "name": name, "metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}}, "type": driver_type, "payload": {bucket_payload: bucket_name, "storageClass": storage_class, "region": region, "path": path}, "allowExternalDelete": allow_external_delete, "creator": self._client_api.info().get("user_email"), } if endpoint and driver_type == entities.ExternalStorage.S3: payload["payload"]["endpoint"] = endpoint return self._create_driver(payload)
[docs] @_api_reference.add(path="/drivers/{id}", method="delete") def delete(self, driver_name: str = None, driver_id: str = None, sure: bool = False, really: bool = False): """ Delete a driver forever! **Prerequisites**: You must be an *owner* or *developer* to use this method. **Example**: .. code-block:: python project.drivers.delete(dataset_id='dataset_id', sure=True, really=True) :param str driver_name: optional - search by name :param str driver_id: optional - search by id :param bool sure: Are you sure you want to delete? :param bool really: Really really sure? :return: True if success :rtype: bool """ if sure and really: driver = self.get(driver_name=driver_name, driver_id=driver_id) success, response = self._client_api.gen_request(req_type="delete", path="/drivers/{}".format(driver.id)) if not success: raise exceptions.PlatformException(response) logger.info("Driver {!r} was deleted successfully".format(driver.name)) return True else: raise exceptions.PlatformException( error="403", message="Cant delete driver from SDK. Please login to platform to delete" )