"""
Integrations Repository
"""
import logging
from .. import entities, exceptions, miscellaneous, _api_reference
from ..services.api_client import ApiClient
logger = logging.getLogger(name='dtlpy')
[docs]class Integrations:
"""
Integrations Repository
The Integrations class allows you to manage data integrations from your external storage (e.g., S3, GCS, Azure)
into your Dataloop's Dataset storage, as well as sync data in your Dataloop's Datasets with data in your external
storage.
For more information on Organization Storage Integration see the `Dataloop documentation <https://dataloop.ai/docs/organization-integrations>`_ and `developers' docs <https://developers.dataloop.ai/tutorials/data_management/>`_.
"""
def __init__(self, client_api: ApiClient, org: entities.Organization = None,
project: entities.Project = None):
self._client_api = client_api
self._org = org
self._project = project
@property
def project(self) -> entities.Project:
return self._project
@project.setter
def project(self, project: entities.Project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Project entity')
self._project = project
@property
def org(self) -> entities.Organization:
if self._org is None:
if self.project is not None:
self._org = entities.Organization.from_json(_json=self.project.org, client_api=self._client_api)
return self._org
@org.setter
def org(self, org: entities.Organization):
if not isinstance(org, entities.Organization):
raise ValueError('Must input a valid Organization entity')
self._org = org
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations/{integrationId}', method='delete')
def delete(self,
integrations_id: str,
sure: bool = False,
really: bool = False) -> bool:
"""
Delete integrations from the organization.
**Prerequisites**: You must be an organization *owner* to delete an integration.
:param str integrations_id: integrations id
:param bool sure: Are you sure you want to delete?
:param bool really: Really really sure?
:return: success
:rtype: bool
**Example**:
.. code-block:: python
project.integrations.delete(integrations_id='integrations_id', sure=True, really=True)
"""
if sure and really:
if self.project is None and self.org is None:
raise exceptions.PlatformException(
error='400',
message='Must provide an identifier in inputs')
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations/{}'.format(organization_id, integrations_id)
success, response = self._client_api.gen_request(req_type='delete',
path=url_path)
if not success:
raise exceptions.PlatformException(response)
else:
return True
else:
raise exceptions.PlatformException(
error='403',
message='Cant delete integrations from SDK. Please login to platform to delete')
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations', method='post')
def create(self,
integrations_type: entities.IntegrationType,
name: str,
options: dict):
"""
Create an integration between an external storage and the organization.
**Examples for options include**:
s3 - {key: "", secret: ""};
gcs - {key: "", secret: "", content: ""};
azureblob - {key: "", secret: "", clientId: "", tenantId: ""};
key_value - {key: "", value: ""}
aws-sts - {key: "", secret: "", roleArns: ""}
aws-cross - {}
gcp-cross - {}
gcp-workload-identity-federation - {"secret": "", "content": "{}", "clientId": ""}
**Prerequisites**: You must be an *owner* in the organization.
:param IntegrationType integrations_type: integrations type dl.IntegrationType
:param str name: integrations name
:param dict options: dict of storage secrets
:return: success
:rtype: bool
**Example**:
.. code-block:: python
project.integrations.create(integrations_type=dl.IntegrationType.S3,
name='S3ntegration',
options={key: "Access key ID", secret: "Secret access key"})
"""
if self.project is None and self.org is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations'.format(organization_id)
payload = {"type": integrations_type.value if isinstance(integrations_type, entities.IntegrationType) else integrations_type, 'name': name, 'options': options}
success, response = self._client_api.gen_request(req_type='post',
path=url_path,
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
else:
integration = entities.Integration.from_json(_json=response.json(), client_api=self._client_api)
if integration.metadata and isinstance(integration.metadata, list) and len(integration.metadata) > 0:
for metadata in integration.metadata:
if metadata['name'] == 'status':
integration_status = metadata['value']
logger.info('Integration status: {}'.format(integration_status))
return integration
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations', method='patch')
def update(self,
new_name: str = None,
integrations_id: str = None,
integration: entities.Integration = None,
new_options: dict = None):
"""
Update the integration's name.
**Prerequisites**: You must be an *owner* in the organization.
:param str new_name: new name
:param str integrations_id: integrations id
:param Integration integration: integration object
:param dict new_options: new value
:return: Integration object
:rtype: dtlpy.entities.integration.Integration
**Examples for options include**:
s3 - {key: "", secret: ""};
gcs - {key: "", secret: "", content: ""};
azureblob - {key: "", secret: "", clientId: "", tenantId: ""};
key_value - {key: "", value: ""}
aws-sts - {key: "", secret: "", roleArns: ""}
aws-cross - {roleArn: ""}
gcp-cross - {"email: "", "resourceName": ""}
**Example**:
.. code-block:: python
project.integrations.update(integrations_id='integrations_id', new_options={roleArn: ""})
"""
if self.project is None and self.org is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if integrations_id is None and integration is None:
raise exceptions.PlatformException(
error='400',
message='Must have an integrations_id or integration')
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations/'.format(organization_id)
payload = dict(integrationId=integrations_id if integrations_id is not None else integration.id)
if new_name is not None:
payload['name'] = new_name
if new_options is not None:
if integration is None:
integration = self.get(integrations_id=integrations_id)
payload['credentials'] = dict(options=new_options, type=integration.type)
success, response = self._client_api.gen_request(req_type='patch',
path=url_path,
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Integration.from_json(_json=response.json(), client_api=self._client_api)
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations/{integrationId}', method='get')
def get(self, integrations_id: str):
"""
Get organization integrations. Use this method to access your integration and be able to use it in your code.
**Prerequisites**: You must be an *owner* in the organization.
:param str integrations_id: integrations id
:return: Integration object
:rtype: dtlpy.entities.integration.Integration
**Example**:
.. code-block:: python
project.integrations.get(integrations_id='integrations_id')
"""
if self.project is None and self.org is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations/{}'.format(organization_id, integrations_id)
success, response = self._client_api.gen_request(req_type='get',
path=url_path)
if not success:
raise exceptions.PlatformException(response)
return entities.Integration.from_json(_json=response.json(), client_api=self._client_api)
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations', method='get')
def list(self, only_available=False):
"""
List all the organization's integrations with external storage.
**Prerequisites**: You must be an *owner* in the organization.
:param bool only_available: if True list only the available integrations.
:return: groups list
:rtype: list
**Example**:
.. code-block:: python
project.integrations.list(only_available=True)
"""
if self.project is None and self.org is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
if only_available:
url_path = '/orgs/{}/availableIntegrations'.format(organization_id)
else:
url_path = '/orgs/{}/integrations'.format(organization_id)
success, response = self._client_api.gen_request(req_type='get',
path=url_path)
if not success:
raise exceptions.PlatformException(response)
available_integrations = miscellaneous.List(response.json())
return available_integrations