Skip to content

Commit

Permalink
STREAM-166: fix bug in CDC resource
Browse files Browse the repository at this point in the history
This fixes an issue when the local CDC state is out of sync with the remote resource state
and the plugin crashes with an index out of bounds on the CDC status slice.

The issue happens when the user does the following
1. create CDC configs using Terraform
2. manually delete all CDC configs for that database via the UI
3. run terraform apply again

This also includes some other minor code cleanup.
  • Loading branch information
pgier committed Aug 31, 2023
1 parent e74a348 commit c0b42e3
Showing 1 changed file with 20 additions and 33 deletions.
53 changes: 20 additions & 33 deletions internal/provider/resource_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,12 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met
return diag.FromErr(fmt.Errorf("failed to request organization: %w", err))
}

var org OrgId

err = json.NewDecoder(orgResp.Body).Decode(&org)
if err != nil {
var orgId OrgId
if err = json.NewDecoder(orgResp.Body).Decode(&orgId); err != nil {
return diag.FromErr(fmt.Errorf("failed to read organization: %w", err))
}

pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName)
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, orgId, streamingClient, tenantName)
if err != nil {
diag.FromErr(err)
}
Expand All @@ -222,33 +220,20 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met
diag.FromErr(fmt.Errorf("failed get CDC request: %w", err))
} else if getCDCResponse.StatusCode > 299 {
body, _ := io.ReadAll(getCDCResponse.Body)
return diag.Errorf("Error getting cdc config %s", body)
return diag.Errorf("failed to get cdc status, http code: %v, message %s", getCDCResponse.StatusCode, body)
}

var cdcResult CDCStatusResponse
err = json.NewDecoder(getCDCResponse.Body).Decode(&cdcResult)
if err != nil {
if err = json.NewDecoder(getCDCResponse.Body).Decode(&cdcResult); err != nil {
return diag.FromErr(fmt.Errorf("failed to read CDC status: %w", err))
}

for i := 0; i < len(cdcResult); i++ {
if cdcResult[i].Keyspace == keyspace {
if cdcResult[i].DatabaseTable == table {
return nil
}
}
}

if err := resourceData.Set("connector_status", cdcResult[0].ConnectorStatus); err != nil {
return diag.FromErr(err)
}
if err := resourceData.Set("data_topic", cdcResult[0].DataTopic); err != nil {
return diag.FromErr(err)
if cdcStatus := getTableCDCStatus(databaseId, keyspace, table, cdcResult); cdcStatus != nil {
return setCDCData(resourceData, *cdcStatus)
}

// Not found. Remove from state.
resourceData.SetId("")

return nil
}

Expand Down Expand Up @@ -294,6 +279,7 @@ type StreamingTokens []struct {
var cdcEnablementMutex sync.Mutex

func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
const cdcCreateWaitTime = time.Second * 3
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
Expand Down Expand Up @@ -350,19 +336,12 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m
}

tflog.Info(ctx, fmt.Sprintf("waiting for CDC on keyspace: %s, table: %s", cdcRequestJSON.Keyspace, cdcRequestJSON.TableName))
time.Sleep(time.Second * 3)
time.Sleep(cdcCreateWaitTime)

if cdcStatus, err := waitCDCStatusReady(ctx, streamingClientv3, databaseId, keyspace, table, tenantName, getCDCParams); err != nil {
return diag.FromErr(fmt.Errorf("failed to check CDC status %w", err))
} else if cdcStatus != nil {
if err := resourceData.Set("connector_status", cdcStatus.CodStatus); err != nil {
return diag.FromErr(err)
}
if err := resourceData.Set("data_topic", cdcStatus.DataTopic); err != nil {
return diag.FromErr(err)
}
setCDCData(resourceData, fmt.Sprintf("%s/%s/%s/%s", databaseId, keyspace, table, tenantName))
return nil
return setCDCData(resourceData, *cdcStatus)
}

tflog.Warn(ctx, fmt.Sprintf("CDC not ready after max wait time, remaining retries: %v", (maxRetries-i)))
Expand Down Expand Up @@ -462,8 +441,16 @@ func getPulsarToken(ctx context.Context, pulsarCluster string, token string, org
return pulsarToken, err
}

func setCDCData(d *schema.ResourceData, id string) error {
d.SetId(id)
func setCDCData(resourceData *schema.ResourceData, cdc CDCStatus) diag.Diagnostics {
if err := resourceData.Set("connector_status", cdc.CodStatus); err != nil {
return diag.FromErr(err)
}
if err := resourceData.Set("data_topic", cdc.DataTopic); err != nil {
return diag.FromErr(err)
}

cdcId := fmt.Sprintf("%s/%s/%s/%s", cdc.DatabaseID, cdc.Keyspace, cdc.DatabaseTable, cdc.Tenant)
resourceData.SetId(cdcId)

return nil
}
Expand Down

0 comments on commit c0b42e3

Please sign in to comment.