Skip to content

Commit

Permalink
feat: add context propagator and use it to enrich metrics with flow m…
Browse files Browse the repository at this point in the history
…etadata (#2493)

The context is propagated as temporal headers:
<img width="1074" alt="image"
src="https://github.com/user-attachments/assets/444f8b4d-e909-4b2e-aa3a-03fe0a1ac71e"
/>


Metrics automatically have the desired attributes:
<img width="879" alt="image"
src="https://github.com/user-attachments/assets/5dddb50a-52cb-470d-bde2-2bf963deca66"
/>
  • Loading branch information
iamKunalGupta authored Jan 29, 2025
1 parent 799af39 commit 3664789
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 31 deletions.
30 changes: 28 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,7 @@ func (a *FlowableActivity) SyncFlow(
slog.Int64("totalRecordsSynced", totalRecordsSynced.Load()))
if a.OtelManager != nil {
a.OtelManager.Metrics.RecordsSyncedGauge.Record(ctx, syncResponse.NumRecordsSynced, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, config.FlowJobName),
attribute.String(otel_metrics.BatchIdKey, strconv.FormatInt(syncResponse.CurrentSyncBatchID, 10)),
attribute.String(otel_metrics.SourcePeerType, fmt.Sprintf("%T", srcConn)),
)))
}
if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs {
Expand Down Expand Up @@ -1110,3 +1108,31 @@ func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowN

return nil
}

func (a *FlowableActivity) GetFlowMetadata(
ctx context.Context,
flowName string,
sourceName string,
destinationName string,
) (*protos.FlowContextMetadata, error) {
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName))
peerTypes, err := connectors.LoadPeerTypes(ctx, a.CatalogPool, []string{sourceName, destinationName})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
logger.Info("loaded peer types for flow", slog.String("flowName", flowName),
slog.String("sourceName", sourceName), slog.String("destinationName", destinationName),
slog.Any("peerTypes", peerTypes))
return &protos.FlowContextMetadata{
FlowName: flowName,
Source: &protos.PeerContextMetadata{
Name: sourceName,
Type: peerTypes[sourceName],
},
Destination: &protos.PeerContextMetadata{
Name: destinationName,
Type: peerTypes[destinationName],
},
}, nil
}
8 changes: 1 addition & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}

var res *model.SyncResponse
var dstConnType string
errGroup.Go(func() error {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return fmt.Errorf("failed to recreate destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
dstConnType = fmt.Sprintf("%T", dstConn)

syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil {
Expand Down Expand Up @@ -323,11 +321,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

if a.OtelManager != nil {
a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, flowName),
attribute.String(otel_metrics.SourcePeerType, fmt.Sprintf("%T", srcConn)),
attribute.String(otel_metrics.DestinationPeerType, dstConnType),
)))
a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID)
}

syncState.Store(shared.Ptr("updating schema"))
Expand Down
5 changes: 5 additions & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"go.temporal.io/sdk/client"
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peerdb/flow/activities"
"github.com/PeerDB-io/peerdb/flow/alerting"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
"github.com/PeerDB-io/peerdb/flow/peerdbenv"
"github.com/PeerDB-io/peerdb/flow/shared"
Expand All @@ -30,6 +32,9 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (*WorkerSetupResponse, erro
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))),
ContextPropagators: []workflow.ContextPropagator{
shared.NewContextPropagator[*protos.FlowContextMetadata](shared.FlowMetadataKey),
},
}

