Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use gRPC interceptor instead of data converter for encoding/decoding #384

Closed

Conversation

dandavison
Copy link
Contributor

@dandavison dandavison commented Nov 13, 2023

Fixes #367

Depends on unmerged PR temporalio/sdk-go#1303

What was changed

  • Do payload encoding/decoding for all client requests, using a gRPC interceptor
  • Stop doing payload encoding/decoding in the dataconverter
  • Eliminate the global mutable dataconverter model

Why?

  • temporal workflow show was respecting --codec-endpoint but temporal workflow show --output json was not

Why do we think this change is correct?

For this change to maintain previous semantics, all the code paths that were using the custom dataconverter must have been operating on data that was either a response obtained using SDKClient, FrontendClient, or OperatorClient, or is being prepared for a request via one of those clients. I have added comments inline establishing that this is true.

For this change to be correct, we require that all code paths that send/receive data containing Payloads are using SDKClient, FrontendClient, or OperatorClient.

How was this tested

Checklist

  1. Addresses [Bug] Ensure remote codec used for all payloads in and out #367 w.r.t. workflow show --output json
  2. Any docs updates needed?
    No

@cretz
Copy link
Member

cretz commented Nov 13, 2023

@dandavison - Look at https://pkg.go.dev/go.temporal.io/api/proxy#VisitPayloads for visiting all payloads (be sure to exclude search attributes). https://github.com/temporalio/sdk-go/blob/v1.25.1/converter/grpc_interceptor.go#L47 is an example of it being used.

