Skip to content

Commit

Permalink
always spawn heartbeatRoutine asap in activity (#2490)
Browse files Browse the repository at this point in the history
spawning later means routine is shutdown before earlier defers,
if those take a long time activity will timeout
  • Loading branch information
serprex authored Jan 27, 2025
1 parent cb8d16c commit 6626128
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 42 deletions.
76 changes: 40 additions & 36 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ func (a *FlowableActivity) SetupTableSchema(
ctx context.Context,
config *protos.SetupTableSchemaBatchInput,
) error {
shutdown := heartbeatRoutine(ctx, func() string {
return "getting table schema"
})
defer shutdown()

logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
Expand All @@ -173,10 +178,6 @@ func (a *FlowableActivity) SetupTableSchema(
}
defer connectors.CloseConnector(ctx, srcConn)

heartbeatRoutine(ctx, func() string {
return "getting table schema"
})

tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.System, config.TableIdentifiers)
if err != nil {
return fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)
Expand Down Expand Up @@ -214,6 +215,14 @@ func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
numTablesSetup := atomic.Uint32{}
numTablesToSetup := atomic.Int32{}

shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), numTablesToSetup.Load())
})
defer shutdown()

logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
Expand All @@ -236,13 +245,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
if err != nil {
return nil, err
}

numTablesSetup := atomic.Uint32{}
shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), len(tableNameSchemaMapping))
})
defer shutdown()
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))

tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
for tableIdentifier, tableSchema := range tableNameSchemaMapping {
Expand Down Expand Up @@ -281,17 +284,13 @@ func (a *FlowableActivity) SyncFlow(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

var currentSyncFlowNum atomic.Int32
var totalRecordsSynced atomic.Int64
var normalizingBatchID atomic.Int64
var normalizeWaiting atomic.Bool
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]
syncState.Store(shared.Ptr("setup"))

shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
Expand All @@ -308,6 +307,9 @@ func (a *FlowableActivity) SyncFlow(
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -482,6 +484,11 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
last *protos.QRepPartition,
runUUID string,
) (*protos.QRepParitionResult, error) {
shutdown := heartbeatRoutine(ctx, func() string {
return "getting partitions for job"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))
if err := monitoring.InitializeQRepRun(ctx, logger, a.CatalogPool, config, runUUID, nil, config.ParentMirrorName); err != nil {
Expand All @@ -493,10 +500,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "getting partitions for job"
})
defer shutdown()
partitions, err := srcConn.GetQRepPartitions(ctx, config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -590,6 +593,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string,
) error {
shutdown := heartbeatRoutine(ctx, func() string {
return "consolidating partitions for job"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetByNameAs[connectors.QRepConsolidateConnector](ctx, config.Env, a.CatalogPool, config.DestinationName)
if errors.Is(err, errors.ErrUnsupported) {
Expand All @@ -599,11 +607,6 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "consolidating partitions for job"
})
defer shutdown()

if err := dstConn.ConsolidateQRepPartitions(ctx, config); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
Expand Down Expand Up @@ -811,6 +814,11 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition,
) (bool, error) {
shutdown := heartbeatRoutine(ctx, func() string {
return "scanning for new rows"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

Expand All @@ -825,11 +833,6 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "scanning for new rows"
})
defer shutdown()

logger.Info(fmt.Sprintf("current last partition value is %v", last))

maxValue, err := srcConn.GetMaxValue(ctx, config, last)
Expand Down Expand Up @@ -859,6 +862,11 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
shutdown := heartbeatRoutine(ctx, func() string {
return "renaming tables for job"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
conn, err := connectors.GetByNameAs[connectors.RenameTablesConnector](ctx, nil, a.CatalogPool, config.PeerName)
if err != nil {
Expand All @@ -867,11 +875,6 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}
defer connectors.CloseConnector(ctx, conn)

shutdown := heartbeatRoutine(ctx, func() string {
return "renaming tables for job"
})
defer shutdown()

tableNameSchemaMapping := make(map[string]*protos.TableSchema, len(config.RenameTableOptions))
for _, option := range config.RenameTableOptions {
schema, err := shared.LoadTableSchemaFromCatalog(
Expand Down Expand Up @@ -915,12 +918,13 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}

func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName))
shutdown := heartbeatRoutine(ctx, func() string {
return "deleting mirror stats"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName))
if err := monitoring.DeleteMirrorStats(ctx, logger, a.CatalogPool, flowName); err != nil {
logger.Warn("was not able to delete mirror stats", slog.Any("error", err))
return err
Expand Down
10 changes: 4 additions & 6 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,18 +508,16 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
) (int, int64, error),
syncRecords func(TSync, context.Context, *protos.QRepConfig, *protos.QRepPartition, TRead) (int, error),
) (int64, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

startTime := time.Now()

logger.Info("replicating xmin")
shutdown := heartbeatRoutine(ctx, func() string {
return "syncing xmin"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)
logger.Info("replicating xmin")
errGroup, errCtx := errgroup.WithContext(ctx)
startTime := time.Now()

var currentSnapshotXmin int64
var rowsSynced int
Expand Down

0 comments on commit 6626128

Please sign in to comment.