Skip to content

Commit

Permalink
Merge pull request #632 from MaterializeInc/feature/graceful-reconfig
Browse files Browse the repository at this point in the history
Feature/graceful reconfig
  • Loading branch information
jubrad authored Aug 23, 2024
2 parents f51faf1 + a692d67 commit 1273ffb
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 3 deletions.
12 changes: 12 additions & 0 deletions docs/resources/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ resource "materialize_cluster" "example_cluster" {
- `replication_factor` (Number) The number of replicas of each dataflow-powered object to maintain.
- `scheduling` (Block List, Max: 1) Defines the scheduling parameters for the cluster. (see [below for nested schema](#nestedblock--scheduling))
- `size` (String) The size of the managed cluster.
- `wait_until_ready` (Block List, Max: 1) Defines the parameters for the WAIT UNTIL READY options (see [below for nested schema](#nestedblock--wait_until_ready))

### Read-Only

Expand All @@ -59,6 +60,17 @@ Optional:
- `hydration_time_estimate` (String) Estimated time to hydrate the cluster during refresh.
- `rehydration_time_estimate` (String, Deprecated) Estimated time to rehydrate the cluster during refresh. This field is deprecated and will be removed in a future release. Use `hydration_time_estimate` instead.



<a id="nestedblock--wait_until_ready"></a>
### Nested Schema for `wait_until_ready`

Optional:

- `enabled` (Boolean) Enable wait_until_ready.
- `on_timeout` (String) Action to take on timeout: COMMIT|ROLLBACK
- `timeout` (String) Max duration to wait for the new replicas to be ready.

## Import

Import is supported using the following syntax:
Expand Down
44 changes: 43 additions & 1 deletion pkg/materialize/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/lib/pq"
)

type ReconfigurationOptions struct {
enabled bool
timeout string
on_timeout string
}

// DDL
type ClusterBuilder struct {
ddl Builder
Expand Down Expand Up @@ -87,6 +93,34 @@ func GetSchedulingConfig(v interface{}) SchedulingConfig {
return config
}

func (b *ClusterBuilder) GetReconfigOpts(v interface{}) ReconfigurationOptions {
if v == nil {
return ReconfigurationOptions{}
}
configSlice, ok := v.([]interface{})
if !ok || len(configSlice) == 0 {
return ReconfigurationOptions{}
}

configMap, ok := configSlice[0].(map[string]interface{})
if !ok {
return ReconfigurationOptions{}
}

reconfigOpts := ReconfigurationOptions{}

if o, ok := configMap["enabled"]; ok {
reconfigOpts.enabled = o.(bool)
}
if o, ok := configMap["timeout"]; ok {
reconfigOpts.timeout = o.(string)
}
if o, ok := configMap["on_timeout"]; ok {
reconfigOpts.on_timeout = o.(string)
}
return reconfigOpts
}

func (b *ClusterBuilder) QualifiedName() string {
return QualifiedName(b.clusterName)
}
Expand Down Expand Up @@ -199,14 +233,22 @@ func (b *ClusterBuilder) AlterClusterScheduling(s SchedulingConfig) error {
return b.ddl.exec(q.String())
}

func (b *ClusterBuilder) AlterCluster() error {
func (b *ClusterBuilder) AlterCluster(reconfig_opts ReconfigurationOptions) error {
q := strings.Builder{}
graceful_reconfig_statement := fmt.Sprintf(
" WITH ( WAIT UNTIL READY ( TIMEOUT %s, ON TIMEOUT %s ) )",
QuoteString(reconfig_opts.timeout),
QuoteString(reconfig_opts.on_timeout),
)

q.WriteString(fmt.Sprintf(`ALTER CLUSTER %s`, b.QualifiedName()))
// The only alterations to unmanaged clusters should be to
// move them to maanged clusters, we will assume here that we are only
// dealing with managed clusters
q.WriteString(fmt.Sprintf(` SET (%s)`, b.GenerateClusterOptions()))
if reconfig_opts.enabled {
q.WriteString(graceful_reconfig_statement)
}
q.WriteString(`;`)
return b.ddl.exec(q.String())
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/materialize/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,23 @@ func TestClusterUpdate(t *testing.T) {
b.SetSize("xsmall")
b.SetReplicationFactor(2)

if err := b.AlterCluster(); err != nil {
if err := b.AlterCluster(ReconfigurationOptions{}); err != nil {
t.Fatalf("Expected no error, got %v", err)
}
})
}

func TestClusterUpdateWithWaitUntilReady(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
expectedSQL := `ALTER CLUSTER "cluster" SET \(SIZE 'xsmall', REPLICATION FACTOR 2\) WITH \( WAIT UNTIL READY \( TIMEOUT '10s', ON TIMEOUT 'COMMIT' \) \);`
mock.ExpectExec(expectedSQL).WillReturnResult(sqlmock.NewResult(1, 1))

o := MaterializeObject{Name: "cluster"}
b := NewClusterBuilder(db, o)
b.SetSize("xsmall")
b.SetReplicationFactor(2)

if err := b.AlterCluster(ReconfigurationOptions{enabled: true, timeout: "10s", on_timeout: "COMMIT"}); err != nil {
t.Fatalf("Expected no error, got %v", err)
}
})
Expand Down
56 changes: 56 additions & 0 deletions pkg/provider/acceptance_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,34 @@ func TestAccCluster_disappears(t *testing.T) {
})
}

func TestAccClusterAlterGraceful(t *testing.T) {
clusterName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
size := "3xsmall"
newSize := "2xsmall"
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
Steps: []resource.TestStep{
{
Config: testAccManagedClusterResource(clusterName, size, "1"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("materialize_cluster.test", "name", clusterName),
resource.TestCheckResourceAttr("materialize_cluster.test", "size", size),
resource.TestCheckResourceAttr("materialize_cluster.test", "replication_factor", "1"),
),
},
{
Config: testAccManagedClusterResourceAlterGraceful(clusterName, newSize, "1", "COMMIT"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("materialize_cluster.test", "name", clusterName),
resource.TestCheckResourceAttr("materialize_cluster.test", "size", newSize),
resource.TestCheckResourceAttr("materialize_cluster.test", "replication_factor", "1"),
),
},
},
})
}

func testAccClusterResource(
roleName,
cluster1Name,
Expand Down Expand Up @@ -410,6 +438,34 @@ func testAccClusterManagedNoReplicationResource(clusterName, clusterSize string)
clusterName, clusterSize)
}

