From 2833566adc6efbef9e014248bb8aca6e288c51b1 Mon Sep 17 00:00:00 2001 From: Mike Gouline <1960272+gouline@users.noreply.github.com> Date: Wed, 14 Jul 2021 02:52:24 +1000 Subject: [PATCH] feat: Add support for error notifications for Snowpipe (#595) --- docs/resources/notification_integration.md | 10 +- docs/resources/pipe.md | 1 + pkg/resources/notification_integration.go | 74 +++++++++++- ...otification_integration_acceptance_test.go | 8 +- .../notification_integration_test.go | 108 +++++++++++++----- pkg/resources/pipe.go | 23 ++++ pkg/resources/pipe_test.go | 4 +- .../notification_integration_test.go | 22 +++- pkg/snowflake/pipe.go | 33 ++++-- 9 files changed, 230 insertions(+), 53 deletions(-) diff --git a/docs/resources/notification_integration.md b/docs/resources/notification_integration.md index f25172984d..5917f1a7a1 100644 --- a/docs/resources/notification_integration.md +++ b/docs/resources/notification_integration.md @@ -17,16 +17,20 @@ description: |- ### Required -- **azure_storage_queue_primary_uri** (String) The queue ID for the Azure Queue Storage queue created for Event Grid notifications -- **azure_tenant_id** (String) The ID of the Azure Active Directory tenant used for identity management - **name** (String) ### Optional +- **aws_sqs_arn** (String) AWS SQS queue ARN for notification integration to connect to +- **aws_sqs_external_id** (String) The external ID that Snowflake will use when assuming the AWS role +- **aws_sqs_role_arn** (String) AWS IAM role ARN for notification integration to assume +- **azure_storage_queue_primary_uri** (String) The queue ID for the Azure Queue Storage queue created for Event Grid notifications +- **azure_tenant_id** (String) The ID of the Azure Active Directory tenant used for identity management - **comment** (String) +- **direction** (String) Direction of the cloud messaging with respect to Snowflake (required only for error notifications) - **enabled** (Boolean) - **id** (String) The ID of this resource. -- **notification_provider** (String) The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE) +- **notification_provider** (String) The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE, AWS_SQS) - **type** (String) A type of integration ### Read-Only diff --git a/docs/resources/pipe.md b/docs/resources/pipe.md index cb260c2a4c..a51229c4b4 100644 --- a/docs/resources/pipe.md +++ b/docs/resources/pipe.md @@ -44,6 +44,7 @@ resource snowflake_pipe pipe { - **auto_ingest** (Boolean) Specifies a auto_ingest param for the pipe. - **aws_sns_topic_arn** (String) Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket. - **comment** (String) Specifies a comment for the pipe. +- **error_integration** (String) Specifies the name of the notification integration used for error notifications. - **id** (String) The ID of this resource. - **integration** (String) Specifies an integration for the pipe. diff --git a/pkg/resources/notification_integration.go b/pkg/resources/notification_integration.go index 770dabee0e..7614f57980 100644 --- a/pkg/resources/notification_integration.go +++ b/pkg/resources/notification_integration.go @@ -30,23 +30,44 @@ var notificationIntegrationSchema = map[string]*schema.Schema{ ValidateFunc: validation.StringInSlice([]string{"QUEUE"}, true), Description: "A type of integration", }, + "direction": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringInSlice([]string{"INBOUND", "OUTBOUND"}, true), + Description: "Direction of the cloud messaging with respect to Snowflake (required only for error notifications)", + }, + // This part of the schema is the cloudProviderParams in the Snowflake documentation and differs between vendors "notification_provider": &schema.Schema{ Type: schema.TypeString, Optional: true, - Default: "AZURE_STORAGE_QUEUE", - ValidateFunc: validation.StringInSlice([]string{"AZURE_STORAGE_QUEUE"}, true), - Description: "The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE)", + ValidateFunc: validation.StringInSlice([]string{"AZURE_STORAGE_QUEUE", "AWS_SQS"}, true), + Description: "The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE, AWS_SQS)", }, "azure_storage_queue_primary_uri": &schema.Schema{ Type: schema.TypeString, - Required: true, + Optional: true, Description: "The queue ID for the Azure Queue Storage queue created for Event Grid notifications", }, "azure_tenant_id": &schema.Schema{ Type: schema.TypeString, - Required: true, + Optional: true, Description: "The ID of the Azure Active Directory tenant used for identity management", }, + "aws_sqs_external_id": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Description: "The external ID that Snowflake will use when assuming the AWS role", + }, + "aws_sqs_arn": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Description: "AWS SQS queue ARN for notification integration to connect to", + }, + "aws_sqs_role_arn": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Description: "AWS IAM role ARN for notification integration to assume", + }, "comment": &schema.Schema{ Type: schema.TypeString, Optional: true, @@ -90,6 +111,12 @@ func CreateNotificationIntegration(data *schema.ResourceData, meta interface{}) if v, ok := data.GetOk("comment"); ok { stmt.SetString(`COMMENT`, v.(string)) } + if v, ok := data.GetOk("direction"); ok { + stmt.SetString(`DIRECTION`, v.(string)) + } + if v, ok := data.GetOk("azure_tenant_id"); ok { + stmt.SetString(`AZURE_TENANT_ID`, v.(string)) + } if v, ok := data.GetOk("notification_provider"); ok { stmt.SetString(`NOTIFICATION_PROVIDER`, v.(string)) } @@ -99,6 +126,12 @@ func CreateNotificationIntegration(data *schema.ResourceData, meta interface{}) if v, ok := data.GetOk("azure_tenant_id"); ok { stmt.SetString(`AZURE_TENANT_ID`, v.(string)) } + if v, ok := data.GetOk("aws_sqs_arn"); ok { + stmt.SetString(`AWS_SQS_ARN`, v.(string)) + } + if v, ok := data.GetOk("aws_sqs_role_arn"); ok { + stmt.SetString(`AWS_SQS_ROLE_ARN`, v.(string)) + } err := snowflake.Exec(db, stmt.Statement()) if err != nil { @@ -167,6 +200,10 @@ func ReadNotificationIntegration(data *schema.ResourceData, meta interface{}) er switch k { case "ENABLED": // We set this using the SHOW INTEGRATION call so let's ignore it here + case "DIRECTION": + if err = data.Set("direction", v.(string)); err != nil { + return err + } case "NOTIFICATION_PROVIDER": if err = data.Set("notification_provider", v.(string)); err != nil { return err @@ -179,6 +216,18 @@ func ReadNotificationIntegration(data *schema.ResourceData, meta interface{}) er if err = data.Set("azure_tenant_id", v.(string)); err != nil { return err } + case "AWS_SQS_ARN": + if err = data.Set("aws_sqs_arn", v.(string)); err != nil { + return err + } + case "AWS_SQS_ROLE_ARN": + if err = data.Set("aws_sqs_role_arn", v.(string)); err != nil { + return err + } + case "AWS_SQS_EXTERNAL_ID": + if err = data.Set("aws_sqs_external_id", v.(string)); err != nil { + return err + } default: log.Printf("[WARN] unexpected property %v returned from Snowflake", k) } @@ -213,6 +262,11 @@ func UpdateNotificationIntegration(data *schema.ResourceData, meta interface{}) stmt.SetBool(`ENABLED`, data.Get("enabled").(bool)) } + if data.HasChange("direction") { + runSetStatement = true + stmt.SetString("DIRECTION", data.Get("direction").(string)) + } + if data.HasChange("notification_provider") { runSetStatement = true stmt.SetString("NOTIFICATION_PROVIDER", data.Get("notification_provider").(string)) @@ -228,6 +282,16 @@ func UpdateNotificationIntegration(data *schema.ResourceData, meta interface{}) stmt.SetString("AZURE_TENANT_ID", data.Get("azure_tenant_id").(string)) } + if data.HasChange("aws_sqs_arn") { + runSetStatement = true + stmt.SetString("AWS_SQS_ARN", data.Get("aws_sqs_arn").(string)) + } + + if data.HasChange("aws_sqs_role_arn") { + runSetStatement = true + stmt.SetString("AWS_SQS_ROLE_ARN", data.Get("aws_sqs_role_arn").(string)) + } + if runSetStatement { if err := snowflake.Exec(db, stmt.Statement()); err != nil { return fmt.Errorf("error updating notification integration: %w", err) diff --git a/pkg/resources/notification_integration_acceptance_test.go b/pkg/resources/notification_integration_acceptance_test.go index e1ef725b0b..00b6ec92aa 100644 --- a/pkg/resources/notification_integration_acceptance_test.go +++ b/pkg/resources/notification_integration_acceptance_test.go @@ -22,9 +22,10 @@ func TestAcc_NotificationIntegration(t *testing.T) { Providers: providers(), Steps: []resource.TestStep{ { - Config: notificationIntegrationConfig(accName, storageUri, tenant), + Config: azureNotificationIntegrationConfig(accName, storageUri, tenant), Check: resource.ComposeTestCheckFunc( resource.TestCheckResourceAttr("snowflake_notification_integration.test", "name", accName), + resource.TestCheckResourceAttr("snowflake_notification_integration.test", "notification_provider", "AZURE_STORAGE_QUEUE"), resource.TestCheckResourceAttr("snowflake_notification_integration.test", "azure_storage_queue_primary_uri", storageUri), resource.TestCheckResourceAttr("snowflake_notification_integration.test", "azure_tenant_id", tenant), ), @@ -33,13 +34,14 @@ func TestAcc_NotificationIntegration(t *testing.T) { }) } -func notificationIntegrationConfig(name string, azureStorageQueuePrimaryUri string, azureTenantId string) string { +func azureNotificationIntegrationConfig(name string, azureStorageQueuePrimaryUri string, azureTenantId string) string { s := ` resource "snowflake_notification_integration" "test" { name = "%s" + notification_provider = "%s" azure_storage_queue_primary_uri = "%s" azure_tenant_id = "%s" } ` - return fmt.Sprintf(s, name, azureStorageQueuePrimaryUri, azureTenantId) + return fmt.Sprintf(s, name, "AZURE_STORAGE_QUEUE", azureStorageQueuePrimaryUri, azureTenantId) } diff --git a/pkg/resources/notification_integration_test.go b/pkg/resources/notification_integration_test.go index 64f443fa87..c02edd6bdd 100644 --- a/pkg/resources/notification_integration_test.go +++ b/pkg/resources/notification_integration_test.go @@ -19,39 +19,73 @@ func TestNotificationIntegration(t *testing.T) { } func TestNotificationIntegrationCreate(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_notification_integration", - "comment": "great comment", - "azure_storage_queue_primary_uri": "azure://great-bucket/great-path/", - "azure_tenant_id": "some-guid", + testCases := []struct { + notificationProvider string + raw map[string]interface{} + expectSQL string + }{ + { + notificationProvider: "AZURE_STORAGE_QUEUE", + raw: map[string]interface{}{ + "name": "test_notification_integration", + "comment": "great comment", + "notification_provider": "AZURE_STORAGE_QUEUE", + "azure_storage_queue_primary_uri": "azure://great-bucket/great-path/", + "azure_tenant_id": "some-guid", + }, + expectSQL: `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://great-bucket/great-path/' AZURE_TENANT_ID='some-guid' COMMENT='great comment' NOTIFICATION_PROVIDER='AZURE_STORAGE_QUEUE' TYPE='QUEUE' ENABLED=true$`, + }, + { + notificationProvider: "AWS_SQS", + raw: map[string]interface{}{ + "name": "test_notification_integration", + "comment": "great comment", + "direction": "OUTBOUND", + "notification_provider": "AWS_SQS", + "aws_sqs_arn": "some-sqs-arn", + "aws_sqs_role_arn": "some-iam-role-arn", + }, + expectSQL: `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AWS_SQS_ARN='some-sqs-arn' AWS_SQS_ROLE_ARN='some-iam-role-arn' COMMENT='great comment' DIRECTION='OUTBOUND' NOTIFICATION_PROVIDER='AWS_SQS' TYPE='QUEUE' ENABLED=true$`, + }, + } + for _, testCase := range testCases { + r := require.New(t) + d := schema.TestResourceDataRaw(t, resources.NotificationIntegration().Schema, testCase.raw) + r.NotNil(d) + + WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec(testCase.expectSQL).WillReturnResult(sqlmock.NewResult(1, 1)) + expectReadNotificationIntegration(mock, testCase.notificationProvider) + + err := resources.CreateNotificationIntegration(d, db) + r.NoError(err) + }) } - d := schema.TestResourceDataRaw(t, resources.NotificationIntegration().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec( - `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://great-bucket/great-path/' AZURE_TENANT_ID='some-guid' COMMENT='great comment' NOTIFICATION_PROVIDER='AZURE_STORAGE_QUEUE' TYPE='QUEUE' ENABLED=true$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - expectReadNotificationIntegration(mock) - - err := resources.CreateNotificationIntegration(d, db) - r.NoError(err) - }) } func TestNotificationIntegrationRead(t *testing.T) { - r := require.New(t) + testCases := []struct { + notificationProvider string + }{ + { + notificationProvider: "AZURE_STORAGE_QUEUE", + }, + { + notificationProvider: "AWS_SQS", + }, + } + for _, testCase := range testCases { + r := require.New(t) - d := notificationIntegration(t, "test_notification_integration", map[string]interface{}{"name": "test_notification_integration"}) + d := notificationIntegration(t, "test_notification_integration", map[string]interface{}{"name": "test_notification_integration"}) - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - expectReadNotificationIntegration(mock) + WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { + expectReadNotificationIntegration(mock, testCase.notificationProvider) - err := resources.ReadNotificationIntegration(d, db) - r.NoError(err) - }) + err := resources.ReadNotificationIntegration(d, db) + r.NoError(err) + }) + } } func TestNotificationIntegrationDelete(t *testing.T) { @@ -66,7 +100,7 @@ func TestNotificationIntegrationDelete(t *testing.T) { }) } -func expectReadNotificationIntegration(mock sqlmock.Sqlmock) { +func expectReadNotificationIntegration(mock sqlmock.Sqlmock, notificationProvider string) { showRows := sqlmock.NewRows([]string{ "name", "type", "category", "enabled", "created_on"}, ).AddRow("test_notification_integration", "QUEUE", "NOTIFICATION", true, "now") @@ -74,10 +108,22 @@ func expectReadNotificationIntegration(mock sqlmock.Sqlmock) { descRows := sqlmock.NewRows([]string{ "property", "property_type", "property_value", "property_default", - }).AddRow("ENABLED", "Boolean", true, false). - AddRow("NOTIFICATION_PROVIDER", "String", "AZURE_STORAGE_QUEUE", nil). - AddRow("AZURE_STORAGE_QUEUE_PRIMARY_URI", "String", "azure://great-bucket/great-path/", nil). - AddRow("AZURE_TENANT_ID", "String", "some-guid", nil) + }).AddRow("ENABLED", "Boolean", true, false) + + switch notificationProvider { + case "AZURE_STORAGE_QUEUE": + descRows = descRows. + AddRow("NOTIFICATION_PROVIDER", "String", notificationProvider, nil). + AddRow("AZURE_STORAGE_QUEUE_PRIMARY_URI", "String", "azure://great-bucket/great-path/", nil). + AddRow("AZURE_TENANT_ID", "String", "some-guid", nil) + case "AWS_SQS": + descRows = descRows. + AddRow("NOTIFICATION_PROVIDER", "String", notificationProvider, nil). + AddRow("DIRECTION", "String", "OUTBOUND", nil). + AddRow("AWS_SQS_ARN", "String", "some-sqs-arn", nil). + AddRow("AWS_SQS_ROLE_ARN", "String", "some-iam-role-arn", nil). + AddRow("AWS_SQS_EXTERNAL_ID", "String", "AGreatExternalID", nil) + } mock.ExpectQuery(`DESCRIBE NOTIFICATION INTEGRATION "test_notification_integration"$`).WillReturnRows(descRows) } diff --git a/pkg/resources/pipe.go b/pkg/resources/pipe.go index f8193d9c04..5d66c49ad8 100644 --- a/pkg/resources/pipe.go +++ b/pkg/resources/pipe.go @@ -75,6 +75,11 @@ var pipeSchema = map[string]*schema.Schema{ Computed: true, Description: "Name of the role that owns the pipe.", }, + "error_integration": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Description: "Specifies the name of the notification integration used for error notifications.", + }, } func Pipe() *schema.Resource { @@ -176,6 +181,10 @@ func CreatePipe(d *schema.ResourceData, meta interface{}) error { builder.WithIntegration(v.(string)) } + if v, ok := d.GetOk("error_integration"); ok { + builder.WithErrorIntegration((v.(string))) + } + q := builder.Create() err := snowflake.Exec(db, q) @@ -267,6 +276,11 @@ func ReadPipe(d *schema.ResourceData, meta interface{}) error { return err } + err = d.Set("error_integration", pipe.ErrorIntegration.String) + if err != nil { + return err + } + return nil } @@ -293,6 +307,15 @@ func UpdatePipe(d *schema.ResourceData, meta interface{}) error { } } + if d.HasChange("error_integration") { + errorIntegration := d.Get("error_integration") + q := builder.ChangeErrorIntegration(errorIntegration.(string)) + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error updating pipe error_integration on %v", d.Id()) + } + } + return ReadPipe(d, meta) } diff --git a/pkg/resources/pipe_test.go b/pkg/resources/pipe_test.go index 98eef1db29..07c514017a 100644 --- a/pkg/resources/pipe_test.go +++ b/pkg/resources/pipe_test.go @@ -66,7 +66,7 @@ func TestPipeRead(t *testing.T) { func expectReadPipe(mock sqlmock.Sqlmock) { rows := sqlmock.NewRows([]string{ - "created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment"}, - ).AddRow("2019-12-23 17:20:50.088 +0000", "test_pipe", "test_db", "test_schema", "test definition", "N", "test", "great comment") + "created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment", "error_integration"}, + ).AddRow("2019-12-23 17:20:50.088 +0000", "test_pipe", "test_db", "test_schema", "test definition", "N", "test", "great comment", "test_integration") mock.ExpectQuery(`^SHOW PIPES LIKE 'test_pipe' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows) } diff --git a/pkg/snowflake/notification_integration_test.go b/pkg/snowflake/notification_integration_test.go index f2595695a5..78c18f9fd3 100644 --- a/pkg/snowflake/notification_integration_test.go +++ b/pkg/snowflake/notification_integration_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestNotificationIntegration(t *testing.T) { +func TestNotificationIntegration_Azure(t *testing.T) { r := require.New(t) builder := snowflake.NotificationIntegration("azure") r.NotNil(builder) @@ -25,3 +25,23 @@ func TestNotificationIntegration(t *testing.T) { r.Equal(`CREATE NOTIFICATION INTEGRATION "azure" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://my-bucket/my-path/' AZURE_TENANT_ID='some-guid' TYPE='QUEUE' ENABLED=true`, q) } + +func TestNotificationIntegration_AWS(t *testing.T) { + r := require.New(t) + builder := snowflake.NotificationIntegration("aws_sqs") + r.NotNil(builder) + + q := builder.Show() + r.Equal("SHOW NOTIFICATION INTEGRATIONS LIKE 'aws_sqs'", q) + + c := builder.Create() + + c.SetString(`type`, `QUEUE`) + c.SetString(`direction`, `OUTBOUND`) + c.SetString(`aws_sqs_arn`, `some-sqs-arn`) + c.SetString(`aws_sqs_role_arn`, `some-iam-role-arn`) + c.SetBool(`enabled`, true) + q = c.Statement() + + r.Equal(`CREATE NOTIFICATION INTEGRATION "aws_sqs" AWS_SQS_ARN='some-sqs-arn' AWS_SQS_ROLE_ARN='some-iam-role-arn' DIRECTION='OUTBOUND' TYPE='QUEUE' ENABLED=true`, q) +} diff --git a/pkg/snowflake/pipe.go b/pkg/snowflake/pipe.go index 88b6ecd1d7..b0f5f13e14 100644 --- a/pkg/snowflake/pipe.go +++ b/pkg/snowflake/pipe.go @@ -10,14 +10,15 @@ import ( // PipeBuilder abstracts the creation of SQL queries for a Snowflake schema type PipeBuilder struct { - name string - db string - schema string - autoIngest bool - awsSnsTopicArn string - comment string - copyStatement string - integration string + name string + db string + schema string + autoIngest bool + awsSnsTopicArn string + comment string + copyStatement string + integration string + errorIntegration string } // QualifiedName prepends the db and schema if set and escapes everything nicely @@ -71,6 +72,12 @@ func (pb *PipeBuilder) WithIntegration(s string) *PipeBuilder { return pb } +/// WithErrorIntegration adds ErrorIntegration specification to the PipeBuilder +func (pb *PipeBuilder) WithErrorIntegration(s string) *PipeBuilder { + pb.errorIntegration = s + return pb +} + // Pipe returns a pointer to a Builder that abstracts the DDL operations for a pipe. // // Supported DDL operations are: @@ -103,6 +110,10 @@ func (pb *PipeBuilder) Create() string { q.WriteString(fmt.Sprintf(` INTEGRATION = '%v'`, EscapeString(pb.integration))) } + if pb.errorIntegration != "" { + q.WriteString(fmt.Sprintf(` ERROR_INTEGRATION = '%v'`, EscapeString(pb.errorIntegration))) + } + if pb.awsSnsTopicArn != "" { q.WriteString(fmt.Sprintf(` AWS_SNS_TOPIC = '%v'`, EscapeString(pb.awsSnsTopicArn))) } @@ -127,6 +138,11 @@ func (pb *PipeBuilder) RemoveComment() string { return fmt.Sprintf(`ALTER PIPE %v UNSET COMMENT`, pb.QualifiedName()) } +// ChangeErrorIntegration return SQL query that will update the error_integration on the pipe. +func (pb *PipeBuilder) ChangeErrorIntegration(c string) string { + return fmt.Sprintf(`ALTER PIPE %v SET ERROR_INTEGRATION = %v`, pb.QualifiedName(), EscapeString(c)) +} + // Drop returns the SQL query that will drop a pipe. func (pb *PipeBuilder) Drop() string { return fmt.Sprintf(`DROP PIPE %v`, pb.QualifiedName()) @@ -147,6 +163,7 @@ type pipe struct { NotificationChannel *string `db:"notification_channel"` Comment string `db:"comment"` Integration sql.NullString `db:"integration"` + ErrorIntegration sql.NullString `db:"error_integration"` } func ScanPipe(row *sqlx.Row) (*pipe, error) {