Skip to content

Commit

Permalink
Use Payload for SearchAttributes, Memo, and Headers (temporalio#346)
Browse files Browse the repository at this point in the history
* Rename payload to payloads.
  • Loading branch information
alexshtin authored May 2, 2020
1 parent 0d05393 commit c0ce239
Show file tree
Hide file tree
Showing 72 changed files with 824 additions and 765 deletions.
2 changes: 1 addition & 1 deletion common/archiver/filestore/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
Status: executionpb.WorkflowExecutionStatus_Failed,
HistoryLength: int64(101),
Memo: &commonpb.Memo{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"testFields": payload.EncodeBytes([]byte{1, 2, 3}),
},
},
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/s3store/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
Status: executionpb.WorkflowExecutionStatus_Failed,
HistoryLength: int64(101),
Memo: &commonpb.Memo{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"testFields": payload.EncodeBytes([]byte{1, 2, 3}),
},
},
Expand Down
4 changes: 2 additions & 2 deletions common/archiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func ValidateQueryRequest(request *QueryVisibilityRequest) error {
}

// ConvertSearchAttrToPayload converts search attribute value from string back to byte array
func ConvertSearchAttrToPayload(searchAttrStr map[string]string) map[string]*commonpb.Payloads {
searchAttr := make(map[string]*commonpb.Payloads)
func ConvertSearchAttrToPayload(searchAttrStr map[string]string) map[string]*commonpb.Payload {
searchAttr := make(map[string]*commonpb.Payload)
for k, v := range searchAttrStr {
searchAttr[k] = payload.EncodeBytes([]byte(v))
}
Expand Down
7 changes: 2 additions & 5 deletions common/elasticsearch/validator/searchAttrValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func (sv *SearchAttributesValidator) ValidateSearchAttributes(input *commonpb.Se
return serviceerror.NewInvalidArgument(fmt.Sprintf("%s is read-only Temporal reservered attribute", key))
}
// verify: size of single value <= limit
dataSize := 0
for _, payloadItem := range val.GetPayloads() {
dataSize += len(payloadItem.GetData())
}
dataSize := len(val.GetData())
if dataSize > sv.searchAttributesSizeOfValueLimit(namespace) {
sv.logger.WithTags(tag.ESKey(key), tag.Number(int64(dataSize)), tag.WorkflowNamespace(namespace)).
Error("value size of search attribute exceed limit")
Expand Down Expand Up @@ -142,7 +139,7 @@ func (sv *SearchAttributesValidator) isValidSearchAttributesKey(
func (sv *SearchAttributesValidator) isValidSearchAttributesValue(
validAttr map[string]interface{},
key string,
value *commonpb.Payloads,
value *commonpb.Payload,
) bool {
valueType := common.ConvertIndexedValueTypeToProtoType(validAttr[key], sv.logger)
_, err := common.DeserializeSearchAttributeValue(value, valueType)
Expand Down
16 changes: 8 additions & 8 deletions common/elasticsearch/validator/searchAttrValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {

intPayload, err := payload.Encode(1)
s.NoError(err)
fields := map[string]*commonpb.Payloads{
fields := map[string]*commonpb.Payload{
"CustomIntField": intPayload,
}
attr = &commonpb.SearchAttributes{
Expand All @@ -73,7 +73,7 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
err = validator.ValidateSearchAttributes(attr, namespace)
s.Nil(err)

fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"CustomIntField": intPayload,
"CustomKeywordField": payload.EncodeString("keyword"),
"CustomBoolField": payload.EncodeString("true"),
Expand All @@ -82,14 +82,14 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
err = validator.ValidateSearchAttributes(attr, namespace)
s.Equal("number of keys 3 exceed limit", err.Error())

fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"InvalidKey": payload.EncodeString("1"),
}
attr.IndexedFields = fields
err = validator.ValidateSearchAttributes(attr, namespace)
s.Equal("InvalidKey is not valid search attribute key", err.Error())

fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"CustomStringField": payload.EncodeString("1"),
"CustomBoolField": payload.EncodeString("123"),
}
Expand All @@ -99,28 +99,28 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {

intArrayPayload, err := payload.Encode([]int{1, 2})
s.NoError(err)
fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"CustomIntField": intArrayPayload,
}
attr.IndexedFields = fields
err = validator.ValidateSearchAttributes(attr, namespace)
s.NoError(err)

fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"StartTime": intPayload,
}
attr.IndexedFields = fields
err = validator.ValidateSearchAttributes(attr, namespace)
s.Equal("StartTime is read-only Temporal reservered attribute", err.Error())

fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"CustomKeywordField": payload.EncodeString("123456"),
}
attr.IndexedFields = fields
err = validator.ValidateSearchAttributes(attr, namespace)
s.Equal("size limit exceed for key CustomKeywordField", err.Error())

fields = map[string]*commonpb.Payloads{
fields = map[string]*commonpb.Payload{
"CustomKeywordField": payload.EncodeString("123"),
"CustomStringField": payload.EncodeString("12"),
}
Expand Down
18 changes: 9 additions & 9 deletions common/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ import (
)

var (
dataConverter = encoded.GetDefaultDataConverter()
payloadConverter = encoded.GetDefaultPayloadConverter()
)

func EncodeString(str string) *commonpb.Payloads {
func EncodeString(str string) *commonpb.Payload {
// Error can be safely ignored here becase string always can be converted to JSON
payload, _ := dataConverter.ToData(str)
payload, _ := payloadConverter.ToData(str)
return payload
}

func EncodeBytes(bytes []byte) *commonpb.Payloads {
func EncodeBytes(bytes []byte) *commonpb.Payload {
// Error can be safely ignored here becase []byte always can be raw encoded
payload, _ := dataConverter.ToData(bytes)
payload, _ := payloadConverter.ToData(bytes)
return payload
}

func Encode(valuePtr ...interface{}) (*commonpb.Payloads, error) {
return dataConverter.ToData(valuePtr...)
func Encode(valuePtr interface{}) (*commonpb.Payload, error) {
return payloadConverter.ToData(valuePtr)
}

func Decode(payload *commonpb.Payloads, valuePtr ...interface{}) error {
return dataConverter.FromData(payload, valuePtr...)
func Decode(payload *commonpb.Payload, valuePtr interface{}) error {
return payloadConverter.FromData(payload, valuePtr)
}
54 changes: 54 additions & 0 deletions common/payloads/payloads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package payloads

import (
commonpb "go.temporal.io/temporal-proto/common"
"go.temporal.io/temporal/encoded"
)

var (
dataConverter = encoded.GetDefaultDataConverter()
)

func EncodeString(str string) *commonpb.Payloads {
// Error can be safely ignored here becase string always can be converted to JSON
payload, _ := dataConverter.ToData(str)
return payload
}

func EncodeBytes(bytes []byte) *commonpb.Payloads {
// Error can be safely ignored here becase []byte always can be raw encoded
payload, _ := dataConverter.ToData(bytes)
return payload
}

func Encode(valuePtr ...interface{}) (*commonpb.Payloads, error) {
return dataConverter.ToData(valuePtr...)
}

func Decode(payload *commonpb.Payloads, valuePtr ...interface{}) error {
return dataConverter.FromData(payload, valuePtr...)
}
4 changes: 2 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ type (
ClientFeatureVersion string
ClientImpl string
AutoResetPoints *executionpb.ResetPoints
Memo map[string]*commonpb.Payloads
SearchAttributes map[string]*commonpb.Payloads
Memo map[string]*commonpb.Payload
SearchAttributes map[string]*commonpb.Payload
// for retry
Attempt int32
HasRetryPolicy bool
Expand Down
24 changes: 10 additions & 14 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (v *esVisibilityStore) convertSearchResultToVisibilityRecord(hit *elastic.S

func getVisibilityMessage(namespaceID string, wid, rid string, workflowTypeName string, taskList string,
startTimeUnixNano, executionTimeUnixNano int64, taskID int64, memo []byte, encoding common.EncodingType,
searchAttributes map[string]*commonpb.Payloads) *indexergenpb.Message {
searchAttributes map[string]*commonpb.Payload) *indexergenpb.Message {

msgType := indexergenpb.MessageType_Index
fields := map[string]*indexergenpb.Field{
Expand All @@ -909,12 +909,10 @@ func getVisibilityMessage(namespaceID string, wid, rid string, workflowTypeName
fields[es.Encoding] = &indexergenpb.Field{Type: es.FieldTypeString, StringData: string(encoding)}
}
for k, v := range searchAttributes {
// TODO: current implementation assumes that there is always one single PayloadItem in payload and it's content is JSON.
// This neds to be saved in generic way (as commonpb.Payloads) and then deserialized on consumer side.
if len(v.GetPayloads()) > 0 { // There must be always one single item
data := v.GetPayloads()[0].GetData() // content must always be JSON
fields[k] = &indexergenpb.Field{Type: es.FieldTypeBinary, BinaryData: data}
}
// TODO: current implementation assumes that payload is JSON.
// This needs to be saved in generic way (as commonpb.Payload) and then deserialized on consumer side.
data := v.GetData() // content must always be JSON
fields[k] = &indexergenpb.Field{Type: es.FieldTypeBinary, BinaryData: data}
}

msg := &indexergenpb.Message{
Expand All @@ -931,7 +929,7 @@ func getVisibilityMessage(namespaceID string, wid, rid string, workflowTypeName
func getVisibilityMessageForCloseExecution(namespaceID string, wid, rid string, workflowTypeName string,
startTimeUnixNano int64, executionTimeUnixNano int64, endTimeUnixNano int64, status executionpb.WorkflowExecutionStatus,
historyLength int64, taskID int64, memo []byte, taskList string, encoding common.EncodingType,
searchAttributes map[string]*commonpb.Payloads) *indexergenpb.Message {
searchAttributes map[string]*commonpb.Payload) *indexergenpb.Message {

msgType := indexergenpb.MessageType_Index
fields := map[string]*indexergenpb.Field{
Expand All @@ -948,12 +946,10 @@ func getVisibilityMessageForCloseExecution(namespaceID string, wid, rid string,
fields[es.Encoding] = &indexergenpb.Field{Type: es.FieldTypeString, StringData: string(encoding)}
}
for k, v := range searchAttributes {
// TODO: current implementation assumes that there is always one single PayloadItem in payload and it's content is JSON.
// This neds to be saved in generic way (as commonpb.Payloads) and then deserialized on consumer side.
if len(v.GetPayloads()) > 0 { // There must be always one single item
data := v.GetPayloads()[0].GetData() // content must always be JSON
fields[k] = &indexergenpb.Field{Type: es.FieldTypeBinary, BinaryData: data}
}
// TODO: current implementation assumes that payload is JSON.
// This needs to be saved in generic way (as commonpb.Payload) and then deserialized on consumer side.
data := v.GetData() // content must always be JSON
fields[k] = &indexergenpb.Field{Type: es.FieldTypeBinary, BinaryData: data}
}

msg := &indexergenpb.Message{
Expand Down
25 changes: 13 additions & 12 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/temporalio/temporal/common/checksum"
"github.com/temporalio/temporal/common/cluster"
"github.com/temporalio/temporal/common/payload"
"github.com/temporalio/temporal/common/payloads"
p "github.com/temporalio/temporal/common/persistence"
"github.com/temporalio/temporal/common/primitives"
)
Expand Down Expand Up @@ -1211,13 +1212,13 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
}
testSearchAttrKey := "env"
testSearchAttrVal := payload.EncodeString("test")
testSearchAttr := map[string]*commonpb.Payloads{
testSearchAttr := map[string]*commonpb.Payload{
testSearchAttrKey: testSearchAttrVal,
}

testMemoKey := "memoKey"
testMemoVal := payload.EncodeString("memoVal")
testMemo := map[string]*commonpb.Payloads{
testMemo := map[string]*commonpb.Payload{
testMemoKey: testMemoVal,
}

Expand Down Expand Up @@ -1418,11 +1419,11 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
updatedInfo.NonRetriableErrors = []string{"accessDenied", "badRequest"}
searchAttrKey := "env"
searchAttrVal := payload.EncodeBytes([]byte("test"))
updatedInfo.SearchAttributes = map[string]*commonpb.Payloads{searchAttrKey: searchAttrVal}
updatedInfo.SearchAttributes = map[string]*commonpb.Payload{searchAttrKey: searchAttrVal}

memoKey := "memoKey"
memoVal := payload.EncodeBytes([]byte("memoVal"))
updatedInfo.Memo = map[string]*commonpb.Payloads{memoKey: memoVal}
updatedInfo.Memo = map[string]*commonpb.Payload{memoKey: memoVal}
updatedStats.HistorySize = math.MaxInt64

err2 := s.UpdateWorkflowExecution(updatedInfo, updatedStats, nil, []int64{int64(4)}, nil, int64(3), nil, nil, nil, nil, nil)
Expand Down Expand Up @@ -2593,7 +2594,7 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateActivities() {
ScheduledTime: currentTime,
ActivityID: uuid.New(),
RequestID: uuid.New(),
Details: payload.EncodeString(uuid.New()),
Details: payloads.EncodeString(uuid.New()),
StartedID: 2,
StartedEvent: &eventpb.HistoryEvent{EventId: 2},
StartedTime: currentTime,
Expand All @@ -2618,7 +2619,7 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateActivities() {
NonRetriableErrors: []string{"accessDenied", "badRequest"},
LastFailureReason: "some random error",
LastWorkerIdentity: uuid.New(),
LastFailureDetails: payload.EncodeString(uuid.New()),
LastFailureDetails: payloads.EncodeString(uuid.New()),
}}
err2 := s.UpdateWorkflowExecution(updatedInfo, updatedStats, nil, []int64{int64(4)}, nil, int64(3), nil, activityInfos, nil, nil, nil)
s.NoError(err2)
Expand Down Expand Up @@ -2871,7 +2872,7 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateSignalInfo() {
InitiatedEventBatchId: 1,
RequestId: uuid.New(),
Name: "my signal",
Input: payload.EncodeString("test signal input"),
Input: payloads.EncodeString("test signal input"),
Control: uuid.New(),
}
err2 := s.UpsertSignalInfoState(updatedInfo, updatedStats, nil, int64(3), []*persistenceblobs.SignalInfo{signalInfo})
Expand Down Expand Up @@ -3706,7 +3707,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionCurrentIsSel
InitiatedEventBatchId: 38,
RequestId: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
Name: "signalA",
Input: payload.EncodeString("signal_input_A"),
Input: payloads.EncodeString("signal_input_A"),
Control: "signal_control_A",
},
},
Expand Down Expand Up @@ -3915,15 +3916,15 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionCurrentIsSel
InitiatedId: 39,
RequestId: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
Name: "signalB",
Input: payload.EncodeString("signal_input_b"),
Input: payloads.EncodeString("signal_input_b"),
Control: "signal_control_b",
},
{
Version: 3336,
InitiatedId: 42,
RequestId: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
Name: "signalC",
Input: payload.EncodeString("signal_input_c"),
Input: payloads.EncodeString("signal_input_c"),
Control: "signal_control_c",
}}

Expand Down Expand Up @@ -4011,7 +4012,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionCurrentIsSel
s.Equal(int64(39), si.GetInitiatedId())
s.Equal("signalB", si.Name)
var signal string
err := payload.Decode(si.GetInput(), &signal)
err := payloads.Decode(si.GetInput(), &signal)
s.NoError(err)
s.Equal("signal_input_b", signal)
s.Equal("signal_control_b", si.Control)
Expand All @@ -4022,7 +4023,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionCurrentIsSel
s.Equal(int64(3336), si.Version)
s.Equal(int64(42), si.GetInitiatedId())
s.Equal("signalC", si.Name)
err = payload.Decode(si.GetInput(), &signal)
err = payloads.Decode(si.GetInput(), &signal)
s.NoError(err)
s.Equal("signal_input_c", signal)
s.Equal("signal_control_c", si.Control)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
WorkflowTimeout: 0,
TaskID: 0,
Memo: nil,
SearchAttributes: map[string]*commonpb.Payloads{
SearchAttributes: map[string]*commonpb.Payload{
definition.TemporalChangeVersion: payload.EncodeBytes([]byte("dummy")),
},
},
Expand Down
Loading

0 comments on commit c0ce239

Please sign in to comment.