if opts.EnableOtelMetrics {
Expand Down
5 changes: 5 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"go.temporal.io/sdk/client"
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peerdb/flow/activities"
"github.com/PeerDB-io/peerdb/flow/alerting"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
"github.com/PeerDB-io/peerdb/flow/peerdbenv"
"github.com/PeerDB-io/peerdb/flow/shared"
Expand Down Expand Up @@ -99,6 +101,9 @@ func WorkerSetup(opts *WorkerSetupOptions) (*WorkerSetupResponse, error) {
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))),
ContextPropagators: []workflow.ContextPropagator{
shared.NewContextPropagator[*protos.FlowContextMetadata](shared.FlowMetadataKey),
},
}
if opts.EnableOtelMetrics {
metricsProvider, metricsErr := otel_metrics.SetupTemporalMetricsProvider("flow-worker")
Expand Down
23 changes: 23 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,29 @@ func LoadPeerType(ctx context.Context, catalogPool *pgxpool.Pool, peerName strin
return dbtype, err
}

func LoadPeerTypes(ctx context.Context, catalogPool *pgxpool.Pool, peerNames []string) (map[string]protos.DBType, error) {
if len(peerNames) == 0 {
return nil, nil
}

rows, err := catalogPool.Query(ctx, "SELECT name, type FROM peers WHERE name = ANY($1)", peerNames)
if err != nil {
return nil, err
}
defer rows.Close()

peerTypes := make(map[string]protos.DBType)
for rows.Next() {
var peerName string
var dbtype protos.DBType
if err := rows.Scan(&peerName, &dbtype); err != nil {
return nil, err
}
peerTypes[peerName] = dbtype
}
return peerTypes, nil
}

func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) (*protos.Peer, error) {
row := catalogPool.QueryRow(ctx, `
SELECT type, options, enc_key_id
Expand Down
7 changes: 1 addition & 6 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"

connmetadata "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata"
Expand Down Expand Up @@ -473,10 +471,7 @@ func PullCdcRecords[Items model.Items](
return shared.LogError(logger, fmt.Errorf("received Postgres WAL error: %+v", msg))
case *pgproto3.CopyData:
if p.otelManager != nil {
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, int64(len(msg.Data)), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, req.FlowJobName),
attribute.String(otel_metrics.SourcePeerType, fmt.Sprintf("%T", p)),
)))
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, int64(len(msg.Data)))
}
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
Expand Down
2 changes: 2 additions & 0 deletions flow/otel_metrics/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const (
BatchIdKey = "batchId"
SourcePeerType = "sourcePeerType"
DestinationPeerType = "destinationPeerType"
SourcePeerName = "sourcePeerName"
DestinationPeerName = "destinationPeerName"
)

const (
Expand Down
78 changes: 78 additions & 0 deletions flow/otel_metrics/observables.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/shared"
)

type ObservationMapValue[V comparable] struct {
Expand Down Expand Up @@ -99,6 +102,52 @@ func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Fl
return &Float64SyncGauge{syncGauge: syncGauge}, nil
}

func buildFlowMetadataAttributes(flowMetadata *protos.FlowContextMetadata) metric.MeasurementOption {
return metric.WithAttributeSet(attribute.NewSet(
attribute.String(FlowNameKey, flowMetadata.FlowName),
attribute.String(SourcePeerType, flowMetadata.Source.Type.String()),
attribute.String(DestinationPeerType, flowMetadata.Destination.Type.String()),
attribute.String(SourcePeerName, flowMetadata.Source.Name),
attribute.String(DestinationPeerName, flowMetadata.Destination.Name),
))
}

type ContextAwareInt64SyncGauge struct {
metric.Int64Gauge
}

func (a *ContextAwareInt64SyncGauge) Record(ctx context.Context, value int64, options ...metric.RecordOption) {
flowMetadata := shared.GetFlowMetadata(ctx)
if flowMetadata != nil {
options = append(options, buildFlowMetadataAttributes(flowMetadata))
}
a.Int64Gauge.Record(ctx, value, options...)
}

type ContextAwareFloat64SyncGauge struct {
metric.Float64Gauge
}

func (a *ContextAwareFloat64SyncGauge) Record(ctx context.Context, value float64, options ...metric.RecordOption) {
flowMetadata := shared.GetFlowMetadata(ctx)
if flowMetadata != nil {
options = append(options, buildFlowMetadataAttributes(flowMetadata))
}
a.Float64Gauge.Record(ctx, value, options...)
}

type ContextAwareInt64Counter struct {
metric.Int64Counter
}

func (a *ContextAwareInt64Counter) Add(ctx context.Context, value int64, options ...metric.AddOption) {
flowMetadata := shared.GetFlowMetadata(ctx)
if flowMetadata != nil {
options = append(options, buildFlowMetadataAttributes(flowMetadata))
}
a.Int64Counter.Add(ctx, value, options...)
}

func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
gaugeConfig := metric.NewInt64GaugeConfig(opts...)
return NewInt64SyncGauge(meter, name,
Expand All @@ -107,10 +156,39 @@ func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption
)
}

func ContextAwareInt64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
gauge, err := Int64Gauge(meter, name, opts...)
if err != nil {
return nil, err
}
return &ContextAwareInt64SyncGauge{Int64Gauge: gauge}, nil
}

func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
gaugeConfig := metric.NewFloat64GaugeConfig(opts...)
return NewFloat64SyncGauge(meter, name,
metric.WithDescription(gaugeConfig.Description()),
metric.WithUnit(gaugeConfig.Unit()),
)
}

func ContextAwareFloat64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
gauge, err := Float64Gauge(meter, name, opts...)
if err != nil {
return nil, err
}
return &ContextAwareFloat64SyncGauge{Float64Gauge: gauge}, nil
}

func NewContextAwareInt64Counter(meter metric.Meter, name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) {
counterConfig := metric.NewInt64CounterConfig(opts...)
counter, err := meter.Int64Counter(name,
metric.WithDescription(counterConfig.Description()),
metric.WithUnit(counterConfig.Unit()),
)
if err != nil {
return nil, err
}

return &ContextAwareInt64Counter{Int64Counter: counter}, nil
}
6 changes: 3 additions & 3 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,16 @@ func getOrInitMetric[M any, O any](

func (om *OtelManager) GetOrInitInt64Gauge(name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
// Once fixed, replace first argument below with metric.Meter.Int64Gauge
return getOrInitMetric(Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...)
return getOrInitMetric(ContextAwareInt64Gauge, om.Meter, om.Int64GaugesCache, name, opts...)
}

func (om *OtelManager) GetOrInitFloat64Gauge(name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
// Once fixed, replace first argument below with metric.Meter.Float64Gauge
return getOrInitMetric(Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...)
return getOrInitMetric(ContextAwareFloat64Gauge, om.Meter, om.Float64GaugesCache, name, opts...)
}

func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) {
return getOrInitMetric(metric.Meter.Int64Counter, om.Meter, om.Int64CountersCache, name, opts...)
return getOrInitMetric(NewContextAwareInt64Counter, om.Meter, om.Int64CountersCache, name, opts...)
}

func (om *OtelManager) setupMetrics() error {
Expand Down
84 changes: 84 additions & 0 deletions flow/shared/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package shared

import (
"context"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
)

type TemporalContextKey string

func (k TemporalContextKey) HeaderKey() string {
return string(k)
}

const (
FlowMetadataKey TemporalContextKey = "x-peerdb-flow-metadata"
)

type PeerMetadata struct {
Name string
Type protos.DBType
}

func GetFlowMetadata(ctx context.Context) *protos.FlowContextMetadata {
if metadata, ok := ctx.Value(FlowMetadataKey).(*protos.FlowContextMetadata); ok {
return metadata
}
return nil
}

type ContextPropagator[V any] struct {
Key TemporalContextKey
}

func NewContextPropagator[V any](key TemporalContextKey) workflow.ContextPropagator {
return &ContextPropagator[V]{Key: key}
}

func (c *ContextPropagator[V]) Inject(ctx context.Context, writer workflow.HeaderWriter) error {
value := ctx.Value(c.Key)
payload, err := converter.GetDefaultDataConverter().ToPayload(value)
if err != nil {
return err
}
writer.Set(c.Key.HeaderKey(), payload)
return nil
}

func (c *ContextPropagator[V]) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) {
if payload, ok := reader.Get(c.Key.HeaderKey()); ok {
var value V
if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil {
return ctx, nil
}
ctx = context.WithValue(ctx, c.Key, value)
}

return ctx, nil
}

func (c *ContextPropagator[V]) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error {
value := ctx.Value(c.Key)
payload, err := converter.GetDefaultDataConverter().ToPayload(value)
if err != nil {
return err
}
writer.Set(c.Key.HeaderKey(), payload)
return nil
}

func (c *ContextPropagator[V]) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) {
if payload, ok := reader.Get(c.Key.HeaderKey()); ok {
var value V
if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil {
return ctx, nil
}
ctx = workflow.WithValue(ctx, c.Key, value)
}

return ctx, nil
}
Loading

0 comments on commit 3664789

Please sign in to comment.