Skip to content

Commit

Permalink
feat: Streams on views (#1112)
Browse files Browse the repository at this point in the history
* Allow streams on views

* Tests

* Add view name to read

* Docs
  • Loading branch information
gouline authored Jul 14, 2022
1 parent 2705970 commit 7a27b40
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 43 deletions.
1 change: 1 addition & 0 deletions docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ resource "snowflake_stream" "stream" {
- `comment` (String) Specifies a comment for the stream.
- `insert_only` (Boolean) Create an insert only stream type.
- `on_table` (String) Name of the table the stream will monitor.
- `on_view` (String) Name of the view the stream will monitor.
- `show_initial_rows` (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.

### Read-Only
Expand Down
12 changes: 6 additions & 6 deletions pkg/resources/external_table_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ func ExternalTestTableIDFromString(t *testing.T) {

// Bad ID -- not enough fields
id = "database"
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("3 fields allowed"), err)

// Bad ID
id = "||"
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.NoError(err)

// 0 lines
id = ""
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)

// 2 lines
id = `database_name|schema_name|table
database_name|schema_name|table`
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)
}

Expand Down Expand Up @@ -65,8 +65,8 @@ func ExternalTestTableStruct(t *testing.T) {
}
sID, err = table.String()
r.NoError(err)
newTable, err := streamOnTableIDFromString(sID)
newTable, err := streamOnObjectIDFromString(sID)
r.NoError(err)
r.Equal("database|name", newTable.DatabaseName)
r.Equal("table|name", newTable.OnTableName)
r.Equal("table|name", newTable.Name)
}
89 changes: 62 additions & 27 deletions pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const (
streamIDDelimiter = '|'
streamOnTableIDDelimiter = '.'
streamIDDelimiter = '|'
streamOnObjectIDDelimiter = '.'
)

var streamSchema = map[string]*schema.Schema{
Expand Down Expand Up @@ -43,10 +43,18 @@ var streamSchema = map[string]*schema.Schema{
Description: "Specifies a comment for the stream.",
},
"on_table": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the table the stream will monitor.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the table the stream will monitor.",
ExactlyOneOf: []string{"on_table", "on_view"},
},
"on_view": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the view the stream will monitor.",
ExactlyOneOf: []string{"on_table", "on_view"},
},
"append_only": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -96,10 +104,10 @@ type streamID struct {
StreamName string
}

