Skip to content

Commit

Permalink
add volume support
Browse files Browse the repository at this point in the history
  • Loading branch information
DaanRademaker committed Nov 20, 2023
1 parent b0d5107 commit 8614243
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 2 deletions.
5 changes: 5 additions & 0 deletions aws-lambda/src/databricks_cdk/resources/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
create_or_update_storage_credential,
delete_storage_credential,
)
from databricks_cdk.resources.unity_catalog.volumes import VolumeProperties, create_or_update_volume, delete_volume
from databricks_cdk.utils import CnfResponse

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -211,6 +212,8 @@ def create_or_update_resource(event: DatabricksEvent) -> CnfResponse:
RegisteredModelProperties(**event.ResourceProperties),
event.PhysicalResourceId,
)
elif action == "volume":
return create_or_update_volume(VolumeProperties(**event.ResourceProperties), event.PhysicalResourceId)
else:
raise RuntimeError(f"Unknown action: {action}")

Expand Down Expand Up @@ -308,6 +311,8 @@ def delete_resource(event: DatabricksEvent) -> CnfResponse:
RegisteredModelProperties(**event.ResourceProperties),
event.PhysicalResourceId,
)
elif action == "volume":
return delete_volume(VolumeProperties(**event.ResourceProperties), event.PhysicalResourceId)
else:
raise RuntimeError(f"Unknown action: {action}")

Expand Down
102 changes: 102 additions & 0 deletions aws-lambda/src/databricks_cdk/resources/unity_catalog/volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Optional

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import VolumeInfo, VolumeType
from pydantic import BaseModel

from databricks_cdk.utils import CnfResponse, get_workspace_client


class VolumeCreatedError(Exception):
pass


class Volume(BaseModel):
name: str
catalog_name: str
schema_name: str
volume_type: VolumeType = VolumeType.MANAGED
comment: Optional[str] = None
storage_location: Optional[str] = None

@property
def full_name(self) -> str:
"""The three-level (fully qualified) name of the volume"""
return f"{self.catalog_name}.{self.schema_name}.{self.name}"


class VolumeProperties(BaseModel):
workspace_url: str
volume: Volume


class VolumeResponse(CnfResponse):
name: str


def create_or_update_volume(properties: VolumeProperties, physical_resource_id: Optional[str] = None) -> VolumeResponse:
"""
Create or update volume on databricks. If physical_resource_id is provided, it will update the existing volume
else it will create a new one.
"""
workspace_client = get_workspace_client(properties.workspace_url)

if physical_resource_id is not None:
# update existing volume
existing_volume = [
v
for v in workspace_client.volumes.list(
catalog_name=properties.volume.catalog_name, schema_name=properties.volume.schema_name
)
if v.volume_id == physical_resource_id
]

if len(existing_volume) == 0:
raise VolumeCreatedError(
f"Volume with id {physical_resource_id} not found but id is provided, make sure it's managed by CDK"
)

return update_volume(properties, workspace_client, existing_volume[0], physical_resource_id)

# volume doesn't exist yet so create new one
return create_volume(properties, workspace_client)


def create_volume(properties: VolumeProperties, workspace_client: WorkspaceClient) -> VolumeResponse:
"""Create volume on databricks"""
created_volume = workspace_client.volumes.create(
catalog_name=properties.volume.catalog_name,
schema_name=properties.volume.schema_name,
name=properties.volume.name,
volume_type=properties.volume.volume_type,
comment=properties.volume.comment,
storage_location=properties.volume.storage_location,
)

if created_volume.volume_id is None:
raise VolumeCreatedError("Volume creation failed, there was no id found")

return VolumeResponse(name=properties.volume.name, physical_resource_id=created_volume.volume_id)


def update_volume(
properties: VolumeProperties,
workspace_client: WorkspaceClient,
existing_volume: VolumeInfo,
physical_resource_id: str,
) -> VolumeResponse:
"""Update volume on databricks based on physical_resource_id"""
workspace_client.volumes.update(
full_name_arg=existing_volume.full_name,
name=properties.volume.name,
comment=properties.volume.comment,
)

