Skip to content

Commit

Permalink
feat: Add support for creation of streams on external tables (#999)
Browse files Browse the repository at this point in the history
Allow the creation of `STREAMS` on `EXTERNAL TABLE`s.
  • Loading branch information
adamantike authored Jun 7, 2022
1 parent e1e23b9 commit 0ee1d55
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ description: |-
## Example Usage

```terraform
resource snowflake_stream stream {
resource "snowflake_stream" "stream" {
comment = "A stream."
database = "db"
Expand Down
2 changes: 1 addition & 1 deletion examples/resources/snowflake_stream/resource.tf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource snowflake_stream stream {
resource "snowflake_stream" "stream" {
comment = "A stream."

database = "db"
Expand Down
4 changes: 2 additions & 2 deletions pkg/resources/external_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ resource "snowflake_external_table" "test_table" {
column {
name = "column1"
type = "STRING"
as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')"
as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')"
}
column {
name = "column2"
type = "TIMESTAMP_NTZ(9)"
as = "($1:'CreatedDate'::timestamp)"
as = "($1:\"CreatedDate\"::timestamp)"
}
file_format = "TYPE = CSV"
location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}"
Expand Down
11 changes: 10 additions & 1 deletion pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,16 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
return err
}

builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName)
tq := snowflake.Table(resultOnTable.OnTableName, resultOnTable.DatabaseName, resultOnTable.SchemaName).Show()
tableRow := snowflake.QueryRow(db, tq)

t, err := snowflake.ScanTable(tableRow)
if err != nil {
return err
}

builder.WithExternalTable(t.IsExternal.String == "Y")
builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String)
builder.WithAppendOnly(appendOnly)
builder.WithInsertOnly(insertOnly)
builder.WithShowInitialRows(showInitialRows)
Expand Down
75 changes: 75 additions & 0 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package resources_test

import (
"fmt"
"os"
"strings"
"testing"

Expand All @@ -10,7 +11,11 @@ import (
)

func TestAcc_Stream(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_EXTERNAL_TABLE_TESTS"); ok {
t.Skip("Skipping TestAccStream")
}
accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))
accNameExternalTable := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))

resource.ParallelTest(t, resource.TestCase{
Providers: providers(),
Expand Down Expand Up @@ -41,6 +46,18 @@ func TestAcc_Stream(t *testing.T) {
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
Config: externalTableStreamConfig(accNameExternalTable),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accNameExternalTable),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accNameExternalTable),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accNameExternalTable),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accNameExternalTable, accNameExternalTable, "STREAM_ON_EXTERNAL_TABLE")),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
ResourceName: "snowflake_stream.test_stream",
ImportState: true,
Expand Down Expand Up @@ -96,3 +113,61 @@ resource "snowflake_stream" "test_stream" {
`
return fmt.Sprintf(s, name, name, name, append_only_config)
}

func externalTableStreamConfig(name string) string {
// Refer to external_table_acceptance_test.go for the original source on
// external table resources and dependents (modified slightly here).
locations := []string{"s3://com.example.bucket/prefix"}
s := `
resource "snowflake_database" "test" {
name = "%v"
comment = "Terraform acceptance test"
}
resource "snowflake_schema" "test" {
name = "%v"
database = snowflake_database.test.name
comment = "Terraform acceptance test"
}
resource "snowflake_stage" "test" {
name = "%v"
url = "s3://com.example.bucket/prefix"
database = snowflake_database.test.name
schema = snowflake_schema.test.name
comment = "Terraform acceptance test"
storage_integration = snowflake_storage_integration.external_table_stream_integration.name
}
resource "snowflake_storage_integration" "external_table_stream_integration" {
name = "%v"
storage_allowed_locations = %q
storage_provider = "S3"
storage_aws_role_arn = "arn:aws:iam::000000000001:/role/test"
}
resource "snowflake_external_table" "test_external_stream_table" {
database = snowflake_database.test.name
schema = snowflake_schema.test.name
name = "STREAM_ON_EXTERNAL_TABLE"
comment = "Terraform acceptance test"
column {
name = "column1"
type = "STRING"
as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')"
}
column {
name = "column2"
type = "TIMESTAMP_NTZ(9)"
as = "($1:\"CreatedDate\"::timestamp)"
}
file_format = "TYPE = CSV"
location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}"
}
resource "snowflake_stream" "test_external_table_stream" {
database = snowflake_database.test.name
schema = snowflake_schema.test.name
name = "%s"
comment = "Terraform acceptance test"
on_table = "${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_external_table.test_external_stream_table.name}"
}
`

return fmt.Sprintf(s, name, name, name, name, locations, name)
}
36 changes: 36 additions & 0 deletions pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ func TestStreamCreate(t *testing.T) {
WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
expectOnTableRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
})
}

func TestStreamCreateOnExternalTable(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON EXTERNAL TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
expectOnExternalTableRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
Expand All @@ -47,6 +73,16 @@ func expectStreamRead(mock sqlmock.Sqlmock) {
mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN SCHEMA "database_name"."schema_name"`).WillReturnRows(rows)
}

func expectOnTableRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_table", "target_db", "target_schema", "TABLE", "mock comment", "", "", "", "", 1, "OFF", "OFF", "N")
mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows)
}

func expectOnExternalTableRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_table", "target_db", "target_schema", "TABLE", "mock comment", "", "", "", "", 1, "OFF", "OFF", "Y")
mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows)
}

func TestStreamRead(t *testing.T) {
r := require.New(t)

Expand Down
14 changes: 13 additions & 1 deletion pkg/snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type StreamBuilder struct {
name string
db string
schema string
externalTable bool
onTable string
appendOnly bool
insertOnly bool
Expand Down Expand Up @@ -53,6 +54,11 @@ func (sb *StreamBuilder) WithOnTable(d string, s string, t string) *StreamBuilde
return sb
}

func (sb *StreamBuilder) WithExternalTable(b bool) *StreamBuilder {
sb.externalTable = b
return sb
}

func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder {
sb.appendOnly = b
return sb
Expand Down Expand Up @@ -90,7 +96,13 @@ func (sb *StreamBuilder) Create() string {
q := strings.Builder{}
q.WriteString(fmt.Sprintf(`CREATE STREAM %v`, sb.QualifiedName()))

q.WriteString(fmt.Sprintf(` ON TABLE %v`, sb.onTable))
q.WriteString(` ON`)

if sb.externalTable {
q.WriteString(` EXTERNAL`)
}

q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable))

if sb.comment != "" {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(sb.comment)))
Expand Down
3 changes: 3 additions & 0 deletions pkg/snowflake/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func TestStreamCreate(t *testing.T) {

s.WithInsertOnly(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)

s.WithExternalTable(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON EXTERNAL TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)
}

func TestStreamChangeComment(t *testing.T) {
Expand Down

0 comments on commit 0ee1d55

Please sign in to comment.