From c0b42e3f53c4206b5a8a43aea5053426d98939d1 Mon Sep 17 00:00:00 2001 From: Paul Gier Date: Thu, 31 Aug 2023 12:03:49 -0500 Subject: [PATCH] STREAM-166: fix bug in CDC resource This fixes an issue when the local CDC state is out of sync with the remote resource state and the plugin crashes with an index out of bounds on the CDC status slice. The issue happens when the user does the following 1. create CDC configs using Terraform 2. manually delete all CDC configs for that database via the UI 3. run terraform apply again This also includes some other minor code cleanup. --- internal/provider/resource_cdc.go | 53 ++++++++++++------------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/internal/provider/resource_cdc.go b/internal/provider/resource_cdc.go index 41baf76..8d21319 100644 --- a/internal/provider/resource_cdc.go +++ b/internal/provider/resource_cdc.go @@ -201,14 +201,12 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met return diag.FromErr(fmt.Errorf("failed to request organization: %w", err)) } - var org OrgId - - err = json.NewDecoder(orgResp.Body).Decode(&org) - if err != nil { + var orgId OrgId + if err = json.NewDecoder(orgResp.Body).Decode(&orgId); err != nil { return diag.FromErr(fmt.Errorf("failed to read organization: %w", err)) } - pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName) + pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, orgId, streamingClient, tenantName) if err != nil { diag.FromErr(err) } @@ -222,33 +220,20 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met diag.FromErr(fmt.Errorf("failed get CDC request: %w", err)) } else if getCDCResponse.StatusCode > 299 { body, _ := io.ReadAll(getCDCResponse.Body) - return diag.Errorf("Error getting cdc config %s", body) + return diag.Errorf("failed to get cdc status, http code: %v, message %s", getCDCResponse.StatusCode, body) } var cdcResult CDCStatusResponse - err = json.NewDecoder(getCDCResponse.Body).Decode(&cdcResult) - if err != nil { + if err = json.NewDecoder(getCDCResponse.Body).Decode(&cdcResult); err != nil { return diag.FromErr(fmt.Errorf("failed to read CDC status: %w", err)) } - for i := 0; i < len(cdcResult); i++ { - if cdcResult[i].Keyspace == keyspace { - if cdcResult[i].DatabaseTable == table { - return nil - } - } - } - - if err := resourceData.Set("connector_status", cdcResult[0].ConnectorStatus); err != nil { - return diag.FromErr(err) - } - if err := resourceData.Set("data_topic", cdcResult[0].DataTopic); err != nil { - return diag.FromErr(err) + if cdcStatus := getTableCDCStatus(databaseId, keyspace, table, cdcResult); cdcStatus != nil { + return setCDCData(resourceData, *cdcStatus) } // Not found. Remove from state. resourceData.SetId("") - return nil } @@ -294,6 +279,7 @@ type StreamingTokens []struct { var cdcEnablementMutex sync.Mutex func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics { + const cdcCreateWaitTime = time.Second * 3 client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) streamingClientv3 := meta.(astraClients).astraStreamingClientv3 @@ -350,19 +336,12 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m } tflog.Info(ctx, fmt.Sprintf("waiting for CDC on keyspace: %s, table: %s", cdcRequestJSON.Keyspace, cdcRequestJSON.TableName)) - time.Sleep(time.Second * 3) + time.Sleep(cdcCreateWaitTime) if cdcStatus, err := waitCDCStatusReady(ctx, streamingClientv3, databaseId, keyspace, table, tenantName, getCDCParams); err != nil { return diag.FromErr(fmt.Errorf("failed to check CDC status %w", err)) } else if cdcStatus != nil { - if err := resourceData.Set("connector_status", cdcStatus.CodStatus); err != nil { - return diag.FromErr(err) - } - if err := resourceData.Set("data_topic", cdcStatus.DataTopic); err != nil { - return diag.FromErr(err) - } - setCDCData(resourceData, fmt.Sprintf("%s/%s/%s/%s", databaseId, keyspace, table, tenantName)) - return nil + return setCDCData(resourceData, *cdcStatus) } tflog.Warn(ctx, fmt.Sprintf("CDC not ready after max wait time, remaining retries: %v", (maxRetries-i))) @@ -462,8 +441,16 @@ func getPulsarToken(ctx context.Context, pulsarCluster string, token string, org return pulsarToken, err } -func setCDCData(d *schema.ResourceData, id string) error { - d.SetId(id) +func setCDCData(resourceData *schema.ResourceData, cdc CDCStatus) diag.Diagnostics { + if err := resourceData.Set("connector_status", cdc.CodStatus); err != nil { + return diag.FromErr(err) + } + if err := resourceData.Set("data_topic", cdc.DataTopic); err != nil { + return diag.FromErr(err) + } + + cdcId := fmt.Sprintf("%s/%s/%s/%s", cdc.DatabaseID, cdc.Keyspace, cdc.DatabaseTable, cdc.Tenant) + resourceData.SetId(cdcId) return nil }