return VolumeResponse(name=properties.volume.name, physical_resource_id=physical_resource_id)


def delete_volume(properties: VolumeProperties, physical_resource_id: str) -> CnfResponse:
"""Delete a volume on databricks""" ""
workspace_client = get_workspace_client(properties.workspace_url)
workspace_client.volumes.delete(full_name_arg=properties.volume.full_name)
return CnfResponse(physical_resource_id=physical_resource_id)
3 changes: 2 additions & 1 deletion aws-lambda/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from unittest.mock import MagicMock

import pytest
from databricks.sdk import ExperimentsAPI, ModelRegistryAPI, WorkspaceClient
from databricks.sdk import ExperimentsAPI, ModelRegistryAPI, VolumesAPI, WorkspaceClient


@pytest.fixture(scope="function", autouse=True)
Expand All @@ -23,5 +23,6 @@ def workspace_client():
# mock all of the underlying service api's
workspace_client.model_registry = MagicMock(spec=ModelRegistryAPI)
workspace_client.experiments = MagicMock(spec=ExperimentsAPI)
workspace_client.volumes = MagicMock(spec=VolumesAPI)

return workspace_client
145 changes: 145 additions & 0 deletions aws-lambda/tests/resources/unity_catalog/test_volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from unittest.mock import patch

import pytest

from databricks_cdk.resources.unity_catalog.volumes import (
Volume,
VolumeCreatedError,
VolumeInfo,
VolumeProperties,
VolumeResponse,
VolumeType,
create_or_update_volume,
create_volume,
delete_volume,
update_volume,
)
from databricks_cdk.utils import CnfResponse


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
@patch("databricks_cdk.resources.unity_catalog.volumes.update_volume")
@patch("databricks_cdk.resources.unity_catalog.volumes.create_volume")
def test_create_or_update_volume_create(
patched_create_volume, patched_update_volume, patched_get_workspace_client, workspace_client
):
patched_get_workspace_client.return_value = workspace_client

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

create_or_update_volume(properties=mock_properties, physical_resource_id=None)

patched_create_volume.assert_called_once_with(mock_properties, workspace_client)
patched_update_volume.assert_not_called()


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
@patch("databricks_cdk.resources.unity_catalog.volumes.update_volume")
@patch("databricks_cdk.resources.unity_catalog.volumes.create_volume")
def test_create_or_update_volume_update(
patched_create_volume, patched_update_volume, patched_get_workspace_client, workspace_client
):
patched_get_workspace_client.return_value = workspace_client
mock_physical_resource_id = "some_id"
existing_volume = [VolumeInfo(volume_id=mock_physical_resource_id, name="mock_name", comment="some comment")]

workspace_client.volumes.list.return_value = existing_volume

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

create_or_update_volume(properties=mock_properties, physical_resource_id=mock_physical_resource_id)

patched_update_volume.assert_called_once_with(
mock_properties, workspace_client, existing_volume[0], mock_physical_resource_id
)
patched_create_volume.assert_not_called()


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
def test_create_or_update_volume_error(patch_get_workspace_client, workspace_client):
patch_get_workspace_client.return_value = workspace_client

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

workspace_client.volumes.list.return_value = []

with pytest.raises(VolumeCreatedError):
create_or_update_volume(properties=mock_properties, physical_resource_id="some_id")


def test_create_volume(workspace_client):
mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

workspace_client.volumes.create.return_value = VolumeInfo(volume_id="some_id")

response = create_volume(mock_properties, workspace_client)

assert response == VolumeResponse(name="mock_name", physical_resource_id="some_id")
workspace_client.volumes.create.assert_called_once_with(
catalog_name="catalog",
schema_name="schema",
name="mock_name",
volume_type=VolumeType.MANAGED,
comment="some comment",
storage_location=None,
)


def test_create_volume_error(workspace_client):
mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)

workspace_client.volumes.create.return_value = VolumeInfo(volume_id=None)

with pytest.raises(VolumeCreatedError):
create_volume(mock_properties, workspace_client)


