Skip to content

Commit

Permalink
docs: add v1p1beta1 notifications samples (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdh911 authored Feb 20, 2020
1 parent 8ccd401 commit 420e26b
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/google-cloud-securitycenter/docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-pubsub==1.3.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#!/usr/bin/env python
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Demos for working with notification configs."""


def create_notification_config(organization_id, notification_config_id, pubsub_topic):

# [START scc_create_notification_config]
from google.cloud import securitycenter_v1p1beta1 as securitycenter
from google.cloud.securitycenter_v1p1beta1.proto.notification_config_pb2 import (
NotificationConfig,
)

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
# TODO: notification_config_id = "your-config-id"
# TODO: pubsub_topic = "projects/{your-project-id}/topics/{your-topic-ic}"
# Ensure this ServiceAccount has the "pubsub.topics.setIamPolicy" permission on the new topic.

org_name = "organizations/{org_id}".format(org_id=organization_id)

created_notification_config = client.create_notification_config(
org_name,
notification_config_id,
{
"description": "Notification for active findings",
"pubsub_topic": pubsub_topic,
"event_type": NotificationConfig.FINDING,
"streaming_config": {"filter": 'state = "ACTIVE"',},
},
)

print(created_notification_config)
# [END scc_create_notification_config]
return created_notification_config


def delete_notification_config(organization_id, notification_config_id):

# [START scc_delete_notification_config]
from google.cloud import securitycenter_v1p1beta1 as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
# TODO: notification_config_id = "your-config-id"

notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=organization_id, config_id=notification_config_id
)

client.delete_notification_config(notification_config_name)
print("Deleted notification config: {}".format(notification_config_name))
# [END scc_delete_notification_config]
return True


def get_notification_config(organization_id, notification_config_id):

# [START scc_get_notification_config]
from google.cloud import securitycenter_v1p1beta1 as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
# TODO: notification_config_id = "your-config-id"

notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=organization_id, config_id=notification_config_id
)

notification_config = client.get_notification_config(notification_config_name)
print("Got notification config: {}".format(notification_config))
# [END scc_get_notification_config]
return notification_config


def list_notification_configs(organization_id):

# [START scc_list_notification_configs]
from google.cloud import securitycenter_v1p1beta1 as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
org_name = "organizations/{org_id}".format(org_id=organization_id)

notification_configs_iterator = client.list_notification_configs(org_name)
for i, config in enumerate(notification_configs_iterator):
print("{}: notification_config: {}".format(i, config))
# [END scc_list_notification_configs]
return notification_configs_iterator


def update_notification_config(organization_id, notification_config_id, pubsub_topic):
# [START scc_update_notification_config]
from google.cloud import securitycenter_v1p1beta1 as securitycenter
from google.protobuf import field_mask_pb2

client = securitycenter.SecurityCenterClient()

# TODO organization_id = "your-org-id"
# TODO notification_config_id = "config-id-to-update"
# TODO pubsub_topic = "projects/{new-project}/topics/{new-topic}"
# If updating a pubsub_topic, ensure this ServiceAccount has the
# "pubsub.topics.setIamPolicy" permission on the new topic.

notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=organization_id, config_id=notification_config_id
)

updated_description = "New updated description"

# Only description and pubsub_topic can be updated.
field_mask = field_mask_pb2.FieldMask(paths=["description", "pubsub_topic"])

updated_notification_config = client.update_notification_config(
{
"name": notification_config_name,
"description": updated_description,
"pubsub_topic": pubsub_topic,
},
update_mask=field_mask,
)

print(updated_notification_config)
# [END scc_update_notification_config]
return updated_notification_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Demo for receiving notifications."""


def receive_notifications(project_id, subscription_name):
# [START scc_receive_notifications]
# Requires https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-python
from google.cloud import pubsub_v1
from google.cloud.securitycenter_v1p1beta1.proto.notification_message_pb2 import (
NotificationMessage,
)
from google.protobuf import json_format

# TODO: project_id = "your-project-id"
# TODO: subscription_name = "your-subscription-name"

def callback(message):
print("Received message")

notification_msg = NotificationMessage()
json_format.Parse(message.data, notification_msg)

print(
"Notification config name: {}".format(
notification_msg.notification_config_name
)
)
print("Finding: {}".format(notification_msg.finding))

# Ack the message to prevent it from being pulled again
message.ack()

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

print("Listening for messages on {}...\n".format(subscription_path))
try:
streaming_pull_future.result(timeout=1) # Block for 1 second
except:
streaming_pull_future.cancel()
# [END scc_receive_notifications]
return True
152 changes: 152 additions & 0 deletions packages/google-cloud-securitycenter/docs/v1p1beta1/snippets_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for snippets."""

import os
import uuid

from google.cloud import securitycenter_v1p1beta1 as securitycenter
from google.cloud.securitycenter_v1p1beta1.proto.notification_config_pb2 import (
NotificationConfig,
)
import pytest

import snippets_notification_configs
import snippets_notification_receiver

ORG_ID = os.environ["GCLOUD_ORGANIZATION"]
PROJECT_ID = os.environ["GCLOUD_PROJECT"]
PUBSUB_TOPIC = os.environ["GCLOUD_PUBSUB_TOPIC"]
PUBSUB_SUBSCRIPTION = os.environ["GCLOUD_PUBSUB_SUBSCRIPTION"]

CREATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1())
DELETE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1())
GET_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1())
UPDATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1())


def cleanup_notification_config(notification_config_id):
client = securitycenter.SecurityCenterClient()

notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=ORG_ID, config_id=notification_config_id
)
client.delete_notification_config(notification_config_name)


@pytest.fixture
def new_notification_config_for_update():
client = securitycenter.SecurityCenterClient()

org_name = "organizations/{org_id}".format(org_id=ORG_ID)

created_notification_config = client.create_notification_config(
org_name,
UPDATE_CONFIG_ID,
{
"description": "Notification for active findings",
"pubsub_topic": PUBSUB_TOPIC,
"event_type": NotificationConfig.FINDING,
"streaming_config": {"filter": "",},
},
)
yield created_notification_config
cleanup_notification_config(UPDATE_CONFIG_ID)


@pytest.fixture
def new_notification_config_for_get():
client = securitycenter.SecurityCenterClient()

org_name = "organizations/{org_id}".format(org_id=ORG_ID)

created_notification_config = client.create_notification_config(
org_name,
GET_CONFIG_ID,
{
"description": "Notification for active findings",
"pubsub_topic": PUBSUB_TOPIC,
"event_type": NotificationConfig.FINDING,
"streaming_config": {"filter": "",},
},
)
yield created_notification_config
cleanup_notification_config(GET_CONFIG_ID)


@pytest.fixture
def deleted_notification_config():
client = securitycenter.SecurityCenterClient()

org_name = "organizations/{org_id}".format(org_id=ORG_ID)

created_notification_config = client.create_notification_config(
org_name,
DELETE_CONFIG_ID,
{
"description": "Notification for active findings",
"pubsub_topic": PUBSUB_TOPIC,
"event_type": NotificationConfig.FINDING,
"streaming_config": {"filter": "",},
},
)
return created_notification_config


def test_create_notification_config():
created_notification_config = snippets_notification_configs.create_notification_config(
ORG_ID, CREATE_CONFIG_ID, PUBSUB_TOPIC
)
assert created_notification_config is not None

cleanup_notification_config(CREATE_CONFIG_ID)


def test_delete_notification_config(deleted_notification_config):
assert (
snippets_notification_configs.delete_notification_config(
ORG_ID, DELETE_CONFIG_ID
)
== True
)


def test_get_notification_config(new_notification_config_for_get):
retrieved_config = snippets_notification_configs.get_notification_config(
ORG_ID, GET_CONFIG_ID
)
assert retrieved_config is not None


def test_list_notification_configs():
iterator = snippets_notification_configs.list_notification_configs(ORG_ID)
assert iterator is not None


def test_update_notification_config(new_notification_config_for_update):
updated_config = snippets_notification_configs.update_notification_config(
ORG_ID, UPDATE_CONFIG_ID, PUBSUB_TOPIC
)
assert updated_config is not None


def test_receive_notifications():
assert (
snippets_notification_receiver.receive_notifications(
PROJECT_ID, PUBSUB_SUBSCRIPTION
)
== True
)
17 changes: 17 additions & 0 deletions packages/google-cloud-securitycenter/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,26 @@ def snippets(session):
session.env['GCLOUD_ORGANIZATION'] = '1081635000895'
else:
session.skip('Credentials must be set via environment variable.')
if not os.environ.get('GCLOUD_PROJECT', ''):
if 'KOKORO_GFILE_DIR' in os.environ:
session.env['GCLOUD_PROJECT'] = 'project-a-id'
else:
session.skip('Credentials must be set via environment variable.')
if not os.environ.get('GCLOUD_PUBSUB_TOPIC', ''):
if 'KOKORO_GFILE_DIR' in os.environ:
session.env['GCLOUD_PUBSUB_TOPIC'] = 'projects/project-a-id/topics/notifications-sample-topic'
else:
session.skip('Credentials must be set via environment variable.')
if not os.environ.get('GCLOUD_PUBSUB_SUBSCRIPTION', ''):
if 'KOKORO_GFILE_DIR' in os.environ:
session.env['GCLOUD_PUBSUB_SUBSCRIPTION'] = 'notification_sample_subscription'
else:
session.skip('Credentials must be set via environment variable.')


# Install all test dependencies, then install local packages in place.
session.install('mock', 'pytest')
session.install("-r", "docs/requirements.txt")
session.install('-e', '.')
session.run(
'py.test',
Expand All @@ -160,6 +176,7 @@ def snippets(session):
os.path.join('docs', 'snippets_orgs.py'),
os.path.join('docs', 'snippets_findings.py'),
os.path.join('docs', 'snippets_security_marks.py'),
os.path.join('docs', 'v1p1beta1', 'snippets_test.py'),


*session.posargs
Expand Down

0 comments on commit 420e26b

Please sign in to comment.