Skip to content

Commit

Permalink
Feature/table clustering (Snowflake-Labs#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
berosen authored and jtzero committed Aug 19, 2021
1 parent c1d394c commit 3f3fdd1
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 23 deletions.
20 changes: 14 additions & 6 deletions docs/resources/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ description: |-

```terraform
resource snowflake_table table {
database = "database"
schema = "schmea"
name = "table"
comment = "A table."
owner = "me"
database = "database"
schema = "schmea"
name = "table"
comment = "A table."
cluster_by = ["to_date(DATE)"]
owner = "me"
column {
name = "id"
type = "int"
Expand All @@ -29,6 +31,11 @@ resource snowflake_table table {
name = "data"
type = "text"
}
column {
name = "DATE"
type = "TIMESTAMP_NTZ(9)"
}
}
```

Expand All @@ -44,6 +51,7 @@ resource snowflake_table table {

### Optional

- **cluster_by** (List of String) A list of one of more table columns/expressions to be used as clustering key(s) for the table
- **comment** (String) Specifies a comment for the table.
- **id** (String) The ID of this resource.

Expand Down
19 changes: 13 additions & 6 deletions examples/resources/snowflake_table/resource.tf
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
resource snowflake_table table {
database = "database"
schema = "schmea"
name = "table"
comment = "A table."
owner = "me"

database = "database"
schema = "schmea"
name = "table"
comment = "A table."
cluster_by = ["to_date(DATE)"]

owner = "me"

column {
name = "id"
type = "int"
Expand All @@ -14,4 +16,9 @@ resource snowflake_table table {
name = "data"
type = "text"
}

column {
name = "DATE"
type = "TIMESTAMP_NTZ(9)"
}
}
40 changes: 34 additions & 6 deletions pkg/resources/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var tableSchema = map[string]*schema.Schema{
ForceNew: true,
Description: "The database in which to create the table.",
},
"cluster_by": {
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
Description: "A list of one of more table columns/expressions to be used as clustering key(s) for the table",
},
"column": {
Type: schema.TypeList,
Required: true,
Expand Down Expand Up @@ -213,6 +219,10 @@ func CreateTable(d *schema.ResourceData, meta interface{}) error {
builder.WithComment(v.(string))
}

if v, ok := d.GetOk("cluster_by"); ok {
builder.WithClustering(expandStringList(v.([]interface{})))
}

stmt := builder.Create()
err := snowflake.Exec(db, stmt)
if err != nil {
Expand Down Expand Up @@ -267,12 +277,13 @@ func ReadTable(d *schema.ResourceData, meta interface{}) error {

// Set the relevant data in the state
toSet := map[string]interface{}{
"name": table.TableName.String,
"owner": table.Owner.String,
"database": tableID.DatabaseName,
"schema": tableID.SchemaName,
"comment": table.Comment.String,
"column": snowflake.NewColumns(tableDescription).Flatten(),
"name": table.TableName.String,
"owner": table.Owner.String,
"database": tableID.DatabaseName,
"schema": tableID.SchemaName,
"comment": table.Comment.String,
"column": snowflake.NewColumns(tableDescription).Flatten(),
"cluster_by": snowflake.ClusterStatementToList(table.ClusterBy.String),
}

for key, val := range toSet {
Expand Down Expand Up @@ -306,6 +317,23 @@ func UpdateTable(d *schema.ResourceData, meta interface{}) error {
return errors.Wrapf(err, "error updating table comment on %v", d.Id())
}
}

if d.HasChange("cluster_by") {
cb := expandStringList(d.Get("cluster_by").([]interface{}))

var q string
if len(cb) != 0 {
builder.WithClustering(cb)
q = builder.ChangeClusterBy(builder.GetClusterKeyString())
} else {
q = builder.DropClustering()
}

err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating table clustering on %v", d.Id())
}
}
if d.HasChange("column") {
old, new := d.GetChange("column")
removed, added, changed := getColumns(old).diffs(getColumns(new))
Expand Down
145 changes: 145 additions & 0 deletions pkg/resources/table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
func TestAcc_Table(t *testing.T) {
accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))

table2Name := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))

resource.ParallelTest(t, resource.TestCase{
Providers: providers(),
Steps: []resource.TestStep{
Expand Down Expand Up @@ -41,6 +43,53 @@ func TestAcc_Table(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_table.test_table", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table", "column.1.name", "column3"),
resource.TestCheckResourceAttr("snowflake_table.test_table", "column.1.type", "FLOAT"),
resource.TestCheckNoResourceAttr("snowflake_table.test_table", "cluster_by"),
),
},
{
Config: tableConfig3(accName, table2Name),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.0", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.type", "FLOAT"),
),
},
{
Config: tableConfig4(accName, table2Name),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.1", "\"col2\""),
),
},
{
Config: tableConfig5(accName, table2Name),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.0", "\"col2\""),
),
},
},
Expand Down Expand Up @@ -108,3 +157,99 @@ resource "snowflake_table" "test_table" {
`
return fmt.Sprintf(s, name, name, name)
}

func tableConfig3(name string, table2Name string) string {
s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_table2" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
cluster_by = ["COL1"]
column {
name = "COL1"
type = "VARCHAR(16777216)"
}
column {
name = "col2"
type = "FLOAT"
}
}
`
return fmt.Sprintf(s, name, name, table2Name)
}

