Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom search attributes validation per store #4655

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type (
GetStoreNames() []string
HasStoreName(stName string) bool
GetIndexName() string
ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error)

// Write APIs.
RecordWorkflowExecutionStarted(ctx context.Context, request *RecordWorkflowExecutionStartedRequest) error
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibility/manager/visibility_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ var _ store.VisibilityStore = (*visibilityStore)(nil)
var (
errUnexpectedJSONFieldType = errors.New("unexpected JSON field type")

minTime = time.Unix(0, 0).UTC()
maxTime = time.Unix(0, math.MaxInt64).UTC()
maxStringLength = 32766
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think search attribute already has size limit of 2KB.

Copy link
Contributor Author

@rodrigozhou rodrigozhou Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, still better to have this already in place just in case in the future we (or an OSS user) change it limit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, storage specific validators should reflect all storage limitation. DC value can be changed.


// Default sorter uses the sorting order defined in the index template.
// It is indirectly built so buildPaginationQuery can have access to
// the fields names to build the page query from the token.
Expand Down Expand Up @@ -164,6 +168,44 @@ func (s *visibilityStore) GetIndexName() string {
return s.index
}

func (s *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
validatedSearchAttributes := make(map[string]any, len(searchAttributes))
var invalidValueErrs []error
for saName, saValue := range searchAttributes {
var err error
switch value := saValue.(type) {
case time.Time:
err = validateDatetime(value)
case []time.Time:
for _, item := range value {
if err = validateDatetime(item); err != nil {
break
}
}
case string:
err = validateString(value)
case []string:
for _, item := range value {
if err = validateString(item); err != nil {
break
}
}
}
if err != nil {
invalidValueErrs = append(invalidValueErrs, err)
continue
}
validatedSearchAttributes[saName] = saValue
}
var retError error
if len(invalidValueErrs) > 0 {
retError = store.NewVisibilityStoreInvalidValuesError(invalidValueErrs)
}
return validatedSearchAttributes, retError
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down Expand Up @@ -941,6 +983,14 @@ func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequest
s.metricsHandler.Counter(metrics.ElasticsearchDocumentGenerateFailuresCount.GetMetricName()).Record(1)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to decode search attributes: %v", err))
}
// This is to prevent existing tasks to fail indefinitely.
// If it's only invalid values error, then silently continue without them.
searchAttributes, err = s.ValidateCustomSearchAttributes(searchAttributes)
if err != nil {
if _, ok := err.(*store.VisibilityStoreInvalidValuesError); !ok {
return nil, err
}
}
for saName, saValue := range searchAttributes {
if saValue == nil {
// If search attribute value is `nil`, it means that it shouldn't be added to the document.
Expand Down Expand Up @@ -1321,3 +1371,25 @@ func parsePageTokenValue(
))
}
}

func validateDatetime(value time.Time) error {
if value.Before(minTime) || value.After(maxTime) {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("Date not supported in Elasticsearch: %v", value),
)
}
return nil
}

func validateString(value string) error {
if len(value) > maxStringLength {
return serviceerror.NewInvalidArgument(
fmt.Sprintf(
"Strings with more than %d bytes are not supported in Elasticsearch (got %s)",
maxStringLength,
value,
),
)
}
return nil
}
23 changes: 23 additions & 0 deletions common/persistence/visibility/store/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,33 @@
package store

import (
"strings"

"go.temporal.io/api/serviceerror"
)

type (
VisibilityStoreInvalidValuesError struct {
errs []error
}
)

var (
// OperationNotSupportedErr is returned when visibility operation in not supported.
OperationNotSupportedErr = serviceerror.NewInvalidArgument("Operation not supported. Please use on Elasticsearch")
)

func (e *VisibilityStoreInvalidValuesError) Error() string {
var sb strings.Builder
sb.WriteString("Visibility store invalid values errors: ")
for _, err := range e.errs {
sb.WriteString("[")
sb.WriteString(err.Error())
sb.WriteString("]")
}
return sb.String()
}

func NewVisibilityStoreInvalidValuesError(errs []error) error {
return &VisibilityStoreInvalidValuesError{errs: errs}
}
14 changes: 14 additions & 0 deletions common/persistence/visibility/store/sql/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (s *VisibilityStore) GetIndexName() string {
return s.sqlStore.GetDbName()
}

