Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add volume support #1056

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aws-lambda/src/databricks_cdk/resources/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
create_or_update_storage_credential,
delete_storage_credential,
)
from databricks_cdk.resources.unity_catalog.volumes import VolumeProperties, create_or_update_volume, delete_volume
from databricks_cdk.utils import CnfResponse

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -211,6 +212,8 @@ def create_or_update_resource(event: DatabricksEvent) -> CnfResponse:
RegisteredModelProperties(**event.ResourceProperties),
event.PhysicalResourceId,
)
elif action == "volume":
return create_or_update_volume(VolumeProperties(**event.ResourceProperties), event.PhysicalResourceId)
else:
raise RuntimeError(f"Unknown action: {action}")

Expand Down Expand Up @@ -308,6 +311,8 @@ def delete_resource(event: DatabricksEvent) -> CnfResponse:
RegisteredModelProperties(**event.ResourceProperties),
event.PhysicalResourceId,
)
elif action == "volume":
return delete_volume(VolumeProperties(**event.ResourceProperties), event.PhysicalResourceId)
else:
raise RuntimeError(f"Unknown action: {action}")

Expand Down
103 changes: 103 additions & 0 deletions aws-lambda/src/databricks_cdk/resources/unity_catalog/volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from typing import Optional

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import VolumeInfo, VolumeType
from pydantic import BaseModel

from databricks_cdk.utils import CnfResponse, get_workspace_client


class VolumeCreationError(Exception):
pass


class Volume(BaseModel):
name: str
catalog_name: str
schema_name: str
volume_type: VolumeType = VolumeType.MANAGED
comment: Optional[str] = None
storage_location: Optional[str] = None

@property
def full_name(self) -> str:
"""The three-level (fully qualified) name of the volume"""
return f"{self.catalog_name}.{self.schema_name}.{self.name}"


class VolumeProperties(BaseModel):
workspace_url: str
volume: Volume


class VolumeResponse(CnfResponse):
name: str


def create_or_update_volume(properties: VolumeProperties, physical_resource_id: Optional[str] = None) -> VolumeResponse:
"""
Create or update volume on databricks. If physical_resource_id is provided, it will update the existing volume
else it will create a new one.
"""
workspace_client = get_workspace_client(properties.workspace_url)

if physical_resource_id is None:
# volume doesn't exist yet so create new one
return create_volume(properties, workspace_client)

# update existing volume
existing_volume = [
v
for v in workspace_client.volumes.list(
catalog_name=properties.volume.catalog_name, schema_name=properties.volume.schema_name
)
if v.volume_id == physical_resource_id
]

if len(existing_volume) == 0:
raise VolumeCreationError(
f"Volume with id {physical_resource_id} not found but id is provided, make sure it's managed by CDK"
)

return update_volume(properties, workspace_client, existing_volume[0], physical_resource_id)


def create_volume(properties: VolumeProperties, workspace_client: WorkspaceClient) -> VolumeResponse:
"""Create volume on databricks"""

created_volume = workspace_client.volumes.create(
catalog_name=properties.volume.catalog_name,
schema_name=properties.volume.schema_name,
name=properties.volume.name,
volume_type=properties.volume.volume_type,
comment=properties.volume.comment,
storage_location=properties.volume.storage_location,
)

if created_volume.volume_id is None:
raise VolumeCreationError("Volume creation failed, there was no id found")

return VolumeResponse(name=properties.volume.name, physical_resource_id=created_volume.volume_id)


def update_volume(
properties: VolumeProperties,
workspace_client: WorkspaceClient,
existing_volume: VolumeInfo,
physical_resource_id: str,
) -> VolumeResponse:
"""Update volume on databricks based on physical_resource_id"""
workspace_client.volumes.update(
full_name_arg=existing_volume.full_name,
name=properties.volume.name,
comment=properties.volume.comment,
)
DaanRademaker marked this conversation as resolved.
Show resolved Hide resolved

return VolumeResponse(name=properties.volume.name, physical_resource_id=physical_resource_id)


def delete_volume(properties: VolumeProperties, physical_resource_id: str) -> CnfResponse:
"""Delete a volume on databricks"""
workspace_client = get_workspace_client(properties.workspace_url)
workspace_client.volumes.delete(full_name_arg=properties.volume.full_name)
return CnfResponse(physical_resource_id=physical_resource_id)
3 changes: 2 additions & 1 deletion aws-lambda/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from unittest.mock import MagicMock

import pytest
from databricks.sdk import ExperimentsAPI, ModelRegistryAPI, WorkspaceClient
from databricks.sdk import ExperimentsAPI, ModelRegistryAPI, VolumesAPI, WorkspaceClient


@pytest.fixture(scope="function", autouse=True)
Expand All @@ -23,5 +23,6 @@ def workspace_client():
# mock all of the underlying service api's
workspace_client.model_registry = MagicMock(spec=ModelRegistryAPI)
workspace_client.experiments = MagicMock(spec=ExperimentsAPI)
workspace_client.volumes = MagicMock(spec=VolumesAPI)

return workspace_client
145 changes: 145 additions & 0 deletions aws-lambda/tests/resources/unity_catalog/test_volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from unittest.mock import patch

import pytest

from databricks_cdk.resources.unity_catalog.volumes import (
Volume,
VolumeCreationError,
VolumeInfo,
VolumeProperties,
VolumeResponse,
VolumeType,
create_or_update_volume,
create_volume,
delete_volume,
update_volume,
)
from databricks_cdk.utils import CnfResponse


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
@patch("databricks_cdk.resources.unity_catalog.volumes.update_volume")
@patch("databricks_cdk.resources.unity_catalog.volumes.create_volume")
def test_create_or_update_volume_create(
patched_create_volume, patched_update_volume, patched_get_workspace_client, workspace_client
):
patched_get_workspace_client.return_value = workspace_client

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