def test_update_volume(workspace_client):
mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)
mock_volume_info = VolumeInfo(volume_id="some_id", full_name="catalog.schema.mock_name")

workspace_client.volumes.update.return_value = mock_volume_info

response = update_volume(mock_properties, workspace_client, mock_volume_info, "some_id")

assert response == VolumeResponse(name="mock_name", physical_resource_id="some_id")
workspace_client.volumes.update.assert_called_once_with(
full_name_arg="catalog.schema.mock_name",
name="mock_name",
comment="some comment",
)


@patch("databricks_cdk.resources.unity_catalog.volumes.get_workspace_client")
def test_delete_volume(patched_get_workspace_client, workspace_client):

patched_get_workspace_client.return_value = workspace_client

mock_properties = VolumeProperties(
workspace_url="https://test.cloud.databricks.com",
volume=Volume(catalog_name="catalog", schema_name="schema", name="mock_name", comment="some comment"),
)
response = delete_volume(mock_properties, "some_id")

assert response == CnfResponse(physical_resource_id="some_id")
workspace_client.volumes.delete.assert_called_once_with(
full_name_arg="catalog.schema.mock_name",
)
10 changes: 9 additions & 1 deletion typescript/src/resources/deploy-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {WarehousePermissions, WarehousePermissionsProperties} from "./permission
import {Construct} from "constructs";
import {DockerImage} from "../docker-image";
import {
UnityCatalogVolume, UnityCatalogVolumeProperties,
UnityCatalogCatalog,
UnityCatalogCatalogProperties, UnityCatalogExternalLocation, UnityCatalogExternalLocationProperties,
UnityCatalogMetastore,
Expand Down Expand Up @@ -198,6 +199,13 @@ export abstract class IDatabricksDeployLambda extends Construct {
});
}

public createUnityCatalogVolume(scope: Construct, id: string, props: UnityCatalogVolumeProperties): UnityCatalogVolume {
return new UnityCatalogVolume(scope, id, {
...props,
serviceToken: this.serviceToken
});
}

public createUnityCatalogMetastore(scope: Construct, id: string, props: UnityCatalogMetastoreProperties): UnityCatalogMetastore {
return new UnityCatalogMetastore(scope, id, {
...props,
Expand Down Expand Up @@ -298,7 +306,7 @@ export class DatabricksDeployLambda extends IDatabricksDeployLambda {

this.lambdaRole.addToPrincipalPolicy(new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: [ "secretsmanager:ListSecrets"],
actions: ["secretsmanager:ListSecrets"],
resources: ["*"] // AWS doesn't support providing specific resources for the ListSecrets action
}));

Expand Down
1 change: 1 addition & 0 deletions typescript/src/resources/unity-catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from "./unityCatalogSchema";
export * from "./unityCatalogPermission";
export * from "./unityCatalogStorageCredentials";
export * from "./unityCatalogExternalLocation";
export * from "./unityCatalogVolume";
40 changes: 40 additions & 0 deletions typescript/src/resources/unity-catalog/unityCatalogVolume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {CustomResource} from "aws-cdk-lib";
import {Construct} from "constructs";

enum volumeType {
EXTERNAL = "EXTERNAL",
MANAGED = "MANAGED",
}


export interface UnityCatalogVolumeSettings {
name: string
schema_name: string
catalog_name: string
volume_type?: volumeType
comment?: string
storage_location?: string

}

export interface UnityCatalogVolumeProperties {
workspace_url: string
volume: UnityCatalogVolumeSettings
}

export interface UnityCatalogVolumeProps extends UnityCatalogVolumeProperties {
readonly serviceToken: string
}

export class UnityCatalogVolume extends CustomResource {
constructor(scope: Construct, id: string, props: UnityCatalogVolumeProps) {
super(scope, id, {
serviceToken: props.serviceToken,
properties: {
action: "unity-catalog",
workspace_url: props.workspace_url,
catalog: props.volume,
}
});
}
}

0 comments on commit 8614243

Please sign in to comment.