From a692d678fa641dec8f3c9aaedd332d9f2cc7b4c1 Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Tue, 13 Aug 2024 11:07:21 -0500 Subject: [PATCH] Add graceful reconfig, Wait Until Ready introduces wait_until_ready option for clusters. This only effects the behavior of cluster updates, allowing a create-before-delete type strategy that increases cluster availability during cluster updates. --- docs/resources/cluster.md | 12 ++++++ pkg/materialize/cluster.go | 44 ++++++++++++++++++- pkg/materialize/cluster_test.go | 18 +++++++- pkg/provider/acceptance_cluster_test.go | 56 +++++++++++++++++++++++++ pkg/resources/resource_cluster.go | 36 +++++++++++++++- 5 files changed, 163 insertions(+), 3 deletions(-) diff --git a/docs/resources/cluster.md b/docs/resources/cluster.md index b0b23491..bc9e9d24 100644 --- a/docs/resources/cluster.md +++ b/docs/resources/cluster.md @@ -38,6 +38,7 @@ resource "materialize_cluster" "example_cluster" { - `replication_factor` (Number) The number of replicas of each dataflow-powered object to maintain. - `scheduling` (Block List, Max: 1) Defines the scheduling parameters for the cluster. (see [below for nested schema](#nestedblock--scheduling)) - `size` (String) The size of the managed cluster. +- `wait_until_ready` (Block List, Max: 1) Defines the parameters for the WAIT UNTIL READY options (see [below for nested schema](#nestedblock--wait_until_ready)) ### Read-Only @@ -59,6 +60,17 @@ Optional: - `hydration_time_estimate` (String) Estimated time to hydrate the cluster during refresh. - `rehydration_time_estimate` (String, Deprecated) Estimated time to rehydrate the cluster during refresh. This field is deprecated and will be removed in a future release. Use `hydration_time_estimate` instead. + + + +### Nested Schema for `wait_until_ready` + +Optional: + +- `enabled` (Boolean) Enable wait_until_ready. +- `on_timeout` (String) Action to take on timeout: COMMIT|ROLLBACK +- `timeout` (String) Max duration to wait for the new replicas to be ready. + ## Import Import is supported using the following syntax: diff --git a/pkg/materialize/cluster.go b/pkg/materialize/cluster.go index d978d50f..56ac8279 100644 --- a/pkg/materialize/cluster.go +++ b/pkg/materialize/cluster.go @@ -9,6 +9,12 @@ import ( "github.com/lib/pq" ) +type ReconfigurationOptions struct { + enabled bool + timeout string + on_timeout string +} + // DDL type ClusterBuilder struct { ddl Builder @@ -87,6 +93,34 @@ func GetSchedulingConfig(v interface{}) SchedulingConfig { return config } +func (b *ClusterBuilder) GetReconfigOpts(v interface{}) ReconfigurationOptions { + if v == nil { + return ReconfigurationOptions{} + } + configSlice, ok := v.([]interface{}) + if !ok || len(configSlice) == 0 { + return ReconfigurationOptions{} + } + + configMap, ok := configSlice[0].(map[string]interface{}) + if !ok { + return ReconfigurationOptions{} + } + + reconfigOpts := ReconfigurationOptions{} + + if o, ok := configMap["enabled"]; ok { + reconfigOpts.enabled = o.(bool) + } + if o, ok := configMap["timeout"]; ok { + reconfigOpts.timeout = o.(string) + } + if o, ok := configMap["on_timeout"]; ok { + reconfigOpts.on_timeout = o.(string) + } + return reconfigOpts +} + func (b *ClusterBuilder) QualifiedName() string { return QualifiedName(b.clusterName) } @@ -199,14 +233,22 @@ func (b *ClusterBuilder) AlterClusterScheduling(s SchedulingConfig) error { return b.ddl.exec(q.String()) } -func (b *ClusterBuilder) AlterCluster() error { +func (b *ClusterBuilder) AlterCluster(reconfig_opts ReconfigurationOptions) error { q := strings.Builder{} + graceful_reconfig_statement := fmt.Sprintf( + " WITH ( WAIT UNTIL READY ( TIMEOUT %s, ON TIMEOUT %s ) )", + QuoteString(reconfig_opts.timeout), + QuoteString(reconfig_opts.on_timeout), + ) q.WriteString(fmt.Sprintf(`ALTER CLUSTER %s`, b.QualifiedName())) // The only alterations to unmanaged clusters should be to // move them to maanged clusters, we will assume here that we are only // dealing with managed clusters q.WriteString(fmt.Sprintf(` SET (%s)`, b.GenerateClusterOptions())) + if reconfig_opts.enabled { + q.WriteString(graceful_reconfig_statement) + } q.WriteString(`;`) return b.ddl.exec(q.String()) } diff --git a/pkg/materialize/cluster_test.go b/pkg/materialize/cluster_test.go index 2ce298e3..ccc90642 100644 --- a/pkg/materialize/cluster_test.go +++ b/pkg/materialize/cluster_test.go @@ -135,7 +135,23 @@ func TestClusterUpdate(t *testing.T) { b.SetSize("xsmall") b.SetReplicationFactor(2) - if err := b.AlterCluster(); err != nil { + if err := b.AlterCluster(ReconfigurationOptions{}); err != nil { + t.Fatalf("Expected no error, got %v", err) + } + }) +} + +func TestClusterUpdateWithWaitUntilReady(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + expectedSQL := `ALTER CLUSTER "cluster" SET \(SIZE 'xsmall', REPLICATION FACTOR 2\) WITH \( WAIT UNTIL READY \( TIMEOUT '10s', ON TIMEOUT 'COMMIT' \) \);` + mock.ExpectExec(expectedSQL).WillReturnResult(sqlmock.NewResult(1, 1)) + + o := MaterializeObject{Name: "cluster"} + b := NewClusterBuilder(db, o) + b.SetSize("xsmall") + b.SetReplicationFactor(2) + + if err := b.AlterCluster(ReconfigurationOptions{enabled: true, timeout: "10s", on_timeout: "COMMIT"}); err != nil { t.Fatalf("Expected no error, got %v", err) } }) diff --git a/pkg/provider/acceptance_cluster_test.go b/pkg/provider/acceptance_cluster_test.go index 48a88f43..0ab70632 100644 --- a/pkg/provider/acceptance_cluster_test.go +++ b/pkg/provider/acceptance_cluster_test.go @@ -349,6 +349,34 @@ func TestAccCluster_disappears(t *testing.T) { }) } +func TestAccClusterAlterGraceful(t *testing.T) { + clusterName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + size := "3xsmall" + newSize := "2xsmall" + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + Steps: []resource.TestStep{ + { + Config: testAccManagedClusterResource(clusterName, size, "1"), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("materialize_cluster.test", "name", clusterName), + resource.TestCheckResourceAttr("materialize_cluster.test", "size", size), + resource.TestCheckResourceAttr("materialize_cluster.test", "replication_factor", "1"), + ), + }, + { + Config: testAccManagedClusterResourceAlterGraceful(clusterName, newSize, "1", "COMMIT"), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("materialize_cluster.test", "name", clusterName), + resource.TestCheckResourceAttr("materialize_cluster.test", "size", newSize), + resource.TestCheckResourceAttr("materialize_cluster.test", "replication_factor", "1"), + ), + }, + }, + }) +} + func testAccClusterResource( roleName, cluster1Name, @@ -410,6 +438,34 @@ func testAccClusterManagedNoReplicationResource(clusterName, clusterSize string) clusterName, clusterSize) } +func testAccManagedClusterResource(clusterName, clusterSize string, replicationFactor string) string { + return fmt.Sprintf(` + resource "materialize_cluster" "test" { + name = "%[1]s" + size = "%[2]s" + replication_factor = %[3]s + } + `, + clusterName, clusterSize, replicationFactor) +} + +func testAccManagedClusterResourceAlterGraceful(clusterName, clusterSize string, replicationFactor string, onTimeoutAction string) string { + return fmt.Sprintf(` + resource "materialize_cluster" "test" { + name = "%[1]s" + size = "%[2]s" + replication_factor = %[3]s + wait_until_ready { + enabled = true + timeout = "10m" + on_timeout = "%[4]s" + + } + } + `, + clusterName, clusterSize, replicationFactor, onTimeoutAction) +} + func testAccClusterManagedZeroReplicationResource(clusterName, clusterSize string) string { return fmt.Sprintf(` resource "materialize_cluster" "test" { diff --git a/pkg/resources/resource_cluster.go b/pkg/resources/resource_cluster.go index 31884476..31c03d0b 100644 --- a/pkg/resources/resource_cluster.go +++ b/pkg/resources/resource_cluster.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log" + "regexp" "strings" "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" @@ -12,6 +13,7 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" ) var clusterSchema = map[string]*schema.Schema{ @@ -82,6 +84,36 @@ var clusterSchema = map[string]*schema.Schema{ Description: "Use the cluster name as the resource identifier in your state file, rather than the internal cluster ID. This is particularly useful in scenarios like dbt-materialize blue/green deployments, where clusters are swapped but the ID changes. By identifying by name, the resource can be managed consistently even when the underlying cluster ID is updated.", }, "region": RegionSchema(), + "wait_until_ready": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Description: "Defines the parameters for the WAIT UNTIL READY options", + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "enabled": { + Type: schema.TypeBool, + Optional: true, + Default: false, + Description: "Enable wait_until_ready.", + }, + "timeout": { + Type: schema.TypeString, + Optional: true, + Default: "0s", + Description: "Max duration to wait for the new replicas to be ready.", + ValidateFunc: validation.StringMatch(regexp.MustCompile("^\\d+[smh]{1}$"), "Must be a valid duration in the form of ex: 1s, 10m"), + }, + "on_timeout": { + Type: schema.TypeString, + Optional: true, + Description: "Action to take on timeout: COMMIT|ROLLBACK", + Default: "COMMIT", + ValidateFunc: validation.StringInSlice([]string{"COMMIT", "ROLLBACK"}, true), + }, + }, + }, + }, } func Cluster() *schema.Resource { @@ -363,7 +395,9 @@ func clusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{} } if changed { - if err := b.AlterCluster(); err != nil { + _, reconfigOptsRaw := d.GetChange("wait_until_ready") + reconfigOpts := b.GetReconfigOpts(reconfigOptsRaw) + if err := b.AlterCluster(reconfigOpts); err != nil { return diag.FromErr(err) } }