create_or_update_volume(properties=mock_properties, physical_resource_id=None)

patched_create_volume.assert_called_once_with(mock_properties, workspace_client)
patched_update_volume.assert_not_called()


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
@patch("databricks_cdk.resources.unity_catalog.volumes.update_volume")
@patch("databricks_cdk.resources.unity_catalog.volumes.create_volume")
def test_create_or_update_volume_update(
patched_create_volume, patched_update_volume, patched_get_workspace_client, workspace_client
):
patched_get_workspace_client.return_value = workspace_client
mock_physical_resource_id = "some_id"
existing_volume = [VolumeInfo(volume_id=mock_physical_resource_id, name="mock_name", comment="some comment")]

workspace_client.volumes.list.return_value = existing_volume

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

create_or_update_volume(properties=mock_properties, physical_resource_id=mock_physical_resource_id)

patched_update_volume.assert_called_once_with(
mock_properties, workspace_client, existing_volume[0], mock_physical_resource_id
)
patched_create_volume.assert_not_called()


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
def test_create_or_update_volume_error(patch_get_workspace_client, workspace_client):
patch_get_workspace_client.return_value = workspace_client

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

workspace_client.volumes.list.return_value = []

with pytest.raises(VolumeCreationError):
create_or_update_volume(properties=mock_properties, physical_resource_id="some_id")


def test_create_volume(workspace_client):
mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

workspace_client.volumes.create.return_value = VolumeInfo(volume_id="some_id")

response = create_volume(mock_properties, workspace_client)

assert response == VolumeResponse(name="mock_name", physical_resource_id="some_id")
workspace_client.volumes.create.assert_called_once_with(
catalog_name="catalog",
schema_name="schema",
name="mock_name",
volume_type=VolumeType.MANAGED,
comment="some comment",
storage_location=None,
)


def test_create_volume_error(workspace_client):
mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

workspace_client.volumes.create.return_value = VolumeInfo(volume_id=None)

with pytest.raises(VolumeCreationError):
create_volume(mock_properties, workspace_client)


def test_update_volume(workspace_client):
mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)
mock_volume_info = VolumeInfo(volume_id="some_id", full_name="catalog.schema.mock_name")

workspace_client.volumes.update.return_value = mock_volume_info

response = update_volume(mock_properties, workspace_client, mock_volume_info, "some_id")

assert response == VolumeResponse(name="mock_name", physical_resource_id="some_id")
workspace_client.volumes.update.assert_called_once_with(
full_name_arg="catalog.schema.mock_name",
name="mock_name",
comment="some comment",
)


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
def test_delete_volume(patched_get_workspace_client, workspace_client):

patched_get_workspace_client.return_value = workspace_client

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)
response = delete_volume(mock_properties, "some_id")

assert response == CnfResponse(physical_resource_id="some_id")
workspace_client.volumes.delete.assert_called_once_with(
full_name_arg="catalog.schema.mock_name",
)
10 changes: 9 additions & 1 deletion typescript/src/resources/deploy-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {WarehousePermissions, WarehousePermissionsProperties} from "./permission
import {Construct} from "constructs";
import {DockerImage} from "../docker-image";
import {
UnityCatalogVolume, UnityCatalogVolumeProperties,
UnityCatalogCatalog,
UnityCatalogCatalogProperties, UnityCatalogExternalLocation, UnityCatalogExternalLocationProperties,
UnityCatalogMetastore,
Expand Down Expand Up @@ -198,6 +199,13 @@ export abstract class IDatabricksDeployLambda extends Construct {
});
}

public createUnityCatalogVolume(scope: Construct, id: string, props: UnityCatalogVolumeProperties): UnityCatalogVolume {
return new UnityCatalogVolume(scope, id, {
...props,
serviceToken: this.serviceToken
});
}

public createUnityCatalogMetastore(scope: Construct, id: string, props: UnityCatalogMetastoreProperties): UnityCatalogMetastore {
return new UnityCatalogMetastore(scope, id, {
...props,
Expand Down Expand Up @@ -298,7 +306,7 @@ export class DatabricksDeployLambda extends IDatabricksDeployLambda {

this.lambdaRole.addToPrincipalPolicy(new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: [ "secretsmanager:ListSecrets"],
actions: ["secretsmanager:ListSecrets"],
resources: ["*"] // AWS doesn't support providing specific resources for the ListSecrets action
}));

Expand Down
1 change: 1 addition & 0 deletions typescript/src/resources/unity-catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from "./unityCatalogSchema";
export * from "./unityCatalogPermission";
export * from "./unityCatalogStorageCredentials";
export * from "./unityCatalogExternalLocation";
export * from "./unityCatalogVolume";
40 changes: 40 additions & 0 deletions typescript/src/resources/unity-catalog/unityCatalogVolume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {CustomResource} from "aws-cdk-lib";
import {Construct} from "constructs";

export enum VolumeType {
EXTERNAL = "EXTERNAL",
MANAGED = "MANAGED",
}


export interface UnityCatalogVolumeSettings {
name: string
schema_name: string
catalog_name: string
volume_type?: VolumeType
comment?: string
storage_location?: string

}

export interface UnityCatalogVolumeProperties {
workspace_url: string
volume: UnityCatalogVolumeSettings
}

export interface UnityCatalogVolumeProps extends UnityCatalogVolumeProperties {
readonly serviceToken: string
}

export class UnityCatalogVolume extends CustomResource {
constructor(scope: Construct, id: string, props: UnityCatalogVolumeProps) {
super(scope, id, {
serviceToken: props.serviceToken,
properties: {
action: "volume",
workspace_url: props.workspace_url,
catalog: props.volume,
}
});
}
}
Loading
Loading