func testAccManagedClusterResource(clusterName, clusterSize string, replicationFactor string) string {
return fmt.Sprintf(`
resource "materialize_cluster" "test" {
name = "%[1]s"
size = "%[2]s"
replication_factor = %[3]s
}
`,
clusterName, clusterSize, replicationFactor)
}

func testAccManagedClusterResourceAlterGraceful(clusterName, clusterSize string, replicationFactor string, onTimeoutAction string) string {
return fmt.Sprintf(`
resource "materialize_cluster" "test" {
name = "%[1]s"
size = "%[2]s"
replication_factor = %[3]s
wait_until_ready {
enabled = true
timeout = "10m"
on_timeout = "%[4]s"
}
}
`,
clusterName, clusterSize, replicationFactor, onTimeoutAction)
}

func testAccClusterManagedZeroReplicationResource(clusterName, clusterSize string) string {
return fmt.Sprintf(`
resource "materialize_cluster" "test" {
Expand Down
36 changes: 35 additions & 1 deletion pkg/resources/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"database/sql"
"fmt"
"log"
"regexp"
"strings"

"github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize"
"github.com/MaterializeInc/terraform-provider-materialize/pkg/utils"

"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
)

var clusterSchema = map[string]*schema.Schema{
Expand Down Expand Up @@ -82,6 +84,36 @@ var clusterSchema = map[string]*schema.Schema{
Description: "Use the cluster name as the resource identifier in your state file, rather than the internal cluster ID. This is particularly useful in scenarios like dbt-materialize blue/green deployments, where clusters are swapped but the ID changes. By identifying by name, the resource can be managed consistently even when the underlying cluster ID is updated.",
},
"region": RegionSchema(),
"wait_until_ready": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: "Defines the parameters for the WAIT UNTIL READY options",
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: "Enable wait_until_ready.",
},
"timeout": {
Type: schema.TypeString,
Optional: true,
Default: "0s",
Description: "Max duration to wait for the new replicas to be ready.",
ValidateFunc: validation.StringMatch(regexp.MustCompile("^\\d+[smh]{1}$"), "Must be a valid duration in the form of <int><unit> ex: 1s, 10m"),
},
"on_timeout": {
Type: schema.TypeString,
Optional: true,
Description: "Action to take on timeout: COMMIT|ROLLBACK",
Default: "COMMIT",
ValidateFunc: validation.StringInSlice([]string{"COMMIT", "ROLLBACK"}, true),
},
},
},
},
}

func Cluster() *schema.Resource {
Expand Down Expand Up @@ -363,7 +395,9 @@ func clusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}
}

if changed {
if err := b.AlterCluster(); err != nil {
_, reconfigOptsRaw := d.GetChange("wait_until_ready")
reconfigOpts := b.GetReconfigOpts(reconfigOptsRaw)
if err := b.AlterCluster(reconfigOpts); err != nil {
return diag.FromErr(err)
}
}
Expand Down

0 comments on commit 1273ffb

Please sign in to comment.