import logging
import re
from .. import entities, miscellaneous, exceptions, _api_reference
from ..services.api_client import ApiClient
logger = logging.getLogger(name="dtlpy")
[docs]class Drivers:
"""
Drivers Repository
The Drivers class allows users to manage drivers that are used to connect with external storage.
Read more about external storage in our `documentation <https://dataloop.ai/docs/overview-1>`_ and `developers' docs <https://developers.dataloop.ai/tutorials/data_management/>`_.
"""
def __init__(self, client_api: ApiClient, project: entities.Project = None):
"""
Initialize the Drivers repository.
:param ApiClient client_api: The API client instance
:param Project project: Optional project entity. If not provided, will try to get from checkout
"""
self._client_api = client_api
self._project = project
############
# entities #
############
@property
def project(self) -> entities.Project:
if self._project is None:
# try get checkout
project = self._client_api.state_io.get("project")
if project is not None:
self._project = entities.Project.from_json(_json=project, client_api=self._client_api)
if self._project is None:
raise exceptions.PlatformException(
error="2001",
message="Cannot perform action WITHOUT Project entity in Drivers repository."
" Please checkout or set a project",
)
assert isinstance(self._project, entities.Project)
return self._project
@project.setter
def project(self, project: entities.Project):
"""
Set the project for this Drivers repository.
:param Project project: The project entity to set
:raises ValueError: If the provided project is not a valid Project entity
"""
if not isinstance(project, entities.Project):
raise ValueError("Must input a valid Project entity")
self._project = project
###########
# methods #
###########
def __get_by_id(self, driver_id) -> entities.Driver:
"""
Get a driver by its ID from the platform.
:param str driver_id: The driver ID
:return: Driver object
:rtype: dtlpy.entities.driver.Driver
:raises PlatformException: If the driver is not found or there's an API error
"""
success, response = self._client_api.gen_request(req_type="get", path="/drivers/{}".format(driver_id))
if success:
_json = response.json()
driver = self._getDriverClass(_json).from_json(client_api=self._client_api, _json=_json)
else:
raise exceptions.PlatformException(response)
return driver
[docs] @_api_reference.add(path="/drivers", method="get")
def list(self) -> miscellaneous.List[entities.Driver]:
"""
Get the project's drivers list.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:return: List of Drivers objects
:rtype: list
**Example**:
.. code-block:: python
project.drivers.list()
"""
success, response = self._client_api.gen_request(
req_type="get", path="/drivers?projectId={}".format(self.project.id)
)
if not success:
raise exceptions.PlatformException(response)
drivers = miscellaneous.List(
[
self._getDriverClass(_json).from_json(_json=_json, client_api=self._client_api)
for _json in response.json()
]
)
return drivers
def _getDriverClass(self, _json):
"""
Determine the appropriate driver class based on the driver type in the JSON response.
:param dict _json: The JSON response containing driver information
:return: The appropriate driver class (S3Driver, GcsDriver, AzureBlobDriver, or Driver)
:rtype: type
"""
driver_type = _json.get("type", None)
if driver_type == entities.ExternalStorage.S3:
driver_class = entities.S3Driver
elif driver_type == entities.ExternalStorage.GCS:
driver_class = entities.GcsDriver
elif driver_type in [entities.ExternalStorage.AZUREBLOB, entities.ExternalStorage.AZURE_DATALAKE_GEN2]:
driver_class = entities.AzureBlobDriver
else:
driver_class = entities.Driver
return driver_class
[docs] @_api_reference.add(path="/drivers/{id}", method="get")
def get(self, driver_name: str = None, driver_id: str = None) -> entities.Driver:
"""
Get a Driver object to use in your code.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
You must provide at least ONE of the following params: driver_name, driver_id.
:param str driver_name: optional - search by name
:param str driver_id: optional - search by id
:return: Driver object
:rtype: dtlpy.entities.driver.Driver
**Example**:
.. code-block:: python
project.drivers.get(driver_id='driver_id')
"""
if driver_id is not None:
driver = self.__get_by_id(driver_id)
elif driver_name is not None:
drivers = self.list()
driver = [driver for driver in drivers if driver.name == driver_name]
if not driver:
# list is empty
raise exceptions.PlatformException(
error="404", message="Driver not found. Name: {}".format(driver_name)
)
# driver = None
elif len(driver) > 1:
# more than one matching driver
raise exceptions.PlatformException(
error="404", message='More than one driver with same name. Please "get" by id'
)
else:
driver = driver[0]
else:
raise exceptions.PlatformException(error="400", message="Must provide an identifier (name or id) in inputs")
return driver
@_api_reference.add(path="/drivers", method="post")
def _create_driver(self, payload: dict):
"""
Create a driver on the platform using the provided payload.
:param dict payload: The driver creation payload
:return: The created driver object
:rtype: dtlpy.entities.driver.Driver
:raises PlatformException: If the driver creation fails
"""
success, response = self._client_api.gen_request(req_type="post", path="/drivers", json_req=payload)
if not success:
raise exceptions.PlatformException(response)
else:
_json = response.json()
return self._getDriverClass(_json).from_json(_json=_json, client_api=self._client_api)
[docs] def create_powerscale_s3(
self,
name: str,
integration: entities.Integration,
elastic_index: str,
endpoint: str,
bucket_name: str,
region: str,
elastic_index_path: str,
path: str = None,
project_id: str = None,
allow_external_delete: bool = False
):
"""
Create a PowerScale S3 driver.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str name: The driver name
:param Integration integration: The S3 integration to use
:param str elastic_index: The elastic index for PowerScale S3 driver
:param str project_id: Optional project ID. If not provided, uses the current project
:param bool allow_external_delete: True to allow deleting files from external storage when files are deleted in your Dataloop storage
:param str endpoint: The endpoint URL. Must be in the format 'http://<hostname>:<port>' or 'https://<hostname>:<port>'
:param str bucket_name: The external bucket name
:param str region: The bucket region (relevant for S3)
:param str elastic_index_path: The elastic index path for PowerScale S3 driver
:param str path: The path to the NFS driver
:return: The created PowerScale S3 driver object
:rtype: dtlpy.entities.driver.Driver
:raises ValueError: If the integration type is not S3
:raises PlatformException: If the driver creation fails
**Example**:
.. code-block:: python
project.drivers.create_powerscale_s3(
name='powerscale_driver',
integration=integration,
elastic_index='my_index',
elastic_index_path='my_index_path',
endpoint='http://location:8765',
bucket_name='my_bucket',
region='us-west-1',
path='my_path'
)
"""
if not integration.type == entities.IntegrationType.S3:
raise ValueError("Integration type must be S3 for PowerScale S3 driver")
payload = {
"integrationId": integration.id,
"integrationType": integration.type,
"name": name,
"metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}},
"type": entities.ExternalStorage.POWERSCALE_S3,
"payload": {
"elasticIndex": elastic_index,
"endpoint": endpoint,
"path": path,
"elasticIndexPath": elastic_index_path,
"bucketName": bucket_name,
"region": region
},
"allowExternalDelete": allow_external_delete,
"creator": self._client_api.info().get("user_email"),
}
return self._create_driver(payload)
[docs] def create_powerscale_nfs(
self,
name: str,
elastic_index: str,
elastic_index_path: str,
integration: entities.Integration = None,
project_id: str = None,
allow_external_delete: bool = False
):
"""
Create a PowerScale NFS driver.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str name: The driver name
:param str elastic_index: The elastic index for PowerScale NFS driver
:param Integration integration (optional): The S3 integration to use. If not provided, integration id will be 'system'
:param str project_id: Optional project ID. If not provided, uses the current project
:param bool allow_external_delete: True to allow deleting files from external storage when files are deleted in your Dataloop storage
:param str elastic_index_path: The elastic index path for PowerScale NFS driver
:return: The created PowerScale NFS driver object
:rtype: dtlpy.entities.driver.Driver
:raises PlatformException: If the driver creation fails
**Example**:
.. code-block:: python
project.drivers.create_powerscale_nfs(
name='powerscale_driver',
elastic_index='my_index',
elastic_index_path='my_index_path',
)
"""
integration_id = 'system'
integration_type = None
if integration is not None:
if not integration.type == entities.IntegrationType.S3:
raise ValueError("Integration type must be S3 for PowerScale NFS driver")
integration_id = integration.id
integration_type = integration.type
payload = {
"integrationId": integration_id,
"integrationType": integration_type,
"name": name,
"metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}},
"type": entities.ExternalStorage.POWERSCALE_NFS,
"payload": {"elasticIndex": elastic_index, "elasticIndexPath": elastic_index_path},
"allowExternalDelete": allow_external_delete,
"creator": self._client_api.info().get("user_email"),
}
return self._create_driver(payload)
[docs] def create_min_io(
self,
name: str,
integration: entities.Integration,
bucket_name: str,
endpoint: str,
project_id: str = None,
allow_external_delete: bool = True,
region: str = None,
storage_class: str = "",
path: str = "",
):
"""
Create a MinIO driver.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str name: The driver name
:param Integration integration: The S3 integration to use
:param str bucket_name: The external bucket name
:param str endpoint: The MinIO endpoint URL. Must be in the format 'http://<hostname>:<port>' or 'https://<hostname>:<port>'
:param str project_id: Optional project ID. If not provided, uses the current project
:param bool allow_external_delete: True to allow deleting files from external storage when files are deleted in your Dataloop storage
:param str region: The bucket region (relevant for S3)
:param str storage_class: The storage class (relevant for S3)
:param str path: Optional path. By default path is the root folder. Path is case sensitive
:return: The created MinIO driver object
:rtype: dtlpy.entities.driver.Driver
:raises ValueError: If the integration type is not S3 or if the endpoint URL format is invalid
:raises PlatformException: If the driver creation fails
**Example**:
.. code-block:: python
project.drivers.create_min_io(
name='minio_driver',
integration=integration,
bucket_name='my_bucket',
endpoint='http://localhost:9000'
)
"""
if not integration.type == entities.IntegrationType.S3:
raise ValueError("Integration type must be S3 for Minio driver")
if not re.match(r"^https?://[A-Za-z0-9.-]+:\d+$", endpoint):
raise ValueError(
f"Invalid endpoint URL '{endpoint}'. Must be 'http://<hostname>:<port>' or 'https://<hostname>:<port>'."
)
payload = {
"integrationId": integration.id,
"integrationType": integration.type,
"name": name,
"metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}},
"type": entities.ExternalStorage.MIN_IO,
"payload": {"bucketName": bucket_name, "storageClass": storage_class, "region": region, "path": path, "endpoint": endpoint},
"allowExternalDelete": allow_external_delete,
"creator": self._client_api.info().get("user_email"),
}
return self._create_driver(payload)
[docs] def create(
self,
name: str,
driver_type: entities.ExternalStorage,
integration_id: str,
bucket_name: str,
integration_type: entities.IntegrationType,
project_id: str = None,
allow_external_delete: bool = True,
region: str = None,
storage_class: str = "",
path: str = "",
endpoint: str = None
):
"""
Create a storage driver.
**Prerequisites**: You must be in the role of an *owner* or *developer*.
:param str name: the driver name
:param ExternalStorage driver_type: dl.ExternalStorage (Enum). For all options run: list(dl.ExsternalStorage)
:param str integration_id: the integration id
:param str bucket_name: the external bucket name
:param IntegrationType integration_type: dl.IntegrationType (Enum). For all options run: list(dl.IntegrationType)
:param str project_id: project id
:param bool allow_external_delete: true to allow deleting files from external storage when files are deleted in your Dataloop storage
:param str region: relevant only for s3 - the bucket region
:param str storage_class: relevant only for s3
:param str path: Optional. By default path is the root folder. Path is case sensitive integration
:param str endpoint: Optional. Custom endpoint for minio storage. Must be in the format 'http://<hostname>:<port>' or 'https://<hostname>:<port>'.
:return: driver object
:rtype: dtlpy.entities.driver.Driver
**Example**:
.. code-block:: python
project.drivers.create(name='driver_name',
driver_type=dl.ExternalStorage.S3,
integration_id='integration_id',
bucket_name='bucket_name',
project_id='project_id',
region='ey-west-1')
"""
if integration_type is None:
integration_type = driver_type
if driver_type == entities.ExternalStorage.S3:
bucket_payload = "bucketName"
if endpoint:
if not re.match(r"^https?://[A-Za-z0-9.-]+:\d+$", endpoint):
raise ValueError(
f"Invalid endpoint URL '{endpoint}'. Must be 'http://<hostname>:<port>' or 'https://<hostname>:<port>'."
)
elif driver_type == entities.ExternalStorage.GCS:
bucket_payload = "bucket"
else:
bucket_payload = "containerName"
payload = {
"integrationId": integration_id,
"integrationType": integration_type,
"name": name,
"metadata": {"system": {"projectId": self.project.id if project_id is None else project_id}},
"type": driver_type,
"payload": {bucket_payload: bucket_name, "storageClass": storage_class, "region": region, "path": path},
"allowExternalDelete": allow_external_delete,
"creator": self._client_api.info().get("user_email"),
}
if endpoint and driver_type == entities.ExternalStorage.S3:
payload["payload"]["endpoint"] = endpoint
return self._create_driver(payload)
[docs] @_api_reference.add(path="/drivers/{id}", method="delete")
def delete(self, driver_name: str = None, driver_id: str = None, sure: bool = False, really: bool = False):
"""
Delete a driver forever!
**Prerequisites**: You must be an *owner* or *developer* to use this method.
**Example**:
.. code-block:: python
project.drivers.delete(dataset_id='dataset_id', sure=True, really=True)
:param str driver_name: optional - search by name
:param str driver_id: optional - search by id
:param bool sure: Are you sure you want to delete?
:param bool really: Really really sure?
:return: True if success
:rtype: bool
"""
if sure and really:
driver = self.get(driver_name=driver_name, driver_id=driver_id)
success, response = self._client_api.gen_request(req_type="delete", path="/drivers/{}".format(driver.id))
if not success:
raise exceptions.PlatformException(response)
logger.info("Driver {!r} was deleted successfully".format(driver.name))
return True
else:
raise exceptions.PlatformException(
error="403", message="Cant delete driver from SDK. Please login to platform to delete"
)