Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for error notifications for Snowpipe #595

Merged
merged 12 commits into from
Jul 13, 2021
10 changes: 7 additions & 3 deletions docs/resources/notification_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ description: |-

### Required

- **azure_storage_queue_primary_uri** (String) The queue ID for the Azure Queue Storage queue created for Event Grid notifications
- **azure_tenant_id** (String) The ID of the Azure Active Directory tenant used for identity management
- **name** (String)

### Optional

- **aws_sqs_arn** (String) AWS SQS queue ARN for notification integration to connect to
- **aws_sqs_external_id** (String) The external ID that Snowflake will use when assuming the AWS role
- **aws_sqs_role_arn** (String) AWS IAM role ARN for notification integration to assume
- **azure_storage_queue_primary_uri** (String) The queue ID for the Azure Queue Storage queue created for Event Grid notifications
- **azure_tenant_id** (String) The ID of the Azure Active Directory tenant used for identity management
- **comment** (String)
- **direction** (String) Direction of the cloud messaging with respect to Snowflake (required only for error notifications)
- **enabled** (Boolean)
- **id** (String) The ID of this resource.
- **notification_provider** (String) The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE)
- **notification_provider** (String) The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE, AWS_SQS)
- **type** (String) A type of integration

### Read-Only
Expand Down
1 change: 1 addition & 0 deletions docs/resources/pipe.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ resource snowflake_pipe pipe {
- **auto_ingest** (Boolean) Specifies a auto_ingest param for the pipe.
- **aws_sns_topic_arn** (String) Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket.
- **comment** (String) Specifies a comment for the pipe.
- **error_integration** (String) Specifies the name of the notification integration used for error notifications.
- **id** (String) The ID of this resource.
- **integration** (String) Specifies an integration for the pipe.

Expand Down
74 changes: 69 additions & 5 deletions pkg/resources/notification_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,44 @@ var notificationIntegrationSchema = map[string]*schema.Schema{
ValidateFunc: validation.StringInSlice([]string{"QUEUE"}, true),
Description: "A type of integration",
},
"direction": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{"INBOUND", "OUTBOUND"}, true),
Description: "Direction of the cloud messaging with respect to Snowflake (required only for error notifications)",
},
// This part of the schema is the cloudProviderParams in the Snowflake documentation and differs between vendors
"notification_provider": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "AZURE_STORAGE_QUEUE",
ValidateFunc: validation.StringInSlice([]string{"AZURE_STORAGE_QUEUE"}, true),
Description: "The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE)",
ValidateFunc: validation.StringInSlice([]string{"AZURE_STORAGE_QUEUE", "AWS_SQS"}, true),
Description: "The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE, AWS_SQS)",
},
"azure_storage_queue_primary_uri": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
Description: "The queue ID for the Azure Queue Storage queue created for Event Grid notifications",
},
"azure_tenant_id": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
Description: "The ID of the Azure Active Directory tenant used for identity management",
},
"aws_sqs_external_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "The external ID that Snowflake will use when assuming the AWS role",
},
"aws_sqs_arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "AWS SQS queue ARN for notification integration to connect to",
},
"aws_sqs_role_arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "AWS IAM role ARN for notification integration to assume",
},
"comment": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -90,6 +111,12 @@ func CreateNotificationIntegration(data *schema.ResourceData, meta interface{})
if v, ok := data.GetOk("comment"); ok {
stmt.SetString(`COMMENT`, v.(string))
}
if v, ok := data.GetOk("direction"); ok {
stmt.SetString(`DIRECTION`, v.(string))
}
if v, ok := data.GetOk("azure_tenant_id"); ok {
stmt.SetString(`AZURE_TENANT_ID`, v.(string))
}
if v, ok := data.GetOk("notification_provider"); ok {
stmt.SetString(`NOTIFICATION_PROVIDER`, v.(string))
}
Expand All @@ -99,6 +126,12 @@ func CreateNotificationIntegration(data *schema.ResourceData, meta interface{})
if v, ok := data.GetOk("azure_tenant_id"); ok {
stmt.SetString(`AZURE_TENANT_ID`, v.(string))
}
if v, ok := data.GetOk("aws_sqs_arn"); ok {
stmt.SetString(`AWS_SQS_ARN`, v.(string))
}
if v, ok := data.GetOk("aws_sqs_role_arn"); ok {
stmt.SetString(`AWS_SQS_ROLE_ARN`, v.(string))
}