func tableConfig4(name string, table2Name string) string {
s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_table2" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
cluster_by = ["COL1","\"col2\""]
column {
name = "COL1"
type = "VARCHAR(16777216)"
}
column {
name = "col2"
type = "FLOAT"
}
}
`
return fmt.Sprintf(s, name, name, table2Name)
}

func tableConfig5(name string, table2Name string) string {
s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_table2" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
cluster_by = ["\"col2\"","COL1"]
column {
name = "COL1"
type = "VARCHAR(16777216)"
}
column {
name = "col2"
type = "FLOAT"
}
}
`
return fmt.Sprintf(s, name, name, table2Name)
}
58 changes: 53 additions & 5 deletions pkg/snowflake/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ func (c Columns) getColumnDefinitions() string {

// TableBuilder abstracts the creation of SQL queries for a Snowflake schema
type TableBuilder struct {
name string
db string
schema string
columns Columns
comment string
name string
db string
schema string
columns Columns
comment string
clusterBy []string
}

// QualifiedName prepends the db and schema if set and escapes everything nicely
Expand Down Expand Up @@ -111,6 +112,37 @@ func (tb *TableBuilder) WithColumns(c Columns) *TableBuilder {
return tb
}

// WithClustering adds cluster keys/expressions to TableBuilder
func (tb *TableBuilder) WithClustering(c []string) *TableBuilder {
tb.clusterBy = c
return tb
}

//Function to get clustering definition
func (tb *TableBuilder) GetClusterKeyString() string {

return fmt.Sprint(strings.Join(tb.clusterBy[:], ", "))
}

//function to take the literal snowflake cluster statement returned from SHOW TABLES and convert it to a list of keys.
func ClusterStatementToList(clusterStatement string) []string {
if clusterStatement == "" {
return nil
}

cleanStatement := strings.TrimSuffix(strings.Replace(clusterStatement, "LINEAR(", "", 1), ")")
// remove cluster statement and trailing parenthesis

var clean []string

for _, s := range strings.Split(cleanStatement, ",") {
clean = append(clean, strings.TrimSpace(s))
}

return clean

}

// Table returns a pointer to a Builder that abstracts the DDL operations for a table.
//
// Supported DDL operations are:
Expand Down Expand Up @@ -152,9 +184,20 @@ func (tb *TableBuilder) Create() string {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(tb.comment)))
}

if tb.clusterBy != nil {
//add optional clustering statement
q.WriteString(fmt.Sprintf(` CLUSTER BY LINEAR(%v)`, tb.GetClusterKeyString()))

}

return q.String()
}

// ChangeClusterBy returns the SQL query to change cluastering on table
func (tb *TableBuilder) ChangeClusterBy(cb string) string {
return fmt.Sprintf(`ALTER TABLE %v CLUSTER BY LINEAR(%v)`, tb.QualifiedName(), cb)
}

// ChangeComment returns the SQL query that will update the comment on the table.
func (tb *TableBuilder) ChangeComment(c string) string {
return fmt.Sprintf(`ALTER TABLE %v SET COMMENT = '%v'`, tb.QualifiedName(), EscapeString(c))
Expand Down Expand Up @@ -188,6 +231,11 @@ func (tb *TableBuilder) RemoveComment() string {
return fmt.Sprintf(`ALTER TABLE %v UNSET COMMENT`, tb.QualifiedName())
}

// RemoveClustering returns the SQL query that will remove data clustering from the table
func (tb *TableBuilder) DropClustering() string {
return fmt.Sprintf(`ALTER TABLE %v DROP CLUSTERING KEY`, tb.QualifiedName())
}

// Drop returns the SQL query that will drop a table.
func (tb *TableBuilder) Drop() string {
return fmt.Sprintf(`DROP TABLE %v`, tb.QualifiedName())
Expand Down
Loading

0 comments on commit 3f3fdd1

Please sign in to comment.