@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch 2 times, most recently from 1ea59a0 to 66456e5 Compare November 16, 2023 03:04
@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch 3 times, most recently from 792b278 to aea2036 Compare November 27, 2023 11:19
@@ -459,7 +459,7 @@ func DescribeSchedule(c *cli.Context) error {
if sw := s.Action.GetStartWorkflow(); sw != nil {
item.StartWorkflow = sw
item.WorkflowType = sw.WorkflowType.GetName()
item.Input = dataconverter.CustomDataConverter().ToStrings(sw.Input)
item.Input = converter.GetDefaultDataConverter().ToStrings(sw.Input)
Copy link
Contributor Author

@dandavison dandavison Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: With this PR the data converter no longer does encoding/decoding, so we must establish that it has been done already in this code path.

s is resp.Schedule, and resp was obtained using frontendClient,

resp, err := frontendClient.DescribeSchedule(ctx, req)

and frontendClient is created in a way that has the gRPC interceptor set:
frontendClient = client.Factory(c.App).FrontendClient(c)

@@ -479,13 +479,13 @@ func DescribeSchedule(c *cli.Context) error {
if fields := resp.Memo.GetFields(); len(fields) > 0 {
item.Memo = make(map[string]string, len(fields))
for k, payload := range fields {
item.Memo[k] = dataconverter.CustomDataConverter().ToString(payload)
item.Memo[k] = converter.GetDefaultDataConverter().ToString(payload)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: using the same resp as above.

}
}
if fields := resp.SearchAttributes.GetIndexedFields(); len(fields) > 0 {
item.SearchAttributes = make(map[string]string, len(fields))
for k, payload := range fields {
item.SearchAttributes[k] = dataconverter.DefaultDataConverter().ToString(payload)
item.SearchAttributes[k] = converter.GetDefaultDataConverter().ToString(payload)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: same resp as above.

@@ -656,7 +656,7 @@ func encodeMemo(memo map[string]interface{}) (*commonpb.Memo, error) {
if len(memo) == 0 {
return nil, nil
}
dc := dataconverter.CustomDataConverter()
dc := converter.GetDefaultDataConverter()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: here we are in encodeMemo(). This helper function which is used only in getMemoAndSearchAttributesForSchedule which is used only in CreateSchedule:

memo, sa, err := getMemoAndSearchAttributesForSchedule(c)

There we are creating a request, which will be sent using frontendClient, which has the gRPC interceptor configured:

frontendClient, namespace, scheduleID, err := scheduleBaseArgs(c)

@@ -672,7 +672,7 @@ func encodeSearchAttributes(sa map[string]interface{}) (*commonpb.SearchAttribut
if len(sa) == 0 {
return nil, nil
}
dc := dataconverter.DefaultDataConverter()
dc := converter.GetDefaultDataConverter()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: only used in getMemoAndSearchAttributesForSchedule, therefore correct by above.

@@ -595,7 +595,7 @@ func queryWorkflowHelper(c *cli.Context, queryType string) error {
if queryResponse.QueryRejected != nil {
fmt.Printf("Query was rejected, workflow has status: %v\n", queryResponse.QueryRejected.GetStatus())
} else {
queryResult := stringify.AnyToString(queryResponse.QueryResult, true, 0, dataconverter.CustomDataConverter())
queryResult := stringify.AnyToString(queryResponse.QueryResult, true, 0, converter.GetDefaultDataConverter())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: queryResponse was obtained using the frontend client, which was created using the usual factory, which sets the gRPC interceptor:

fclient := client.Factory(c.App).FrontendClient(c)

@@ -781,7 +781,7 @@ func convertDescribeWorkflowExecutionResponse(c *cli.Context, resp *workflowserv
}

if pendingActivity.GetHeartbeatDetails() != nil {
pendingActivityStr.HeartbeatDetails = stringify.AnyToString(pendingActivity.GetHeartbeatDetails(), true, 0, dataconverter.CustomDataConverter())
pendingActivityStr.HeartbeatDetails = stringify.AnyToString(pendingActivity.GetHeartbeatDetails(), true, 0, converter.GetDefaultDataConverter())
Copy link
Contributor Author

@dandavison dandavison Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: pendingActivity derives from a response which was passed into this function by DescribeWorkflow, which used the standard frontend client factory:

frontendClient := client.Factory(c.App).FrontendClient(c)

@@ -835,7 +835,7 @@ func printRunStatus(c *cli.Context, event *historypb.HistoryEvent) {
switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
fmt.Printf(" Status: %s\n", color.Green(c, "COMPLETED"))
result := stringify.AnyToString(event.GetWorkflowExecutionCompletedEventAttributes().GetResult(), true, 0, dataconverter.CustomDataConverter())
result := stringify.AnyToString(event.GetWorkflowExecutionCompletedEventAttributes().GetResult(), true, 0, converter.GetDefaultDataConverter())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: the event being formatted was obtained in printWorkflowProgress via a call to the SDKClient

iter := sdkClient.GetWorkflowHistory(tcCtx, wid, rid, watch, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
which was obtained via the standard factory code path that we have modified to use the gRPC interceptor:
sdkClient, err := client.GetSDKClient(c)

@@ -845,7 +845,7 @@ func printRunStatus(c *cli.Context, event *historypb.HistoryEvent) {
fmt.Printf(" Retry status: %s\n", event.GetWorkflowExecutionTimedOutEventAttributes().GetRetryState())
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
fmt.Printf(" Status: %s\n", color.Red(c, "CANCELED"))
details := stringify.AnyToString(event.GetWorkflowExecutionCanceledEventAttributes().GetDetails(), true, 0, dataconverter.CustomDataConverter())
details := stringify.AnyToString(event.GetWorkflowExecutionCanceledEventAttributes().GetDetails(), true, 0, converter.GetDefaultDataConverter())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: same argument as above.

@@ -1577,7 +1577,7 @@ func findWorkflowStatusValue(name string) (enumspb.WorkflowExecutionStatus, bool
// historyEventToString convert HistoryEvent to string
func historyEventToString(e *historypb.HistoryEvent, printFully bool, maxFieldLength int) string {
data := getEventAttributes(e)
return stringify.AnyToString(data, printFully, maxFieldLength, dataconverter.CustomDataConverter())
return stringify.AnyToString(data, printFully, maxFieldLength, converter.GetDefaultDataConverter())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: the event being formatted was obtained via the history table iterator

Details: historyEventToString(event, false, h.maxFieldLength),

The only place the history table iterator is constructed is in printWorkflowProgress, which we have already established uses the gRPC interceptor.

@dandavison dandavison changed the title Decode history events before formatting as JSON Move encoding/decoding out of data converter and into gRPC interceptor Nov 27, 2023
@dandavison dandavison changed the title Move encoding/decoding out of data converter and into gRPC interceptor Move encoding/decoding from data converter to gRPC interceptor Nov 27, 2023
@dandavison dandavison changed the title Move encoding/decoding from data converter to gRPC interceptor Use gRPC interceptor instead of data converter for encoding/decoding Nov 27, 2023
@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch from 80a17b4 to e31398e Compare November 27, 2023 12:39
@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch from e31398e to c7ba09a Compare November 27, 2023 12:43
@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch from 45a79a1 to e2684ff Compare November 27, 2023 14:25
@dandavison dandavison marked this pull request as ready for review November 27, 2023 14:25
@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch from e2684ff to 9e36d8f Compare November 27, 2023 14:38
go.mod Outdated Show resolved Hide resolved
@Quinn-With-Two-Ns
Copy link
Contributor

Quinn-With-Two-Ns commented Nov 29, 2023

@dandavison did you test search attributes? Those don't appear to be excluded by NewPayloadCodecGRPCClientInterceptor ?

Ah the proxy does it

Copy link
Contributor

@Quinn-With-Two-Ns Quinn-With-Two-Ns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@dandavison dandavison force-pushed the sdk-1194-codec-endpoint branch from c130b77 to dbc8eb8 Compare December 3, 2023 22:22
dandavison added a commit that referenced this pull request Jan 29, 2024
Cherry-pick of #384 with vendoring
of sdk-go code.
@@ -157,3 +156,5 @@ require (
)

replace github.com/grpc-ecosystem/grpc-gateway => github.com/temporalio/grpc-gateway v1.17.0

replace go.temporal.io/sdk => github.com/dandavison/temporalio-sdk-go v1.30.0
Copy link
Member

@cretz cretz Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR need to be updated for #420 and have this removed? Or is this PR now moot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll close this one and open a PR adding the release candidate cherry pick to main.

dandavison added a commit to dandavison/temporalio-cli that referenced this pull request Jan 29, 2024
Cherry-pick of temporalio#384 with vendoring
of sdk-go code.
@dandavison
Copy link
Contributor Author

Superseded by #421

@dandavison dandavison closed this Jan 29, 2024
dandavison added a commit that referenced this pull request Jan 29, 2024
This is #420 cherry-picked onto
`main`. We'll release either from `main` or `v0.10.8.rc` but either way
we want the chanes in `main`.

This replaces #384
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Ensure remote codec used for all payloads in and out
3 participants