err := snowflake.Exec(db, stmt.Statement())
if err != nil {
Expand Down Expand Up @@ -167,6 +200,10 @@ func ReadNotificationIntegration(data *schema.ResourceData, meta interface{}) er
switch k {
case "ENABLED":
// We set this using the SHOW INTEGRATION call so let's ignore it here
case "DIRECTION":
if err = data.Set("direction", v.(string)); err != nil {
return err
}
case "NOTIFICATION_PROVIDER":
if err = data.Set("notification_provider", v.(string)); err != nil {
return err
Expand All @@ -179,6 +216,18 @@ func ReadNotificationIntegration(data *schema.ResourceData, meta interface{}) er
if err = data.Set("azure_tenant_id", v.(string)); err != nil {
return err
}
case "AWS_SQS_ARN":
if err = data.Set("aws_sqs_arn", v.(string)); err != nil {
return err
}
case "AWS_SQS_ROLE_ARN":
if err = data.Set("aws_sqs_role_arn", v.(string)); err != nil {
return err
}
case "AWS_SQS_EXTERNAL_ID":
if err = data.Set("aws_sqs_external_id", v.(string)); err != nil {
return err
}
default:
log.Printf("[WARN] unexpected property %v returned from Snowflake", k)
}
Expand Down Expand Up @@ -213,6 +262,11 @@ func UpdateNotificationIntegration(data *schema.ResourceData, meta interface{})
stmt.SetBool(`ENABLED`, data.Get("enabled").(bool))
}

if data.HasChange("direction") {
runSetStatement = true
stmt.SetString("DIRECTION", data.Get("direction").(string))
}

if data.HasChange("notification_provider") {
runSetStatement = true
stmt.SetString("NOTIFICATION_PROVIDER", data.Get("notification_provider").(string))
Expand All @@ -228,6 +282,16 @@ func UpdateNotificationIntegration(data *schema.ResourceData, meta interface{})
stmt.SetString("AZURE_TENANT_ID", data.Get("azure_tenant_id").(string))
}

if data.HasChange("aws_sqs_arn") {
runSetStatement = true
stmt.SetString("AWS_SQS_ARN", data.Get("aws_sqs_arn").(string))
}

if data.HasChange("aws_sqs_role_arn") {
runSetStatement = true
stmt.SetString("AWS_SQS_ROLE_ARN", data.Get("aws_sqs_role_arn").(string))
}

if runSetStatement {
if err := snowflake.Exec(db, stmt.Statement()); err != nil {
return fmt.Errorf("error updating notification integration: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/resources/notification_integration_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ func TestAcc_NotificationIntegration(t *testing.T) {
Providers: providers(),
Steps: []resource.TestStep{
{
Config: notificationIntegrationConfig(accName, storageUri, tenant),
Config: azureNotificationIntegrationConfig(accName, storageUri, tenant),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "name", accName),
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "notification_provider", "AZURE_STORAGE_QUEUE"),
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "azure_storage_queue_primary_uri", storageUri),
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "azure_tenant_id", tenant),
),
Expand All @@ -33,13 +34,14 @@ func TestAcc_NotificationIntegration(t *testing.T) {
})
}

