diff --git a/internal/provider/resource_cdc.go b/internal/provider/resource_cdc.go index 41baf76..8d21319 100644 --- a/internal/provider/resource_cdc.go +++ b/internal/provider/resource_cdc.go @@ -201,14 +201,12 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met return diag.FromErr(fmt.Errorf("failed to request organization: %w", err)) } - var org OrgId - - err = json.NewDecoder(orgResp.Body).Decode(&org) - if err != nil { + var orgId OrgId + if err = json.NewDecoder(orgResp.Body).Decode(&orgId); err != nil { return diag.FromErr(fmt.Errorf("failed to read organization: %w", err)) } - pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName) + pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, orgId, streamingClient, tenantName) if err != nil { diag.FromErr(err) } @@ -222,33 +220,20 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met diag.FromErr(fmt.Errorf("failed get CDC request: %w", err)) } else if getCDCResponse.StatusCode > 299 { body, _ := io.ReadAll(getCDCResponse.Body) - return diag.Errorf("Error getting cdc config %s", body) + return diag.Errorf("failed to get cdc status, http code: %v, message %s", getCDCResponse.StatusCode, body) } var cdcResult CDCStatusResponse - err = json.NewDecoder(getCDCResponse.Body).Decode(&cdcResult) - if err != nil { + if err = json.NewDecoder(getCDCResponse.Body).Decode(&cdcResult); err != nil { return diag.FromErr(fmt.Errorf("failed to read CDC status: %w", err)) } - for i := 0; i < len(cdcResult); i++ { - if cdcResult[i].Keyspace == keyspace { - if cdcResult[i].DatabaseTable == table { - return nil - } - } - } - - if err := resourceData.Set("connector_status", cdcResult[0].ConnectorStatus); err != nil { - return diag.FromErr(err) - } - if err := resourceData.Set("data_topic", cdcResult[0].DataTopic); err != nil { - return diag.FromErr(err) + if cdcStatus := getTableCDCStatus(databaseId, keyspace, table, cdcResult); cdcStatus != nil { + return setCDCData(resourceData, *cdcStatus) } // Not found. Remove from state. resourceData.SetId("") - return nil } @@ -294,6 +279,7 @@ type StreamingTokens []struct { var cdcEnablementMutex sync.Mutex func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics { + const cdcCreateWaitTime = time.Second * 3 client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) streamingClientv3 := meta.(astraClients).astraStreamingClientv3 @@ -350,19 +336,12 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m } tflog.Info(ctx, fmt.Sprintf("waiting for CDC on keyspace: %s, table: %s", cdcRequestJSON.Keyspace, cdcRequestJSON.TableName)) - time.Sleep(time.Second * 3) + time.Sleep(cdcCreateWaitTime) if cdcStatus, err := waitCDCStatusReady(ctx, streamingClientv3, databaseId, keyspace, table, tenantName, getCDCParams); err != nil { return diag.FromErr(fmt.Errorf("failed to check CDC status %w", err)) } else if cdcStatus != nil { - if err := resourceData.Set("connector_status", cdcStatus.CodStatus); err != nil { - return diag.FromErr(err) - } - if err := resourceData.Set("data_topic", cdcStatus.DataTopic); err != nil { - return diag.FromErr(err) - } - setCDCData(resourceData, fmt.Sprintf("%s/%s/%s/%s", databaseId, keyspace, table, tenantName)) - return nil + return setCDCData(resourceData, *cdcStatus) } tflog.Warn(ctx, fmt.Sprintf("CDC not ready after max wait time, remaining retries: %v", (maxRetries-i))) @@ -462,8 +441,16 @@ func getPulsarToken(ctx context.Context, pulsarCluster string, token string, org return pulsarToken, err } -func setCDCData(d *schema.ResourceData, id string) error { - d.SetId(id) +func setCDCData(resourceData *schema.ResourceData, cdc CDCStatus) diag.Diagnostics { + if err := resourceData.Set("connector_status", cdc.CodStatus); err != nil { + return diag.FromErr(err) + } + if err := resourceData.Set("data_topic", cdc.DataTopic); err != nil { + return diag.FromErr(err) + } + + cdcId := fmt.Sprintf("%s/%s/%s/%s", cdc.DatabaseID, cdc.Keyspace, cdc.DatabaseTable, cdc.Tenant) + resourceData.SetId(cdcId) return nil }