diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index a7c017f71..a5198c89b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -378,9 +378,7 @@ func (a *FlowableActivity) SyncFlow( slog.Int64("totalRecordsSynced", totalRecordsSynced.Load())) if a.OtelManager != nil { a.OtelManager.Metrics.RecordsSyncedGauge.Record(ctx, syncResponse.NumRecordsSynced, metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, config.FlowJobName), attribute.String(otel_metrics.BatchIdKey, strconv.FormatInt(syncResponse.CurrentSyncBatchID, 10)), - attribute.String(otel_metrics.SourcePeerType, fmt.Sprintf("%T", srcConn)), ))) } if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs { @@ -1110,3 +1108,31 @@ func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowN return nil } + +func (a *FlowableActivity) GetFlowMetadata( + ctx context.Context, + flowName string, + sourceName string, + destinationName string, +) (*protos.FlowContextMetadata, error) { + logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName)) + peerTypes, err := connectors.LoadPeerTypes(ctx, a.CatalogPool, []string{sourceName, destinationName}) + if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) + return nil, err + } + logger.Info("loaded peer types for flow", slog.String("flowName", flowName), + slog.String("sourceName", sourceName), slog.String("destinationName", destinationName), + slog.Any("peerTypes", peerTypes)) + return &protos.FlowContextMetadata{ + FlowName: flowName, + Source: &protos.PeerContextMetadata{ + Name: sourceName, + Type: peerTypes[sourceName], + }, + Destination: &protos.PeerContextMetadata{ + Name: destinationName, + Type: peerTypes[destinationName], + }, + }, nil +} diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 011ddfa96..260615c0c 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -230,14 +230,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } var res *model.SyncResponse - var dstConnType string errGroup.Go(func() error { dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { return fmt.Errorf("failed to recreate destination connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) - dstConnType = fmt.Sprintf("%T", dstConn) syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) if err != nil { @@ -323,11 +321,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) if a.OtelManager != nil { - a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID, metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, flowName), - attribute.String(otel_metrics.SourcePeerType, fmt.Sprintf("%T", srcConn)), - attribute.String(otel_metrics.DestinationPeerType, dstConnType), - ))) + a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID) } syncState.Store(shared.Ptr("updating schema")) diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 7eae1955d..987411ec2 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -10,9 +10,11 @@ import ( "go.temporal.io/sdk/client" temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/activities" "github.com/PeerDB-io/peerdb/flow/alerting" + "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/otel_metrics" "github.com/PeerDB-io/peerdb/flow/peerdbenv" "github.com/PeerDB-io/peerdb/flow/shared" @@ -30,6 +32,9 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (*WorkerSetupResponse, erro HostPort: opts.TemporalHostPort, Namespace: opts.TemporalNamespace, Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), + ContextPropagators: []workflow.ContextPropagator{ + shared.NewContextPropagator[*protos.FlowContextMetadata](shared.FlowMetadataKey), + }, } if opts.EnableOtelMetrics { diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index c3e8fe85b..2648d3114 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -13,9 +13,11 @@ import ( "go.temporal.io/sdk/client" temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/activities" "github.com/PeerDB-io/peerdb/flow/alerting" + "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/otel_metrics" "github.com/PeerDB-io/peerdb/flow/peerdbenv" "github.com/PeerDB-io/peerdb/flow/shared" @@ -99,6 +101,9 @@ func WorkerSetup(opts *WorkerSetupOptions) (*WorkerSetupResponse, error) { HostPort: opts.TemporalHostPort, Namespace: opts.TemporalNamespace, Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), + ContextPropagators: []workflow.ContextPropagator{ + shared.NewContextPropagator[*protos.FlowContextMetadata](shared.FlowMetadataKey), + }, } if opts.EnableOtelMetrics { metricsProvider, metricsErr := otel_metrics.SetupTemporalMetricsProvider("flow-worker") diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 1d0bcd8cb..2a6b48758 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -294,6 +294,29 @@ func LoadPeerType(ctx context.Context, catalogPool *pgxpool.Pool, peerName strin return dbtype, err } +func LoadPeerTypes(ctx context.Context, catalogPool *pgxpool.Pool, peerNames []string) (map[string]protos.DBType, error) { + if len(peerNames) == 0 { + return nil, nil + } + + rows, err := catalogPool.Query(ctx, "SELECT name, type FROM peers WHERE name = ANY($1)", peerNames) + if err != nil { + return nil, err + } + defer rows.Close() + + peerTypes := make(map[string]protos.DBType) + for rows.Next() { + var peerName string + var dbtype protos.DBType + if err := rows.Scan(&peerName, &dbtype); err != nil { + return nil, err + } + peerTypes[peerName] = dbtype + } + return peerTypes, nil +} + func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) (*protos.Peer, error) { row := catalogPool.QueryRow(ctx, ` SELECT type, options, enc_key_id diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index ba14deed4..971952c35 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -15,8 +15,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/activity" connmetadata "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata" @@ -473,10 +471,7 @@ func PullCdcRecords[Items model.Items]( return shared.LogError(logger, fmt.Errorf("received Postgres WAL error: %+v", msg)) case *pgproto3.CopyData: if p.otelManager != nil { - p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, int64(len(msg.Data)), metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, req.FlowJobName), - attribute.String(otel_metrics.SourcePeerType, fmt.Sprintf("%T", p)), - ))) + p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, int64(len(msg.Data))) } switch msg.Data[0] { case pglogrepl.PrimaryKeepaliveMessageByteID: diff --git a/flow/otel_metrics/attributes.go b/flow/otel_metrics/attributes.go index f19969f64..5604850d0 100644 --- a/flow/otel_metrics/attributes.go +++ b/flow/otel_metrics/attributes.go @@ -11,6 +11,8 @@ const ( BatchIdKey = "batchId" SourcePeerType = "sourcePeerType" DestinationPeerType = "destinationPeerType" + SourcePeerName = "sourcePeerName" + DestinationPeerName = "destinationPeerName" ) const ( diff --git a/flow/otel_metrics/observables.go b/flow/otel_metrics/observables.go index 6d9f97017..f9b6254d4 100644 --- a/flow/otel_metrics/observables.go +++ b/flow/otel_metrics/observables.go @@ -8,6 +8,9 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/shared" ) type ObservationMapValue[V comparable] struct { @@ -99,6 +102,52 @@ func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Fl return &Float64SyncGauge{syncGauge: syncGauge}, nil } +func buildFlowMetadataAttributes(flowMetadata *protos.FlowContextMetadata) metric.MeasurementOption { + return metric.WithAttributeSet(attribute.NewSet( + attribute.String(FlowNameKey, flowMetadata.FlowName), + attribute.String(SourcePeerType, flowMetadata.Source.Type.String()), + attribute.String(DestinationPeerType, flowMetadata.Destination.Type.String()), + attribute.String(SourcePeerName, flowMetadata.Source.Name), + attribute.String(DestinationPeerName, flowMetadata.Destination.Name), + )) +} + +type ContextAwareInt64SyncGauge struct { + metric.Int64Gauge +} + +func (a *ContextAwareInt64SyncGauge) Record(ctx context.Context, value int64, options ...metric.RecordOption) { + flowMetadata := shared.GetFlowMetadata(ctx) + if flowMetadata != nil { + options = append(options, buildFlowMetadataAttributes(flowMetadata)) + } + a.Int64Gauge.Record(ctx, value, options...) +} + +type ContextAwareFloat64SyncGauge struct { + metric.Float64Gauge +} + +func (a *ContextAwareFloat64SyncGauge) Record(ctx context.Context, value float64, options ...metric.RecordOption) { + flowMetadata := shared.GetFlowMetadata(ctx) + if flowMetadata != nil { + options = append(options, buildFlowMetadataAttributes(flowMetadata)) + } + a.Float64Gauge.Record(ctx, value, options...) +} + +type ContextAwareInt64Counter struct { + metric.Int64Counter +} + +func (a *ContextAwareInt64Counter) Add(ctx context.Context, value int64, options ...metric.AddOption) { + flowMetadata := shared.GetFlowMetadata(ctx) + if flowMetadata != nil { + options = append(options, buildFlowMetadataAttributes(flowMetadata)) + } + a.Int64Counter.Add(ctx, value, options...) +} + func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { gaugeConfig := metric.NewInt64GaugeConfig(opts...) return NewInt64SyncGauge(meter, name, @@ -107,6 +156,14 @@ func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption ) } +func ContextAwareInt64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { + gauge, err := Int64Gauge(meter, name, opts...) + if err != nil { + return nil, err + } + return &ContextAwareInt64SyncGauge{Int64Gauge: gauge}, nil +} + func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { gaugeConfig := metric.NewFloat64GaugeConfig(opts...) return NewFloat64SyncGauge(meter, name, @@ -114,3 +171,24 @@ func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOp metric.WithUnit(gaugeConfig.Unit()), ) } + +func ContextAwareFloat64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { + gauge, err := Float64Gauge(meter, name, opts...) + if err != nil { + return nil, err + } + return &ContextAwareFloat64SyncGauge{Float64Gauge: gauge}, nil +} + +func NewContextAwareInt64Counter(meter metric.Meter, name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { + counterConfig := metric.NewInt64CounterConfig(opts...) + counter, err := meter.Int64Counter(name, + metric.WithDescription(counterConfig.Description()), + metric.WithUnit(counterConfig.Unit()), + ) + if err != nil { + return nil, err + } + + return &ContextAwareInt64Counter{Int64Counter: counter}, nil +} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 1040d35a1..5d121666a 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -117,16 +117,16 @@ func getOrInitMetric[M any, O any]( func (om *OtelManager) GetOrInitInt64Gauge(name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { // Once fixed, replace first argument below with metric.Meter.Int64Gauge - return getOrInitMetric(Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) + return getOrInitMetric(ContextAwareInt64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) } func (om *OtelManager) GetOrInitFloat64Gauge(name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { // Once fixed, replace first argument below with metric.Meter.Float64Gauge - return getOrInitMetric(Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) + return getOrInitMetric(ContextAwareFloat64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) } func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - return getOrInitMetric(metric.Meter.Int64Counter, om.Meter, om.Int64CountersCache, name, opts...) + return getOrInitMetric(NewContextAwareInt64Counter, om.Meter, om.Int64CountersCache, name, opts...) } func (om *OtelManager) setupMetrics() error { diff --git a/flow/shared/context.go b/flow/shared/context.go new file mode 100644 index 000000000..718e1e08c --- /dev/null +++ b/flow/shared/context.go @@ -0,0 +1,84 @@ +package shared + +import ( + "context" + + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +type TemporalContextKey string + +func (k TemporalContextKey) HeaderKey() string { + return string(k) +} + +const ( + FlowMetadataKey TemporalContextKey = "x-peerdb-flow-metadata" +) + +type PeerMetadata struct { + Name string + Type protos.DBType +} + +func GetFlowMetadata(ctx context.Context) *protos.FlowContextMetadata { + if metadata, ok := ctx.Value(FlowMetadataKey).(*protos.FlowContextMetadata); ok { + return metadata + } + return nil +} + +type ContextPropagator[V any] struct { + Key TemporalContextKey +} + +func NewContextPropagator[V any](key TemporalContextKey) workflow.ContextPropagator { + return &ContextPropagator[V]{Key: key} +} + +func (c *ContextPropagator[V]) Inject(ctx context.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(c.Key) + payload, err := converter.GetDefaultDataConverter().ToPayload(value) + if err != nil { + return err + } + writer.Set(c.Key.HeaderKey(), payload) + return nil +} + +func (c *ContextPropagator[V]) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) { + if payload, ok := reader.Get(c.Key.HeaderKey()); ok { + var value V + if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil { + return ctx, nil + } + ctx = context.WithValue(ctx, c.Key, value) + } + + return ctx, nil +} + +func (c *ContextPropagator[V]) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(c.Key) + payload, err := converter.GetDefaultDataConverter().ToPayload(value) + if err != nil { + return err + } + writer.Set(c.Key.HeaderKey(), payload) + return nil +} + +func (c *ContextPropagator[V]) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) { + if payload, ok := reader.Get(c.Key.HeaderKey()); ok { + var value V + if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil { + return ctx, nil + } + ctx = workflow.WithValue(ctx, c.Key, value) + } + + return ctx, nil +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 4755cb0f9..d811dd869 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -52,18 +52,6 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow } } -func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T { - sideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - return f(ctx) - }) - - var result T - if err := sideEffect.Get(&result); err != nil { - panic(err) - } - return result -} - func GetUUID(ctx workflow.Context) string { return GetSideEffect(ctx, func(_ workflow.Context) string { return uuid.New().String() @@ -373,6 +361,12 @@ func CDCFlowWorkflow( originalRunID := workflow.GetInfo(ctx).OriginalRunID + var err error + ctx, err = GetFlowMetadataContext(ctx, cfg.FlowJobName, cfg.SourceName, cfg.DestinationName) + if err != nil { + return state, fmt.Errorf("failed to get flow metadata context: %w", err) + } + // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index c0131743d..970035c18 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "errors" + "fmt" "log/slog" "time" @@ -85,7 +86,12 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), input.FlowJobName)) - + var err error + ctx, err = GetFlowMetadataContext(ctx, + input.FlowJobName, input.FlowConnectionConfigs.SourceName, input.FlowConnectionConfigs.DestinationName) + if err != nil { + return fmt.Errorf("failed to get flow metadata context: %w", err) + } if input.FlowConnectionConfigs != nil { if input.DropFlowStats { dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ diff --git a/flow/workflows/util.go b/flow/workflows/util.go new file mode 100644 index 000000000..58bd42481 --- /dev/null +++ b/flow/workflows/util.go @@ -0,0 +1,34 @@ +package peerflow + +import ( + "time" + + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/shared" +) + +func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T { + sideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return f(ctx) + }) + + var result T + if err := sideEffect.Get(&result); err != nil { + panic(err) + } + return result +} + +func GetFlowMetadataContext(ctx workflow.Context, flowJobName string, sourceName string, destinationName string) (workflow.Context, error) { + metadataCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + StartToCloseTimeout: 30 * time.Second, + }) + getMetadataFuture := workflow.ExecuteLocalActivity(metadataCtx, flowable.GetFlowMetadata, flowJobName, sourceName, destinationName) + var metadata *protos.FlowContextMetadata + if err := getMetadataFuture.Get(metadataCtx, &metadata); err != nil { + return nil, err + } + return workflow.WithValue(ctx, shared.FlowMetadataKey, metadata), nil +} diff --git a/protos/flow.proto b/protos/flow.proto index 3880e10b6..351b70cb7 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -488,3 +488,14 @@ message MaintenanceMirror { message MaintenanceMirrors { repeated MaintenanceMirror mirrors = 1; } + +message PeerContextMetadata { + string name = 1; + peerdb_peers.DBType type = 2; +} + +message FlowContextMetadata{ + string flow_name = 1; + PeerContextMetadata source = 2; + PeerContextMetadata destination = 3; +}