From da4a3289db82f4e6affed343ee10b46d48ad82cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Thu, 25 Jan 2024 14:57:33 +0100 Subject: [PATCH 1/4] wip --- pkg/datasources/stages.go | 71 ++++++++------ pkg/resources/stage.go | 202 +++++++++++++------------------------- 2 files changed, 108 insertions(+), 165 deletions(-) diff --git a/pkg/datasources/stages.go b/pkg/datasources/stages.go index 249149e64d..e10ec8074b 100644 --- a/pkg/datasources/stages.go +++ b/pkg/datasources/stages.go @@ -1,12 +1,12 @@ package datasources import ( + "context" "database/sql" - "errors" "fmt" - "log" - - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" ) @@ -56,42 +56,55 @@ var stagesSchema = map[string]*schema.Schema{ func Stages() *schema.Resource { return &schema.Resource{ - Read: ReadStages, - Schema: stagesSchema, + ReadContext: ReadStages, + Schema: stagesSchema, } } -func ReadStages(d *schema.ResourceData, meta interface{}) error { +func ReadStages(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) databaseName := d.Get("database").(string) schemaName := d.Get("schema").(string) - currentStages, err := snowflake.ListStages(databaseName, schemaName, db) - if errors.Is(err, sql.ErrNoRows) { - // If not found, mark resource to be removed from state file during apply or refresh - log.Printf("[DEBUG] stages in schema (%s) not found", d.Id()) - d.SetId("") - return nil - } else if err != nil { - log.Printf("[DEBUG] unable to parse stages in schema (%s)", d.Id()) + client := sdk.NewClientFromDB(db) + stages, err := client.Stages.Show(ctx, sdk.NewShowStageRequest().WithIn( + &sdk.In{ + Schema: sdk.NewDatabaseObjectIdentifier(databaseName, schemaName), + }, + )) + if err != nil { d.SetId("") - return nil + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Warning, + Summary: "Failed to query stages", + Detail: fmt.Sprintf("DatabaseName: %s, SchemaName: %s, Err: %s", databaseName, schemaName, err), + }, + } } - stages := []map[string]interface{}{} - - for _, stage := range currentStages { - stageMap := map[string]interface{}{} - - stageMap["name"] = stage.Name - stageMap["database"] = stage.DatabaseName - stageMap["schema"] = stage.SchemaName - stageMap["comment"] = stage.Comment - stageMap["storage_integration"] = stage.StorageIntegration + stagesList := make([]map[string]any, len(stages)) + for i, stage := range stages { + stagesList[i] = map[string]any{ + "name": stage.Name, + "database": stage.DatabaseName, + "schema": stage.SchemaName, + "comment": stage.Comment, + "storage_integration": stage.StorageIntegration, + } + } - stages = append(stages, stageMap) + if err := d.Set("stages", stagesList); err != nil { + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Error, + Summary: "Failed to set stages", + Detail: fmt.Sprintf("Err: %s", err), + }, + } } - d.SetId(fmt.Sprintf(`%v|%v`, databaseName, schemaName)) - return d.Set("stages", stages) + d.SetId(helpers.EncodeSnowflakeID(databaseName, schemaName)) + + return nil } diff --git a/pkg/resources/stage.go b/pkg/resources/stage.go index 752c3de862..1010a25ab4 100644 --- a/pkg/resources/stage.go +++ b/pkg/resources/stage.go @@ -1,21 +1,14 @@ package resources import ( - "bytes" + "context" "database/sql" - "encoding/csv" - "errors" "fmt" - "log" - "strings" - + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "github.com/snowflakedb/gosnowflake" -) - -const ( - stageIDDelimiter = '|' ) var stageSchema = map[string]*schema.Schema{ @@ -92,58 +85,12 @@ var stageSchema = map[string]*schema.Schema{ "tag": tagReferenceSchema, } -type stageID struct { - DatabaseName string - SchemaName string - StageName string -} - -// String() takes in a stageID object and returns a pipe-delimited string: -// DatabaseName|SchemaName|StageName. -func (si *stageID) String() (string, error) { - var buf bytes.Buffer - csvWriter := csv.NewWriter(&buf) - csvWriter.Comma = stageIDDelimiter - dataIdentifiers := [][]string{{si.DatabaseName, si.SchemaName, si.StageName}} - if err := csvWriter.WriteAll(dataIdentifiers); err != nil { - return "", err - } - strStageID := strings.TrimSpace(buf.String()) - return strStageID, nil -} - -// stageIDFromString() takes in a pipe-delimited string: DatabaseName|SchemaName|StageName -// and returns a stageID object. -func stageIDFromString(stringID string) (*stageID, error) { - reader := csv.NewReader(strings.NewReader(stringID)) - reader.Comma = stageIDDelimiter - lines, err := reader.ReadAll() - if err != nil { - return nil, fmt.Errorf("not CSV compatible") - } - - if len(lines) != 1 { - return nil, fmt.Errorf("1 line per stage") - } - if len(lines[0]) != 3 { - return nil, fmt.Errorf("3 fields allowed") - } - - stageResult := &stageID{ - DatabaseName: lines[0][0], - SchemaName: lines[0][1], - StageName: lines[0][2], - } - return stageResult, nil -} - -// Stage returns a pointer to the resource representing a stage. func Stage() *schema.Resource { return &schema.Resource{ - Create: CreateStage, - Read: ReadStage, - Update: UpdateStage, - Delete: DeleteStage, + CreateContext: CreateStage, + ReadContext: ReadStage, + UpdateContext: UpdateStage, + DeleteContext: DeleteStage, Schema: stageSchema, Importer: &schema.ResourceImporter{ @@ -152,8 +99,7 @@ func Stage() *schema.Resource { } } -// CreateStage implements schema.CreateFunc. -func CreateStage(d *schema.ResourceData, meta interface{}) error { +func CreateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) name := d.Get("name").(string) database := d.Get("database").(string) @@ -202,111 +148,91 @@ func CreateStage(d *schema.ResourceData, meta interface{}) error { q := builder.Create() if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error creating stage %v", name) + return diag.Errorf("error creating stage %v", name) } - stageID := &stageID{ - DatabaseName: database, - SchemaName: schema, - StageName: name, - } - dataIDInput, err := stageID.String() - if err != nil { - return err - } - d.SetId(dataIDInput) + d.SetId(helpers.EncodeSnowflakeID(database, schema, name)) - return ReadStage(d, meta) + return ReadStage(ctx, d, meta) } -// ReadStage implements schema.ReadFunc -// credentials and encryption are omitted, they cannot be read via SHOW or DESCRIBE. -func ReadStage(d *schema.ResourceData, meta interface{}) error { +func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) - stageID, err := stageIDFromString(d.Id()) - if err != nil { - return err - } + id := helpers.DecodeSnowflakeID(d.Id()).(sdk.SchemaObjectIdentifier) + client := sdk.NewClientFromDB(db) - dbName := stageID.DatabaseName - schema := stageID.SchemaName - stage := stageID.StageName - - q := snowflake.NewStageBuilder(stage, dbName, schema).Describe() - stageDesc, err := snowflake.DescStage(db, q) - if errors.Is(err, sql.ErrNoRows) { - // If not found, mark resource to be removed from state file during apply or refresh - log.Printf("[DEBUG] stage (%s) not found", d.Id()) + properties, err := client.Stages.Describe(ctx, id) + if err != nil { d.SetId("") - return nil - } - - if driverErr, ok := err.(*gosnowflake.SnowflakeError); ok { //nolint:errorlint // todo: should be fixed - // 002003 (02000): SQL compilation error: - // 'XXX' does not exist or not authorized. - if driverErr.Number == 2003 { - log.Printf("[DEBUG] stage (%s) not found", d.Id()) - d.SetId("") - return nil + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Error, + Summary: "Failed to describe stage", + Detail: fmt.Sprintf("Id: %s, Err: %s", d.Id(), err), + }, } } - sq := snowflake.NewStageBuilder(stage, dbName, schema).Show() - row := snowflake.QueryRow(db, sq) - - s, err := snowflake.ScanStageShow(row) + stage, err := client.Stages.ShowByID(ctx, id) if err != nil { - return err + d.SetId("") + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Error, + Summary: "Failed to show stage by id", + Detail: fmt.Sprintf("Id: %s, Err: %s", d.Id(), err), + }, + } } - if err := d.Set("name", s.Name); err != nil { - return err + if err := d.Set("name", stage.Name); err != nil { + return diag.FromErr(err) } - if err := d.Set("database", s.DatabaseName); err != nil { - return err + if err := d.Set("database", stage.DatabaseName); err != nil { + return diag.FromErr(err) } - if err := d.Set("schema", s.SchemaName); err != nil { - return err + if err := d.Set("schema", stage.SchemaName); err != nil { + return diag.FromErr(err) } if err := d.Set("url", stageDesc.URL); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("file_format", stageDesc.FileFormat); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("copy_options", stageDesc.CopyOptions); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("directory", stageDesc.Directory); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("storage_integration", s.StorageIntegration); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("comment", s.Comment); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("aws_external_id", stageDesc.AwsExternalID); err != nil { - return err + return diag.FromErr(err) } if err := d.Set("snowflake_iam_user", stageDesc.SnowflakeIamUser); err != nil { - return err + return diag.FromErr(err) } + return nil } -// UpdateStage implements schema.UpdateFunc. -func UpdateStage(d *schema.ResourceData, meta interface{}) error { +func UpdateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { stageID, err := stageIDFromString(d.Id()) if err != nil { return err @@ -387,27 +313,31 @@ func UpdateStage(d *schema.ResourceData, meta interface{}) error { return tagChangeErr } - return ReadStage(d, meta) + return ReadStage(ctx, d, meta) } -// DeleteStage implements schema.DeleteFunc. -func DeleteStage(d *schema.ResourceData, meta interface{}) error { +func DeleteStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) - stageID, err := stageIDFromString(d.Id()) - if err != nil { - return err - } + id := helpers.DecodeSnowflakeID(d.Id()).(sdk.SchemaObjectIdentifier) + client := sdk.NewClientFromDB(db) - dbName := stageID.DatabaseName - schema := stageID.SchemaName - stage := stageID.StageName - - q := snowflake.NewStageBuilder(stage, dbName, schema).Drop() - if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error deleting stage %v err = %w", d.Id(), err) + err := client.Stages.Drop(ctx, sdk.NewDropStageRequest(id)) + if err != nil { + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Error, + Summary: "Failed to drop stage", + Detail: fmt.Sprintf("Id: %s, Err: %s", d.Id(), err), + }, + } } d.SetId("") return nil } + +func findStagePropertyValue(properties []sdk.StageProperty, name string) string { + + return "" +} From 8e8c6a95b91abc213b9d99d6050600d34832d986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Thu, 25 Jan 2024 15:28:06 +0100 Subject: [PATCH 2/4] wip --- pkg/resources/stage.go | 94 ++++++++++++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 30 deletions(-) diff --git a/pkg/resources/stage.go b/pkg/resources/stage.go index 1010a25ab4..8fd193608e 100644 --- a/pkg/resources/stage.go +++ b/pkg/resources/stage.go @@ -9,6 +9,7 @@ import ( "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "strings" ) var stageSchema = map[string]*schema.Schema{ @@ -99,6 +100,8 @@ func Stage() *schema.Resource { } } +// TODO: Document why snowflake package is used here instead of sdk +// TODO: Add acceptance tests func CreateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) name := d.Get("name").(string) @@ -107,7 +110,6 @@ func CreateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Dia builder := snowflake.NewStageBuilder(name, database, schema) - // Set optionals if v, ok := d.GetOk("url"); ok { builder.WithURL(v.(string)) } @@ -197,35 +199,53 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn return diag.FromErr(err) } - if err := d.Set("url", stageDesc.URL); err != nil { + if err := d.Set("url", strings.Trim(findStagePropertyValue(properties, "URL"), "[\"]")); err != nil { return diag.FromErr(err) } - if err := d.Set("file_format", stageDesc.FileFormat); err != nil { + fileFormat := make([]string, 0) + for _, property := range properties { + if property.Parent == "STAGE_FILE_FORMAT" && property.Value != property.Default { + fileFormat = append(fileFormat, fmt.Sprintf("%s = %s", property.Name, property.Value)) + } + } + if err := d.Set("file_format", fileFormat); err != nil { return diag.FromErr(err) } - if err := d.Set("copy_options", stageDesc.CopyOptions); err != nil { + copyOptions := make([]string, 0) + for _, property := range properties { + if property.Parent == "STAGE_COPY_OPTIONS" && property.Value != property.Default { + copyOptions = append(copyOptions, fmt.Sprintf("%s = %s", property.Name, property.Value)) + } + } + if err := d.Set("copy_options", copyOptions); err != nil { return diag.FromErr(err) } - if err := d.Set("directory", stageDesc.Directory); err != nil { + directory := make([]string, 0) + for _, property := range properties { + if property.Parent == "DIRECTORY" && property.Value != property.Default && property.Name != "LAST_REFRESHED_ON" { + directory = append(directory, fmt.Sprintf("%s = %s", property.Name, property.Value)) + } + } + if err := d.Set("directory", directory); err != nil { return diag.FromErr(err) } - if err := d.Set("storage_integration", s.StorageIntegration); err != nil { + if err := d.Set("storage_integration", stage.StorageIntegration); err != nil { return diag.FromErr(err) } - if err := d.Set("comment", s.Comment); err != nil { + if err := d.Set("comment", stage.Comment); err != nil { return diag.FromErr(err) } - if err := d.Set("aws_external_id", stageDesc.AwsExternalID); err != nil { + if err := d.Set("aws_external_id", findStagePropertyValue(properties, "AWS_EXTERNAL_ID")); err != nil { return diag.FromErr(err) } - if err := d.Set("snowflake_iam_user", stageDesc.SnowflakeIamUser); err != nil { + if err := d.Set("snowflake_iam_user", findStagePropertyValue(properties, "SNOWFLAKE_IAM_USER")); err != nil { return diag.FromErr(err) } @@ -233,24 +253,18 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn } func UpdateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { - stageID, err := stageIDFromString(d.Id()) - if err != nil { - return err - } - - dbName := stageID.DatabaseName - schema := stageID.SchemaName - stage := stageID.StageName + id := helpers.DecodeSnowflakeID(d.Id()).(sdk.SchemaObjectIdentifier) - builder := snowflake.NewStageBuilder(stage, dbName, schema) + builder := snowflake.NewStageBuilder(id.Name(), id.DatabaseName(), id.SchemaName()) db := meta.(*sql.DB) + client := sdk.NewClientFromDB(db) if d.HasChange("credentials") { credentials := d.Get("credentials") q := builder.ChangeCredentials(credentials.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage credentials on %v", d.Id()) + return diag.Errorf("error updating stage credentials on %v", d.Id()) } } @@ -259,14 +273,14 @@ func UpdateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Dia url := d.Get("url") q := builder.ChangeStorageIntegrationAndUrl(si.(string), url.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage storage integration and url on %v", d.Id()) + return diag.Errorf("error updating stage storage integration and url on %v", d.Id()) } } else { if d.HasChange("storage_integration") { si := d.Get("storage_integration") q := builder.ChangeStorageIntegration(si.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage storage integration on %v", d.Id()) + return diag.Errorf("error updating stage storage integration on %v", d.Id()) } } @@ -274,7 +288,7 @@ func UpdateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Dia url := d.Get("url") q := builder.ChangeURL(url.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage url on %v", d.Id()) + return diag.Errorf("error updating stage url on %v", d.Id()) } } } @@ -283,34 +297,50 @@ func UpdateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Dia encryption := d.Get("encryption") q := builder.ChangeEncryption(encryption.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage encryption on %v", d.Id()) + return diag.Errorf("error updating stage encryption on %v", d.Id()) } } + if d.HasChange("file_format") { fileFormat := d.Get("file_format") q := builder.ChangeFileFormat(fileFormat.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage file formaat on %v", d.Id()) + return diag.Errorf("error updating stage file format on %v", d.Id()) } } + if d.HasChange("copy_options") { copyOptions := d.Get("copy_options") q := builder.ChangeCopyOptions(copyOptions.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage copy options on %v", d.Id()) + return diag.Errorf("error updating stage copy options on %v", d.Id()) } } + if d.HasChange("comment") { comment := d.Get("comment") q := builder.ChangeComment(comment.(string)) if err := snowflake.Exec(db, q); err != nil { - return fmt.Errorf("error updating stage comment on %v", d.Id()) + return diag.Errorf("error updating stage comment on %v", d.Id()) } } - tagChangeErr := handleTagChanges(db, d, builder) - if tagChangeErr != nil { - return tagChangeErr + if d.HasChange("tag") { + unsetTags, setTags := GetTagsDiff(d, "tag") + + if len(unsetTags) > 0 { + err := client.Stages.Alter(ctx, sdk.NewAlterStageRequest(id).WithUnsetTags(unsetTags)) + if err != nil { + return diag.Errorf("error occurred when dropping tags on stage with id: %v, err = %s", d.Id(), err) + } + } + + if len(setTags) > 0 { + err := client.Stages.Alter(ctx, sdk.NewAlterStageRequest(id).WithSetTags(setTags)) + if err != nil { + return diag.Errorf("error occurred when setting tags on stage with id: %v, err = %s", d.Id(), err) + } + } } return ReadStage(ctx, d, meta) @@ -338,6 +368,10 @@ func DeleteStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Dia } func findStagePropertyValue(properties []sdk.StageProperty, name string) string { - + for _, property := range properties { + if property.Name == name { + return property.Value + } + } return "" } From 61a537c5ed306c5f4c0ced6e2afad19d531aa50c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Fri, 26 Jan 2024 14:15:31 +0100 Subject: [PATCH 3/4] wip --- pkg/datasources/stages.go | 1 + pkg/datasources/stages_acceptance_test.go | 69 ++++--- pkg/resources/stage.go | 23 ++- pkg/resources/stage_acceptance_test.go | 85 ++++++++- pkg/resources/stage_test.go | 173 ------------------ pkg/resources/stream.go | 6 +- .../TestAcc_Stage_CreateAndAlter/test.tf | 20 ++ .../TestAcc_Stage_CreateAndAlter/variables.tf | 35 ++++ pkg/sdk/stages_gen.go | 17 +- pkg/snowflake/stage.go | 122 ------------ pkg/snowflake/stage_test.go | 124 ------------- 11 files changed, 208 insertions(+), 467 deletions(-) delete mode 100644 pkg/resources/stage_test.go create mode 100644 pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/test.tf create mode 100644 pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/variables.tf delete mode 100644 pkg/snowflake/stage_test.go diff --git a/pkg/datasources/stages.go b/pkg/datasources/stages.go index e10ec8074b..66b4927478 100644 --- a/pkg/datasources/stages.go +++ b/pkg/datasources/stages.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" diff --git a/pkg/datasources/stages_acceptance_test.go b/pkg/datasources/stages_acceptance_test.go index 0c5da94136..63dfd78ddf 100644 --- a/pkg/datasources/stages_acceptance_test.go +++ b/pkg/datasources/stages_acceptance_test.go @@ -5,6 +5,9 @@ import ( "strings" "testing" + acc "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance" + "github.com/hashicorp/terraform-plugin-testing/tfversion" + "github.com/hashicorp/terraform-plugin-testing/helper/acctest" "github.com/hashicorp/terraform-plugin-testing/helper/resource" ) @@ -12,47 +15,65 @@ import ( func TestAcc_Stages(t *testing.T) { databaseName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) schemaName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + storageIntegrationName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) stageName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) - resource.ParallelTest(t, resource.TestCase{ - Providers: providers(), - CheckDestroy: nil, + comment := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, Steps: []resource.TestStep{ { - Config: stages(databaseName, schemaName, stageName), + Config: stages(databaseName, schemaName, storageIntegrationName, stageName, comment), Check: resource.ComposeTestCheckFunc( - resource.TestCheckResourceAttr("data.snowflake_stages.t", "database", databaseName), - resource.TestCheckResourceAttr("data.snowflake_stages.t", "schema", schemaName), - resource.TestCheckResourceAttrSet("data.snowflake_stages.t", "stages.#"), - resource.TestCheckResourceAttr("data.snowflake_stages.t", "stages.#", "1"), - resource.TestCheckResourceAttr("data.snowflake_stages.t", "stages.0.name", stageName), + resource.TestCheckResourceAttr("data.snowflake_stages.test", "database", databaseName), + resource.TestCheckResourceAttr("data.snowflake_stages.test", "schema", schemaName), + resource.TestCheckResourceAttr("data.snowflake_stages.test", "stages.#", "1"), + resource.TestCheckResourceAttr("data.snowflake_stages.test", "stages.0.name", stageName), + resource.TestCheckResourceAttr("data.snowflake_stages.test", "stages.0.storage_integration", storageIntegrationName), + resource.TestCheckResourceAttr("data.snowflake_stages.test", "stages.0.comment", comment), ), }, }, }) } -func stages(databaseName string, schemaName string, stageName string) string { +func stages(databaseName string, schemaName string, storageIntegrationName string, stageName string, comment string) string { return fmt.Sprintf(` + resource "snowflake_database" "test" { + name = "%s" + } - resource snowflake_database "d" { - name = "%v" + resource "snowflake_schema" "test"{ + name = "%s" + database = snowflake_database.test.name } - resource snowflake_schema "s"{ - name = "%v" - database = snowflake_database.d.name + resource "snowflake_storage_integration" "test" { + name = "%s" + storage_allowed_locations = ["s3://foo/"] + storage_provider = "S3" + + storage_aws_role_arn = "arn:aws:iam::000000000001:/role/test" } - resource snowflake_stage "t"{ - name = "%v" - database = snowflake_schema.s.database - schema = snowflake_schema.s.name + resource "snowflake_stage" "test"{ + name = "%s" + database = snowflake_schema.test.database + schema = snowflake_schema.test.name + url = "s3://foo/" + storage_integration = snowflake_storage_integration.test.name + comment = "%s" } - data snowflake_stages "t" { - database = snowflake_stage.t.database - schema = snowflake_stage.t.schema - depends_on = [snowflake_stage.t] + data "snowflake_stages" "test" { + depends_on = [snowflake_storage_integration.test, snowflake_stage.test] + + database = snowflake_stage.test.database + schema = snowflake_stage.test.schema } - `, databaseName, schemaName, stageName) + `, databaseName, schemaName, storageIntegrationName, stageName, comment) } diff --git a/pkg/resources/stage.go b/pkg/resources/stage.go index 8fd193608e..dd3af2b681 100644 --- a/pkg/resources/stage.go +++ b/pkg/resources/stage.go @@ -4,12 +4,13 @@ import ( "context" "database/sql" "fmt" + "strings" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "strings" ) var stageSchema = map[string]*schema.Schema{ @@ -100,8 +101,10 @@ func Stage() *schema.Resource { } } -// TODO: Document why snowflake package is used here instead of sdk -// TODO: Add acceptance tests +// TODO: Remove from snowflake package everything that is not used + +// TODO (SNOW-1019005): Remove snowflake package that is used in Create and Update operations + func CreateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) name := d.Get("name").(string) @@ -199,7 +202,7 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn return diag.FromErr(err) } - if err := d.Set("url", strings.Trim(findStagePropertyValue(properties, "URL"), "[\"]")); err != nil { + if err := d.Set("url", strings.Trim(findStagePropertyValueByName(properties, "URL"), "[\"]")); err != nil { return diag.FromErr(err) } @@ -209,7 +212,7 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn fileFormat = append(fileFormat, fmt.Sprintf("%s = %s", property.Name, property.Value)) } } - if err := d.Set("file_format", fileFormat); err != nil { + if err := d.Set("file_format", strings.Join(fileFormat, " ")); err != nil { return diag.FromErr(err) } @@ -219,7 +222,7 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn copyOptions = append(copyOptions, fmt.Sprintf("%s = %s", property.Name, property.Value)) } } - if err := d.Set("copy_options", copyOptions); err != nil { + if err := d.Set("copy_options", strings.Join(copyOptions, " ")); err != nil { return diag.FromErr(err) } @@ -229,7 +232,7 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn directory = append(directory, fmt.Sprintf("%s = %s", property.Name, property.Value)) } } - if err := d.Set("directory", directory); err != nil { + if err := d.Set("directory", strings.Join(directory, " ")); err != nil { return diag.FromErr(err) } @@ -241,11 +244,11 @@ func ReadStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagn return diag.FromErr(err) } - if err := d.Set("aws_external_id", findStagePropertyValue(properties, "AWS_EXTERNAL_ID")); err != nil { + if err := d.Set("aws_external_id", findStagePropertyValueByName(properties, "AWS_EXTERNAL_ID")); err != nil { return diag.FromErr(err) } - if err := d.Set("snowflake_iam_user", findStagePropertyValue(properties, "SNOWFLAKE_IAM_USER")); err != nil { + if err := d.Set("snowflake_iam_user", findStagePropertyValueByName(properties, "SNOWFLAKE_IAM_USER")); err != nil { return diag.FromErr(err) } @@ -367,7 +370,7 @@ func DeleteStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Dia return nil } -func findStagePropertyValue(properties []sdk.StageProperty, name string) string { +func findStagePropertyValueByName(properties []sdk.StageProperty, name string) string { for _, property := range properties { if property.Name == name { return property.Value diff --git a/pkg/resources/stage_acceptance_test.go b/pkg/resources/stage_acceptance_test.go index 6e4dce0cd6..9fa09ebdb9 100644 --- a/pkg/resources/stage_acceptance_test.go +++ b/pkg/resources/stage_acceptance_test.go @@ -2,8 +2,12 @@ package resources_test import ( "fmt" + "strings" "testing" + "github.com/hashicorp/terraform-plugin-testing/config" + "github.com/hashicorp/terraform-plugin-testing/tfversion" + acc "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance" "github.com/hashicorp/terraform-plugin-testing/helper/acctest" "github.com/hashicorp/terraform-plugin-testing/helper/resource" @@ -12,7 +16,7 @@ import ( func TestAcc_StageAlterWhenBothURLAndStorageIntegrationChange(t *testing.T) { name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) - resource.ParallelTest(t, resource.TestCase{ + resource.Test(t, resource.TestCase{ Providers: acc.TestAccProviders(), PreCheck: func() { acc.TestAccPreCheck(t) }, CheckDestroy: nil, @@ -23,7 +27,6 @@ func TestAcc_StageAlterWhenBothURLAndStorageIntegrationChange(t *testing.T) { resource.TestCheckResourceAttr("snowflake_stage.test", "name", name), resource.TestCheckResourceAttr("snowflake_stage.test", "url", "s3://foo/"), ), - Destroy: false, }, { Config: stageIntegrationConfig(name, "changed", "s3://changed/", acc.TestDatabaseName, acc.TestSchemaName), @@ -36,6 +39,84 @@ func TestAcc_StageAlterWhenBothURLAndStorageIntegrationChange(t *testing.T) { }) } +func TestAcc_Stage_CreateAndAlter(t *testing.T) { + if !hasExternalEnvironmentVariablesSet { + t.Skip("Skipping TestAcc_Stages_CreateOnS3 because external environment variables are not set") + } + + databaseName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + schemaName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + name := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + url := "s3://foo/" + comment := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + storageIntegration := "" + credentials := fmt.Sprintf("AWS_KEY_ID = '%s' AWS_SECRET_KEY = '%s'", awsKeyId, awsSecretKey) + encryption := "TYPE = 'NONE'" + fileFormat := "TYPE = JSON NULL_IF = []" + + changedUrl := "s3://bar/" + changedStorageIntegration := "s3_storage_integration" + changedCredentials := "" + changedEncryption := "TYPE = 'AWS_SSE_S3'" + changedFileFormat := "TYPE = CSV" + changedComment := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + + configVariables := func(url string, storageIntegration string, credentials string, encryption string, fileFormat string, comment string) config.Variables { + return config.Variables{ + "database": config.StringVariable(databaseName), + "schema": config.StringVariable(schemaName), + "name": config.StringVariable(name), + "url": config.StringVariable(url), + "storage_integration": config.StringVariable(storageIntegration), + "credentials": config.StringVariable(credentials), + "encryption": config.StringVariable(encryption), + "file_format": config.StringVariable(fileFormat), + "comment": config.StringVariable(comment), + } + } + + resourceName := "snowflake_stage.test" + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + Steps: []resource.TestStep{ + { + ConfigDirectory: config.TestNameDirectory(), + ConfigVariables: configVariables(url, storageIntegration, credentials, encryption, fileFormat, comment), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "database", databaseName), + resource.TestCheckResourceAttr(resourceName, "schema", schemaName), + resource.TestCheckResourceAttr(resourceName, "name", name), + resource.TestCheckResourceAttr(resourceName, "storage_integration", storageIntegration), + resource.TestCheckResourceAttr(resourceName, "credentials", credentials), + resource.TestCheckResourceAttr(resourceName, "encryption", encryption), + resource.TestCheckResourceAttr(resourceName, "file_format", fileFormat), + resource.TestCheckResourceAttr(resourceName, "url", url), + resource.TestCheckResourceAttr(resourceName, "comment", comment), + ), + }, + { + ConfigDirectory: config.TestNameDirectory(), + ConfigVariables: configVariables(changedUrl, changedStorageIntegration, changedCredentials, changedEncryption, changedFileFormat, changedComment), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "database", databaseName), + resource.TestCheckResourceAttr(resourceName, "schema", schemaName), + resource.TestCheckResourceAttr(resourceName, "name", name), + resource.TestCheckResourceAttr(resourceName, "storage_integration", changedStorageIntegration), + resource.TestCheckResourceAttr(resourceName, "credentials", changedCredentials), + resource.TestCheckResourceAttr(resourceName, "encryption", changedEncryption), + resource.TestCheckResourceAttr(resourceName, "file_format", changedFileFormat), + resource.TestCheckResourceAttr(resourceName, "url", changedUrl), + resource.TestCheckResourceAttr(resourceName, "comment", changedComment), + ), + }, + }, + }) +} + func stageIntegrationConfig(name string, siNameSuffix string, url string, databaseName string, schemaName string) string { resources := ` resource "snowflake_storage_integration" "test" { diff --git a/pkg/resources/stage_test.go b/pkg/resources/stage_test.go deleted file mode 100644 index 449300ad98..0000000000 --- a/pkg/resources/stage_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package resources_test - -import ( - "database/sql" - "testing" - - sqlmock "github.com/DATA-DOG/go-sqlmock" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/provider" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/resources" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" - . "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/testhelpers" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "github.com/stretchr/testify/require" -) - -func TestStage(t *testing.T) { - r := require.New(t) - err := resources.Stage().InternalValidate(provider.Provider().Schema, true) - r.NoError(err) -} - -func TestInternalStageCreate(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_stage", - "database": "test_db", - "schema": "test_schema", - "comment": "great comment", - } - d := schema.TestResourceDataRaw(t, resources.Stage().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec( - `^CREATE STAGE "test_db"."test_schema"."test_stage" COMMENT = 'great comment'$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - expectReadStage(mock) - expectReadStageShow(mock) - err := resources.CreateStage(d, db) - r.NoError(err) - }) -} - -func TestExternalStageCreate(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_stage", - "database": "test_db", - "url": "s3://com.example.bucket/prefix", - "schema": "test_schema", - "comment": "great comment", - } - d := schema.TestResourceDataRaw(t, resources.Stage().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec( - `^CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://com.example.bucket/prefix' COMMENT = 'great comment'$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - expectReadStage(mock) - expectReadStageShow(mock) - err := resources.CreateStage(d, db) - r.NoError(err) - }) -} - -func expectReadStage(mock sqlmock.Sqlmock) { - rows := sqlmock.NewRows([]string{ - "parent_property", "property", "property_type", "property_value", "property_default", - }, - ).AddRow("STAGE_LOCATION", "URL", "string", `["s3://load/test/"]`, ""). - AddRow("STAGE_CREDENTIALS", "AWS_EXTERNAL_ID", "string", "test", ""). - AddRow("STAGE_FILE_FORMAT", "FORMAT_NAME", "string", "CSV", ""). - AddRow("DIRECTORY", "ENABLED", "Boolean", true, false) - mock.ExpectQuery(`^DESCRIBE STAGE "test_db"."test_schema"."test_stage"$`).WillReturnRows(rows) -} - -func expectReadStageShow(mock sqlmock.Sqlmock) { - rows := sqlmock.NewRows([]string{ - "created_on", "name", "database_name", "schema_name", "url", "has_credentials", "has_encryption_key", "owner", "comment", "region", "type", "cloud", "notification_channel", "storage_integration", - }, - ).AddRow("2019-12-23 17:20:50.088 +0000", "test_stage", "test_db", "test_schema", "s3://load/test/", "N", "Y", "test", "great comment", "us-east-1", "EXTERNAL", "AWS", "NULL", "NULL") - mock.ExpectQuery(`^SHOW STAGES LIKE 'test_stage' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows) -} - -func TestStageRead(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_stage", - "database": "test_db", - "schema": "test_schema", - } - d := stage(t, "test_db|test_schema|test_stage", in) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - // Test when resource is not found, checking if state will be empty - r.NotEmpty(d.State()) - q := snowflake.NewStageBuilder("test_stage", "test_db", "test_schema").Describe() - mock.ExpectQuery(q).WillReturnError(sql.ErrNoRows) - err := resources.ReadStage(d, db) - r.Empty(d.State()) - r.Nil(err) - }) -} - -func TestStageUpdateWithSIAndURL(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_stage", - "database": "test_db", - "schema": "test_schema", - "url": "s3://changed_url", - "storage_integration": "changed_integration", - } - - d := stage(t, "test_db|test_schema|test_stage", in) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec(`ALTER STAGE "test_db"."test_schema"."test_stage" SET STORAGE_INTEGRATION = "changed_integration" URL = 's3://changed_url'`).WillReturnResult(sqlmock.NewResult(1, 1)) - expectReadStage(mock) - expectReadStageShow(mock) - err := resources.UpdateStage(d, db) - r.NoError(err) - }) -} - -func TestStageUpdateWithJustURL(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_stage", - "database": "test_db", - "schema": "test_schema", - "url": "s3://changed_url", - } - - d := stage(t, "test_db|test_schema|test_stage", in) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec(`ALTER STAGE "test_db"."test_schema"."test_stage" SET URL = 's3://changed_url'`).WillReturnResult(sqlmock.NewResult(1, 1)) - expectReadStage(mock) - expectReadStageShow(mock) - err := resources.UpdateStage(d, db) - r.NoError(err) - }) -} - -func TestStageUpdateWithJustSI(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_stage", - "database": "test_db", - "schema": "test_schema", - "storage_integration": "changed_integration", - } - - d := stage(t, "test_db|test_schema|test_stage", in) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec(`ALTER STAGE "test_db"."test_schema"."test_stage" SET STORAGE_INTEGRATION = "changed_integration"`).WillReturnResult(sqlmock.NewResult(1, 1)) - expectReadStage(mock) - expectReadStageShow(mock) - err := resources.UpdateStage(d, db) - r.NoError(err) - }) -} diff --git a/pkg/resources/stream.go b/pkg/resources/stream.go index 2633d96fd9..5af07d4b92 100644 --- a/pkg/resources/stream.go +++ b/pkg/resources/stream.go @@ -200,13 +200,11 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { if err != nil { return err } - stageBuilder := snowflake.NewStageBuilder(stageId.Name(), stageId.DatabaseName(), stageId.SchemaName()) - sq := stageBuilder.Describe() - stageDesc, err := snowflake.DescStage(db, sq) + stageProperties, err := client.Stages.Describe(ctx, stageId) if err != nil { return err } - if !strings.Contains(stageDesc.Directory, "ENABLE = true") { + if findStagePropertyValueByName(stageProperties, "ENABLE") != "true" { return fmt.Errorf("directory must be enabled on stage") } req := sdk.NewCreateStreamOnDirectoryTableRequest(id, stageId) diff --git a/pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/test.tf b/pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/test.tf new file mode 100644 index 0000000000..7a0de913f2 --- /dev/null +++ b/pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/test.tf @@ -0,0 +1,20 @@ +resource "snowflake_database" "test" { + name = var.database +} + +resource "snowflake_schema" "test" { + name = var.schema + database = snowflake_database.test.name +} + +resource "snowflake_stage" "test" { + name = var.name + schema = snowflake_schema.test.name + database = snowflake_database.test.name + comment = var.comment + url = var.url + storage_integration = var.storage_integration + credentials = var.credentials + encryption = var.encryption + file_format = var.file_format +} diff --git a/pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/variables.tf b/pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/variables.tf new file mode 100644 index 0000000000..944a8d90bf --- /dev/null +++ b/pkg/resources/testdata/TestAcc_Stage_CreateAndAlter/variables.tf @@ -0,0 +1,35 @@ +variable "name" { + type = string +} + +variable "database" { + type = string +} + +variable "schema" { + type = string +} + +variable "comment" { + type = string +} + +variable "url" { + type = string +} + +variable "storage_integration" { + type = string +} + +variable "credentials" { + type = string +} + +variable "encryption" { + type = string +} + +variable "file_format" { + type = string +} diff --git a/pkg/sdk/stages_gen.go b/pkg/sdk/stages_gen.go index 9e733d0d86..6fa177b6c4 100644 --- a/pkg/sdk/stages_gen.go +++ b/pkg/sdk/stages_gen.go @@ -223,14 +223,15 @@ type AlterStageOptions struct { // AlterInternalStageStageOptions is based on https://docs.snowflake.com/en/sql-reference/sql/alter-stage. type AlterInternalStageStageOptions struct { - alter bool `ddl:"static" sql:"ALTER"` - stage bool `ddl:"static" sql:"STAGE"` - IfExists *bool `ddl:"keyword" sql:"IF EXISTS"` - name SchemaObjectIdentifier `ddl:"identifier"` - set bool `ddl:"static" sql:"SET"` - FileFormat *StageFileFormat `ddl:"list,parentheses" sql:"FILE_FORMAT ="` - CopyOptions *StageCopyOptions `ddl:"list,parentheses,no_comma" sql:"COPY_OPTIONS ="` - Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` + alter bool `ddl:"static" sql:"ALTER"` + stage bool `ddl:"static" sql:"STAGE"` + IfExists *bool `ddl:"keyword" sql:"IF EXISTS"` + name SchemaObjectIdentifier `ddl:"identifier"` + set bool `ddl:"static" sql:"SET"` + // TODO (SNOW-1019005): Move parameters below to the AlterStageOptions as they're common across stage types (+ remove from other alter option structs) + FileFormat *StageFileFormat `ddl:"list,parentheses" sql:"FILE_FORMAT ="` + CopyOptions *StageCopyOptions `ddl:"list,parentheses,no_comma" sql:"COPY_OPTIONS ="` + Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` } // AlterExternalS3StageStageOptions is based on https://docs.snowflake.com/en/sql-reference/sql/alter-stage. diff --git a/pkg/snowflake/stage.go b/pkg/snowflake/stage.go index b96c1df016..f0d6183cbb 100644 --- a/pkg/snowflake/stage.go +++ b/pkg/snowflake/stage.go @@ -1,13 +1,8 @@ package snowflake import ( - "database/sql" - "errors" "fmt" - "log" "strings" - - "github.com/jmoiron/sqlx" ) func fixFileFormat(inputFileFormat string) string { @@ -216,120 +211,3 @@ func (sb *StageBuilder) ChangeFileFormat(f string) string { func (sb *StageBuilder) ChangeCopyOptions(c string) string { return fmt.Sprintf(`ALTER STAGE %v SET COPY_OPTIONS = (%v)`, sb.QualifiedName(), c) } - -// Drop returns the SQL query that will drop a stage. -func (sb *StageBuilder) Drop() string { - return fmt.Sprintf(`DROP STAGE %v`, sb.QualifiedName()) -} - -// Undrop returns the SQL query that will undrop a stage. -func (sb *StageBuilder) Undrop() string { - return fmt.Sprintf(`UNDROP STAGE %v`, sb.QualifiedName()) -} - -// Describe returns the SQL query that will describe a stage. -func (sb *StageBuilder) Describe() string { - return fmt.Sprintf(`DESCRIBE STAGE %v`, sb.QualifiedName()) -} - -// Show returns the SQL query that will show a stage. -func (sb *StageBuilder) Show() string { - return fmt.Sprintf(`SHOW STAGES LIKE '%v' IN SCHEMA "%v"."%v"`, sb.name, sb.db, sb.schema) -} - -type Stage struct { - Name *string `db:"name"` - DatabaseName *string `db:"database_name"` - SchemaName *string `db:"schema_name"` - Comment *string `db:"comment"` - StorageIntegration *string `db:"storage_integration"` -} - -func ScanStageShow(row *sqlx.Row) (*Stage, error) { - r := &Stage{} - err := row.StructScan(r) - return r, err -} - -type DescStageResult struct { - URL string - AwsExternalID string - SnowflakeIamUser string - FileFormat string - CopyOptions string - Directory string -} - -type descStageRow struct { - ParentProperty string `db:"parent_property"` - Property string `db:"property"` - PropertyValue string `db:"property_value"` - PropertyDefault string `db:"property_default"` -} - -func DescStage(db *sql.DB, query string) (*DescStageResult, error) { - r := &DescStageResult{} - var ff []string - var co []string - var dir []string - rows, err := Query(db, query) - if err != nil { - return r, err - } - defer rows.Close() - - for rows.Next() { - row := &descStageRow{} - if err := rows.StructScan(row); err != nil { - return r, err - } - - switch row.Property { - case "URL": - r.URL = strings.Trim(row.PropertyValue, "[\"]") - case "AWS_EXTERNAL_ID": - r.AwsExternalID = row.PropertyValue - case "SNOWFLAKE_IAM_USER": - r.SnowflakeIamUser = row.PropertyValue - } - - switch row.ParentProperty { - case "STAGE_FILE_FORMAT": - if row.PropertyValue != row.PropertyDefault { - ff = append(ff, fmt.Sprintf("%s = %s", row.Property, row.PropertyValue)) - } - case "STAGE_COPY_OPTIONS": - if row.PropertyValue != row.PropertyDefault { - co = append(co, fmt.Sprintf("%s = %s", row.Property, row.PropertyValue)) - } - case "DIRECTORY": - if row.PropertyValue != row.PropertyDefault && row.Property != "LAST_REFRESHED_ON" { - dir = append(dir, fmt.Sprintf("%s = %s", row.Property, row.PropertyValue)) - } - } - } - - r.FileFormat = strings.Join(ff, " ") - r.CopyOptions = strings.Join(co, " ") - r.Directory = strings.Join(dir, " ") - return r, nil -} - -func ListStages(databaseName string, schemaName string, db *sql.DB) ([]Stage, error) { - stmt := fmt.Sprintf(`SHOW STAGES IN SCHEMA "%s"."%v"`, databaseName, schemaName) - rows, err := Query(db, stmt) - if err != nil { - return nil, err - } - defer rows.Close() - - dbs := []Stage{} - if err := sqlx.StructScan(rows, &dbs); err != nil { - if errors.Is(err, sql.ErrNoRows) { - log.Println("[DEBUG] no stages found") - return nil, nil - } - return nil, fmt.Errorf("unable to scan row for %s err = %w", stmt, err) - } - return dbs, nil -} diff --git a/pkg/snowflake/stage_test.go b/pkg/snowflake/stage_test.go deleted file mode 100644 index 33074b2846..0000000000 --- a/pkg/snowflake/stage_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package snowflake - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestStageCreate(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`"test_db"."test_schema"."test_stage"`, s.QualifiedName()) - - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage"`, s.Create()) - - s.WithCredentials("aws_role='arn:aws:iam::001234567890:role/mysnowflakerole'") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole')`, s.Create()) - - s.WithEncryption("type='AWS_SSE_KMS' kms_key_id = 'aws/key'") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key')`, s.Create()) - - s.WithURL("s3://load/encrypted_files/") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://load/encrypted_files/' CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key')`, s.Create()) - - s.WithFileFormat("format_name=my_csv_format") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://load/encrypted_files/' CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key') FILE_FORMAT = (format_name=my_csv_format)`, s.Create()) - - s.WithCopyOptions("on_error='skip_file'") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://load/encrypted_files/' CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key') FILE_FORMAT = (format_name=my_csv_format) COPY_OPTIONS = (on_error='skip_file')`, s.Create()) - - s.WithDirectory("ENABLE=TRUE") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://load/encrypted_files/' CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key') FILE_FORMAT = (format_name=my_csv_format) COPY_OPTIONS = (on_error='skip_file') DIRECTORY = (ENABLE=TRUE)`, s.Create()) - - s.WithComment("Yee'haw") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://load/encrypted_files/' CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key') FILE_FORMAT = (format_name=my_csv_format) COPY_OPTIONS = (on_error='skip_file') DIRECTORY = (ENABLE=TRUE) COMMENT = 'Yee\'haw'`, s.Create()) - - s.WithStorageIntegration("MY_INTEGRATION") - r.Equal(`CREATE STAGE "test_db"."test_schema"."test_stage" URL = 's3://load/encrypted_files/' CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole') STORAGE_INTEGRATION = "MY_INTEGRATION" ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key') FILE_FORMAT = (format_name=my_csv_format) COPY_OPTIONS = (on_error='skip_file') DIRECTORY = (ENABLE=TRUE) COMMENT = 'Yee\'haw'`, s.Create()) -} - -func TestStageRename(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" RENAME TO "test_stage2"`, s.Rename("test_stage2")) -} - -func TestStageChangeComment(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET COMMENT = 'worst stage ever'`, s.ChangeComment("worst stage ever")) -} - -func TestStageChangeURL(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET URL = 's3://load/test/'`, s.ChangeURL("s3://load/test/")) -} - -func TestStageChangeFileFormat(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET FILE_FORMAT = (format_name=my_csv_format)`, s.ChangeFileFormat("format_name=my_csv_format")) -} - -func TestStageChangeFileFormatToEmptyList(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET FILE_FORMAT = (TYPE = parquet NULL_IF = () COMPRESSION = none)`, s.ChangeFileFormat("TYPE = parquet NULL_IF = [] COMPRESSION = none")) -} - -func TestStageChangeEncryption(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = 'aws/key')`, s.ChangeEncryption("type='AWS_SSE_KMS' kms_key_id = 'aws/key'")) -} - -func TestStageChangeCredentials(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET CREDENTIALS = (aws_role='arn:aws:iam::001234567890:role/mysnowflakerole')`, s.ChangeCredentials("aws_role='arn:aws:iam::001234567890:role/mysnowflakerole'")) -} - -func TestStageChangeStorageIntegration(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET STORAGE_INTEGRATION = "MY_INTEGRATION"`, s.ChangeStorageIntegration("MY_INTEGRATION")) -} - -func TestStageChangeStorageIntegrationAndUrl(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET STORAGE_INTEGRATION = "MY_INTEGRATION" URL = 's3://load/test'`, s.ChangeStorageIntegrationAndUrl("MY_INTEGRATION", "s3://load/test")) -} - -func TestStageChangeCopyOptions(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`ALTER STAGE "test_db"."test_schema"."test_stage" SET COPY_OPTIONS = (on_error='skip_file')`, s.ChangeCopyOptions("on_error='skip_file'")) -} - -func TestStageDrop(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`DROP STAGE "test_db"."test_schema"."test_stage"`, s.Drop()) -} - -func TestStageUndrop(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`UNDROP STAGE "test_db"."test_schema"."test_stage"`, s.Undrop()) -} - -func TestStageDescribe(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`DESCRIBE STAGE "test_db"."test_schema"."test_stage"`, s.Describe()) -} - -func TestStageShow(t *testing.T) { - r := require.New(t) - s := NewStageBuilder("test_stage", "test_db", "test_schema") - r.Equal(`SHOW STAGES LIKE 'test_stage' IN SCHEMA "test_db"."test_schema"`, s.Show()) -} From a913e63e0981a84dd61cfbf23a50d25e65a429c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Fri, 26 Jan 2024 14:41:25 +0100 Subject: [PATCH 4/4] wip --- pkg/resources/stage.go | 5 +---- pkg/resources/stage_acceptance_test.go | 16 +++++++--------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/pkg/resources/stage.go b/pkg/resources/stage.go index dd3af2b681..f1fc964092 100644 --- a/pkg/resources/stage.go +++ b/pkg/resources/stage.go @@ -87,6 +87,7 @@ var stageSchema = map[string]*schema.Schema{ "tag": tagReferenceSchema, } +// TODO (SNOW-1019005): Remove snowflake package that is used in Create and Update operations func Stage() *schema.Resource { return &schema.Resource{ CreateContext: CreateStage, @@ -101,10 +102,6 @@ func Stage() *schema.Resource { } } -// TODO: Remove from snowflake package everything that is not used - -// TODO (SNOW-1019005): Remove snowflake package that is used in Create and Update operations - func CreateStage(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { db := meta.(*sql.DB) name := d.Get("name").(string) diff --git a/pkg/resources/stage_acceptance_test.go b/pkg/resources/stage_acceptance_test.go index 9fa09ebdb9..bec24fab77 100644 --- a/pkg/resources/stage_acceptance_test.go +++ b/pkg/resources/stage_acceptance_test.go @@ -52,13 +52,11 @@ func TestAcc_Stage_CreateAndAlter(t *testing.T) { storageIntegration := "" credentials := fmt.Sprintf("AWS_KEY_ID = '%s' AWS_SECRET_KEY = '%s'", awsKeyId, awsSecretKey) encryption := "TYPE = 'NONE'" - fileFormat := "TYPE = JSON NULL_IF = []" - changedUrl := "s3://bar/" - changedStorageIntegration := "s3_storage_integration" - changedCredentials := "" + changedUrl := awsBucketUrl + "/some-path" + changedStorageIntegration := "S3_STORAGE_INTEGRATION" changedEncryption := "TYPE = 'AWS_SSE_S3'" - changedFileFormat := "TYPE = CSV" + changedFileFormat := "TYPE = JSON NULL_IF = []" changedComment := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) configVariables := func(url string, storageIntegration string, credentials string, encryption string, fileFormat string, comment string) config.Variables { @@ -85,7 +83,7 @@ func TestAcc_Stage_CreateAndAlter(t *testing.T) { Steps: []resource.TestStep{ { ConfigDirectory: config.TestNameDirectory(), - ConfigVariables: configVariables(url, storageIntegration, credentials, encryption, fileFormat, comment), + ConfigVariables: configVariables(url, storageIntegration, credentials, encryption, "", comment), Check: resource.ComposeTestCheckFunc( resource.TestCheckResourceAttr(resourceName, "database", databaseName), resource.TestCheckResourceAttr(resourceName, "schema", schemaName), @@ -93,20 +91,20 @@ func TestAcc_Stage_CreateAndAlter(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "storage_integration", storageIntegration), resource.TestCheckResourceAttr(resourceName, "credentials", credentials), resource.TestCheckResourceAttr(resourceName, "encryption", encryption), - resource.TestCheckResourceAttr(resourceName, "file_format", fileFormat), + resource.TestCheckResourceAttr(resourceName, "file_format", ""), resource.TestCheckResourceAttr(resourceName, "url", url), resource.TestCheckResourceAttr(resourceName, "comment", comment), ), }, { ConfigDirectory: config.TestNameDirectory(), - ConfigVariables: configVariables(changedUrl, changedStorageIntegration, changedCredentials, changedEncryption, changedFileFormat, changedComment), + ConfigVariables: configVariables(changedUrl, changedStorageIntegration, credentials, changedEncryption, changedFileFormat, changedComment), Check: resource.ComposeTestCheckFunc( resource.TestCheckResourceAttr(resourceName, "database", databaseName), resource.TestCheckResourceAttr(resourceName, "schema", schemaName), resource.TestCheckResourceAttr(resourceName, "name", name), resource.TestCheckResourceAttr(resourceName, "storage_integration", changedStorageIntegration), - resource.TestCheckResourceAttr(resourceName, "credentials", changedCredentials), + resource.TestCheckResourceAttr(resourceName, "credentials", credentials), resource.TestCheckResourceAttr(resourceName, "encryption", changedEncryption), resource.TestCheckResourceAttr(resourceName, "file_format", changedFileFormat), resource.TestCheckResourceAttr(resourceName, "url", changedUrl),