From 477431986be943f1af4c4a64b6c42de154f0192a Mon Sep 17 00:00:00 2001
From: Mo Omer <beancinematics@gmail.com>
Date: Mon, 23 Aug 2021 17:02:28 -0500
Subject: [PATCH] feat: Add INSERT_ONLY option to streams (#655)

---
 docs/data-sources/functions.md                | 50 +++++++++++++++++++
 docs/data-sources/procedures.md               | 50 +++++++++++++++++++
 docs/resources/stream.md                      |  2 +
 .../resources/snowflake_stream/resource.tf    |  1 +
 pkg/resources/stream.go                       | 14 ++++++
 pkg/resources/stream_acceptance_test.go       |  1 +
 pkg/resources/stream_test.go                  |  4 +-
 pkg/snowflake/stream.go                       |  9 ++++
 pkg/snowflake/stream_test.go                  | 11 ++--
 9 files changed, 137 insertions(+), 5 deletions(-)
 create mode 100644 docs/data-sources/functions.md
 create mode 100644 docs/data-sources/procedures.md

diff --git a/docs/data-sources/functions.md b/docs/data-sources/functions.md
new file mode 100644
index 0000000000..e8fbeb93d2
--- /dev/null
+++ b/docs/data-sources/functions.md
@@ -0,0 +1,50 @@
+---
+# generated by https://github.com/hashicorp/terraform-plugin-docs
+page_title: "snowflake_functions Data Source - terraform-provider-snowflake"
+subcategory: ""
+description: |-
+  
+---
+
+# snowflake_functions (Data Source)
+
+
+
+## Example Usage
+
+```terraform
+data "snowflake_functions" "current" {
+    database = "MYDB"
+    schema   = "MYSCHEMA"
+}
+```
+
+<!-- schema generated by tfplugindocs -->
+## Schema
+
+### Required
+
+- **database** (String) The database from which to return the schemas from.
+- **schema** (String) The schema from which to return the functions from.
+
+### Optional
+
+- **id** (String) The ID of this resource.
+
+### Read-Only
+
+- **functions** (List of Object) The functions in the schema (see [below for nested schema](#nestedatt--functions))
+
+<a id="nestedatt--functions"></a>
+### Nested Schema for `functions`
+
+Read-Only:
+
+- **argument_types** (List of String)
+- **comment** (String)
+- **database** (String)
+- **name** (String)
+- **return_type** (String)
+- **schema** (String)
+
+
diff --git a/docs/data-sources/procedures.md b/docs/data-sources/procedures.md
new file mode 100644
index 0000000000..403d28b676
--- /dev/null
+++ b/docs/data-sources/procedures.md
@@ -0,0 +1,50 @@
+---
+# generated by https://github.com/hashicorp/terraform-plugin-docs
+page_title: "snowflake_procedures Data Source - terraform-provider-snowflake"
+subcategory: ""
+description: |-
+  
+---
+
+# snowflake_procedures (Data Source)
+
+
+
+## Example Usage
+
+```terraform
+data "snowflake_procedures" "current" {
+    database = "MYDB"
+    schema   = "MYSCHEMA"
+}
+```
+
+<!-- schema generated by tfplugindocs -->
+## Schema
+
+### Required
+
+- **database** (String) The database from which to return the schemas from.
+- **schema** (String) The schema from which to return the procedures from.
+
+### Optional
+
+- **id** (String) The ID of this resource.
+
+### Read-Only
+
+- **procedures** (List of Object) The procedures in the schema (see [below for nested schema](#nestedatt--procedures))
+
+<a id="nestedatt--procedures"></a>
+### Nested Schema for `procedures`
+
+Read-Only:
+
+- **argument_types** (List of String)
+- **comment** (String)
+- **database** (String)
+- **name** (String)
+- **return_type** (String)
+- **schema** (String)
+
+
diff --git a/docs/resources/stream.md b/docs/resources/stream.md
index e303aa78c2..88440de244 100644
--- a/docs/resources/stream.md
+++ b/docs/resources/stream.md
@@ -22,6 +22,7 @@ resource snowflake_stream stream {
 
   on_table    = "table"
   append_only = false
+  insert_only = false
 
   owner = "role1"
 }
@@ -41,6 +42,7 @@ resource snowflake_stream stream {
 - **append_only** (Boolean) Type of the stream that will be created.
 - **comment** (String) Specifies a comment for the stream.
 - **id** (String) The ID of this resource.
+- **insert_only** (Boolean) Create an insert only stream type.
 - **on_table** (String) Name of the table the stream will monitor.
 - **show_initial_rows** (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.
 
diff --git a/examples/resources/snowflake_stream/resource.tf b/examples/resources/snowflake_stream/resource.tf
index 3668d37dd8..d490d3433a 100644
--- a/examples/resources/snowflake_stream/resource.tf
+++ b/examples/resources/snowflake_stream/resource.tf
@@ -7,6 +7,7 @@ resource snowflake_stream stream {
 
   on_table    = "table"
   append_only = false
+  insert_only = false
 
   owner = "role1"
 }
diff --git a/pkg/resources/stream.go b/pkg/resources/stream.go
index f97acd6a15..f8bc7aaf91 100644
--- a/pkg/resources/stream.go
+++ b/pkg/resources/stream.go
@@ -55,6 +55,13 @@ var streamSchema = map[string]*schema.Schema{
 		Default:     false,
 		Description: "Type of the stream that will be created.",
 	},
+	"insert_only": {
+		Type:        schema.TypeBool,
+		Optional:    true,
+		ForceNew:    true,
+		Default:     false,
+		Description: "Create an insert only stream type.",
+	},
 	"show_initial_rows": {
 		Type:        schema.TypeBool,
 		Optional:    true,
@@ -169,6 +176,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
 	name := d.Get("name").(string)
 	onTable := d.Get("on_table").(string)
 	appendOnly := d.Get("append_only").(bool)
+	insertOnly := d.Get("insert_only").(bool)
 	showInitialRows := d.Get("show_initial_rows").(bool)
 
 	builder := snowflake.Stream(name, database, schema)
@@ -180,6 +188,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
 
 	builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName)
 	builder.WithAppendOnly(appendOnly)
+	builder.WithInsertOnly(insertOnly)
 	builder.WithShowInitialRows(showInitialRows)
 
 	// Set optionals
@@ -247,6 +256,11 @@ func ReadStream(d *schema.ResourceData, meta interface{}) error {
 		return err
 	}
 
+	err = d.Set("insert_only", stream.InsertOnly)
+	if err != nil {
+		return err
+	}
+
 	err = d.Set("show_initial_rows", stream.ShowInitialRows)
 	if err != nil {
 		return err
diff --git a/pkg/resources/stream_acceptance_test.go b/pkg/resources/stream_acceptance_test.go
index 883463a42d..d271c377bf 100644
--- a/pkg/resources/stream_acceptance_test.go
+++ b/pkg/resources/stream_acceptance_test.go
@@ -24,6 +24,7 @@ func TestAcc_Stream(t *testing.T) {
 					resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_TABLE")),
 					resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
 					checkBool("snowflake_stream.test_stream", "append_only", false),
+					checkBool("snowflake_stream.test_stream", "insert_only", false),
 					checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
 				),
 			},
diff --git a/pkg/resources/stream_test.go b/pkg/resources/stream_test.go
index 48ffd4fbdb..7a532049fc 100644
--- a/pkg/resources/stream_test.go
+++ b/pkg/resources/stream_test.go
@@ -28,12 +28,13 @@ func TestStreamCreate(t *testing.T) {
 		"comment":           "great comment",
 		"on_table":          "target_db.target_schema.target_table",
 		"append_only":       true,
+		"insert_only":       false,
 		"show_initial_rows": true,
 	}
 	d := stream(t, "database_name|schema_name|stream_name", in)
 
 	WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
-		mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
+		mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
 		expectStreamRead(mock)
 		err := resources.CreateStream(d, db)
 		r.NoError(err)
@@ -118,6 +119,7 @@ func TestStreamUpdate(t *testing.T) {
 		"comment":     "new stream comment",
 		"on_table":    "target_table",
 		"append_only": true,
+		"insert_only": false,
 	}
 
 	d := stream(t, "database_name|schema_name|stream_name", in)
diff --git a/pkg/snowflake/stream.go b/pkg/snowflake/stream.go
index 7e51e6acae..429d8de473 100644
--- a/pkg/snowflake/stream.go
+++ b/pkg/snowflake/stream.go
@@ -17,6 +17,7 @@ type StreamBuilder struct {
 	schema          string
 	onTable         string
 	appendOnly      bool
+	insertOnly      bool
 	showInitialRows bool
 	comment         string
 }
@@ -57,6 +58,11 @@ func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder {
 	return sb
 }
 
+func (sb *StreamBuilder) WithInsertOnly(b bool) *StreamBuilder {
+	sb.insertOnly = b
+	return sb
+}
+
 func (sb *StreamBuilder) WithShowInitialRows(b bool) *StreamBuilder {
 	sb.showInitialRows = b
 	return sb
@@ -92,6 +98,8 @@ func (sb *StreamBuilder) Create() string {
 
 	q.WriteString(fmt.Sprintf(` APPEND_ONLY = %v`, sb.appendOnly))
 
+	q.WriteString(fmt.Sprintf(` INSERT_ONLY = %v`, sb.insertOnly))
+
 	q.WriteString(fmt.Sprintf(` SHOW_INITIAL_ROWS = %v`, sb.showInitialRows))
 
 	return q.String()
@@ -125,6 +133,7 @@ type descStreamRow struct {
 	Owner           sql.NullString `db:"owner"`
 	Comment         sql.NullString `db:"comment"`
 	AppendOnly      bool           `db:"append_only"`
+	InsertOnly      bool           `db:"insert_only"`
 	ShowInitialRows bool           `db:"show_initial_rows"`
 	TableName       sql.NullString `db:"table_name"`
 	Type            sql.NullString `db:"type"`
diff --git a/pkg/snowflake/stream_test.go b/pkg/snowflake/stream_test.go
index 9ebadf6680..b8a5ad86c2 100644
--- a/pkg/snowflake/stream_test.go
+++ b/pkg/snowflake/stream_test.go
@@ -11,16 +11,19 @@ func TestStreamCreate(t *testing.T) {
 	s := Stream("test_stream", "test_db", "test_schema")
 
 	s.WithOnTable("test_db", "test_schema", "test_target_table")
-	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" APPEND_ONLY = false SHOW_INITIAL_ROWS = false`)
+	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" APPEND_ONLY = false INSERT_ONLY = false SHOW_INITIAL_ROWS = false`)
 
 	s.WithComment("Test Comment")
-	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false SHOW_INITIAL_ROWS = false`)
+	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false INSERT_ONLY = false SHOW_INITIAL_ROWS = false`)
 
 	s.WithShowInitialRows(true)
-	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false SHOW_INITIAL_ROWS = true`)
+	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false INSERT_ONLY = false SHOW_INITIAL_ROWS = true`)
 
 	s.WithAppendOnly(true)
-	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true SHOW_INITIAL_ROWS = true`)
+	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`)
+
+	s.WithInsertOnly(true)
+	r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)
 }
 
 func TestStreamChangeComment(t *testing.T) {