func (s *VisibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *VisibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down Expand Up @@ -514,6 +520,14 @@ func (s *VisibilityStore) prepareSearchAttributesForDb(
if err != nil {
return nil, err
}
// This is to prevent existing tasks to fail indefinitely.
// If it's only invalid values error, then silently continue without them.
searchAttributes, err = s.ValidateCustomSearchAttributes(searchAttributes)
if err != nil {
if _, ok := err.(*store.VisibilityStoreInvalidValuesError); !ok {
return nil, err
}
}

for name, value := range searchAttributes {
if value == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (v *visibilityStore) GetIndexName() string {
return ""
}

func (v *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

// Close releases the resources held by this object
func (v *visibilityStore) Close() {
v.session.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (s *visibilityStore) GetIndexName() string {
return ""
}

func (s *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (s *standardStore) GetIndexName() string {
return ""
}

func (s *standardStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *standardStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/visibility/store/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type (
GetName() string
GetIndexName() string

// Validate search attributes based on the store constraints. It returns a new map containing
// only search attributes with valid values. If there are invalid values, an error of type
// VisibilityStoreInvalidValuesError wraps all invalid values errors.
ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error)

// Write APIs.
RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error
RecordWorkflowExecutionClosed(ctx context.Context, request *InternalRecordWorkflowExecutionClosedRequest) error
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibility/store/visibility_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions common/persistence/visibility/visibility_manager_dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ func (v *visibilityManagerDual) GetIndexName() string {
return v.visibilityManager.GetIndexName()
}

func (v *visibilityManagerDual) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return nil, err
}
for _, m := range ms {
searchAttributes, err = m.ValidateCustomSearchAttributes(searchAttributes)
if err != nil {
return nil, err
}
}
return searchAttributes, nil
}

func (v *visibilityManagerDual) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func (p *visibilityManagerImpl) GetIndexName() string {
return p.store.GetIndexName()
}

func (p *visibilityManagerImpl) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return p.store.ValidateCustomSearchAttributes(searchAttributes)
}

func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func (m *visibilityManagerRateLimited) GetIndexName() string {
return m.delegate.GetIndexName()
}

func (m *visibilityManagerRateLimited) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return m.delegate.ValidateCustomSearchAttributes(searchAttributes)
}

// Below are write APIs.

func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted(
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/visibility/visiblity_manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func (m *visibilityManagerMetrics) GetIndexName() string {
return m.delegate.GetIndexName()
}

func (m *visibilityManagerMetrics) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
) (map[string]any, error) {
return m.delegate.ValidateCustomSearchAttributes(searchAttributes)
}

func (m *visibilityManagerMetrics) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
19 changes: 13 additions & 6 deletions common/searchattribute/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence/visibility/manager"
)

type (
Expand All @@ -44,7 +45,7 @@ type (
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
indexName string
visibilityManager manager.VisibilityManager

// allowList allows list of values when it's not keyword list type.
allowList bool
Expand All @@ -58,7 +59,7 @@ func NewValidator(
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
indexName string,
visibilityManager manager.VisibilityManager,
allowList bool,
) *Validator {
return &Validator{
Expand All @@ -67,7 +68,7 @@ func NewValidator(
searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit,
searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit,
searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit,
indexName: indexName,
visibilityManager: visibilityManager,
allowList: allowList,
}
}
Expand All @@ -90,13 +91,17 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(v.indexName, false)
saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(
v.visibilityManager.GetIndexName(),
false,
)
if err != nil {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("unable to get search attributes from cluster metadata: %v", err),
)
}

saMap := make(map[string]any, len(searchAttributes.GetIndexedFields()))
for saFieldName, saPayload := range searchAttributes.GetIndexedFields() {
// user search attribute cannot be a system search attribute
if _, err = saTypeMap.getType(saFieldName, systemCategory); err == nil {
Expand All @@ -121,7 +126,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

_, err = DecodeValue(saPayload, saType, v.allowList)
saValue, err := DecodeValue(saPayload, saType, v.allowList)
if err != nil {
var invalidValue interface{}
if err = payload.Decode(saPayload, &invalidValue); err != nil {
Expand All @@ -138,8 +143,10 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
namespace,
)
}
saMap[saFieldName] = saValue
}
return nil
_, err = v.visibilityManager.ValidateCustomSearchAttributes(saMap)
return err
}

// ValidateSize validate search attributes are valid for writing and not exceed limits.
Expand Down
Loading