Skip to content

Commit

Permalink
fix: refactor ReadWarehouse function to correctly read object paramet…
Browse files Browse the repository at this point in the history
…ers (#745)
  • Loading branch information
sonmezonur authored Nov 29, 2021
1 parent 3ed2805 commit d83c499
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/datasources/materialized_views_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestAccMaterializedViews(t *testing.T) {
func materializedViews(warehouseName string, databaseName string, schemaName string, tableName string, viewName string) string {
return fmt.Sprintf(`
resource "snowflake_warehouse" "w" {
name = "%v"
name = "%v"
initially_suspended = false
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/datasources/tasks_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func tasks(databaseName string, schemaName string, taskName string) string {
}
resource snowflake_warehouse "test" {
name = snowflake_database.test.name
name = snowflake_database.test.name
max_concurrency_level = 8
statement_timeout_in_seconds = 172800
}
resource snowflake_task "test" {
Expand Down
10 changes: 6 additions & 4 deletions pkg/datasources/warehouses_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ func TestAccWarehouses(t *testing.T) {
func warehouses(warehouseName string) string {
return fmt.Sprintf(`
resource snowflake_warehouse "s"{
name = "%v"
warehouse_size = "XSMALL"
initially_suspended = true
auto_suspend = 60
name = "%v"
warehouse_size = "XSMALL"
initially_suspended = true
auto_suspend = 60
max_concurrency_level = 8
statement_timeout_in_seconds = 172800
}
data snowflake_warehouses "s" {
Expand Down
31 changes: 27 additions & 4 deletions pkg/resources/task_grant_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,41 @@ func TestAcc_TaskGrant(t *testing.T) {
Providers: providers(),
Steps: []resource.TestStep{
{
Config: taskGrantConfig(accName),
Config: taskGrantConfig(accName, 8),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_task_grant.test", "database_name", accName),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "schema_name", accName),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "task_name", accName),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "with_grant_option", "false"),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "privilege", "OPERATE"),
resource.TestCheckResourceAttr("snowflake_warehouse.test", "max_concurrency_level", "8"),
resource.TestCheckResourceAttr("snowflake_warehouse.test", "statement_timeout_in_seconds", "86400"),
),
},
// UPDATE MAX_CONCURRENCY_LEVEL
{
Config: taskGrantConfig(accName, 10),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_task_grant.test", "database_name", accName),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "schema_name", accName),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "task_name", accName),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "with_grant_option", "false"),
resource.TestCheckResourceAttr("snowflake_task_grant.test", "privilege", "OPERATE"),
resource.TestCheckResourceAttr("snowflake_warehouse.test", "max_concurrency_level", "10"),
resource.TestCheckResourceAttr("snowflake_warehouse.test", "statement_timeout_in_seconds", "86400"),
),
},
// IMPORT
{
ResourceName: "snowflake_task_grant.test",
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func taskGrantConfig(name string) string {
func taskGrantConfig(name string, concurrency int32) string {
s := `
resource "snowflake_database" "test" {
name = "%v"
Expand All @@ -47,7 +68,9 @@ resource "snowflake_role" "test" {
}
resource "snowflake_warehouse" "test" {
name = snowflake_database.test.name
name = snowflake_database.test.name
max_concurrency_level = %d
statement_timeout_in_seconds = 86400
}
resource "snowflake_task" "test" {
Expand All @@ -71,5 +94,5 @@ resource "snowflake_task_grant" "test" {
privilege = "OPERATE"
}
`
return fmt.Sprintf(s, name, name)
return fmt.Sprintf(s, name, name, concurrency)
}
67 changes: 56 additions & 11 deletions pkg/resources/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package resources
import (
"database/sql"
"log"
"strconv"
"strings"

"github.com/chanzuckerberg/terraform-provider-snowflake/pkg/snowflake"
Expand Down Expand Up @@ -105,7 +106,7 @@ var warehouseSchema = map[string]*schema.Schema{
"statement_timeout_in_seconds": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
Default: 172800,
Description: "Specifies the time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system",
},
"statement_queued_timeout_in_seconds": {
Expand All @@ -117,7 +118,7 @@ var warehouseSchema = map[string]*schema.Schema{
"max_concurrency_level": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
Default: 8,
Description: "Object parameter that specifies the concurrency level for SQL statements (i.e. queries and DML) executed by a warehouse.",
},
"tag": tagReferenceSchema,
Expand All @@ -141,13 +142,22 @@ func Warehouse() *schema.Resource {
// CreateWarehouse implements schema.CreateFunc
func CreateWarehouse(d *schema.ResourceData, meta interface{}) error {
props := append(warehouseProperties, warehouseCreateProperties...)
return CreateResource("warehouse", props, warehouseSchema, snowflake.Warehouse, ReadWarehouse)(d, meta)
return CreateResource(
"warehouse",
props,
warehouseSchema,
func(name string) *snowflake.Builder {
return snowflake.Warehouse(name).Builder
},
ReadWarehouse,
)(d, meta)
}

// ReadWarehouse implements schema.ReadFunc
func ReadWarehouse(d *schema.ResourceData, meta interface{}) error {
db := meta.(*sql.DB)
stmt := snowflake.Warehouse(d.Id()).Show()
warehouseBuilder := snowflake.Warehouse(d.Id())
stmt := warehouseBuilder.Show()

row := snowflake.QueryRow(db, stmt)
w, err := snowflake.ScanWarehouse(row)
Expand Down Expand Up @@ -193,29 +203,64 @@ func ReadWarehouse(d *schema.ResourceData, meta interface{}) error {
if err != nil {
return err
}
err = d.Set("statement_timeout_in_seconds", w.StatementTimeoutInSeconds)
err = d.Set("resource_monitor", w.ResourceMonitor)
if err != nil {
return err
}
err = d.Set("statement_queued_timeout_in_seconds", w.StatementQueuedTimeoutInSeconds)

stmt = warehouseBuilder.ShowParameters()
paramRows, err := snowflake.Query(db, stmt)
if err != nil {
return err
}
err = d.Set("max_concurrency_level", w.MaxConcurrencyLevel)

warehouseParams, err := snowflake.ScanWarehouseParameters(paramRows)
if err != nil {
return err
}
err = d.Set("resource_monitor", w.ResourceMonitor)

return err
for _, param := range warehouseParams {
log.Printf("[TRACE] %+v\n", param)

var value interface{} = param.DefaultValue
if strings.EqualFold(param.Type, "number") {
i, err := strconv.ParseInt(param.Value, 10, 64)
if err != nil {
return err
}
value = i
} else {
value = param.Value
}

key := strings.ToLower(param.Key)
err = d.Set(key, value)
if err != nil {
return err
}
}

return nil
}

// UpdateWarehouse implements schema.UpdateFunc
func UpdateWarehouse(d *schema.ResourceData, meta interface{}) error {
return UpdateResource("warehouse", warehouseProperties, warehouseSchema, snowflake.Warehouse, ReadWarehouse)(d, meta)
return UpdateResource(
"warehouse",
warehouseProperties,
warehouseSchema,
func(name string) *snowflake.Builder {
return snowflake.Warehouse(name).Builder
},
ReadWarehouse,
)(d, meta)
}

// DeleteWarehouse implements schema.DeleteFunc
func DeleteWarehouse(d *schema.ResourceData, meta interface{}) error {
return DeleteResource("warehouse", snowflake.Warehouse)(d, meta)
return DeleteResource(
"warehouse", func(name string) *snowflake.Builder {
return snowflake.Warehouse(name).Builder
},
)(d, meta)
}
5 changes: 5 additions & 0 deletions pkg/resources/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func TestWarehouseCreate(t *testing.T) {
func expectReadWarehouse(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"name", "comment", "size"}).AddRow("good_name", "mock comment", "SMALL")
mock.ExpectQuery("SHOW WAREHOUSES LIKE 'good_name'").WillReturnRows(rows)

rows = sqlmock.NewRows(
[]string{"key", "value", "default", "level", "description", "type"},
).AddRow("MAX_CONCURRENCY_LEVEL", 8, 8, "WAREHOUSE", "", "NUMBER")
mock.ExpectQuery("SHOW PARAMETERS IN WAREHOUSE good_name").WillReturnRows(rows)
}

func TestWarehouseRead(t *testing.T) {
Expand Down
130 changes: 94 additions & 36 deletions pkg/snowflake/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,98 @@ package snowflake

import (
"database/sql"
"fmt"
"log"
"time"

"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)

func Warehouse(name string) *Builder {
return &Builder{
name: name,
entityType: WarehouseType,
type WarehouseBuilder struct {
*Builder
}

func (wb *WarehouseBuilder) Show() string {
return wb.Builder.Show()
}

func (wb *WarehouseBuilder) Describe() string {
return wb.Builder.Describe()
}

func (wb *WarehouseBuilder) Drop() string {
return wb.Builder.Drop()
}

func (wb *WarehouseBuilder) Rename(newName string) string {
return wb.Builder.Rename(newName)
}

func (wb *WarehouseBuilder) Alter() *AlterPropertiesBuilder {
return wb.Builder.Alter()
}

func (wb *WarehouseBuilder) Create() *CreateBuilder {
return wb.Builder.Create()
}

// ShowParameters returns the query to show the parameters for the warehouse
func (wb *WarehouseBuilder) ShowParameters() string {
return fmt.Sprintf("SHOW PARAMETERS IN WAREHOUSE %v", wb.Builder.name)
}

func Warehouse(name string) *WarehouseBuilder {
return &WarehouseBuilder{
&Builder{
name: name,
entityType: WarehouseType,
},
}
}

// warehouse is a go representation of a grant that can be used in conjunction
// with github.com/jmoiron/sqlx
type warehouse struct {
Name string `db:"name"`
State string `db:"state"`
Type string `db:"type"`
Size string `db:"size"`
MinClusterCount int64 `db:"min_cluster_count"`
MaxClusterCount int64 `db:"max_cluster_count"`
StartedClusters int64 `db:"started_clusters"`
Running int64 `db:"running"`
Queued int64 `db:"queued"`
IsDefault string `db:"is_default"`
IsCurrent string `db:"is_current"`
AutoSuspend int64 `db:"auto_suspend"`
AutoResume bool `db:"auto_resume"`
Available string `db:"available"`
Provisioning string `db:"provisioning"`
Quiescing string `db:"quiescing"`
Other string `db:"other"`
CreatedOn time.Time `db:"created_on"`
ResumedOn time.Time `db:"resumed_on"`
UpdatedOn time.Time `db:"updated_on"`
Owner string `db:"owner"`
Comment string `db:"comment"`
ResourceMonitor string `db:"resource_monitor"`
StatementTimeoutInSeconds int64 `db:"statement_timeout_in_seconds"`
StatementQueuedTimeoutInSeconds int64 `db:"statement_queued_timeout_in_seconds"`
MaxConcurrencyLevel int64 `db:"max_concurrency_level"`
Actives int64 `db:"actives"`
Pendings int64 `db:"pendings"`
Failed int64 `db:"failed"`
Suspended int64 `db:"suspended"`
UUID string `db:"uuid"`
ScalingPolicy string `db:"scaling_policy"`
Name string `db:"name"`
State string `db:"state"`
Type string `db:"type"`
Size string `db:"size"`
MinClusterCount int64 `db:"min_cluster_count"`
MaxClusterCount int64 `db:"max_cluster_count"`
StartedClusters int64 `db:"started_clusters"`
Running int64 `db:"running"`
Queued int64 `db:"queued"`
IsDefault string `db:"is_default"`
IsCurrent string `db:"is_current"`
AutoSuspend int64 `db:"auto_suspend"`
AutoResume bool `db:"auto_resume"`
Available string `db:"available"`
Provisioning string `db:"provisioning"`
Quiescing string `db:"quiescing"`
Other string `db:"other"`
CreatedOn time.Time `db:"created_on"`
ResumedOn time.Time `db:"resumed_on"`
UpdatedOn time.Time `db:"updated_on"`
Owner string `db:"owner"`
Comment string `db:"comment"`
ResourceMonitor string `db:"resource_monitor"`
Actives int64 `db:"actives"`
Pendings int64 `db:"pendings"`
Failed int64 `db:"failed"`
Suspended int64 `db:"suspended"`
UUID string `db:"uuid"`
ScalingPolicy string `db:"scaling_policy"`
}

// warehouseParams struct to represent a row of parameters
type warehouseParams struct {
Key string `db:"key"`
Value string `db:"value"`
DefaultValue string `db:"default"`
Level string `db:"level"`
Description string `db:"description"`
Type string `db:"type"`
}

func ScanWarehouse(row *sqlx.Row) (*warehouse, error) {
Expand All @@ -59,6 +102,21 @@ func ScanWarehouse(row *sqlx.Row) (*warehouse, error) {
return w, err
}

// ScanWarehouseParameters takes a database row and converts it to a warehouse parameter pointer
func ScanWarehouseParameters(rows *sqlx.Rows) ([]*warehouseParams, error) {
params := []*warehouseParams{}

for rows.Next() {
w := &warehouseParams{}
err := rows.StructScan(w)
if err != nil {
return nil, err
}
params = append(params, w)
}
return params, nil
}

func ListWarehouses(db *sql.DB) ([]warehouse, error) {
stmt := "SHOW WAREHOUSES"
rows, err := Query(db, stmt)
Expand Down

0 comments on commit d83c499

Please sign in to comment.