Skip to content

Commit

Permalink
Custom search attributes validation per store (#4655)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou authored and dnr committed Jul 21, 2023
1 parent 2830f38 commit 53517fd
Show file tree
Hide file tree
Showing 21 changed files with 275 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type (
GetStoreNames() []string
HasStoreName(stName string) bool
GetIndexName() string
ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error)

// Write APIs.
RecordWorkflowExecutionStarted(ctx context.Context, request *RecordWorkflowExecutionStartedRequest) error
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibility/manager/visibility_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ var _ store.VisibilityStore = (*visibilityStore)(nil)
var (
errUnexpectedJSONFieldType = errors.New("unexpected JSON field type")

minTime = time.Unix(0, 0).UTC()
maxTime = time.Unix(0, math.MaxInt64).UTC()
maxStringLength = 32766

// Default sorter uses the sorting order defined in the index template.
// It is indirectly built so buildPaginationQuery can have access to
// the fields names to build the page query from the token.
Expand Down Expand Up @@ -164,6 +168,44 @@ func (s *visibilityStore) GetIndexName() string {
return s.index
}

func (s *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
validatedSearchAttributes := make(map[string]any, len(searchAttributes))
var invalidValueErrs []error
for saName, saValue := range searchAttributes {
var err error
switch value := saValue.(type) {
case time.Time:
err = validateDatetime(value)
case []time.Time:
for _, item := range value {
if err = validateDatetime(item); err != nil {
break
}
}
case string:
err = validateString(value)
case []string:
for _, item := range value {
if err = validateString(item); err != nil {
break
}
}
}
if err != nil {
invalidValueErrs = append(invalidValueErrs, err)
continue
}
validatedSearchAttributes[saName] = saValue
}
var retError error
if len(invalidValueErrs) > 0 {
retError = store.NewVisibilityStoreInvalidValuesError(invalidValueErrs)
}
return validatedSearchAttributes, retError
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down Expand Up @@ -941,6 +983,14 @@ func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequest
s.metricsHandler.Counter(metrics.ElasticsearchDocumentGenerateFailuresCount.GetMetricName()).Record(1)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to decode search attributes: %v", err))
}
// This is to prevent existing tasks to fail indefinitely.
// If it's only invalid values error, then silently continue without them.
searchAttributes, err = s.ValidateCustomSearchAttributes(searchAttributes)
if err != nil {
if _, ok := err.(*store.VisibilityStoreInvalidValuesError); !ok {
return nil, err
}
}
for saName, saValue := range searchAttributes {
if saValue == nil {
// If search attribute value is `nil`, it means that it shouldn't be added to the document.
Expand Down Expand Up @@ -1321,3 +1371,25 @@ func parsePageTokenValue(
))
}
}

func validateDatetime(value time.Time) error {
if value.Before(minTime) || value.After(maxTime) {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("Date not supported in Elasticsearch: %v", value),
)
}
return nil
}

func validateString(value string) error {
if len(value) > maxStringLength {
return serviceerror.NewInvalidArgument(
fmt.Sprintf(
"Strings with more than %d bytes are not supported in Elasticsearch (got %s)",
maxStringLength,
value,
),
)
}
return nil
}
23 changes: 23 additions & 0 deletions common/persistence/visibility/store/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,33 @@
package store

import (
"strings"

"go.temporal.io/api/serviceerror"
)

type (
VisibilityStoreInvalidValuesError struct {
errs []error
}
)

var (
// OperationNotSupportedErr is returned when visibility operation in not supported.
OperationNotSupportedErr = serviceerror.NewInvalidArgument("Operation not supported. Please use on Elasticsearch")
)

func (e *VisibilityStoreInvalidValuesError) Error() string {
var sb strings.Builder
sb.WriteString("Visibility store invalid values errors: ")
for _, err := range e.errs {
sb.WriteString("[")
sb.WriteString(err.Error())
sb.WriteString("]")
}
return sb.String()
}

func NewVisibilityStoreInvalidValuesError(errs []error) error {
return &VisibilityStoreInvalidValuesError{errs: errs}
}
14 changes: 14 additions & 0 deletions common/persistence/visibility/store/sql/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (s *VisibilityStore) GetIndexName() string {
return s.sqlStore.GetDbName()
}

