From a14b994d62f73c272cde72651c3c8c18ac4213cf Mon Sep 17 00:00:00 2001 From: Nathan Gaberel Date: Fri, 12 May 2023 11:49:57 -0700 Subject: [PATCH] chore: Migrate Warehouse resource + datasource to new SDK (#1792) * Allow rendering parameters with aliased string types. * Allow rendering lists of strings. * Implement Warehouse sdk. * Document available clause tags. * Use SDK for warehouse resource. * Remove old Warehouse sdk. * Use SDK with warehouse datasource. * Update docs --- docs/resources/warehouse.md | 14 - pkg/datasources/warehouses.go | 23 +- .../resource_monitor_acceptance_test.go | 4 +- pkg/resources/snowflake_sweeper_test.go | 31 -- pkg/resources/warehouse.go | 379 ++++++++++---- pkg/resources/warehouse_acceptance_test.go | 20 +- pkg/resources/warehouse_test.go | 81 --- pkg/sdk/README.md | 12 + pkg/sdk/client.go | 1 + pkg/sdk/helper_test.go | 4 +- pkg/sdk/sql_builder.go | 34 +- pkg/sdk/sql_builder_test.go | 7 +- pkg/sdk/type_helpers.go | 10 + pkg/sdk/validations.go | 18 + pkg/sdk/warehouse.go | 476 ++++++++++++++++++ pkg/sdk/warehouse_integration_test.go | 451 +++++++++++++++++ pkg/sdk/warehouse_test.go | 287 +++++++++++ pkg/sdk/warehouses.go | 82 --- pkg/snowflake/warehouse.go | 139 ----- 19 files changed, 1582 insertions(+), 491 deletions(-) delete mode 100644 pkg/resources/warehouse_test.go create mode 100644 pkg/sdk/README.md create mode 100644 pkg/sdk/warehouse.go create mode 100644 pkg/sdk/warehouse_integration_test.go create mode 100644 pkg/sdk/warehouse_test.go delete mode 100644 pkg/sdk/warehouses.go delete mode 100644 pkg/snowflake/warehouse.go diff --git a/docs/resources/warehouse.md b/docs/resources/warehouse.md index 4137481f9c..317a86b78e 100644 --- a/docs/resources/warehouse.md +++ b/docs/resources/warehouse.md @@ -42,7 +42,6 @@ resource "snowflake_warehouse" "warehouse" { - `scaling_policy` (String) Specifies the policy for automatically starting and shutting down clusters in a multi-cluster warehouse running in Auto-scale mode. - `statement_queued_timeout_in_seconds` (Number) Object parameter that specifies the time, in seconds, a SQL statement (query, DDL, DML, etc.) can be queued on a warehouse before it is canceled by the system. - `statement_timeout_in_seconds` (Number) Specifies the time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system -- `tag` (Block List, Deprecated) Definitions of a tag to associate with the resource. (see [below for nested schema](#nestedblock--tag)) - `wait_for_provisioning` (Boolean) Specifies whether the warehouse, after being resized, waits for all the servers to provision before executing any queued or new queries. - `warehouse_size` (String) Specifies the size of the virtual warehouse. Larger warehouse sizes 5X-Large and 6X-Large are currently in preview and only available on Amazon Web Services (AWS). - `warehouse_type` (String) Specifies a STANDARD or SNOWPARK-OPTIMIZED warehouse @@ -51,19 +50,6 @@ resource "snowflake_warehouse" "warehouse" { - `id` (String) The ID of this resource. - -### Nested Schema for `tag` - -Required: - -- `name` (String) Tag name, e.g. department. -- `value` (String) Tag value, e.g. marketing_info. - -Optional: - -- `database` (String) Name of the database that the tag was created in. -- `schema` (String) Name of the schema that the tag was created in. - ## Import Import is supported using the following syntax: diff --git a/pkg/datasources/warehouses.go b/pkg/datasources/warehouses.go index 662dd0c445..96cb136106 100644 --- a/pkg/datasources/warehouses.go +++ b/pkg/datasources/warehouses.go @@ -1,11 +1,11 @@ package datasources import ( + "context" "database/sql" - "errors" "fmt" - "log" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" ) @@ -59,31 +59,24 @@ func Warehouses() *schema.Resource { func ReadWarehouses(d *schema.ResourceData, meta interface{}) error { db := meta.(*sql.DB) + client := sdk.NewClientFromDB(db) + ctx := context.Background() account, err := snowflake.ReadCurrentAccount(db) if err != nil { - log.Print("[DEBUG] unable to retrieve current account") d.SetId("") return nil } - d.SetId(fmt.Sprintf("%s.%s", account.Account, account.Region)) - currentWarehouses, err := snowflake.ListWarehouses(db) - if errors.Is(err, sql.ErrNoRows) { - // If not found, mark resource to be removed from state file during apply or refresh - log.Printf("[DEBUG] no warehouses found in account (%s)", d.Id()) - d.SetId("") - return nil - } else if err != nil { - log.Printf("[DEBUG] unable to parse warehouses in account (%s)", d.Id()) - d.SetId("") - return nil + result, err := client.Warehouses.Show(ctx, nil) + if err != nil { + return err } warehouses := []map[string]interface{}{} - for _, warehouse := range currentWarehouses { + for _, warehouse := range result { warehouseMap := map[string]interface{}{} warehouseMap["name"] = warehouse.Name diff --git a/pkg/resources/resource_monitor_acceptance_test.go b/pkg/resources/resource_monitor_acceptance_test.go index f4d99339f5..e1f546e50f 100644 --- a/pkg/resources/resource_monitor_acceptance_test.go +++ b/pkg/resources/resource_monitor_acceptance_test.go @@ -57,7 +57,7 @@ func resourceMonitorConfig(accName string) string { resource "snowflake_warehouse" "warehouse" { name = "test" comment = "foo" - warehouse_size = "small" + warehouse_size = "SMALL" } resource "snowflake_resource_monitor" "test" { @@ -77,7 +77,7 @@ func resourceMonitorConfig2(accName string) string { resource "snowflake_warehouse" "warehouse" { name = "test" comment = "foo" - warehouse_size = "small" + warehouse_size = "SMALL" } resource "snowflake_resource_monitor" "test" { diff --git a/pkg/resources/snowflake_sweeper_test.go b/pkg/resources/snowflake_sweeper_test.go index c2504e3264..be5fe624cf 100644 --- a/pkg/resources/snowflake_sweeper_test.go +++ b/pkg/resources/snowflake_sweeper_test.go @@ -16,36 +16,6 @@ func TestMain(m *testing.M) { resource.TestMain(m) } -func getWarehousesSweeper(name string) *resource.Sweeper { - return &resource.Sweeper{ - Name: name, - F: func(ununsed string) error { - db, err := provider.GetDatabaseHandleFromEnv() - if err != nil { - return fmt.Errorf("Error getting db handle: %w", err) - } - - warehouses, err := snowflake.ListWarehouses(db) - if err != nil { - return fmt.Errorf("Error listing warehouses: %w", err) - } - - for _, wh := range warehouses { - log.Printf("[DEBUG] Testing if warehouse %s starts with tst-terraform", wh.Name) - if strings.HasPrefix(wh.Name, "tst-terraform") { - log.Printf("[DEBUG] deleting warehouse %s", wh.Name) - whBuilder := snowflake.NewWarehouseBuilder(name).Builder - stmt := whBuilder.Drop() - if err := snowflake.Exec(db, stmt); err != nil { - return fmt.Errorf("Error deleting warehouse %q %w", wh.Name, err) - } - } - } - return nil - }, - } -} - func getDatabaseSweepers(name string) *resource.Sweeper { return &resource.Sweeper{ Name: name, @@ -161,7 +131,6 @@ func getIntegrationsSweeper(name string) *resource.Sweeper { // Sweepers usually go along with the tests. In TF[CE]'s case everything depends on the organization, // which means that if we delete it then all the other entities will be deleted automatically. func init() { - resource.AddTestSweepers("wh_sweeper", getWarehousesSweeper("wh_sweeper")) resource.AddTestSweepers("db_sweeper", getDatabaseSweepers("db_sweeper")) resource.AddTestSweepers("role_sweeper", getRolesSweeper("role_sweeper")) resource.AddTestSweepers("user_sweeper", getUsersSweeper("user_sweeper")) diff --git a/pkg/resources/warehouse.go b/pkg/resources/warehouse.go index 990d797697..090b3a41ce 100644 --- a/pkg/resources/warehouse.go +++ b/pkg/resources/warehouse.go @@ -1,28 +1,16 @@ package resources import ( + "context" "database/sql" - "errors" - "log" - "strconv" "strings" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" ) -// warehouseCreateProperties are only available via the CREATE statement. -var warehouseCreateProperties = []string{"initially_suspended", "wait_for_provisioning"} - -var warehouseProperties = []string{ - "comment", "warehouse_size", "max_cluster_count", "min_cluster_count", - "scaling_policy", "auto_suspend", "auto_resume", - "resource_monitor", "max_concurrency_level", "statement_queued_timeout_in_seconds", - "statement_timeout_in_seconds", "enable_query_acceleration", "query_acceleration_max_scale_factor", - "warehouse_type", -} - var warehouseSchema = map[string]*schema.Schema{ "name": { Type: schema.TypeString, @@ -39,11 +27,17 @@ var warehouseSchema = map[string]*schema.Schema{ Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{ - "XSMALL", "X-SMALL", "SMALL", "MEDIUM", "LARGE", "XLARGE", - "X-LARGE", "XXLARGE", "X2LARGE", "2X-LARGE", "XXXLARGE", "X3LARGE", - "3X-LARGE", "X4LARGE", "4X-LARGE", "X5LARGE", "5X-LARGE", "X6LARGE", - "6X-LARGE", - }, true), + string(sdk.WarehouseSizeXSmall), + string(sdk.WarehouseSizeSmall), + string(sdk.WarehouseSizeMedium), + string(sdk.WarehouseSizeLarge), + string(sdk.WarehouseSizeXLarge), + string(sdk.WarehouseSizeXXLarge), + string(sdk.WarehouseSizeXXXLarge), + string(sdk.WarehouseSizeX4Large), + string(sdk.WarehouseSizeX5Large), + string(sdk.WarehouseSizeX6Large), + }, false), DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { normalize := func(s string) string { return strings.ToUpper(strings.ReplaceAll(s, "-", "")) @@ -57,7 +51,7 @@ var warehouseSchema = map[string]*schema.Schema{ Description: "Specifies the maximum number of server clusters for the warehouse.", Optional: true, Computed: true, - ValidateFunc: validation.IntAtLeast(1), + ValidateFunc: validation.IntBetween(1, 10), }, "min_cluster_count": { Type: schema.TypeInt, @@ -67,11 +61,14 @@ var warehouseSchema = map[string]*schema.Schema{ ValidateFunc: validation.IntBetween(1, 10), }, "scaling_policy": { - Type: schema.TypeString, - Description: "Specifies the policy for automatically starting and shutting down clusters in a multi-cluster warehouse running in Auto-scale mode.", - Optional: true, - Computed: true, - ValidateFunc: validation.StringInSlice([]string{"STANDARD", "ECONOMY"}, true), + Type: schema.TypeString, + Description: "Specifies the policy for automatically starting and shutting down clusters in a multi-cluster warehouse running in Auto-scale mode.", + Optional: true, + Computed: true, + ValidateFunc: validation.StringInSlice([]string{ + string(sdk.ScalingPolicyStandard), + string(sdk.ScalingPolicyEconomy), + }, true), }, "auto_suspend": { Type: schema.TypeInt, @@ -137,13 +134,15 @@ var warehouseSchema = map[string]*schema.Schema{ Description: "Specifies the maximum scale factor for leasing compute resources for query acceleration. The scale factor is used as a multiplier based on warehouse size.", }, "warehouse_type": { - Type: schema.TypeString, - Optional: true, - Default: "STANDARD", - ValidateFunc: validation.StringInSlice([]string{"STANDARD", "SNOWPARK-OPTIMIZED"}, true), - Description: "Specifies a STANDARD or SNOWPARK-OPTIMIZED warehouse", + Type: schema.TypeString, + Optional: true, + Default: string(sdk.WarehouseTypeStandard), + ValidateFunc: validation.StringInSlice([]string{ + string(sdk.WarehouseTypeStandard), + string(sdk.WarehouseTypeSnowparkOptimized), + }, true), + Description: "Specifies a STANDARD or SNOWPARK-OPTIMIZED warehouse", }, - "tag": tagReferenceSchema, } // Warehouse returns a pointer to the resource representing a warehouse. @@ -163,41 +162,101 @@ func Warehouse() *schema.Resource { // CreateWarehouse implements schema.CreateFunc. func CreateWarehouse(d *schema.ResourceData, meta interface{}) error { - props := append(warehouseProperties, warehouseCreateProperties...) //nolint:gocritic // todo: please fix this to pass gocritic - return CreateResource( - "warehouse", - props, - warehouseSchema, - func(name string) *snowflake.Builder { - return snowflake.NewWarehouseBuilder(name).Builder - }, - ReadWarehouse, - )(d, meta) + db := meta.(*sql.DB) + client := sdk.NewClientFromDB(db) + ctx := context.Background() + + name := d.Get("name").(string) + objectIdentifier := sdk.NewAccountObjectIdentifier(name) + + createOptions := &sdk.WarehouseCreateOptions{} + + if v, ok := d.GetOk("comment"); ok { + createOptions.Comment = sdk.String(v.(string)) + } + if v, ok := d.GetOk("warehouse_size"); ok { + size := sdk.WarehouseSize(strings.ReplaceAll(v.(string), "-", "")) + createOptions.WarehouseSize = &size + } + if v, ok := d.GetOk("max_cluster_count"); ok { + createOptions.MaxClusterCount = sdk.Uint8(uint8(v.(int))) + } + if v, ok := d.GetOk("min_cluster_count"); ok { + createOptions.MinClusterCount = sdk.Uint8(uint8(v.(int))) + } + if v, ok := d.GetOk("scaling_policy"); ok { + scalingPolicy := sdk.ScalingPolicy(v.(string)) + createOptions.ScalingPolicy = &scalingPolicy + } + if v, ok := d.GetOk("auto_suspend"); ok { + createOptions.AutoSuspend = sdk.Uint(uint(v.(int))) + } + if v, ok := d.GetOk("auto_resume"); ok { + createOptions.AutoResume = sdk.Bool(v.(bool)) + } + if v, ok := d.GetOk("initially_suspended"); ok { + createOptions.InitiallySuspended = sdk.Bool(v.(bool)) + } + if v, ok := d.GetOk("resource_monitor"); ok { + createOptions.ResourceMonitor = sdk.String(v.(string)) + } + if v, ok := d.GetOk("statement_timeout_in_seconds"); ok { + createOptions.StatementTimeoutInSeconds = sdk.Uint(uint(v.(int))) + } + if v, ok := d.GetOk("statement_queued_timeout_in_seconds"); ok { + createOptions.StatementQueuedTimeoutInSeconds = sdk.Uint(uint(v.(int))) + } + if v, ok := d.GetOk("max_concurrency_level"); ok { + createOptions.MaxConcurrencyLevel = sdk.Uint(uint(v.(int))) + } + if v, ok := d.GetOk("enable_query_acceleration"); ok { + createOptions.EnableQueryAcceleration = sdk.Bool(v.(bool)) + } + if v, ok := d.GetOk("query_acceleration_max_scale_factor"); ok { + createOptions.QueryAccelerationMaxScaleFactor = sdk.Uint8(uint8(v.(int))) + } + if v, ok := d.GetOk("warehouse_type"); ok { + whType := sdk.WarehouseType(v.(string)) + createOptions.WarehouseType = &whType + } + + err := client.Warehouses.Create(ctx, objectIdentifier, createOptions) + if err != nil { + return err + } + d.SetId(helpers.EncodeSnowflakeID(objectIdentifier)) + + return ReadWarehouse(d, meta) } // ReadWarehouse implements schema.ReadFunc. func ReadWarehouse(d *schema.ResourceData, meta interface{}) error { db := meta.(*sql.DB) - warehouseBuilder := snowflake.NewWarehouseBuilder(d.Id()) - stmt := warehouseBuilder.Show() + client := sdk.NewClientFromDB(db) + ctx := context.Background() - row := snowflake.QueryRow(db, stmt) - w, err := snowflake.ScanWarehouse(row) - if errors.Is(err, sql.ErrNoRows) { - // If not found, mark resource to be removed from state file during apply or refresh - log.Printf("[DEBUG] warehouse (%s) not found", d.Id()) - d.SetId("") - return nil - } + id := helpers.DecodeSnowflakeID(d.Id()).(sdk.AccountObjectIdentifier) + + warehouses, err := client.Warehouses.Show(ctx, &sdk.WarehouseShowOptions{ + Like: &sdk.Like{ + Pattern: sdk.String(id.Name()), + }, + }) if err != nil { return err } + + w := warehouses[0] + if err = d.Set("name", w.Name); err != nil { return err } if err = d.Set("comment", w.Comment); err != nil { return err } + if err = d.Set("warehouse_type", w.Type); err != nil { + return err + } if err = d.Set("warehouse_size", w.Size); err != nil { return err } @@ -210,7 +269,7 @@ func ReadWarehouse(d *schema.ResourceData, meta interface{}) error { if err = d.Set("scaling_policy", w.ScalingPolicy); err != nil { return err } - if err = d.Set("auto_suspend", w.AutoSuspend.Int64); err != nil { + if err = d.Set("auto_suspend", w.AutoSuspend); err != nil { return err } if err = d.Set("auto_resume", w.AutoResume); err != nil { @@ -228,47 +287,167 @@ func ReadWarehouse(d *schema.ResourceData, meta interface{}) error { } } - log.Printf("[DEBUG] warehouse type is: %s", w.WarehouseType) - if w.WarehouseType == "STANDARD" || w.WarehouseType == "SNOWPARK-OPTIMIZED" { - err = d.Set("warehouse_type", w.WarehouseType) - if err != nil { - return err - } - } else { - err = d.Set("warehouse_type", "STANDARD") - if err != nil { - return err - } - } - - stmt = warehouseBuilder.ShowParameters() - paramRows, err := snowflake.Query(db, stmt) - if err != nil { - return err - } + return nil +} - warehouseParams, err := snowflake.ScanWarehouseParameters(paramRows) - if err != nil { - return err - } +// UpdateWarehouse implements schema.UpdateFunc. +func UpdateWarehouse(d *schema.ResourceData, meta interface{}) error { + db := meta.(*sql.DB) + client := sdk.NewClientFromDB(db) + ctx := context.Background() - for _, param := range warehouseParams { - log.Printf("[TRACE] %+v\n", param) + id := helpers.DecodeSnowflakeID(d.Id()).(sdk.AccountObjectIdentifier) - var value interface{} - if strings.EqualFold(param.Type, "number") { - i, err := strconv.ParseInt(param.Value, 10, 64) + // Change name separately + if d.HasChange("name") { + if v, ok := d.GetOk("name"); ok { + newName := sdk.NewAccountObjectIdentifier(v.(string)) + err := client.Warehouses.Alter(ctx, id, &sdk.WarehouseAlterOptions{ + NewName: &newName, + }) if err != nil { return err } - value = i + d.SetId(helpers.EncodeSnowflakeID(newName)) + } else { + panic("name has to be set") + } + } + + // Batch SET operations and UNSET operations + var runSet bool = false + setOpts := sdk.WarehouseSetOptions{} + unsetFields := []sdk.WarehouseUnsetField{} + + if d.HasChange("comment") { + if v, ok := d.GetOk("comment"); ok { + setOpts.Comment = sdk.String(v.(string)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.CommentField) + } + } + if d.HasChange("warehouse_size") { + if v, ok := d.GetOk("warehouse_size"); ok { + size := sdk.WarehouseSize(strings.ReplaceAll(v.(string), "-", "")) + setOpts.WarehouseSize = &size + runSet = true + } else { + unsetFields = append(unsetFields, sdk.WarehouseSizeField) + } + } + if d.HasChange("max_cluster_count") { + if v, ok := d.GetOk("max_cluster_count"); ok { + setOpts.MaxClusterCount = sdk.Uint8(v.(uint8)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.MaxClusterCountField) + } + } + if d.HasChange("min_cluster_count") { + if v, ok := d.GetOk("min_cluster_count"); ok { + setOpts.MinClusterCount = sdk.Uint8(v.(uint8)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.MinClusterCountField) + } + } + if d.HasChange("scaling_policy") { + if v, ok := d.GetOk("scaling_policy"); ok { + scalingPolicy := sdk.ScalingPolicy(v.(string)) + setOpts.ScalingPolicy = &scalingPolicy + runSet = true + } else { + unsetFields = append(unsetFields, sdk.ScalingPolicyField) + } + } + if d.HasChange("auto_suspend") { + if v, ok := d.GetOk("auto_suspend"); ok { + setOpts.AutoSuspend = sdk.Uint(v.(uint)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.AutoSuspendField) + } + } + if d.HasChange("auto_resume") { + if v, ok := d.GetOk("auto_resume"); ok { + setOpts.AutoResume = sdk.Bool(v.(bool)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.AutoResumeField) + } + } + if d.HasChange("resource_monitor") { + if v, ok := d.GetOk("resource_monitor"); ok { + setOpts.ResourceMonitor = sdk.String(v.(string)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.ResourceMonitorField) + } + } + if d.HasChange("statement_timeout_in_seconds") { + if v, ok := d.GetOk("statement_timeout_in_seconds"); ok { + setOpts.StatementTimeoutInSeconds = sdk.Uint(v.(uint)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.StatementTimeoutInSecondsField) + } + } + if d.HasChange("statement_queued_timeout_in_seconds") { + if v, ok := d.GetOk("statement_queued_timeout_in_seconds"); ok { + setOpts.StatementQueuedTimeoutInSeconds = sdk.Uint(v.(uint)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.StatementQueuedTimeoutInSecondsField) + } + } + if d.HasChange("max_concurrency_level") { + if v, ok := d.GetOk("max_concurrency_level"); ok { + setOpts.MaxConcurrencyLevel = sdk.Uint(uint(v.(int))) + runSet = true } else { - value = param.Value + unsetFields = append(unsetFields, sdk.MaxConcurrencyLevelField) } + } + if d.HasChange("enable_query_acceleration") { + if v, ok := d.GetOk("enable_query_acceleration"); ok { + setOpts.EnableQueryAcceleration = sdk.Bool(v.(bool)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.EnableQueryAccelerationField) + } + } + if d.HasChange("query_acceleration_max_scale_factor") { + if v, ok := d.GetOk("query_acceleration_max_scale_factor"); ok { + setOpts.QueryAccelerationMaxScaleFactor = sdk.Uint8(v.(uint8)) + runSet = true + } else { + unsetFields = append(unsetFields, sdk.QueryAccelerationMaxScaleFactorField) + } + } + if d.HasChange("warehouse_type") { + if v, ok := d.GetOk("warehouse_type"); ok { + whType := sdk.WarehouseType(v.(string)) + setOpts.WarehouseType = &whType + runSet = true + } else { + unsetFields = append(unsetFields, sdk.WarehouseTypeField) + } + } - key := strings.ToLower(param.Key) - // lintignore:R001 - err = d.Set(key, value) + // Apply SET and UNSET changes + if runSet { + err := client.Warehouses.Alter(ctx, id, &sdk.WarehouseAlterOptions{ + Set: &setOpts, + }) + if err != nil { + return err + } + } + if len(unsetFields) > 0 { + err := client.Warehouses.Alter(ctx, id, &sdk.WarehouseAlterOptions{ + Unset: &unsetFields, + }) if err != nil { return err } @@ -277,24 +456,18 @@ func ReadWarehouse(d *schema.ResourceData, meta interface{}) error { return nil } -// UpdateWarehouse implements schema.UpdateFunc. -func UpdateWarehouse(d *schema.ResourceData, meta interface{}) error { - return UpdateResource( - "warehouse", - warehouseProperties, - warehouseSchema, - func(name string) *snowflake.Builder { - return snowflake.NewWarehouseBuilder(name).Builder - }, - ReadWarehouse, - )(d, meta) -} - // DeleteWarehouse implements schema.DeleteFunc. func DeleteWarehouse(d *schema.ResourceData, meta interface{}) error { - return DeleteResource( - "warehouse", func(name string) *snowflake.Builder { - return snowflake.NewWarehouseBuilder(name).Builder - }, - )(d, meta) + db := meta.(*sql.DB) + client := sdk.NewClientFromDB(db) + ctx := context.Background() + + id := helpers.DecodeSnowflakeID(d.Id()).(sdk.AccountObjectIdentifier) + + err := client.Warehouses.Drop(ctx, id, nil) + if err != nil { + return err + } + + return nil } diff --git a/pkg/resources/warehouse_acceptance_test.go b/pkg/resources/warehouse_acceptance_test.go index f2ea4c9b67..5e703ec118 100644 --- a/pkg/resources/warehouse_acceptance_test.go +++ b/pkg/resources/warehouse_acceptance_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" ) @@ -48,15 +49,22 @@ func TestAcc_Warehouse(t *testing.T) { resource.TestCheckResourceAttr("snowflake_warehouse.w", "name", prefix2), resource.TestCheckResourceAttr("snowflake_warehouse.w", "comment", "test comment 2"), resource.TestCheckResourceAttr("snowflake_warehouse.w", "auto_suspend", "60"), - resource.TestCheckResourceAttr("snowflake_warehouse.w", "warehouse_size", "Small"), + resource.TestCheckResourceAttr("snowflake_warehouse.w", "warehouse_size", string(sdk.WarehouseSizeSmall)), ), }, // IMPORT { - ResourceName: "snowflake_warehouse.w", - ImportState: true, - ImportStateVerify: true, - ImportStateVerifyIgnore: []string{"initially_suspended", "wait_for_provisioning", "query_acceleration_max_scale_factor"}, + ResourceName: "snowflake_warehouse.w", + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "initially_suspended", + "wait_for_provisioning", + "query_acceleration_max_scale_factor", + "max_concurrency_level", + "statement_queued_timeout_in_seconds", + "statement_timeout_in_seconds", + }, }, }, }) @@ -85,7 +93,7 @@ func wConfig2(prefix string) string { resource "snowflake_warehouse" "w" { name = "%s" comment = "test comment 2" - warehouse_size = "small" + warehouse_size = "SMALL" auto_suspend = 60 max_cluster_count = 1 diff --git a/pkg/resources/warehouse_test.go b/pkg/resources/warehouse_test.go deleted file mode 100644 index 478497f20d..0000000000 --- a/pkg/resources/warehouse_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package resources_test - -import ( - "database/sql" - "testing" - - sqlmock "github.com/DATA-DOG/go-sqlmock" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/provider" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/resources" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" - . "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/testhelpers" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "github.com/stretchr/testify/require" -) - -func TestWarehouse(t *testing.T) { - r := require.New(t) - err := resources.Warehouse().InternalValidate(provider.Provider().Schema, true) - r.NoError(err) -} - -func TestWarehouseCreate(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "tst-terraform-sfwh", - "comment": "great comment", - } - d := schema.TestResourceDataRaw(t, resources.Warehouse().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec(`CREATE WAREHOUSE "tst-terraform-sfwh" COMMENT='great comment`).WillReturnResult(sqlmock.NewResult(1, 1)) - expectReadWarehouse(mock) - err := resources.CreateWarehouse(d, db) - r.NoError(err) - }) -} - -func expectReadWarehouse(mock sqlmock.Sqlmock) { - rows := sqlmock.NewRows([]string{"name", "comment", "size"}).AddRow("tst-terraform-sfwh", "mock comment", "SMALL") - mock.ExpectQuery("SHOW WAREHOUSES LIKE 'tst-terraform-sfwh").WillReturnRows(rows) - - rows = sqlmock.NewRows( - []string{"key", "value", "default", "level", "description", "type"}, - ).AddRow("MAX_CONCURRENCY_LEVEL", 8, 8, "WAREHOUSE", "", "NUMBER") - mock.ExpectQuery("SHOW PARAMETERS IN WAREHOUSE \"tst-terraform-sfwh\"").WillReturnRows(rows) -} - -func TestWarehouseRead(t *testing.T) { - r := require.New(t) - - d := warehouse(t, "tst-terraform-sfwh", map[string]interface{}{"name": "tst-terraform-sfwh"}) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - expectReadWarehouse(mock) - err := resources.ReadWarehouse(d, db) - r.NoError(err) - r.Equal("mock comment", d.Get("comment").(string)) - - // Test when resource is not found, checking if state will be empty - r.NotEmpty(d.State()) - q := snowflake.NewWarehouseBuilder(d.Id()).Show() - mock.ExpectQuery(q).WillReturnError(sql.ErrNoRows) - err2 := resources.ReadWarehouse(d, db) - r.Empty(d.State()) - r.Nil(err2) - }) -} - -func TestWarehouseDelete(t *testing.T) { - r := require.New(t) - - d := warehouse(t, "tst-terraform-sfwh-dropit", map[string]interface{}{"name": "tst-terraform-sfwh-dropit"}) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec(`DROP WAREHOUSE "tst-terraform-sfwh-dropit"`).WillReturnResult(sqlmock.NewResult(1, 1)) - err := resources.DeleteWarehouse(d, db) - r.NoError(err) - }) -} diff --git a/pkg/sdk/README.md b/pkg/sdk/README.md new file mode 100644 index 0000000000..fce3f1d504 --- /dev/null +++ b/pkg/sdk/README.md @@ -0,0 +1,12 @@ +# Snowflake Go SDK + +## SQL clause types + +| ddl tag | function | output format | +| ------------------ | --------------------- | ----------------------------------------------------------------------------- | +| `ddl:"static"` | `sqlStaticClause` | `WORD` | +| `ddl:"keyword"` | `sqlKeywordClause` | `"WORD"` (quotes configurable) | +| `ddl:"identifier"` | `sqlIdentifierClause` | `"a.b.c"` or `OBJ_TYPE "a.b.c"` | +| `ddl:"parameter"` | `sqlParameterClause` | `PARAM = "value"` (quotes configurable) or `PARAM = 2` | +| `ddl:"command"` | `sqlCommandClause` | `CMD "val"` (quotes configurable) | +| `ddl:"list"` | `sqlListClause` | `WORD (, )` (WORD, parentheses, separator configurable) | diff --git a/pkg/sdk/client.go b/pkg/sdk/client.go index 20fa4a9c4e..e504eb2d2f 100644 --- a/pkg/sdk/client.go +++ b/pkg/sdk/client.go @@ -19,6 +19,7 @@ type ObjectType string const ( ObjectTypeMaskingPolicy ObjectType = "MASKING POLICY" ObjectTypePasswordPolicy ObjectType = "PASSWORD POLICY" + ObjectTypeWarehouse ObjectType = "WAREHOUSE" ) func (o ObjectType) String() string { diff --git a/pkg/sdk/helper_test.go b/pkg/sdk/helper_test.go index 4ba0b847d1..6bc534242b 100644 --- a/pkg/sdk/helper_test.go +++ b/pkg/sdk/helper_test.go @@ -125,12 +125,12 @@ func createWarehouse(t *testing.T, client *Client) (*Warehouse, func()) { return createWarehouseWithOptions(t, client, &WarehouseCreateOptions{}) } -func createWarehouseWithOptions(t *testing.T, client *Client, _ *WarehouseCreateOptions) (*Warehouse, func()) { +func createWarehouseWithOptions(t *testing.T, client *Client, opts *WarehouseCreateOptions) (*Warehouse, func()) { t.Helper() name := randomStringRange(t, 8, 28) id := NewAccountObjectIdentifier(name) ctx := context.Background() - err := client.Warehouses.Create(ctx, id, nil) + err := client.Warehouses.Create(ctx, id, opts) require.NoError(t, err) return &Warehouse{ Name: name, diff --git a/pkg/sdk/sql_builder.go b/pkg/sdk/sql_builder.go index b5df6b3851..bc30a3f663 100644 --- a/pkg/sdk/sql_builder.go +++ b/pkg/sdk/sql_builder.go @@ -147,21 +147,25 @@ func (b *sqlBuilder) parseStruct(s interface{}) ([]sqlClause, error) { for i := 0; i < value.Len(); i++ { v := value.Index(i).Interface() // test if v is an ObjectIdentifier. If it is it needs to be handled separately + objectIdentifer, ok := v.(ObjectIdentifier) - if ok { + switch { + case ok: listClauses = append(listClauses, sqlIdentifierClause{ value: objectIdentifer, }) - continue - } - structClauses, err := b.parseStruct(value.Index(i).Interface()) - if err != nil { - return nil, err + case value.Index(i).Kind() == reflect.String: + listClauses = append(listClauses, sqlStaticClause(value.Index(i).String())) + default: + structClauses, err := b.parseStruct(v) + if err != nil { + return nil, err + } + // each element of the slice needs to be pre-rendered before the commas are added + renderedStructClauses := b.sql(structClauses...) + sClause := sqlStaticClause(renderedStructClauses) + listClauses = append(listClauses, sClause) } - // each element of the slice needs to be pre-rendered before the commas are added - renderedStructClauses := b.sql(structClauses...) - sClause := sqlStaticClause(renderedStructClauses) - listClauses = append(listClauses, sClause) } if len(listClauses) < 1 { continue @@ -358,6 +362,10 @@ func (b *sqlBuilder) parseUnexportedField(field reflect.StructField, value refle return append(clauses, clause), nil } +type sqlClause interface { + String() string +} + type sqlListClause struct { keyword string clauses []sqlClause @@ -385,10 +393,6 @@ func (v sqlListClause) String() string { return s } -type sqlClause interface { - String() string -} - type sqlStaticClause string func (v sqlStaticClause) String() string { @@ -429,7 +433,7 @@ func (v sqlParameterClause) String() string { result = fmt.Sprintf("%s = ", v.key) } if vType.Kind() == reflect.String { - result += v.qt.Quote(v.value.(string)) + result += v.qt.Quote(v.value) } else { result += fmt.Sprintf("%v", v.value) } diff --git a/pkg/sdk/sql_builder_test.go b/pkg/sdk/sql_builder_test.go index c472508e5a..399e95e386 100644 --- a/pkg/sdk/sql_builder_test.go +++ b/pkg/sdk/sql_builder_test.go @@ -424,11 +424,14 @@ func TestBuilder_parseUnexportedField(t *testing.T) { }) } +type StringAlias string + type structTestHelper struct { static bool `ddl:"static" db:"EXAMPLE_STATIC"` name AccountObjectIdentifier `ddl:"identifier"` Param *string `ddl:"parameter" db:"EXAMPLE_PARAMETER"` Command *string `ddl:"command" db:"EXAMPLE_COMMAND"` + List []StringAlias `ddl:"list,no_parentheses" db:"EXAMPLE_STRING_LIST"` } func TestBuilder_parseStruct(t *testing.T) { @@ -446,14 +449,16 @@ func TestBuilder_parseStruct(t *testing.T) { name: randomAccountObjectIdentifier(t), Param: String("example"), Command: String("example"), + List: []StringAlias{"item1", "item2"}, } clauses, err := builder.parseStruct(s) assert.NoError(t, err) - assert.Len(t, clauses, 4) + assert.Len(t, clauses, 5) assert.Equal(t, "EXAMPLE_STATIC", clauses[0].String()) assert.Equal(t, s.name.FullyQualifiedName(), clauses[1].String()) assert.Equal(t, "EXAMPLE_PARAMETER = example", clauses[2].String()) assert.Equal(t, "EXAMPLE_COMMAND example", clauses[3].String()) + assert.Equal(t, "EXAMPLE_STRING_LIST item1,item2", clauses[4].String()) }) t.Run("struct with a slice field using ddl: list", func(t *testing.T) { diff --git a/pkg/sdk/type_helpers.go b/pkg/sdk/type_helpers.go index 222c3dbc45..48a079a01b 100644 --- a/pkg/sdk/type_helpers.go +++ b/pkg/sdk/type_helpers.go @@ -19,6 +19,16 @@ func Int(i int) *int { return &i } +// Uint returns a pointer to the given uint. +func Uint(i uint) *uint { + return &i +} + +// Uint returns a pointer to the given uint. +func Uint8(i uint8) *uint8 { + return &i +} + // toInt converts a string to an int. func toInt(s string) int { i, err := strconv.Atoi(s) diff --git a/pkg/sdk/validations.go b/pkg/sdk/validations.go index 6bbc4fb018..2f60a1f60e 100644 --- a/pkg/sdk/validations.go +++ b/pkg/sdk/validations.go @@ -1,6 +1,24 @@ package sdk +import ( + "fmt" +) + func IsValidDataType(v string) bool { dt := DataTypeFromString(v) return dt != DataTypeUnknown } + +func checkExclusivePointers(ptrs []interface{}) error { + count := 0 + for _, v := range ptrs { + // Types differ so we can't directly compare to `nil` + if fmt.Sprintf("%v", v) != "" { + count++ + } + } + if count != 1 { + return fmt.Errorf("%d values set", count) + } + return nil +} diff --git a/pkg/sdk/warehouse.go b/pkg/sdk/warehouse.go new file mode 100644 index 0000000000..a843f02bdd --- /dev/null +++ b/pkg/sdk/warehouse.go @@ -0,0 +1,476 @@ +package sdk + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +type Warehouses interface { + // Create creates a warehouse. + Create(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseCreateOptions) error + // Alter modifies an existing warehouse + Alter(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseAlterOptions) error + // Drop removes a warehouse. + Drop(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseDropOptions) error + // Show returns a list of warehouses. + Show(ctx context.Context, opts *WarehouseShowOptions) ([]*Warehouse, error) + // Describe returns the details of a warehouse. + Describe(ctx context.Context, id AccountObjectIdentifier) (*WarehouseDetails, error) +} + +var _ Warehouses = (*warehouses)(nil) + +type warehouses struct { + client *Client + builder *sqlBuilder +} + +type WarehouseType string + +var ( + WarehouseTypeStandard WarehouseType = "STANDARD" + WarehouseTypeSnowparkOptimized WarehouseType = "SNOWPARK-OPTIMIZED" +) + +type WarehouseSize string + +var ( + WarehouseSizeXSmall WarehouseSize = "XSMALL" + WarehouseSizeSmall WarehouseSize = "SMALL" + WarehouseSizeMedium WarehouseSize = "MEDIUM" + WarehouseSizeLarge WarehouseSize = "LARGE" + WarehouseSizeXLarge WarehouseSize = "XLARGE" + WarehouseSizeXXLarge WarehouseSize = "XXLARGE" + WarehouseSizeXXXLarge WarehouseSize = "XXXLARGE" + WarehouseSizeX4Large WarehouseSize = "X4LARGE" + WarehouseSizeX5Large WarehouseSize = "X5LARGE" + WarehouseSizeX6Large WarehouseSize = "X6LARGE" +) + +type ScalingPolicy string + +var ( + ScalingPolicyStandard ScalingPolicy = "STANDARD" + ScalingPolicyEconomy ScalingPolicy = "ECONOMY" +) + +type WarehouseCreateOptions struct { + create bool `ddl:"static" db:"CREATE"` //lint:ignore U1000 This is used in the ddl tag + OrReplace *bool `ddl:"keyword" db:"OR REPLACE"` + warehouse bool `ddl:"static" db:"WAREHOUSE"` //lint:ignore U1000 This is used in the ddl tag + IfNotExists *bool `ddl:"keyword" db:"IF NOT EXISTS"` + name AccountObjectIdentifier `ddl:"identifier"` + + // Object properties + WarehouseType *WarehouseType `ddl:"parameter,single_quotes" db:"WAREHOUSE_TYPE"` + WarehouseSize *WarehouseSize `ddl:"parameter,single_quotes" db:"WAREHOUSE_SIZE"` + MaxClusterCount *uint8 `ddl:"parameter" db:"MAX_CLUSTER_COUNT"` + MinClusterCount *uint8 `ddl:"parameter" db:"MIN_CLUSTER_COUNT"` + ScalingPolicy *ScalingPolicy `ddl:"parameter,single_quotes" db:"SCALING_POLICY"` + AutoSuspend *uint `ddl:"parameter" db:"AUTO_SUSPEND"` + AutoResume *bool `ddl:"parameter" db:"AUTO_RESUME"` + InitiallySuspended *bool `ddl:"parameter" db:"INITIALLY_SUSPENDED"` + ResourceMonitor *string `ddl:"parameter,double_quotes" db:"RESOURCE_MONITOR"` + Comment *string `ddl:"parameter,single_quotes" db:"COMMENT"` + EnableQueryAcceleration *bool `ddl:"parameter" db:"ENABLE_QUERY_ACCELERATION"` + QueryAccelerationMaxScaleFactor *uint8 `ddl:"parameter" db:"QUERY_ACCELERATION_MAX_SCALE_FACTOR"` + + // Object params + MaxConcurrencyLevel *uint `ddl:"parameter" db:"MAX_CONCURRENCY_LEVEL"` + StatementQueuedTimeoutInSeconds *uint `ddl:"parameter" db:"STATEMENT_QUEUED_TIMEOUT_IN_SECONDS"` + StatementTimeoutInSeconds *uint `ddl:"parameter" db:"STATEMENT_TIMEOUT_IN_SECONDS"` + Tags []TagAssociation `ddl:"list,parentheses" db:"TAG"` +} + +func (opts *WarehouseCreateOptions) validate() error { + if opts.MaxClusterCount != nil && ((*opts.MaxClusterCount < 1) || (10 < *opts.MaxClusterCount)) { + return fmt.Errorf("MaxClusterCount must be between 1 and 10") + } + if opts.MinClusterCount != nil && ((*opts.MinClusterCount < 1) || (10 < *opts.MinClusterCount)) { + return fmt.Errorf("MinClusterCount must be between 1 and 10") + } + if opts.MinClusterCount != nil && opts.MaxClusterCount != nil && *opts.MaxClusterCount < *opts.MinClusterCount { + return fmt.Errorf("MinClusterCount must be less than or equal to MaxClusterCount") + } + if opts.QueryAccelerationMaxScaleFactor != nil && 100 < *opts.QueryAccelerationMaxScaleFactor { + return fmt.Errorf("QueryAccelerationMaxScaleFactor must be less than or equal to 100") + } + return nil +} + +func (c *warehouses) Create(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseCreateOptions) error { + if opts == nil { + opts = &WarehouseCreateOptions{} + } + opts.name = id + if err := opts.validate(); err != nil { + return err + } + + clauses, err := c.builder.parseStruct(opts) + if err != nil { + return err + } + stmt := c.builder.sql(clauses...) + _, err = c.client.exec(ctx, stmt) + return err +} + +type WarehouseAlterOptions struct { + alter bool `ddl:"static" db:"ALTER"` //lint:ignore U1000 This is used in the ddl tag + warehouse bool `ddl:"static" db:"WAREHOUSE"` //lint:ignore U1000 This is used in the ddl tag + IfExists *bool `ddl:"keyword" db:"IF EXISTS"` + name AccountObjectIdentifier `ddl:"identifier"` + + Suspend *bool `ddl:"keyword" db:"SUSPEND"` + Resume *bool `ddl:"keyword" db:"RESUME"` + IfSuspended *bool `ddl:"keyword" db:"IF SUSPENDED"` + AbortAllQueries *bool `ddl:"keyword" db:"ABORT ALL QUERIES"` + NewName *AccountObjectIdentifier `ddl:"identifier" db:"RENAME TO"` + + Set *WarehouseSetOptions `ddl:"keyword" db:"SET"` + Unset *[]WarehouseUnsetField `ddl:"list,no_parentheses" db:"UNSET"` + + SetTags *[]TagAssociation `ddl:"list,no_parentheses" db:"SET TAG"` + UnsetTags *[]ObjectIdentifier `ddl:"list,no_parentheses" db:"UNSET TAG"` +} + +type WarehouseSetOptions struct { + // Object properties + WarehouseType *WarehouseType `ddl:"parameter,single_quotes" db:"WAREHOUSE_TYPE"` + WarehouseSize *WarehouseSize `ddl:"parameter,single_quotes" db:"WAREHOUSE_SIZE"` + WaitForCompletion *bool `ddl:"parameter" db:"WAIT_FOR_COMPLETION"` + MaxClusterCount *uint8 `ddl:"parameter" db:"MAX_CLUSTER_COUNT"` + MinClusterCount *uint8 `ddl:"parameter" db:"MIN_CLUSTER_COUNT"` + ScalingPolicy *ScalingPolicy `ddl:"parameter,single_quotes" db:"SCALING_POLICY"` + AutoSuspend *uint `ddl:"parameter" db:"AUTO_SUSPEND"` + AutoResume *bool `ddl:"parameter" db:"AUTO_RESUME"` + ResourceMonitor *string `ddl:"parameter,double_quotes" db:"RESOURCE_MONITOR"` + Comment *string `ddl:"parameter,single_quotes" db:"COMMENT"` + EnableQueryAcceleration *bool `ddl:"parameter" db:"ENABLE_QUERY_ACCELERATION"` + QueryAccelerationMaxScaleFactor *uint8 `ddl:"parameter" db:"QUERY_ACCELERATION_MAX_SCALE_FACTOR"` + + // Object params + MaxConcurrencyLevel *uint `ddl:"parameter" db:"MAX_CONCURRENCY_LEVEL"` + StatementQueuedTimeoutInSeconds *uint `ddl:"parameter" db:"STATEMENT_QUEUED_TIMEOUT_IN_SECONDS"` + StatementTimeoutInSeconds *uint `ddl:"parameter" db:"STATEMENT_TIMEOUT_IN_SECONDS"` +} + +type WarehouseUnsetField string + +const ( + WarehouseTypeField WarehouseUnsetField = "WAREHOUSE_TYPE" + WarehouseSizeField WarehouseUnsetField = "WAREHOUSE_SIZE" + WaitForCompletionField WarehouseUnsetField = "WAIT_FOR_COMPLETION" + MaxClusterCountField WarehouseUnsetField = "MAX_CLUSTER_COUNT" + MinClusterCountField WarehouseUnsetField = "MIN_CLUSTER_COUNT" + ScalingPolicyField WarehouseUnsetField = "SCALING_POLICY" + AutoSuspendField WarehouseUnsetField = "AUTO_SUSPEND" + AutoResumeField WarehouseUnsetField = "AUTO_RESUME" + ResourceMonitorField WarehouseUnsetField = "RESOURCE_MONITOR" + CommentField WarehouseUnsetField = "COMMENT" + EnableQueryAccelerationField WarehouseUnsetField = "ENABLE_QUERY_ACCELERATION" + QueryAccelerationMaxScaleFactorField WarehouseUnsetField = "QUERY_ACCELERATION_MAX_SCALE_FACTOR" + MaxConcurrencyLevelField WarehouseUnsetField = "MAX_CONCURRENCY_LEVEL" + StatementQueuedTimeoutInSecondsField WarehouseUnsetField = "STATEMENT_QUEUED_TIMEOUT_IN_SECONDS" + StatementTimeoutInSecondsField WarehouseUnsetField = "STATEMENT_TIMEOUT_IN_SECONDS" +) + +func (opts *WarehouseAlterOptions) validate() error { + if opts.name.FullyQualifiedName() == "" { + return fmt.Errorf("name must not be empty") + } + + exclusivePointers := []interface{}{ + opts.Suspend, + opts.Resume, + opts.AbortAllQueries, + opts.NewName, + opts.Set, + opts.Unset, + opts.SetTags, + opts.UnsetTags, + } + if err := checkExclusivePointers(exclusivePointers); err != nil { + return fmt.Errorf("exactly one of Suspend, Resume, AbortAllQueries, NewName, Set, Unset, SetTags and UnsetTags must be set: %w", err) + } + + if opts.Suspend != nil && *opts.Suspend && opts.Resume != nil && *opts.Resume { + return fmt.Errorf(`"Suspend" and "Resume" cannot both be true`) + } + if opts.IfSuspended != nil && *opts.IfSuspended && (opts.Resume == nil || !*opts.Resume) { + return fmt.Errorf(`"Resume" has to be set when using "IfSuspended"`) + } + if opts.Set != nil && opts.Unset != nil { + return fmt.Errorf("cannot set and unset parameters in the same ALTER statement") + } + return nil +} + +func (c *warehouses) Alter(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseAlterOptions) error { + if opts == nil { + opts = &WarehouseAlterOptions{} + } + opts.name = id + if err := opts.validate(); err != nil { + return err + } + clauses, err := c.builder.parseStruct(opts) + if err != nil { + return err + } + stmt := c.builder.sql(clauses...) + _, err = c.client.exec(ctx, stmt) + return err +} + +type WarehouseDropOptions struct { + drop bool `ddl:"static" db:"DROP"` //lint:ignore U1000 This is used in the ddl tag + warehouse bool `ddl:"static" db:"WAREHOUSE"` //lint:ignore U1000 This is used in the ddl tag + IfExists *bool `ddl:"keyword" db:"IF EXISTS"` + name AccountObjectIdentifier `ddl:"identifier"` +} + +func (opts *WarehouseDropOptions) validate() error { + if opts.name.FullyQualifiedName() == "" { + return errors.New("name must not be empty") + } + return nil +} + +func (c *warehouses) Drop(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseDropOptions) error { + if opts == nil { + opts = &WarehouseDropOptions{ + name: id, + } + } + opts.name = id + if err := opts.validate(); err != nil { + return err + } + clauses, err := c.builder.parseStruct(opts) + if err != nil { + return err + } + stmt := c.builder.sql(clauses...) + _, err = c.client.exec(ctx, stmt) + if err != nil { + return decodeDriverError(err) + } + return err +} + +type WarehouseShowOptions struct { + show bool `ddl:"static" db:"SHOW"` //lint:ignore U1000 This is used in the ddl tag + warehouses bool `ddl:"static" db:"WAREHOUSES"` //lint:ignore U1000 This is used in the ddl tag + Like *Like `ddl:"keyword" db:"LIKE"` +} + +func (opts *WarehouseShowOptions) validate() error { + return nil +} + +type Warehouse struct { + Name string + State string + Type WarehouseType + Size WarehouseSize + MinClusterCount uint8 + MaxClusterCount uint8 + StartedClusters uint + Running uint + Queued uint + IsDefault bool + IsCurrent bool + AutoSuspend uint + AutoResume bool + Available float64 + Provisioning float64 + Quiescing float64 + Other float64 + CreatedOn time.Time + ResumedOn time.Time + UpdatedOn time.Time + Owner string + Comment string + EnableQueryAcceleration bool + QueryAccelerationMaxScaleFactor uint8 + ResourceMonitor string + Actives string + Pendings string + Failed string + Suspended string + Uuid string + ScalingPolicy ScalingPolicy +} + +type warehouseDBRow struct { + Name string `db:"name"` + State string `db:"state"` + Type string `db:"type"` + Size string `db:"size"` + MinClusterCount uint8 `db:"min_cluster_count"` + MaxClusterCount uint8 `db:"max_cluster_count"` + StartedClusters uint `db:"started_clusters"` + Running uint `db:"running"` + Queued uint `db:"queued"` + IsDefault string `db:"is_default"` + IsCurrent string `db:"is_current"` + AutoSuspend sql.NullInt16 `db:"auto_suspend"` + AutoResume bool `db:"auto_resume"` + Available string `db:"available"` + Provisioning string `db:"provisioning"` + Quiescing string `db:"quiescing"` + Other string `db:"other"` + CreatedOn time.Time `db:"created_on"` + ResumedOn time.Time `db:"resumed_on"` + UpdatedOn time.Time `db:"updated_on"` + Owner string `db:"owner"` + Comment string `db:"comment"` + EnableQueryAcceleration bool `db:"enable_query_acceleration"` + QueryAccelerationMaxScaleFactor uint8 `db:"query_acceleration_max_scale_factor"` + ResourceMonitor string `db:"resource_monitor"` + Actives string `db:"actives"` + Pendings string `db:"pendings"` + Failed string `db:"failed"` + Suspended string `db:"suspended"` + Uuid string `db:"uuid"` + ScalingPolicy string `db:"scaling_policy"` +} + +func (row warehouseDBRow) toWarehouse() *Warehouse { + wh := &Warehouse{ + Name: row.Name, + State: row.State, + Type: WarehouseType(row.Type), + Size: WarehouseSize(strings.ReplaceAll(strings.ToUpper(row.Size), "-", "")), + MinClusterCount: row.MinClusterCount, + MaxClusterCount: row.MaxClusterCount, + StartedClusters: row.StartedClusters, + Running: row.Running, + Queued: row.Queued, + IsDefault: row.IsDefault == "Y", + IsCurrent: row.IsCurrent == "Y", + AutoResume: row.AutoResume, + CreatedOn: row.CreatedOn, + ResumedOn: row.ResumedOn, + UpdatedOn: row.UpdatedOn, + Owner: row.Owner, + Comment: row.Comment, + EnableQueryAcceleration: row.EnableQueryAcceleration, + QueryAccelerationMaxScaleFactor: row.QueryAccelerationMaxScaleFactor, + ResourceMonitor: row.ResourceMonitor, + Actives: row.Actives, + Pendings: row.Pendings, + Failed: row.Failed, + Suspended: row.Suspended, + Uuid: row.Uuid, + ScalingPolicy: ScalingPolicy(row.ScalingPolicy), + } + if val, err := strconv.ParseFloat(row.Available, 64); err != nil { + wh.Available = val + } + if val, err := strconv.ParseFloat(row.Provisioning, 64); err != nil { + wh.Provisioning = val + } + if val, err := strconv.ParseFloat(row.Quiescing, 64); err != nil { + wh.Quiescing = val + } + if val, err := strconv.ParseFloat(row.Other, 64); err != nil { + wh.Other = val + } + if row.AutoSuspend.Valid { + wh.AutoSuspend = uint(row.AutoSuspend.Int16) + } + return wh +} + +func (c *warehouses) Show(ctx context.Context, opts *WarehouseShowOptions) ([]*Warehouse, error) { + if opts == nil { + opts = &WarehouseShowOptions{} + } + if err := opts.validate(); err != nil { + return nil, err + } + clauses, err := c.builder.parseStruct(opts) + if err != nil { + return nil, err + } + stmt := c.builder.sql(clauses...) + dest := []warehouseDBRow{} + + err = c.client.query(ctx, &dest, stmt) + if err != nil { + return nil, decodeDriverError(err) + } + resultList := make([]*Warehouse, len(dest)) + for i, row := range dest { + resultList[i] = row.toWarehouse() + } + + return resultList, nil +} + +type warehouseDescribeOptions struct { + describe bool `ddl:"static" db:"DESCRIBE"` //lint:ignore U1000 This is used in the ddl tag + warehouse bool `ddl:"static" db:"WAREHOUSE"` //lint:ignore U1000 This is used in the ddl tag + name AccountObjectIdentifier `ddl:"identifier"` +} + +func (opts *warehouseDescribeOptions) validate() error { + if opts.name.FullyQualifiedName() == "" { + return fmt.Errorf("name is required") + } + return nil +} + +type warehouseDetailsRow struct { + CreatedOn time.Time `db:"created_on"` + Name string `db:"name"` + Kind string `db:"kind"` +} + +func (row *warehouseDetailsRow) toWarehouseDetails() *WarehouseDetails { + return &WarehouseDetails{ + CreatedOn: row.CreatedOn, + Name: row.Name, + Kind: row.Kind, + } +} + +type WarehouseDetails struct { + CreatedOn time.Time + Name string + Kind string +} + +func (c *warehouses) Describe(ctx context.Context, id AccountObjectIdentifier) (*WarehouseDetails, error) { + opts := &warehouseDescribeOptions{ + name: id, + } + if err := opts.validate(); err != nil { + return nil, err + } + + clauses, err := c.builder.parseStruct(opts) + if err != nil { + return nil, err + } + stmt := c.builder.sql(clauses...) + dest := warehouseDetailsRow{} + err = c.client.queryOne(ctx, &dest, stmt) + if err != nil { + return nil, decodeDriverError(err) + } + + return dest.toWarehouseDetails(), nil +} + +func (v *Warehouse) ID() AccountObjectIdentifier { + return NewAccountObjectIdentifier(v.Name) +} diff --git a/pkg/sdk/warehouse_integration_test.go b/pkg/sdk/warehouse_integration_test.go new file mode 100644 index 0000000000..00af5a39b3 --- /dev/null +++ b/pkg/sdk/warehouse_integration_test.go @@ -0,0 +1,451 @@ +package sdk + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInt_WarehousesShow(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + testWarehouse, warehouseCleanup := createWarehouseWithOptions(t, client, &WarehouseCreateOptions{ + WarehouseSize: &WarehouseSizeSmall, + }) + t.Cleanup(warehouseCleanup) + _, warehouse2Cleanup := createWarehouse(t, client) + t.Cleanup(warehouse2Cleanup) + + t.Run("show without options", func(t *testing.T) { + warehouses, err := client.Warehouses.Show(ctx, nil) + require.NoError(t, err) + assert.LessOrEqual(t, 2, len(warehouses)) + }) + + t.Run("show with options", func(t *testing.T) { + showOptions := &WarehouseShowOptions{ + Like: &Like{ + Pattern: &testWarehouse.Name, + }, + } + warehouses, err := client.Warehouses.Show(ctx, showOptions) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + assert.Equal(t, testWarehouse.Name, warehouses[0].Name) + assert.Equal(t, WarehouseSizeSmall, warehouses[0].Size) + }) + + t.Run("when searching a non-existent password policy", func(t *testing.T) { + showOptions := &WarehouseShowOptions{ + Like: &Like{ + Pattern: String("non-existent"), + }, + } + warehouses, err := client.Warehouses.Show(ctx, showOptions) + require.NoError(t, err) + assert.Equal(t, 0, len(warehouses)) + }) +} + +func TestInt_WarehouseCreate(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + database, dbCleanup := createDatabase(t, client) + t.Cleanup(dbCleanup) + schema, schemaCleanup := createSchema(t, client, database) + t.Cleanup(schemaCleanup) + tag, tagCleanup := createTag(t, client, database, schema) + t.Cleanup(tagCleanup) + + t.Run("test complete", func(t *testing.T) { + name := randomUUID(t) + id := NewAccountObjectIdentifier(name) + err := client.Warehouses.Create(ctx, id, &WarehouseCreateOptions{ + OrReplace: Bool(true), + WarehouseType: &WarehouseTypeStandard, + WarehouseSize: &WarehouseSizeSmall, + MaxClusterCount: Uint8(8), + MinClusterCount: Uint8(2), + ScalingPolicy: &ScalingPolicyEconomy, + AutoSuspend: Uint(1000), + AutoResume: Bool(true), + InitiallySuspended: Bool(false), + Comment: String("comment"), + EnableQueryAcceleration: Bool(true), + QueryAccelerationMaxScaleFactor: Uint8(90), + MaxConcurrencyLevel: Uint(10), + StatementQueuedTimeoutInSeconds: Uint(2000), + StatementTimeoutInSeconds: Uint(3000), + Tags: []TagAssociation{ + { + Name: tag.ID(), + Value: "myval", + }, + }, + }) + require.NoError(t, err) + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(name), + }, + }) + require.NoError(t, err) + require.Equal(t, 1, len(warehouses)) + result := warehouses[0] + assert.Equal(t, name, result.Name) + assert.Equal(t, WarehouseTypeStandard, result.Type) + assert.Equal(t, WarehouseSizeSmall, result.Size) + assert.Equal(t, uint8(8), result.MaxClusterCount) + assert.Equal(t, uint8(2), result.MinClusterCount) + assert.Equal(t, ScalingPolicyEconomy, result.ScalingPolicy) + assert.Equal(t, uint(1000), result.AutoSuspend) + assert.Equal(t, true, result.AutoResume) + assert.Contains(t, []string{"RESUMING", "STARTED"}, result.State) + assert.Equal(t, "comment", result.Comment) + assert.Equal(t, true, result.EnableQueryAcceleration) + assert.Equal(t, uint8(90), result.QueryAccelerationMaxScaleFactor) + + val, err := client.SystemFunctions.GetTag(ctx, tag.ID(), id, ObjectTypeWarehouse) + require.NoError(t, err) + require.Equal(t, "myval", val) + }) + + t.Run("test no options", func(t *testing.T) { + name := randomUUID(t) + id := NewAccountObjectIdentifier(name) + err := client.Warehouses.Create(ctx, id, nil) + require.NoError(t, err) + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(name), + }, + }) + require.NoError(t, err) + require.Equal(t, 1, len(warehouses)) + result := warehouses[0] + assert.Equal(t, name, result.Name) + assert.Equal(t, WarehouseTypeStandard, result.Type) + assert.Equal(t, WarehouseSizeXSmall, result.Size) + assert.Equal(t, uint8(1), result.MaxClusterCount) + assert.Equal(t, uint8(1), result.MinClusterCount) + assert.Equal(t, ScalingPolicyStandard, result.ScalingPolicy) + assert.Equal(t, uint(600), result.AutoSuspend) + assert.Equal(t, true, result.AutoResume) + assert.Contains(t, []string{"RESUMING", "STARTED"}, result.State) + assert.Equal(t, "", result.Comment) + assert.Equal(t, false, result.EnableQueryAcceleration) + assert.Equal(t, uint8(8), result.QueryAccelerationMaxScaleFactor) + }) +} + +func TestInt_WarehouseDescribe(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + warehouse, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + + t.Run("when warehouse exists", func(t *testing.T) { + result, err := client.Warehouses.Describe(ctx, warehouse.ID()) + require.NoError(t, err) + assert.Equal(t, warehouse.Name, result.Name) + assert.Equal(t, "WAREHOUSE", result.Kind) + assert.WithinDuration(t, time.Now(), result.CreatedOn, 5*time.Second) + }) + + t.Run("when warehouse does not exist", func(t *testing.T) { + id := NewAccountObjectIdentifier("does_not_exist") + _, err := client.Warehouses.Describe(ctx, id) + assert.ErrorIs(t, err, ErrObjectNotExistOrAuthorized) + }) +} + +func TestInt_WarehouseAlter(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + database, dbCleanup := createDatabase(t, client) + t.Cleanup(dbCleanup) + schema, schemaCleanup := createSchema(t, client, database) + t.Cleanup(schemaCleanup) + tag, tagCleanup := createTag(t, client, database, schema) + t.Cleanup(tagCleanup) + tag2, tagCleanup2 := createTag(t, client, database, schema) + t.Cleanup(tagCleanup2) + + t.Run("set", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + + alterOptions := &WarehouseAlterOptions{ + Set: &WarehouseSetOptions{ + WarehouseSize: &WarehouseSizeMedium, + AutoSuspend: Uint(1234), + EnableQueryAcceleration: Bool(true), + }, + } + err := client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + assert.Equal(t, 1, len(warehouses)) + result := warehouses[0] + require.NoError(t, err) + assert.Equal(t, WarehouseSizeMedium, result.Size) + assert.Equal(t, true, result.EnableQueryAcceleration) + assert.Equal(t, uint(1234), result.AutoSuspend) + }) + + t.Run("rename", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouse(t, client) + oldID := warehouse.ID() + t.Cleanup(warehouseCleanup) + + newName := randomUUID(t) + newID := NewAccountObjectIdentifier(newName) + alterOptions := &WarehouseAlterOptions{ + NewName: &newID, + } + err := client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + result, err := client.Warehouses.Describe(ctx, newID) + require.NoError(t, err) + assert.Equal(t, newName, result.Name) + + // rename back to original name so it can be cleaned up + alterOptions = &WarehouseAlterOptions{ + NewName: &oldID, + } + err = client.Warehouses.Alter(ctx, newID, alterOptions) + require.NoError(t, err) + }) + + t.Run("unset", func(t *testing.T) { + createOptions := &WarehouseCreateOptions{ + Comment: String("test comment"), + MaxClusterCount: Uint8(10), + } + warehouse, warehouseCleanup := createWarehouseWithOptions(t, client, createOptions) + t.Cleanup(warehouseCleanup) + id := warehouse.ID() + + alterOptions := &WarehouseAlterOptions{ + Unset: &[]WarehouseUnsetField{ + CommentField, + MaxClusterCountField, + }, + } + err := client.Warehouses.Alter(ctx, id, alterOptions) + require.NoError(t, err) + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + result := warehouses[0] + assert.Equal(t, warehouse.Name, result.Name) + assert.Equal(t, "", result.Comment) + assert.Equal(t, uint8(1), result.MaxClusterCount) + }) + + t.Run("suspend & resume", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + + alterOptions := &WarehouseAlterOptions{ + Suspend: Bool(true), + } + err := client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + result := warehouses[0] + assert.Contains(t, []string{"SUSPENDING", "SUSPENDED"}, result.State) + + alterOptions = &WarehouseAlterOptions{ + Resume: Bool(true), + } + err = client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + warehouses, err = client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + result = warehouses[0] + assert.Contains(t, []string{"RESUMING", "STARTED"}, result.State) + }) + + t.Run("resume without suspending", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + + alterOptions := &WarehouseAlterOptions{ + Resume: Bool(true), + IfSuspended: Bool(true), + } + err := client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + result := warehouses[0] + assert.Contains(t, []string{"STARTED", "RESUMING"}, result.State) + }) + + t.Run("abort all queries", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + + resetWarehouse := useWarehouse(t, client, warehouse.ID()) + t.Cleanup(resetWarehouse) + + // Start a long query + go client.exec(ctx, "CALL SYSTEM$WAIT(30);") //nolint:errcheck // we don't care if this eventually errors, as long as it runs for a little while + time.Sleep(5 * time.Second) + + // Check that query is running + warehouses, err := client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + result := warehouses[0] + assert.Equal(t, uint(1), result.Running) + assert.Equal(t, uint(0), result.Queued) + + // Abort all queries + alterOptions := &WarehouseAlterOptions{ + AbortAllQueries: Bool(true), + } + err = client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + + // Wait for abort to be effective + time.Sleep(5 * time.Second) + + // Check no query is running + warehouses, err = client.Warehouses.Show(ctx, &WarehouseShowOptions{ + Like: &Like{ + Pattern: String(warehouse.Name), + }, + }) + require.NoError(t, err) + assert.Equal(t, 1, len(warehouses)) + result = warehouses[0] + assert.Equal(t, uint(0), result.Running) + assert.Equal(t, uint(0), result.Queued) + }) + + t.Run("set tags", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + + alterOptions := &WarehouseAlterOptions{ + SetTags: &[]TagAssociation{ + { + Name: tag.ID(), + Value: "val", + }, + { + Name: tag2.ID(), + Value: "val2", + }, + }, + } + err := client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + + val, err := client.SystemFunctions.GetTag(ctx, tag.ID(), warehouse.ID(), ObjectTypeWarehouse) + require.NoError(t, err) + require.Equal(t, "val", val) + val, err = client.SystemFunctions.GetTag(ctx, tag2.ID(), warehouse.ID(), ObjectTypeWarehouse) + require.NoError(t, err) + require.Equal(t, "val2", val) + }) + + t.Run("unset tags", func(t *testing.T) { + warehouse, warehouseCleanup := createWarehouseWithOptions(t, client, &WarehouseCreateOptions{ + Tags: []TagAssociation{ + { + Name: tag.ID(), + Value: "value", + }, + { + Name: tag2.ID(), + Value: "value2", + }, + }, + }) + t.Cleanup(warehouseCleanup) + + alterOptions := &WarehouseAlterOptions{ + UnsetTags: &[]ObjectIdentifier{ + tag.ID(), + tag2.ID(), + }, + } + err := client.Warehouses.Alter(ctx, warehouse.ID(), alterOptions) + require.NoError(t, err) + + val, err := client.SystemFunctions.GetTag(ctx, tag.ID(), warehouse.ID(), ObjectTypeWarehouse) + require.Error(t, err) + require.Equal(t, "", val) + val, err = client.SystemFunctions.GetTag(ctx, tag2.ID(), warehouse.ID(), ObjectTypeWarehouse) + require.Error(t, err) + require.Equal(t, "", val) + }) +} + +func TestInt_WarehouseDrop(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + t.Run("when warehouse exists", func(t *testing.T) { + warehouse, _ := createWarehouse(t, client) + + err := client.Warehouses.Drop(ctx, warehouse.ID(), nil) + require.NoError(t, err) + _, err = client.Warehouses.Describe(ctx, warehouse.ID()) + assert.ErrorIs(t, err, ErrObjectNotExistOrAuthorized) + }) + + t.Run("when warehouse does not exist", func(t *testing.T) { + id := NewAccountObjectIdentifier("does_not_exist") + err := client.Warehouses.Drop(ctx, id, nil) + assert.ErrorIs(t, err, ErrObjectNotExistOrAuthorized) + }) + + t.Run("when warehouse exists and if exists is true", func(t *testing.T) { + warehouse, _ := createWarehouse(t, client) + + dropOptions := &WarehouseDropOptions{IfExists: Bool(true)} + err := client.Warehouses.Drop(ctx, warehouse.ID(), dropOptions) + require.NoError(t, err) + _, err = client.Warehouses.Describe(ctx, warehouse.ID()) + assert.ErrorIs(t, err, ErrObjectNotExistOrAuthorized) + }) +} diff --git a/pkg/sdk/warehouse_test.go b/pkg/sdk/warehouse_test.go new file mode 100644 index 0000000000..a1e84098ef --- /dev/null +++ b/pkg/sdk/warehouse_test.go @@ -0,0 +1,287 @@ +package sdk + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" +) + +func TestWarehouseCreate(t *testing.T) { + builder := testBuilder(t) + + t.Run("only name", func(t *testing.T) { + opts := &WarehouseCreateOptions{ + name: AccountObjectIdentifier{ + name: "mywarehouse", + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `CREATE WAREHOUSE "mywarehouse"`, + builder.sql(clauses...), + ) + }) + + t.Run("with complete options", func(t *testing.T) { + opts := &WarehouseCreateOptions{ + OrReplace: Bool(true), + name: NewAccountObjectIdentifier("completewarehouse"), + IfNotExists: Bool(true), + + WarehouseType: &WarehouseTypeStandard, + WarehouseSize: &WarehouseSizeX4Large, + MaxClusterCount: Uint8(8), + MinClusterCount: Uint8(3), + ScalingPolicy: &ScalingPolicyEconomy, + AutoSuspend: Uint(1000), + AutoResume: Bool(true), + InitiallySuspended: Bool(false), + ResourceMonitor: String("myresmon"), + Comment: String("hello"), + EnableQueryAcceleration: Bool(true), + QueryAccelerationMaxScaleFactor: Uint8(62), + + MaxConcurrencyLevel: Uint(7), + StatementQueuedTimeoutInSeconds: Uint(29), + StatementTimeoutInSeconds: Uint(89), + Tags: []TagAssociation{ + { + Name: NewSchemaObjectIdentifier("db", "schema", "tag1"), + Value: "v1", + }, + { + Name: NewSchemaObjectIdentifier("db2", "schema2", "tag2"), + Value: "v2", + }, + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `CREATE OR REPLACE WAREHOUSE IF NOT EXISTS "completewarehouse" WAREHOUSE_TYPE = 'STANDARD' WAREHOUSE_SIZE = 'X4LARGE' MAX_CLUSTER_COUNT = 8 MIN_CLUSTER_COUNT = 3 SCALING_POLICY = 'ECONOMY' AUTO_SUSPEND = 1000 AUTO_RESUME = true INITIALLY_SUSPENDED = false RESOURCE_MONITOR = "myresmon" COMMENT = 'hello' ENABLE_QUERY_ACCELERATION = true QUERY_ACCELERATION_MAX_SCALE_FACTOR = 62 MAX_CONCURRENCY_LEVEL = 7 STATEMENT_QUEUED_TIMEOUT_IN_SECONDS = 29 STATEMENT_TIMEOUT_IN_SECONDS = 89 TAG ("db"."schema"."tag1" = 'v1',"db2"."schema2"."tag2" = 'v2')`, + builder.sql(clauses...), + ) + }) +} + +func TestWarehouseAlter(t *testing.T) { + builder := testBuilder(t) + // id := randomSchemaObjectIdentifier(t) + + t.Run("with set", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + Set: &WarehouseSetOptions{ + WarehouseType: &WarehouseTypeSnowparkOptimized, + WaitForCompletion: Bool(false), + MinClusterCount: Uint8(4), + AutoSuspend: Uint(200), + ResourceMonitor: String("resmon"), + EnableQueryAcceleration: Bool(false), + StatementQueuedTimeoutInSeconds: Uint(1200), + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" SET WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED' WAIT_FOR_COMPLETION = false MIN_CLUSTER_COUNT = 4 AUTO_SUSPEND = 200 RESOURCE_MONITOR = "resmon" ENABLE_QUERY_ACCELERATION = false STATEMENT_QUEUED_TIMEOUT_IN_SECONDS = 1200`, + builder.sql(clauses...), + ) + }) + + t.Run("with unset", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + Unset: &[]WarehouseUnsetField{ + WarehouseSizeField, + MaxClusterCountField, + AutoResumeField, + // Tag: []ObjectIdentifier{ + // NewSchemaObjectIdentifier("db1", "schema1", "tag1"), + // NewSchemaObjectIdentifier("db2", "schema2", "tag2"), + // }, + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" UNSET WAREHOUSE_SIZE,MAX_CLUSTER_COUNT,AUTO_RESUME`, + builder.sql(clauses...), + ) + }) + + t.Run("rename", func(t *testing.T) { + newname := NewAccountObjectIdentifier("newname") + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("oldname"), + NewName: &newname, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + + assert.Equal(t, + `ALTER WAREHOUSE "oldname" RENAME TO "newname"`, + builder.sql(clauses...), + ) + }) + + t.Run("suspend", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + Suspend: Bool(true), + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" SUSPEND`, + builder.sql(clauses...), + ) + }) + + t.Run("resume", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + Resume: Bool(true), + IfSuspended: Bool(true), + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" RESUME IF SUSPENDED`, + builder.sql(clauses...), + ) + }) + + t.Run("abort all queries", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + AbortAllQueries: Bool(true), + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" ABORT ALL QUERIES`, + builder.sql(clauses...), + ) + }) + + t.Run("with set tag", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + SetTags: &[]TagAssociation{ + { + Name: NewSchemaObjectIdentifier("db1", "schema1", "tag1"), + Value: "v1", + }, + { + Name: NewSchemaObjectIdentifier("db2", "schema2", "tag2"), + Value: "v2", + }, + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" SET TAG "db1"."schema1"."tag1" = 'v1',"db2"."schema2"."tag2" = 'v2'`, + builder.sql(clauses...), + ) + }) + + t.Run("with unset tag", func(t *testing.T) { + opts := &WarehouseAlterOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + UnsetTags: &[]ObjectIdentifier{ + NewSchemaObjectIdentifier("db1", "schema1", "tag1"), + NewSchemaObjectIdentifier("db2", "schema2", "tag2"), + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `ALTER WAREHOUSE "mywarehouse" UNSET TAG "db1"."schema1"."tag1","db2"."schema2"."tag2"`, + builder.sql(clauses...), + ) + }) +} + +func TestWarehouseDrop(t *testing.T) { + builder := testBuilder(t) + // id := randomSchemaObjectIdentifier(t) + + t.Run("only name", func(t *testing.T) { + opts := &WarehouseDropOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + + assert.Equal(t, + `DROP WAREHOUSE "mywarehouse"`, + builder.sql(clauses...), + ) + }) + + t.Run("with if exists", func(t *testing.T) { + opts := &WarehouseDropOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + IfExists: Bool(true), + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + + assert.Equal(t, + `DROP WAREHOUSE IF EXISTS "mywarehouse"`, + builder.sql(clauses...), + ) + }) +} + +func TestWarehouseShow(t *testing.T) { + builder := testBuilder(t) + + t.Run("empty options", func(t *testing.T) { + opts := &WarehouseShowOptions{} + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + "SHOW WAREHOUSES", + builder.sql(clauses...), + ) + }) + + t.Run("with like", func(t *testing.T) { + opts := &WarehouseShowOptions{ + Like: &Like{ + Pattern: String("mywarehouse"), + }, + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + "SHOW WAREHOUSES LIKE 'mywarehouse'", + builder.sql(clauses...), + ) + }) +} + +func TestWarehouseDescribe(t *testing.T) { + builder := testBuilder(t) + + t.Run("only name", func(t *testing.T) { + opts := &warehouseDescribeOptions{ + name: NewAccountObjectIdentifier("mywarehouse"), + } + clauses, err := builder.parseStruct(opts) + require.NoError(t, err) + assert.Equal(t, + `DESCRIBE WAREHOUSE "mywarehouse"`, + builder.sql(clauses...), + ) + }) +} diff --git a/pkg/sdk/warehouses.go b/pkg/sdk/warehouses.go deleted file mode 100644 index a61627148b..0000000000 --- a/pkg/sdk/warehouses.go +++ /dev/null @@ -1,82 +0,0 @@ -package sdk - -import ( - "context" - "fmt" -) - -type Warehouses interface { - // Create creates a warehouse. - Create(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseCreateOptions) error - // Alter modifies an existing warehouse - Alter(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseAlterOptions) error - // Drop removes a warehouse. - Drop(ctx context.Context, id AccountObjectIdentifier, opts *WarehouseDropOptions) error - // Show returns a list of warehouses. - Show(ctx context.Context, opts *WarehouseShowOptions) ([]*Warehouse, error) - // Describe returns the details of a warehouse. - Describe(ctx context.Context, id AccountObjectIdentifier) (*WarehouseDetails, error) -} - -var _ Warehouses = (*warehouses)(nil) - -type warehouses struct { - client *Client - builder *sqlBuilder -} - -type Warehouse struct { - Name string -} - -// placeholder for the real implementation. -type WarehouseCreateOptions struct{} - -func (c *warehouses) Create(ctx context.Context, id AccountObjectIdentifier, _ *WarehouseCreateOptions) error { - sql := fmt.Sprintf(`CREATE WAREHOUSE %s`, id.FullyQualifiedName()) - _, err := c.client.exec(ctx, sql) - return err -} - -// placeholder for the real implementation. -type WarehouseAlterOptions struct{} - -func (c *warehouses) Alter(ctx context.Context, id AccountObjectIdentifier, _ *WarehouseAlterOptions) error { - sql := fmt.Sprintf(`ALTER WAREHOUSE %s`, id.FullyQualifiedName()) - _, err := c.client.exec(ctx, sql) - return err -} - -// placeholder for the real implementation. -type WarehouseDropOptions struct{} - -func (c *warehouses) Drop(ctx context.Context, id AccountObjectIdentifier, _ *WarehouseDropOptions) error { - sql := fmt.Sprintf(`DROP WAREHOUSE %s`, id.FullyQualifiedName()) - _, err := c.client.exec(ctx, sql) - return err -} - -// placeholder for the real implementation. -type WarehouseShowOptions struct{} - -func (c *warehouses) Show(ctx context.Context, _ *WarehouseShowOptions) ([]*Warehouse, error) { - sql := `SHOW WAREHOUSES` - var warehouses []*Warehouse - err := c.client.query(ctx, &warehouses, sql) - return warehouses, err -} - -type WarehouseDetails struct { - Name string -} - -func (c *warehouses) Describe(ctx context.Context, id AccountObjectIdentifier) (*WarehouseDetails, error) { - sql := fmt.Sprintf(`DESCRIBE WAREHOUSE %s`, id.FullyQualifiedName()) - var details WarehouseDetails - err := c.client.queryOne(ctx, &details, sql) - return &details, err -} - -func (v *Warehouse) ID() AccountObjectIdentifier { - return NewAccountObjectIdentifier(v.Name) -} diff --git a/pkg/snowflake/warehouse.go b/pkg/snowflake/warehouse.go deleted file mode 100644 index 60a1a3e957..0000000000 --- a/pkg/snowflake/warehouse.go +++ /dev/null @@ -1,139 +0,0 @@ -package snowflake - -import ( - "database/sql" - "errors" - "fmt" - "log" - "time" - - "github.com/jmoiron/sqlx" -) - -type WarehouseBuilder struct { - *Builder -} - -func (wb *WarehouseBuilder) Show() string { - return wb.Builder.Show() -} - -func (wb *WarehouseBuilder) Describe() string { - return wb.Builder.Describe() -} - -func (wb *WarehouseBuilder) Drop() string { - return wb.Builder.Drop() -} - -func (wb *WarehouseBuilder) Rename(newName string) string { - return wb.Builder.Rename(newName) -} - -func (wb *WarehouseBuilder) Alter() *AlterPropertiesBuilder { - return wb.Builder.Alter() -} - -func (wb *WarehouseBuilder) Create() *CreateBuilder { - return wb.Builder.Create() -} - -// ShowParameters returns the query to show the parameters for the warehouse. -func (wb *WarehouseBuilder) ShowParameters() string { - return fmt.Sprintf("SHOW PARAMETERS IN WAREHOUSE %q", wb.Builder.name) -} - -func NewWarehouseBuilder(name string) *WarehouseBuilder { - return &WarehouseBuilder{ - &Builder{ - name: name, - entityType: WarehouseType, - }, - } -} - -// warehouse is a go representation of a grant that can be used in conjunction -// with github.com/jmoiron/sqlx. -type Warehouse struct { - Name string `db:"name"` - State string `db:"state"` - Type string `db:"type"` - Size string `db:"size"` - MinClusterCount int64 `db:"min_cluster_count"` - MaxClusterCount int64 `db:"max_cluster_count"` - StartedClusters int64 `db:"started_clusters"` - Running int64 `db:"running"` - Queued int64 `db:"queued"` - IsDefault string `db:"is_default"` - IsCurrent string `db:"is_current"` - AutoSuspend sql.NullInt64 `db:"auto_suspend"` - AutoResume bool `db:"auto_resume"` - Available string `db:"available"` - Provisioning string `db:"provisioning"` - Quiescing string `db:"quiescing"` - Other string `db:"other"` - CreatedOn time.Time `db:"created_on"` - ResumedOn time.Time `db:"resumed_on"` - UpdatedOn time.Time `db:"updated_on"` - Owner string `db:"owner"` - Comment string `db:"comment"` - EnableQueryAcceleration bool `db:"enable_query_acceleration"` - QueryAccelerationMaxScaleFactor int `db:"query_acceleration_max_scale_factor"` - ResourceMonitor string `db:"resource_monitor"` - Actives int64 `db:"actives"` - Pendings int64 `db:"pendings"` - Failed int64 `db:"failed"` - Suspended int64 `db:"suspended"` - UUID string `db:"uuid"` - ScalingPolicy string `db:"scaling_policy"` - WarehouseType string `db:"warehouse_type"` -} - -// warehouseParams struct to represent a row of parameters. -type WarehouseParams struct { - Key string `db:"key"` - Value string `db:"value"` - DefaultValue string `db:"default"` - Level string `db:"level"` - Description string `db:"description"` - Type string `db:"type"` -} - -func ScanWarehouse(row *sqlx.Row) (*Warehouse, error) { - w := &Warehouse{} - err := row.StructScan(w) - return w, err -} - -// ScanWarehouseParameters takes a database row and converts it to a warehouse parameter pointer. -func ScanWarehouseParameters(rows *sqlx.Rows) ([]*WarehouseParams, error) { - params := []*WarehouseParams{} - - for rows.Next() { - w := &WarehouseParams{} - if err := rows.StructScan(w); err != nil { - return nil, err - } - params = append(params, w) - } - return params, nil -} - -func ListWarehouses(db *sql.DB) ([]Warehouse, error) { - stmt := "SHOW WAREHOUSES" - rows, err := Query(db, stmt) - if err != nil { - return nil, err - } - defer rows.Close() - - dbs := []Warehouse{} - if err := sqlx.StructScan(rows, &dbs); err != nil { - if errors.Is(err, sql.ErrNoRows) { - log.Println("[DEBUG] no warehouses found") - return nil, fmt.Errorf("unable to scan row for %s err = %w", stmt, err) - } - return nil, fmt.Errorf("unable to scan %s err = %w", stmt, err) - } - return dbs, nil -}