-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add context propagator and use it to enrich metrics with flow metadata #2493
Conversation
c853a75
to
91652be
Compare
func (a *FlowableActivity) GetFlowMetadata(ctx context.Context, flowName string, sourceName string, | ||
destinationName string) (*shared.FlowMetadata, error) { | ||
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName)) | ||
peerTypes, err := connectors.LoadPeerTypes(ctx, a.CatalogPool, []string{sourceName, destinationName}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to cache this somewhere? not sure how often this will hit catalog with the metrics we will add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is meant to be run once by workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed the shared function my bad
could be a LocalActivity
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, was thinking the same
91652be
to
2598e7b
Compare
b01f3e4
to
64c7ee8
Compare
flow/workflows/cdc_flow.go
Outdated
@@ -568,3 +574,15 @@ func CDCFlowWorkflow( | |||
} | |||
} | |||
} | |||
|
|||
func GetFlowMetadataContext(ctx workflow.Context, flowJobName string, sourceName string, destinationName string) (workflow.Context, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's another utility method around in these workflow files (GetSideEffect?), these could be in a util.go
The context is propagated as temporal headers:
Metrics automatically have the desired attributes: