Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Stream on table resource #3109

Merged
merged 12 commits into from
Oct 9, 2024
34 changes: 34 additions & 0 deletions MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This document is meant to help you migrate your Terraform config to the new newe
describe deprecations or breaking changes and help you to change your configuration to keep the same (or similar) behavior
across different versions.

> [!TIP]
> We highly recommend upgrading the versions one by one instead of bulk upgrades.

## v0.96.0 ➞ v0.97.0

### *(new feature)* snowflake_stream_on_table resource
Expand All @@ -13,6 +16,37 @@ The newly introduced resources are aligned with the latest Snowflake documentati
This segregation was based on the object on which the stream is created. The mapping between SQL statements and the resources is the following:
- `ON TABLE <table_name>` -> `snowflake_stream_on_table`

To use the new `stream_on_table`, change the old `stream` from
```terraform
resource "snowflake_stream" "stream" {
name = "stream"
schema = "schema"
database = "database"

on_table = snowflake_table.table.fully_qualified_name
append_only = true

comment = "A stream."
}
```

to

```
resource "snowflake_stream_on_table" "stream" {
name = "stream"
schema = "schema"
database = "database"

table = snowflake_table.table.fully_qualified_name
append_only = "true"

comment = "A stream."
}
```

Then, follow our [Resource migration guide](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/resource_migration.md).

## v0.95.0 ➞ v0.96.0

### snowflake_masking_policies data source changes
Expand Down
8 changes: 4 additions & 4 deletions docs/resources/stream_on_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ resource "snowflake_stream_on_table" "stream" {
### Optional

- `append_only` (String) Specifies whether this is an append-only stream. Available options are: "true" or "false". When the value is not set in the configuration the provider will put "default" there which means to use the Snowflake default for this value.
- `at` (Block List, Max: 1) This field specifies that the request is inclusive of any changes made by a statement or transaction with a timestamp equal to the specified parameter. Due to Snowflake limitations, the provider does not detect external changes on this field. (see [below for nested schema](#nestedblock--at))
- `before` (Block List, Max: 1) This field specifies that the request refers to a point immediately preceding the specified parameter. This point in time is just before the statement, identified by its query ID, is completed. Due to Snowflake limitations, the provider does not detect external changes on this field. (see [below for nested schema](#nestedblock--before))
- `at` (Block List, Max: 1) This field specifies that the request is inclusive of any changes made by a statement or transaction with a timestamp equal to the specified parameter. Due to Snowflake limitations, the provider does not detect external changes on this field. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". (see [below for nested schema](#nestedblock--at))
- `before` (Block List, Max: 1) This field specifies that the request refers to a point immediately preceding the specified parameter. This point in time is just before the statement, identified by its query ID, is completed. Due to Snowflake limitations, the provider does not detect external changes on this field. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". (see [below for nested schema](#nestedblock--before))
- `comment` (String) Specifies a comment for the stream.
- `copy_grants` (Boolean) Retains the access permissions from the original stream when a new stream is created using the OR REPLACE clause.
- `show_initial_rows` (String) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed. Available options are: "true" or "false". When the value is not set in the configuration the provider will put "default" there which means to use the Snowflake default for this value.
- `copy_grants` (Boolean) Retains the access permissions from the original stream when a new stream is created using the OR REPLACE clause. Use only if the resource is already managed by Terraform.
- `show_initial_rows` (String) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed. Available options are: "true" or "false". When the value is not set in the configuration the provider will put "default" there which means to use the Snowflake default for this value. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint".

### Read-Only

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package objectassert

import (
"errors"
"fmt"
"slices"
"testing"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
)

func (s *StreamAssert) HasTableId(expected string) *StreamAssert {
func (s *StreamAssert) HasTableId(expected sdk.SchemaObjectIdentifier) *StreamAssert {
s.AddAssertion(func(t *testing.T, o *sdk.Stream) error {
t.Helper()
if o.TableName == nil {
Expand All @@ -17,7 +19,7 @@ func (s *StreamAssert) HasTableId(expected string) *StreamAssert {
if err != nil {
return err
}
if gotTableId.FullyQualifiedName() != expected {
if gotTableId.FullyQualifiedName() != expected.FullyQualifiedName() {
return fmt.Errorf("expected table name: %v; got: %v", expected, *o.TableName)
}
return nil
Expand Down Expand Up @@ -53,21 +55,21 @@ func (s *StreamAssert) HasSourceType(expected sdk.StreamSourceType) *StreamAsser
return s
}

func (s *StreamAssert) HasBaseTables(expected []sdk.SchemaObjectIdentifier) *StreamAssert {
func (s *StreamAssert) HasBaseTables(expected ...sdk.SchemaObjectIdentifier) *StreamAssert {
s.AddAssertion(func(t *testing.T, o *sdk.Stream) error {
t.Helper()
if o.BaseTables == nil {
return fmt.Errorf("expected base tables to have value; got: nil")
}
if len(o.BaseTables) != len(expected) {
return fmt.Errorf("expected base tables length: %v; got: %v", len(expected), len(o.BaseTables))
}
for i := range o.BaseTables {
if o.BaseTables[i].FullyQualifiedName() != expected[i].FullyQualifiedName() {
return fmt.Errorf("expected base table id: %v; got: %v", expected[i], o.BaseTables[i])
var errs []error
for _, wantId := range expected {
if !slices.ContainsFunc(o.BaseTables, func(gotId sdk.SchemaObjectIdentifier) bool {
return wantId.FullyQualifiedName() == gotId.FullyQualifiedName()
}) {
errs = append(errs, fmt.Errorf("expected id: %s, to be in the list ids: %v", wantId.FullyQualifiedName(), o.BaseTables))
}
}
return nil
return errors.Join(errs...)
})
return s
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/resources/stream_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,79 @@ func DeleteStreamContext(ctx context.Context, d *schema.ResourceData, meta any)
d.SetId("")
return nil
}

var atSchema = &schema.Schema{
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: externalChangesNotDetectedFieldDescription("This field specifies that the request is inclusive of any changes made by a statement or transaction with a timestamp equal to the specified parameter. Due to Snowflake limitations, the provider does not detect external changes on this field."),
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"timestamp": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies an exact date and time to use for Time Travel. The value must be explicitly cast to a TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_NTZ, or TIMESTAMP_TZ data type.",
ExactlyOneOf: []string{"at.0.timestamp", "at.0.offset", "at.0.statement", "at.0.stream"},
},
"offset": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the difference in seconds from the current time to use for Time Travel, in the form -N where N can be an integer or arithmetic expression (e.g. -120 is 120 seconds, -30*60 is 1800 seconds or 30 minutes).",
ExactlyOneOf: []string{"at.0.timestamp", "at.0.offset", "at.0.statement", "at.0.stream"},
},
"statement": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the query ID of a statement to use as the reference point for Time Travel. This parameter supports any statement of one of the following types: DML (e.g. INSERT, UPDATE, DELETE), TCL (BEGIN, COMMIT transaction), SELECT.",
ExactlyOneOf: []string{"at.0.timestamp", "at.0.offset", "at.0.statement", "at.0.stream"},
},
"stream": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the identifier (i.e. name) for an existing stream on the queried table or view. The current offset in the stream is used as the AT point in time for returning change data for the source object.",
ExactlyOneOf: []string{"at.0.timestamp", "at.0.offset", "at.0.statement", "at.0.stream"},
DiffSuppressFunc: suppressIdentifierQuoting,
ValidateDiagFunc: IsValidIdentifier[sdk.SchemaObjectIdentifier](),
},
},
},
ConflictsWith: []string{"before"},
}

var beforeSchema = &schema.Schema{
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: externalChangesNotDetectedFieldDescription("This field specifies that the request refers to a point immediately preceding the specified parameter. This point in time is just before the statement, identified by its query ID, is completed. Due to Snowflake limitations, the provider does not detect external changes on this field."),
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
sfc-gh-jcieslak marked this conversation as resolved.
Show resolved Hide resolved
"timestamp": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies an exact date and time to use for Time Travel. The value must be explicitly cast to a TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_NTZ, or TIMESTAMP_TZ data type.",
ExactlyOneOf: []string{"before.0.timestamp", "before.0.offset", "before.0.statement", "before.0.stream"},
},
"offset": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the difference in seconds from the current time to use for Time Travel, in the form -N where N can be an integer or arithmetic expression (e.g. -120 is 120 seconds, -30*60 is 1800 seconds or 30 minutes).",
ExactlyOneOf: []string{"before.0.timestamp", "before.0.offset", "before.0.statement", "before.0.stream"},
},
"statement": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the query ID of a statement to use as the reference point for Time Travel. This parameter supports any statement of one of the following types: DML (e.g. INSERT, UPDATE, DELETE), TCL (BEGIN, COMMIT transaction), SELECT.",
ExactlyOneOf: []string{"before.0.timestamp", "before.0.offset", "before.0.statement", "before.0.stream"},
},
"stream": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the identifier (i.e. name) for an existing stream on the queried table or view. The current offset in the stream is used as the AT point in time for returning change data for the source object.",
ExactlyOneOf: []string{"before.0.timestamp", "before.0.offset", "before.0.statement", "before.0.stream"},
DiffSuppressFunc: suppressIdentifierQuoting,
ValidateDiagFunc: IsValidIdentifier[sdk.SchemaObjectIdentifier](),
},
},
},
ConflictsWith: []string{"at"},
}
42 changes: 20 additions & 22 deletions pkg/resources/stream_on_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var streamOnTableSchema = map[string]*schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: "Retains the access permissions from the original stream when a new stream is created using the OR REPLACE clause.",
Description: "Retains the access permissions from the original stream when a new stream is created using the OR REPLACE clause. Use only if the resource is already managed by Terraform.",
DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool {
return oldValue != "" && oldValue != newValue
},
Expand All @@ -69,15 +69,15 @@ var streamOnTableSchema = map[string]*schema.Schema{
Optional: true,
Default: BooleanDefault,
ValidateDiagFunc: validateBooleanString,
Description: booleanStringFieldDescription("Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed."),
Description: externalChangesNotDetectedFieldDescription(booleanStringFieldDescription("Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.")),
},
"comment": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies a comment for the stream.",
},
AtAttributeName: schemas.AtSchema,
BeforeAttributeName: schemas.BeforeSchema,
AtAttributeName: atSchema,
BeforeAttributeName: beforeSchema,
ShowOutputAttributeName: {
Type: schema.TypeList,
Computed: true,
Expand Down Expand Up @@ -139,7 +139,7 @@ func ImportStreamOnTable(ctx context.Context, d *schema.ResourceData, meta any)
if err := d.Set("schema", id.SchemaName()); err != nil {
return nil, err
}
if err := d.Set("append_only", booleanStringFromBool(*v.Mode == sdk.StreamModeAppendOnly)); err != nil {
if err := d.Set("append_only", booleanStringFromBool(v.IsAppendOnly())); err != nil {
return nil, err
}
return []*schema.ResourceData{d}, nil
Expand All @@ -162,26 +162,24 @@ func CreateStreamOnTable(orReplace bool) schema.CreateContextFunc {
req := sdk.NewCreateOnTableStreamRequest(id, tableId)
if orReplace {
req.WithOrReplace(true)
} else if d.Get("copy_grants").(bool) {
return diag.Diagnostics{
diag.Diagnostic{
Severity: diag.Error,
Summary: "COPY GRANTS cannot be used without OR REPLACE. If you are creating this object, first import it, so that it's managed by the provider, and try again.",
sfc-gh-jcieslak marked this conversation as resolved.
Show resolved Hide resolved
Detail: fmt.Sprintf("stream name: %s", id.FullyQualifiedName()),
},
}
}

if v := d.Get("copy_grants"); v.(bool) {
req.WithCopyGrants(true).WithOrReplace(true)
}

if v := d.Get("append_only").(string); v != BooleanDefault {
parsed, err := booleanStringToBool(v)
if err != nil {
return diag.FromErr(err)
}
req.WithAppendOnly(parsed)
err = booleanStringAttributeCreate(d, "append_only", &req.AppendOnly)
if err != nil {
return diag.FromErr(err)
}

if v := d.Get("show_initial_rows").(string); v != BooleanDefault {
parsed, err := booleanStringToBool(v)
if err != nil {
return diag.FromErr(err)
}
req.WithShowInitialRows(parsed)
err = booleanStringAttributeCreate(d, "show_initial_rows", &req.ShowInitialRows)
if err != nil {
return diag.FromErr(err)
}

streamTimeTravelReq := handleStreamTimeTravel(d)
Expand Down Expand Up @@ -253,7 +251,7 @@ func ReadStreamOnTable(withExternalChangesMarking bool) schema.ReadContextFunc {
mode = *stream.Mode
}
if err = handleExternalChangesToObjectInShow(d,
showMapping{"mode", "append_only", string(mode), booleanStringFromBool(mode == sdk.StreamModeAppendOnly), nil},
showMapping{"mode", "append_only", string(mode), booleanStringFromBool(stream.IsAppendOnly()), nil},
); err != nil {
return diag.FromErr(err)
}
Expand Down
Loading
Loading