"""
Integrations Repository
"""
import base64
import json
import logging
from .. import entities, exceptions, miscellaneous, _api_reference
from ..services.api_client import ApiClient
logger = logging.getLogger(name='dtlpy')
[docs]class Integrations:
"""
Integrations Repository
The Integrations class allows you to manage data integrations from your external storage (e.g., S3, GCS, Azure)
into your Dataloop's Dataset storage, as well as sync data in your Dataloop's Datasets with data in your external
storage.
For more information on Organization Storage Integration see the `Dataloop documentation <https://dataloop.ai/docs/organization-integrations>`_ and `developers' docs <https://developers.dataloop.ai/tutorials/data_management/>`_.
"""
def __init__(self, client_api: ApiClient, org: entities.Organization = None,
project: entities.Project = None):
self._client_api = client_api
self._org = org
self._project = project
@property
def project(self) -> entities.Project:
return self._project
@project.setter
def project(self, project: entities.Project):
if not isinstance(project, entities.Project):
raise ValueError('Must input a valid Project entity')
self._project = project
@property
def org(self) -> entities.Organization:
if self._org is None:
if self.project is not None:
self._org = entities.Organization.from_json(_json=self.project.org, client_api=self._client_api)
return self._org
@org.setter
def org(self, org: entities.Organization):
if not isinstance(org, entities.Organization):
raise ValueError('Must input a valid Organization entity')
self._org = org
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations/{integrationId}', method='delete')
def delete(self,
integrations_id: str,
sure: bool = False,
really: bool = False,
organization_id: str = None
) -> bool:
"""
Delete integrations from the organization.
**Prerequisites**: You must be an organization *owner* to delete an integration.
:param organization_id: organization id
:param str integrations_id: integrations id
:param bool sure: Are you sure you want to delete?
:param bool really: Really really sure?
:return: success
:rtype: bool
**Example**:
.. code-block:: python
project.integrations.delete(integrations_id='integrations_id', sure=True, really=True)
"""
if sure and really:
if self.project is None and self.org is None and organization_id is None:
raise exceptions.PlatformException(
error='400',
message='Must provide an identifier in inputs')
if organization_id is None:
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations/{}'.format(organization_id, integrations_id)
success, response = self._client_api.gen_request(req_type='delete',
path=url_path)
if not success:
raise exceptions.PlatformException(response)
else:
return True
else:
raise exceptions.PlatformException(
error='403',
message='Cant delete integrations from SDK. Please login to platform to delete')
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations', method='post')
def create(self,
integrations_type: entities.IntegrationType,
name: str,
options: dict,
metadata: dict = None,
organization_id: str = None,
):
"""
Create an integration between an external storage and the organization.
**Examples for options include**:
s3 - {key: "", secret: ""};
gcs - {key: "", secret: "", content: ""};
azureblob - {key: "", secret: "", clientId: "", tenantId: ""};
key_value - {key: "", value: ""}
aws-sts - {key: "", secret: "", roleArns: ""}
aws-cross - {}
gcp-cross - {}
gcp-workload-identity-federation - {"secret": "", "content": "{}", "clientId": ""}
private-registry (ECR) - {"name": "", "spec": {"accessKeyId": "", "secretAccessKey": "", "account": "", "region": ""}}
private-registry (GAR) - {"name": "", "spec": {"password": ""}} (can use generate_gar_options to generate the options)
**Prerequisites**: You must be an *owner* in the organization.
:param IntegrationType integrations_type: integrations type dl.IntegrationType
:param str name: integrations name
:param dict options: dict of storage secrets
:param dict metadata: metadata
:param str organization_id: organization id
:return: success
:rtype: bool
**Example**:
.. code-block:: python
project.integrations.create(integrations_type=dl.IntegrationType.S3,
name='S3Integration',
options={key: "Access key ID", secret: "Secret access key"})
"""
if self.project is None and self.org is None and organization_id is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if organization_id is None:
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations'.format(organization_id)
payload = {"type": integrations_type.value if isinstance(integrations_type,
entities.IntegrationType) else integrations_type,
'name': name, 'options': options}
if metadata is not None:
payload['metadata'] = metadata
success, response = self._client_api.gen_request(req_type='post',
path=url_path,
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
else:
integration = entities.Integration.from_json(_json=response.json(), client_api=self._client_api)
if integration.metadata and isinstance(integration.metadata, list) and len(integration.metadata) > 0:
for m in integration.metadata:
if m['name'] == 'status':
integration_status = m['value']
logger.info('Integration status: {}'.format(integration_status))
return integration
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations', method='patch')
def update(self,
new_name: str = None,
integrations_id: str = None,
integration: entities.Integration = None,
new_options: dict = None,
organization_id: str = None,
):
"""
Update the integration's name.
**Prerequisites**: You must be an *owner* in the organization.
:param str new_name: new name
:param str integrations_id: integrations id
:param Integration integration: integration object
:param dict new_options: new value
:param str organization_id: organization id
:return: Integration object
:rtype: dtlpy.entities.integration.Integration
**Examples for options include**:
s3 - {key: "", secret: ""};
gcs - {key: "", secret: "", content: ""};
azureblob - {key: "", secret: "", clientId: "", tenantId: ""};
key_value - {key: "", value: ""}
aws-sts - {key: "", secret: "", roleArns: ""}
aws-cross - {roleArn: ""}
gcp-cross - {"email: "", "resourceName": ""}
**Example**:
.. code-block:: python
project.integrations.update(integrations_id='integrations_id', new_options={roleArn: ""})
"""
if self.project is None and self.org is None and organization_id is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if integrations_id is None and integration is None:
raise exceptions.PlatformException(
error='400',
message='Must have an integrations_id or integration')
if organization_id is None:
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations/'.format(organization_id)
payload = dict(integrationId=integrations_id if integrations_id is not None else integration.id)
if new_name is not None:
payload['name'] = new_name
if new_options is not None:
if integration is None:
integration = self.get(integrations_id=integrations_id)
payload['credentials'] = dict(options=new_options, type=integration.type)
success, response = self._client_api.gen_request(req_type='patch',
path=url_path,
json_req=payload)
if not success:
raise exceptions.PlatformException(response)
return entities.Integration.from_json(_json=response.json(), client_api=self._client_api)
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations/{integrationId}', method='get')
def get(self, integrations_id: str, organization_id: str = None):
"""
Get organization integrations. Use this method to access your integration and be able to use it in your code.
**Prerequisites**: You must be an *owner* in the organization.
:param str integrations_id: integrations id
:param str organization_id: organization id
:return: Integration object
:rtype: dtlpy.entities.integration.Integration
**Example**:
.. code-block:: python
project.integrations.get(integrations_id='integrations_id')
"""
if self.project is None and self.org is None and organization_id is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if organization_id is None:
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
url_path = '/orgs/{}/integrations/{}'.format(organization_id, integrations_id)
success, response = self._client_api.gen_request(req_type='get',
path=url_path)
if not success:
raise exceptions.PlatformException(response)
return entities.Integration.from_json(_json=response.json(), client_api=self._client_api)
[docs] @_api_reference.add(path='/orgs/{orgId}/integrations', method='get')
def list(self, only_available=False, organization_id: str = None):
"""
List all the organization's integrations with external storage.
**Prerequisites**: You must be an *owner* in the organization.
:param bool only_available: if True list only the available integrations.
:param str organization_id: organization id
:return: groups list
:rtype: list
**Example**:
.. code-block:: python
project.integrations.list(only_available=True)
"""
if self.project is None and self.org is None and organization_id is None:
raise exceptions.PlatformException(
error='400',
message='Must have an organization or project')
if organization_id is None:
if self.project is not None:
organization_id = self.project.org.get('id')
else:
organization_id = self.org.id
if only_available:
url_path = '/orgs/{}/availableIntegrations'.format(organization_id)
else:
url_path = '/orgs/{}/integrations'.format(organization_id)
success, response = self._client_api.gen_request(req_type='get',
path=url_path)
if not success:
raise exceptions.PlatformException(response)
available_integrations = miscellaneous.List(response.json())
return available_integrations
[docs] @staticmethod
def generate_gar_options(service_account: str, location: str, email: str = None) -> dict:
"""
Generates a Google Artifact Registry JSON configuration and returns it as a base64-encoded string.
Parameters:
location (str): The region where the repository will be created (e.g., 'us-central1').
service_account (str): The service_account parameter represents the Google Cloud service account credentials
in the form of a JSON key file. This JSON contains the private key and other metadata
required for authenticating with Google Artifact Registry. It is used to generate a Kubernetes secret
that stores the credentials for pulling container images from the registry.
The JSON key must include fields such as client_email, private_key, and project_id,
and it is typically downloaded from the Google Cloud Console when creating the service account
Returns:
str: A base64-encoded string representation of the repository JSON configuration.
"""
return IntegrationUtils.generate_gar_options(service_account=service_account, location=location, email=email)
[docs] @staticmethod
def generate_docker_hub_options(username: str, password: str, email: str = None) -> dict:
"""
Generates a Docker Hub JSON configuration and returns it as a base64-encoded string.
Parameters:
username (str): The Docker Hub username.
password (str): The Docker Hub password.
email (str): Optional - Docker Hub email.
Returns:
str: A base64-encoded string representation of the repository JSON configuration.
"""
return IntegrationUtils.generate_docker_hub_options(username=username, password=password, email=email)
[docs] @staticmethod
def generate_ecr_options(access_key_id: str, secret_access_key: str, account: str, region: str) -> dict:
"""
Generates an Amazon Elastic Container Registry (ECR) JSON configuration and returns it as a base64-encoded string.
Parameters:
access_key_id (str): The AWS access key ID.
secret_access_key (str): The AWS secret access key.
account (str): The AWS account ID.
region (str): The AWS region.
Returns:
str: A base64-encoded string representation of the repository JSON configuration.
"""
return IntegrationUtils.generate_ecr_options(
access_key_id=access_key_id,
secret_access_key=secret_access_key,
account=account,
region=region
)
class IntegrationUtils:
@staticmethod
def encode(st: str):
return str(base64.b64encode(bytes(st, 'utf-8')))[2:-1]
@staticmethod
def generate_json_key_options(location: str, username: str, password: str, auth: str, email: str = None):
encoded_pass = {
"auths": {
f"{location}": {
"username": username,
"password": password,
"auth": auth
}
}
}
if email:
encoded_pass['auths'][f'{location}']['email'] = email
return {
"name": "_json_key",
"spec": {
"password": IntegrationUtils.encode(json.dumps(encoded_pass))
}
}
@staticmethod
def generate_gar_options(service_account: str, location: str, email: str = None) -> dict:
if not service_account:
raise ValueError('Missing Service Account')
if not location:
raise ValueError('Missing Location')
username = "_json_key"
cred = f"{username}:{service_account}"
auth = IntegrationUtils.encode(cred)
return IntegrationUtils.generate_json_key_options(
location=location,
username=username,
password=service_account,
auth=auth,
email=email
)
@staticmethod
def generate_docker_hub_options(username: str, password: str, email: str = None) -> dict:
if not username:
raise ValueError('Missing Username')
if not password:
raise ValueError('Missing Password')
auth = IntegrationUtils.encode('{}:{}'.format(username, password))
return IntegrationUtils.generate_json_key_options(
location='docker.io',
username=username,
password=password,
auth=auth,
email=email
)
@staticmethod
def generate_ecr_options(access_key_id: str, secret_access_key: str, account: str, region: str) -> dict:
return {
"name": "AWS",
"spec": {
"accessKeyId": access_key_id,
"secretAccessKey": secret_access_key,
"account": account,
"region": region,
}
}