func notificationIntegrationConfig(name string, azureStorageQueuePrimaryUri string, azureTenantId string) string {
func azureNotificationIntegrationConfig(name string, azureStorageQueuePrimaryUri string, azureTenantId string) string {
s := `
resource "snowflake_notification_integration" "test" {
name = "%s"
notification_provider = "%s"
azure_storage_queue_primary_uri = "%s"
azure_tenant_id = "%s"
}
`
return fmt.Sprintf(s, name, azureStorageQueuePrimaryUri, azureTenantId)
return fmt.Sprintf(s, name, "AZURE_STORAGE_QUEUE", azureStorageQueuePrimaryUri, azureTenantId)
}
108 changes: 77 additions & 31 deletions pkg/resources/notification_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,73 @@ func TestNotificationIntegration(t *testing.T) {
}

func TestNotificationIntegrationCreate(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "test_notification_integration",
"comment": "great comment",
"azure_storage_queue_primary_uri": "azure://great-bucket/great-path/",
"azure_tenant_id": "some-guid",
testCases := []struct {
notificationProvider string
raw map[string]interface{}
expectSQL string
}{
{
notificationProvider: "AZURE_STORAGE_QUEUE",
raw: map[string]interface{}{
"name": "test_notification_integration",
"comment": "great comment",
"notification_provider": "AZURE_STORAGE_QUEUE",
"azure_storage_queue_primary_uri": "azure://great-bucket/great-path/",
"azure_tenant_id": "some-guid",
},
expectSQL: `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://great-bucket/great-path/' AZURE_TENANT_ID='some-guid' COMMENT='great comment' NOTIFICATION_PROVIDER='AZURE_STORAGE_QUEUE' TYPE='QUEUE' ENABLED=true$`,
},
{
notificationProvider: "AWS_SQS",
raw: map[string]interface{}{
"name": "test_notification_integration",
"comment": "great comment",
"direction": "OUTBOUND",
"notification_provider": "AWS_SQS",
"aws_sqs_arn": "some-sqs-arn",
"aws_sqs_role_arn": "some-iam-role-arn",
},
expectSQL: `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AWS_SQS_ARN='some-sqs-arn' AWS_SQS_ROLE_ARN='some-iam-role-arn' COMMENT='great comment' DIRECTION='OUTBOUND' NOTIFICATION_PROVIDER='AWS_SQS' TYPE='QUEUE' ENABLED=true$`,
},
}
for _, testCase := range testCases {
r := require.New(t)
d := schema.TestResourceDataRaw(t, resources.NotificationIntegration().Schema, testCase.raw)
r.NotNil(d)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(testCase.expectSQL).WillReturnResult(sqlmock.NewResult(1, 1))
expectReadNotificationIntegration(mock, testCase.notificationProvider)

err := resources.CreateNotificationIntegration(d, db)
r.NoError(err)
})
}
d := schema.TestResourceDataRaw(t, resources.NotificationIntegration().Schema, in)
r.NotNil(d)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://great-bucket/great-path/' AZURE_TENANT_ID='some-guid' COMMENT='great comment' NOTIFICATION_PROVIDER='AZURE_STORAGE_QUEUE' TYPE='QUEUE' ENABLED=true$`,
).WillReturnResult(sqlmock.NewResult(1, 1))
expectReadNotificationIntegration(mock)

err := resources.CreateNotificationIntegration(d, db)
r.NoError(err)
})
}

func TestNotificationIntegrationRead(t *testing.T) {
r := require.New(t)
testCases := []struct {
notificationProvider string
}{
{
notificationProvider: "AZURE_STORAGE_QUEUE",
},
{
notificationProvider: "AWS_SQS",
},
}
for _, testCase := range testCases {
r := require.New(t)

d := notificationIntegration(t, "test_notification_integration", map[string]interface{}{"name": "test_notification_integration"})
d := notificationIntegration(t, "test_notification_integration", map[string]interface{}{"name": "test_notification_integration"})

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
expectReadNotificationIntegration(mock)
WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
expectReadNotificationIntegration(mock, testCase.notificationProvider)

err := resources.ReadNotificationIntegration(d, db)
r.NoError(err)
})
err := resources.ReadNotificationIntegration(d, db)
r.NoError(err)
})
}
}

func TestNotificationIntegrationDelete(t *testing.T) {
Expand All @@ -66,18 +100,30 @@ func TestNotificationIntegrationDelete(t *testing.T) {
})
}