func (s *VisibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *VisibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down Expand Up @@ -514,6 +520,14 @@ func (s *VisibilityStore) prepareSearchAttributesForDb(
if err != nil {
return nil, err
}
// This is to prevent existing tasks to fail indefinitely.
// If it's only invalid values error, then silently continue without them.
searchAttributes, err = s.ValidateCustomSearchAttributes(searchAttributes)
if err != nil {
if _, ok := err.(*store.VisibilityStoreInvalidValuesError); !ok {
return nil, err
}
}

for name, value := range searchAttributes {
if value == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (v *visibilityStore) GetIndexName() string {
return ""
}

func (v *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

// Close releases the resources held by this object
func (v *visibilityStore) Close() {
v.session.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (s *visibilityStore) GetIndexName() string {
return ""
}

func (s *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (s *standardStore) GetIndexName() string {
return ""
}

func (s *standardStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *standardStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/visibility/store/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type (
GetName() string
GetIndexName() string

// Validate search attributes based on the store constraints. It returns a new map containing
// only search attributes with valid values. If there are invalid values, an error of type
// VisibilityStoreInvalidValuesError wraps all invalid values errors.
ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error)

// Write APIs.
RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error
RecordWorkflowExecutionClosed(ctx context.Context, request *InternalRecordWorkflowExecutionClosedRequest) error
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibility/store/visibility_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions common/persistence/visibility/visibility_manager_dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ func (v *visibilityManagerDual) GetIndexName() string {
return v.visibilityManager.GetIndexName()
}

func (v *visibilityManagerDual) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return nil, err
}
for _, m := range ms {
searchAttributes, err = m.ValidateCustomSearchAttributes(searchAttributes)
if err != nil {
return nil, err
}
}
return searchAttributes, nil
}

func (v *visibilityManagerDual) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func (p *visibilityManagerImpl) GetIndexName() string {
return p.store.GetIndexName()
}

func (p *visibilityManagerImpl) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return p.store.ValidateCustomSearchAttributes(searchAttributes)
}

func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (m *visibilityManagerRateLimited) GetIndexName() string {
return m.delegate.GetIndexName()
}

func (m *visibilityManagerRateLimited) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return m.delegate.ValidateCustomSearchAttributes(searchAttributes)
}

// Below are write APIs.

func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted(
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/visibility/visiblity_manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func (m *visibilityManagerMetrics) GetIndexName() string {
return m.delegate.GetIndexName()
}

func (m *visibilityManagerMetrics) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return m.delegate.ValidateCustomSearchAttributes(searchAttributes)
}

func (m *visibilityManagerMetrics) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
19 changes: 13 additions & 6 deletions common/searchattribute/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence/visibility/manager"
)

type (
Expand All @@ -44,7 +45,7 @@ type (
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
indexName string
visibilityManager manager.VisibilityManager

// allowList allows list of values when it's not keyword list type.
allowList bool
Expand All @@ -58,7 +59,7 @@ func NewValidator(
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
indexName string,
visibilityManager manager.VisibilityManager,
allowList bool,
) *Validator {
return &Validator{
Expand All @@ -67,7 +68,7 @@ func NewValidator(
searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit,
searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit,
searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit,
indexName: indexName,
visibilityManager: visibilityManager,
allowList: allowList,
}
}
Expand All @@ -90,13 +91,17 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(v.indexName, false)
saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(
v.visibilityManager.GetIndexName(),
false,
)
if err != nil {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("unable to get search attributes from cluster metadata: %v", err),
)
}

saMap := make(map[string]any, len(searchAttributes.GetIndexedFields()))
for saFieldName, saPayload := range searchAttributes.GetIndexedFields() {
// user search attribute cannot be a system search attribute
if _, err = saTypeMap.getType(saFieldName, systemCategory); err == nil {
Expand All @@ -121,7 +126,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

_, err = DecodeValue(saPayload, saType, v.allowList)
saValue, err := DecodeValue(saPayload, saType, v.allowList)
if err != nil {
var invalidValue interface{}
if err = payload.Decode(saPayload, &invalidValue); err != nil {
Expand All @@ -138,8 +143,10 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
namespace,
)
}
saMap[saFieldName] = saValue
}
return nil
_, err = v.visibilityManager.ValidateCustomSearchAttributes(saMap)
return err
}

// ValidateSize validate search attributes are valid for writing and not exceed limits.
Expand Down
Loading

0 comments on commit 53517fd

Please sign in to comment.