Skip to content

Commit

Permalink
feat(governance): msk brokers (#784)
Browse files Browse the repository at this point in the history
* add MSK bootstrap brokers URL to DataZone asset type metadata
  • Loading branch information
vgkowski authored Nov 8, 2024
1 parent aa44a29 commit b4705bd
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 26 deletions.
3 changes: 2 additions & 1 deletion examples/datazone-msk-governance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ cdk deploy
## Create a custom environment in the consumer project

1. Enable the [custom AWS service blueprint](https://docs.aws.amazon.com/datazone/latest/userguide/enable-custom-blueprint.html)
2. Create a [custom environment](https://docs.aws.amazon.com/datazone/latest/userguide/create-custom-environment.html) in the `consumer` project
2. Create a [custom environment](https://docs.aws.amazon.com/datazone/latest/userguide/create-custom-environment.html) in the `consumer` project.
You can use any IAM Role that can be assumed by DataZone but for simplicity the CDK Stack provides the `StreamingGovernanceStack-ConsumerRole`.

## Create a subscription target on the custom environment

Expand Down
18 changes: 0 additions & 18 deletions examples/datazone-msk-governance/resources/lambda/config.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion examples/datazone-msk-governance/tests/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ def test_nag_errors(results):
errors = Annotations.from_stack(results[0]).find_error('*', Match.string_like_regexp('AwsSolutions-.*'))
for error in errors:
print(error)
assert(len(errors) == 0)
assert(len(errors) == 0)
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export class DataZoneGsrMskDataSource extends TrackedConstruct {
effect: Effect.ALLOW,
actions: [
'kafka:ListClustersV2',
'kafka:GetBootstrapBrokers',
],
resources: ['*'],
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ export class DataZoneMskAssetType extends TrackedConstruct {
type: 'String',
required: true,
},
{
name: 'bootstrap_brokers',
type: 'String',
required: true,
},
],
required: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { DataZoneClient, GetAssetCommand, CreateAssetCommand, CreateAssetRevisionCommand, DeleteAssetCommand } from "@aws-sdk/client-datazone";
import { GlueClient, ListSchemasCommand, GetSchemaVersionCommand, GetSchemaCommand } from "@aws-sdk/client-glue";
import { KafkaClient, ListClustersV2Command, DescribeClusterV2Command } from "@aws-sdk/client-kafka";
import { KafkaClient, ListClustersV2Command, DescribeClusterV2Command, GetBootstrapBrokersCommand } from "@aws-sdk/client-kafka";
import { SSMClient, GetParametersByPathCommand, DeleteParameterCommand, PutParameterCommand } from "@aws-sdk/client-ssm";


Expand Down Expand Up @@ -34,6 +34,7 @@ export const handler = async () => {
let clusterArn;
let clusterUuid;
let clusterType;
let bootstrapBrokers;

try {
// Step 1: Retrieve existing parameters
Expand All @@ -53,7 +54,7 @@ export const handler = async () => {
}
console.log(assetMap);

// Step 2: List all Kafka clusters and find the ARN for the specified cluster
// Step 2: List all Kafka clusters, find the ARN and the bootstrap brokers for the specified cluster
try {
const listClustersCommand = new ListClustersV2Command({});
const listClustersResponse = await kafkaClient.send(listClustersCommand);
Expand All @@ -77,6 +78,10 @@ export const handler = async () => {
}

console.log(`Cluster type for ${clusterName} is ${clusterType}`);

const getBootstrapBrokersCommand = new GetBootstrapBrokersCommand({ ClusterArn: clusterArn });
const getBootstrapBrokersResponse = await kafkaClient.send(getBootstrapBrokersCommand);
bootstrapBrokers = getBootstrapBrokersResponse.BootstrapBrokerStringSaslIam;

} catch (err) {
console.error('Error handling Kafka cluster:', err);
Expand Down Expand Up @@ -139,7 +144,8 @@ export const handler = async () => {
typeIdentifier: 'MskSourceReferenceFormType',
content: JSON.stringify({
cluster_arn: clusterArn,
cluster_type: clusterType // Ensure clusterType is correctly included
cluster_type: clusterType,
bootstrap_brokers: bootstrapBrokers
}),
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ export const handler = async(event) => {
Region: consumerRegion,
Account: consumerAccountId,
RolesArn: consumerRolesArn,
}
},
MetadataVersion: 1
};

console.log(`Metadata collection results: ${JSON.stringify({ results }, null, 2)}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ describe('Creating a DataZone-GSR-MSK-Datasource with default configuration', ()
},
}),
Match.objectLike({
Action: 'kafka:ListClustersV2',
Action: [
'kafka:ListClustersV2',
'kafka:GetBootstrapBrokers',
],
Effect: 'Allow',
Resource: '*',
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe ('Creating a DataZoneMskAssetType with default configuration', () => {
},
{
name: 'MskSourceReferenceFormType',
model: '\n structure MskSourceReferenceFormType {\n @required\ncluster_arn: String\n@required\ncluster_type: String\n }\n ',
model: '\n structure MskSourceReferenceFormType {\n @required\ncluster_arn: String\n@required\ncluster_type: String\n@required\nbootstrap_brokers: String\n }\n ',
required: true,
},
{
Expand Down

0 comments on commit b4705bd

Please sign in to comment.