func expectReadNotificationIntegration(mock sqlmock.Sqlmock) {
func expectReadNotificationIntegration(mock sqlmock.Sqlmock, notificationProvider string) {
showRows := sqlmock.NewRows([]string{
"name", "type", "category", "enabled", "created_on"},
).AddRow("test_notification_integration", "QUEUE", "NOTIFICATION", true, "now")
mock.ExpectQuery(`^SHOW NOTIFICATION INTEGRATIONS LIKE 'test_notification_integration'$`).WillReturnRows(showRows)

descRows := sqlmock.NewRows([]string{
"property", "property_type", "property_value", "property_default",
}).AddRow("ENABLED", "Boolean", true, false).
AddRow("NOTIFICATION_PROVIDER", "String", "AZURE_STORAGE_QUEUE", nil).
AddRow("AZURE_STORAGE_QUEUE_PRIMARY_URI", "String", "azure://great-bucket/great-path/", nil).
AddRow("AZURE_TENANT_ID", "String", "some-guid", nil)
}).AddRow("ENABLED", "Boolean", true, false)

switch notificationProvider {
case "AZURE_STORAGE_QUEUE":
descRows = descRows.
AddRow("NOTIFICATION_PROVIDER", "String", notificationProvider, nil).
AddRow("AZURE_STORAGE_QUEUE_PRIMARY_URI", "String", "azure://great-bucket/great-path/", nil).
AddRow("AZURE_TENANT_ID", "String", "some-guid", nil)
case "AWS_SQS":
descRows = descRows.
AddRow("NOTIFICATION_PROVIDER", "String", notificationProvider, nil).
AddRow("DIRECTION", "String", "OUTBOUND", nil).
AddRow("AWS_SQS_ARN", "String", "some-sqs-arn", nil).
AddRow("AWS_SQS_ROLE_ARN", "String", "some-iam-role-arn", nil).
AddRow("AWS_SQS_EXTERNAL_ID", "String", "AGreatExternalID", nil)
}

mock.ExpectQuery(`DESCRIBE NOTIFICATION INTEGRATION "test_notification_integration"$`).WillReturnRows(descRows)
}
23 changes: 23 additions & 0 deletions pkg/resources/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ var pipeSchema = map[string]*schema.Schema{
Computed: true,
Description: "Name of the role that owns the pipe.",
},
"error_integration": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "Specifies the name of the notification integration used for error notifications.",
},
}

func Pipe() *schema.Resource {
Expand Down Expand Up @@ -176,6 +181,10 @@ func CreatePipe(d *schema.ResourceData, meta interface{}) error {
builder.WithIntegration(v.(string))
}

if v, ok := d.GetOk("error_integration"); ok {
builder.WithErrorIntegration((v.(string)))
}

q := builder.Create()

err := snowflake.Exec(db, q)
Expand Down Expand Up @@ -267,6 +276,11 @@ func ReadPipe(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("error_integration", pipe.ErrorIntegration.String)
if err != nil {
return err
}

return nil
}

Expand All @@ -293,6 +307,15 @@ func UpdatePipe(d *schema.ResourceData, meta interface{}) error {
}
}

if d.HasChange("error_integration") {
errorIntegration := d.Get("error_integration")
q := builder.ChangeErrorIntegration(errorIntegration.(string))
err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating pipe error_integration on %v", d.Id())
}
}

return ReadPipe(d, meta)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/resources/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestPipeRead(t *testing.T) {

func expectReadPipe(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{
"created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment"},
).AddRow("2019-12-23 17:20:50.088 +0000", "test_pipe", "test_db", "test_schema", "test definition", "N", "test", "great comment")
"created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment", "error_integration"},
).AddRow("2019-12-23 17:20:50.088 +0000", "test_pipe", "test_db", "test_schema", "test definition", "N", "test", "great comment", "test_integration")
mock.ExpectQuery(`^SHOW PIPES LIKE 'test_pipe' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows)
}
Loading