type streamOnTableID struct {
type streamOnObjectID struct {
DatabaseName string
SchemaName string
OnTableName string
Name string
}

//String() takes in a streamID object and returns a pipe-delimited string:
Expand Down Expand Up @@ -142,11 +150,11 @@ func streamIDFromString(stringID string) (*streamID, error) {
return streamResult, nil
}

// streamOnTableIDFromString() takes in a dot-delimited string: DatabaseName.SchemaName.TableName
// and returns a streamOnTableID object
func streamOnTableIDFromString(stringID string) (*streamOnTableID, error) {
// streamOnObjectIDFromString() takes in a dot-delimited string: DatabaseName.SchemaName.TableName
// and returns a streamOnObjectID object
func streamOnObjectIDFromString(stringID string) (*streamOnObjectID, error) {
reader := csv.NewReader(strings.NewReader(stringID))
reader.Comma = streamOnTableIDDelimiter
reader.Comma = streamOnObjectIDDelimiter
lines, err := reader.ReadAll()
if err != nil {
return nil, fmt.Errorf("Not CSV compatible")
Expand All @@ -160,10 +168,10 @@ func streamOnTableIDFromString(stringID string) (*streamOnTableID, error) {
return nil, fmt.Errorf("invalid format for on_table: %v , expected: <database_name.schema_name.target_table_name>", strings.Join(lines[0], "."))
}

streamOnTableResult := &streamOnTableID{
streamOnTableResult := &streamOnObjectID{
DatabaseName: lines[0][0],
SchemaName: lines[0][1],
OnTableName: lines[0][2],
Name: lines[0][2],
}
return streamOnTableResult, nil
}
Expand All @@ -174,28 +182,50 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
database := d.Get("database").(string)
schema := d.Get("schema").(string)
name := d.Get("name").(string)
onTable := d.Get("on_table").(string)
appendOnly := d.Get("append_only").(bool)
insertOnly := d.Get("insert_only").(bool)
showInitialRows := d.Get("show_initial_rows").(bool)

builder := snowflake.Stream(name, database, schema)

resultOnTable, err := streamOnTableIDFromString(onTable)
if err != nil {
return err
}
onTable, onTableSet := d.GetOk("on_table")
onView, onViewSet := d.GetOk("on_view")

tq := snowflake.Table(resultOnTable.OnTableName, resultOnTable.DatabaseName, resultOnTable.SchemaName).Show()
tableRow := snowflake.QueryRow(db, tq)
if (onTableSet && onViewSet) || !(onTableSet || onViewSet) {
return fmt.Errorf("exactly one of 'on_table' or 'on_view' expected")
} else if onTableSet {
id, err := streamOnObjectIDFromString(onTable.(string))
if err != nil {
return err
}

t, err := snowflake.ScanTable(tableRow)
if err != nil {
return err
tq := snowflake.Table(id.Name, id.DatabaseName, id.SchemaName).Show()
tableRow := snowflake.QueryRow(db, tq)

t, err := snowflake.ScanTable(tableRow)
if err != nil {
return err
}

builder.WithExternalTable(t.IsExternal.String == "Y")
builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String)
} else if onViewSet {
id, err := streamOnObjectIDFromString(onView.(string))
if err != nil {
return err
}

tq := snowflake.View(id.Name).WithDB(id.DatabaseName).WithSchema(id.SchemaName).Show()
viewRow := snowflake.QueryRow(db, tq)

t, err := snowflake.ScanView(viewRow)
if err != nil {
return err
}

builder.WithOnView(t.DatabaseName.String, t.SchemaName.String, t.Name.String)
}

builder.WithExternalTable(t.IsExternal.String == "Y")
builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String)
builder.WithAppendOnly(appendOnly)
builder.WithInsertOnly(insertOnly)
builder.WithShowInitialRows(showInitialRows)
Expand All @@ -206,7 +236,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
}

stmt := builder.Create()
err = snowflake.Exec(db, stmt)
err := snowflake.Exec(db, stmt)
if err != nil {
return errors.Wrapf(err, "error creating stream %v", name)
}
Expand Down Expand Up @@ -270,6 +300,11 @@ func ReadStream(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("on_view", stream.ViewName.String)
if err != nil {
return err
}

err = d.Set("append_only", stream.Mode.String == "APPEND_ONLY")
if err != nil {
return err
Expand Down
68 changes: 68 additions & 0 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ func TestAcc_Stream(t *testing.T) {
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
Config: viewStreamConfig(accName, false),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_view", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_VIEW")),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "insert_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
ResourceName: "snowflake_stream.test_stream",
ImportState: true,
Expand Down Expand Up @@ -192,3 +205,58 @@ resource "snowflake_stream" "test_external_table_stream" {

return fmt.Sprintf(s, name, name, name, name, locations, name, insert_only_config)
}

func viewStreamConfig(name string, append_only bool) string {
append_only_config := ""
if append_only {
append_only_config = "append_only = true"
}

s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}
resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}
resource "snowflake_table" "test_stream_on_view" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "STREAM_ON_VIEW_TABLE"
comment = "Terraform acceptance test"
change_tracking = true
column {
name = "column1"
type = "VARIANT"
}
column {
name = "column2"
type = "VARCHAR(16777216)"
}
}
resource "snowflake_view" "test_stream_on_view" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "STREAM_ON_VIEW"
statement = "select * from ${snowflake_table.test_stream_on_view.name}"
}
resource "snowflake_stream" "test_stream" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
on_view = "${snowflake_database.test_database.name}.${snowflake_schema.test_schema.name}.${snowflake_view.test_stream_on_view.name}"
%s
}
`
return fmt.Sprintf(s, name, name, name, append_only_config)
}
12 changes: 6 additions & 6 deletions pkg/resources/stream_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,30 @@ func TestStreamOnTableIDFromString(t *testing.T) {
r := require.New(t)
// Vanilla
id := "database_name.schema_name.target_table_name"
streamOnTable, err := streamOnTableIDFromString(id)
streamOnTable, err := streamOnObjectIDFromString(id)
r.NoError(err)
r.Equal("database_name", streamOnTable.DatabaseName)
r.Equal("schema_name", streamOnTable.SchemaName)
r.Equal("target_table_name", streamOnTable.OnTableName)
r.Equal("target_table_name", streamOnTable.Name)

// Bad ID -- not enough fields
id = "database.schema"
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("invalid format for on_table: database.schema , expected: <database_name.schema_name.target_table_name>"), err)

// Bad ID
id = ".."
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.NoError(err)

// 0 lines
id = ""
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)

// 2 lines
id = `database_name.schema_name.target_table_name
database_name.schema_name.target_table_name`
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)
}
52 changes: 52 additions & 0 deletions pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,53 @@ func TestStreamCreateOnExternalTable(t *testing.T) {
})
}

func TestStreamCreateOnView(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_view": "target_db.target_schema.target_view",
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON VIEW "target_db"."target_schema"."target_view" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
expectOnViewRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
})
}

func TestStreamOnTableOrView(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"on_view": "target_db.target_schema.target_view",
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
err := resources.CreateStream(d, db)
r.ErrorContains(err, "exactly one of")
})
}

func expectStreamRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"name", "database_name", "schema_name", "owner", "comment", "table_name", "type", "stale", "mode"}).AddRow("stream_name", "database_name", "schema_name", "owner_name", "grand comment", "target_table", "DELTA", false, "APPEND_ONLY")
mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN SCHEMA "database_name"."schema_name"`).WillReturnRows(rows)
Expand All @@ -83,6 +130,11 @@ func expectOnExternalTableRead(mock sqlmock.Sqlmock) {
mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows)
}

func expectOnViewRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_view", "target_db", "target_schema", "VIEW", "mock comment", "", "", "", "", 1, "OFF", "OFF", "Y")
mock.ExpectQuery(`SHOW VIEWS LIKE 'target_view' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows)
}

func TestStreamRead(t *testing.T) {
r := require.New(t)

Expand Down
Loading

0 comments on commit 7a27b40

Please sign in to comment.