From 3f3fdd1e187185cf6e1c90dc1c8f432f7349292e Mon Sep 17 00:00:00 2001 From: Ben Rosen <34146575+berosen@users.noreply.github.com> Date: Fri, 21 May 2021 10:26:57 -0700 Subject: [PATCH] Feature/table clustering (#548) --- docs/resources/table.md | 20 ++- .../resources/snowflake_table/resource.tf | 19 ++- pkg/resources/table.go | 40 ++++- pkg/resources/table_acceptance_test.go | 145 ++++++++++++++++++ pkg/snowflake/table.go | 58 ++++++- pkg/snowflake/table_test.go | 16 ++ 6 files changed, 275 insertions(+), 23 deletions(-) diff --git a/docs/resources/table.md b/docs/resources/table.md index bf9058aae9..f4d13616a3 100644 --- a/docs/resources/table.md +++ b/docs/resources/table.md @@ -14,12 +14,14 @@ description: |- ```terraform resource snowflake_table table { - database = "database" - schema = "schmea" - name = "table" - comment = "A table." - owner = "me" - + database = "database" + schema = "schmea" + name = "table" + comment = "A table." + cluster_by = ["to_date(DATE)"] + + owner = "me" + column { name = "id" type = "int" @@ -29,6 +31,11 @@ resource snowflake_table table { name = "data" type = "text" } + + column { + name = "DATE" + type = "TIMESTAMP_NTZ(9)" + } } ``` @@ -44,6 +51,7 @@ resource snowflake_table table { ### Optional +- **cluster_by** (List of String) A list of one of more table columns/expressions to be used as clustering key(s) for the table - **comment** (String) Specifies a comment for the table. - **id** (String) The ID of this resource. diff --git a/examples/resources/snowflake_table/resource.tf b/examples/resources/snowflake_table/resource.tf index 32b6b2e871..ce3b9caeab 100644 --- a/examples/resources/snowflake_table/resource.tf +++ b/examples/resources/snowflake_table/resource.tf @@ -1,10 +1,12 @@ resource snowflake_table table { - database = "database" - schema = "schmea" - name = "table" - comment = "A table." - owner = "me" - + database = "database" + schema = "schmea" + name = "table" + comment = "A table." + cluster_by = ["to_date(DATE)"] + + owner = "me" + column { name = "id" type = "int" @@ -14,4 +16,9 @@ resource snowflake_table table { name = "data" type = "text" } + + column { + name = "DATE" + type = "TIMESTAMP_NTZ(9)" + } } diff --git a/pkg/resources/table.go b/pkg/resources/table.go index 5db15aca7e..7f1dc82450 100644 --- a/pkg/resources/table.go +++ b/pkg/resources/table.go @@ -36,6 +36,12 @@ var tableSchema = map[string]*schema.Schema{ ForceNew: true, Description: "The database in which to create the table.", }, + "cluster_by": { + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeString}, + Optional: true, + Description: "A list of one of more table columns/expressions to be used as clustering key(s) for the table", + }, "column": { Type: schema.TypeList, Required: true, @@ -213,6 +219,10 @@ func CreateTable(d *schema.ResourceData, meta interface{}) error { builder.WithComment(v.(string)) } + if v, ok := d.GetOk("cluster_by"); ok { + builder.WithClustering(expandStringList(v.([]interface{}))) + } + stmt := builder.Create() err := snowflake.Exec(db, stmt) if err != nil { @@ -267,12 +277,13 @@ func ReadTable(d *schema.ResourceData, meta interface{}) error { // Set the relevant data in the state toSet := map[string]interface{}{ - "name": table.TableName.String, - "owner": table.Owner.String, - "database": tableID.DatabaseName, - "schema": tableID.SchemaName, - "comment": table.Comment.String, - "column": snowflake.NewColumns(tableDescription).Flatten(), + "name": table.TableName.String, + "owner": table.Owner.String, + "database": tableID.DatabaseName, + "schema": tableID.SchemaName, + "comment": table.Comment.String, + "column": snowflake.NewColumns(tableDescription).Flatten(), + "cluster_by": snowflake.ClusterStatementToList(table.ClusterBy.String), } for key, val := range toSet { @@ -306,6 +317,23 @@ func UpdateTable(d *schema.ResourceData, meta interface{}) error { return errors.Wrapf(err, "error updating table comment on %v", d.Id()) } } + + if d.HasChange("cluster_by") { + cb := expandStringList(d.Get("cluster_by").([]interface{})) + + var q string + if len(cb) != 0 { + builder.WithClustering(cb) + q = builder.ChangeClusterBy(builder.GetClusterKeyString()) + } else { + q = builder.DropClustering() + } + + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error updating table clustering on %v", d.Id()) + } + } if d.HasChange("column") { old, new := d.GetChange("column") removed, added, changed := getColumns(old).diffs(getColumns(new)) diff --git a/pkg/resources/table_acceptance_test.go b/pkg/resources/table_acceptance_test.go index 74bbc3ede8..12d31d9749 100644 --- a/pkg/resources/table_acceptance_test.go +++ b/pkg/resources/table_acceptance_test.go @@ -12,6 +12,8 @@ import ( func TestAcc_Table(t *testing.T) { accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + table2Name := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + resource.ParallelTest(t, resource.TestCase{ Providers: providers(), Steps: []resource.TestStep{ @@ -41,6 +43,53 @@ func TestAcc_Table(t *testing.T) { resource.TestCheckResourceAttr("snowflake_table.test_table", "column.0.type", "VARCHAR(16777216)"), resource.TestCheckResourceAttr("snowflake_table.test_table", "column.1.name", "column3"), resource.TestCheckResourceAttr("snowflake_table.test_table", "column.1.type", "FLOAT"), + resource.TestCheckNoResourceAttr("snowflake_table.test_table", "cluster_by"), + ), + }, + { + Config: tableConfig3(accName, table2Name), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "1"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.0", "COL1"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.type", "FLOAT"), + ), + }, + { + Config: tableConfig4(accName, table2Name), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.1", "\"col2\""), + ), + }, + { + Config: tableConfig5(accName, table2Name), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "2"), + resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.0", "\"col2\""), ), }, }, @@ -108,3 +157,99 @@ resource "snowflake_table" "test_table" { ` return fmt.Sprintf(s, name, name, name) } + +func tableConfig3(name string, table2Name string) string { + s := ` +resource "snowflake_database" "test_database" { + name = "%s" + comment = "Terraform acceptance test" +} + +resource "snowflake_schema" "test_schema" { + name = "%s" + database = snowflake_database.test_database.name + comment = "Terraform acceptance test" +} + +resource "snowflake_table" "test_table2" { + database = snowflake_database.test_database.name + schema = snowflake_schema.test_schema.name + name = "%s" + comment = "Terraform acceptance test" + cluster_by = ["COL1"] + column { + name = "COL1" + type = "VARCHAR(16777216)" + } + column { + name = "col2" + type = "FLOAT" + } +} +` + return fmt.Sprintf(s, name, name, table2Name) +} + +func tableConfig4(name string, table2Name string) string { + s := ` +resource "snowflake_database" "test_database" { + name = "%s" + comment = "Terraform acceptance test" +} + +resource "snowflake_schema" "test_schema" { + name = "%s" + database = snowflake_database.test_database.name + comment = "Terraform acceptance test" +} + +resource "snowflake_table" "test_table2" { + database = snowflake_database.test_database.name + schema = snowflake_schema.test_schema.name + name = "%s" + comment = "Terraform acceptance test" + cluster_by = ["COL1","\"col2\""] + column { + name = "COL1" + type = "VARCHAR(16777216)" + } + column { + name = "col2" + type = "FLOAT" + } +} +` + return fmt.Sprintf(s, name, name, table2Name) +} + +func tableConfig5(name string, table2Name string) string { + s := ` +resource "snowflake_database" "test_database" { + name = "%s" + comment = "Terraform acceptance test" +} + +resource "snowflake_schema" "test_schema" { + name = "%s" + database = snowflake_database.test_database.name + comment = "Terraform acceptance test" +} + +resource "snowflake_table" "test_table2" { + database = snowflake_database.test_database.name + schema = snowflake_schema.test_schema.name + name = "%s" + comment = "Terraform acceptance test" + cluster_by = ["\"col2\"","COL1"] + column { + name = "COL1" + type = "VARCHAR(16777216)" + } + column { + name = "col2" + type = "FLOAT" + } +} +` + return fmt.Sprintf(s, name, name, table2Name) +} diff --git a/pkg/snowflake/table.go b/pkg/snowflake/table.go index 6d6b1063e3..24d3d78215 100644 --- a/pkg/snowflake/table.go +++ b/pkg/snowflake/table.go @@ -71,11 +71,12 @@ func (c Columns) getColumnDefinitions() string { // TableBuilder abstracts the creation of SQL queries for a Snowflake schema type TableBuilder struct { - name string - db string - schema string - columns Columns - comment string + name string + db string + schema string + columns Columns + comment string + clusterBy []string } // QualifiedName prepends the db and schema if set and escapes everything nicely @@ -111,6 +112,37 @@ func (tb *TableBuilder) WithColumns(c Columns) *TableBuilder { return tb } +// WithClustering adds cluster keys/expressions to TableBuilder +func (tb *TableBuilder) WithClustering(c []string) *TableBuilder { + tb.clusterBy = c + return tb +} + +//Function to get clustering definition +func (tb *TableBuilder) GetClusterKeyString() string { + + return fmt.Sprint(strings.Join(tb.clusterBy[:], ", ")) +} + +//function to take the literal snowflake cluster statement returned from SHOW TABLES and convert it to a list of keys. +func ClusterStatementToList(clusterStatement string) []string { + if clusterStatement == "" { + return nil + } + + cleanStatement := strings.TrimSuffix(strings.Replace(clusterStatement, "LINEAR(", "", 1), ")") + // remove cluster statement and trailing parenthesis + + var clean []string + + for _, s := range strings.Split(cleanStatement, ",") { + clean = append(clean, strings.TrimSpace(s)) + } + + return clean + +} + // Table returns a pointer to a Builder that abstracts the DDL operations for a table. // // Supported DDL operations are: @@ -152,9 +184,20 @@ func (tb *TableBuilder) Create() string { q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(tb.comment))) } + if tb.clusterBy != nil { + //add optional clustering statement + q.WriteString(fmt.Sprintf(` CLUSTER BY LINEAR(%v)`, tb.GetClusterKeyString())) + + } + return q.String() } +// ChangeClusterBy returns the SQL query to change cluastering on table +func (tb *TableBuilder) ChangeClusterBy(cb string) string { + return fmt.Sprintf(`ALTER TABLE %v CLUSTER BY LINEAR(%v)`, tb.QualifiedName(), cb) +} + // ChangeComment returns the SQL query that will update the comment on the table. func (tb *TableBuilder) ChangeComment(c string) string { return fmt.Sprintf(`ALTER TABLE %v SET COMMENT = '%v'`, tb.QualifiedName(), EscapeString(c)) @@ -188,6 +231,11 @@ func (tb *TableBuilder) RemoveComment() string { return fmt.Sprintf(`ALTER TABLE %v UNSET COMMENT`, tb.QualifiedName()) } +// RemoveClustering returns the SQL query that will remove data clustering from the table +func (tb *TableBuilder) DropClustering() string { + return fmt.Sprintf(`ALTER TABLE %v DROP CLUSTERING KEY`, tb.QualifiedName()) +} + // Drop returns the SQL query that will drop a table. func (tb *TableBuilder) Drop() string { return fmt.Sprintf(`DROP TABLE %v`, tb.QualifiedName()) diff --git a/pkg/snowflake/table_test.go b/pkg/snowflake/table_test.go index 8a17e40163..7251321875 100644 --- a/pkg/snowflake/table_test.go +++ b/pkg/snowflake/table_test.go @@ -27,6 +27,10 @@ func TestTableCreate(t *testing.T) { s.WithComment("Test Comment") r.Equal(s.Create(), `CREATE TABLE "test_db"."test_schema"."test_table" ("column1" OBJECT, "column2" VARCHAR) COMMENT = 'Test Comment'`) + + s.WithClustering([]string{"column1"}) + r.Equal(s.Create(), `CREATE TABLE "test_db"."test_schema"."test_table" ("column1" OBJECT, "column2" VARCHAR) COMMENT = 'Test Comment' CLUSTER BY LINEAR(column1)`) + } func TestTableChangeComment(t *testing.T) { @@ -59,6 +63,18 @@ func TestTableChangeColumnType(t *testing.T) { r.Equal(s.ChangeColumnType("old_column", "BIGINT"), `ALTER TABLE "test_db"."test_schema"."test_table" MODIFY COLUMN "old_column" BIGINT`) } +func TestTableChangeClusterBy(t *testing.T) { + r := require.New(t) + s := Table("test_table", "test_db", "test_schema") + r.Equal(s.ChangeClusterBy("column2, column3"), `ALTER TABLE "test_db"."test_schema"."test_table" CLUSTER BY LINEAR(column2, column3)`) +} + +func TestTableDropClusterBy(t *testing.T) { + r := require.New(t) + s := Table("test_table", "test_db", "test_schema") + r.Equal(s.DropClustering(), `ALTER TABLE "test_db"."test_schema"."test_table" DROP CLUSTERING KEY`) +} + func TestTableDrop(t *testing.T) { r := require.New(t) s := Table("test_table", "test_db", "test_schema")