Skip to content

Commit

Permalink
Custom search attributes validation per store
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Jul 20, 2023
1 parent 2605c0b commit 9cd238d
Show file tree
Hide file tree
Showing 20 changed files with 247 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type (
GetStoreNames() []string
HasStoreName(stName string) bool
GetIndexName() string
ValidateCustomSearchAttributes(searchAttributes map[string]any, ignoreInvalidValues bool) (map[string]any, error)

// Write APIs.
RecordWorkflowExecutionStarted(ctx context.Context, request *RecordWorkflowExecutionStartedRequest) error
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibility/manager/visibility_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ var _ store.VisibilityStore = (*visibilityStore)(nil)
var (
errUnexpectedJSONFieldType = errors.New("unexpected JSON field type")

minTime = time.Unix(0, 0).UTC()
maxTime = time.Unix(0, math.MaxInt64).UTC()
maxStringLength = 32766

// Default sorter uses the sorting order defined in the index template.
// It is indirectly built so buildPaginationQuery can have access to
// the fields names to build the page query from the token.
Expand Down Expand Up @@ -164,6 +168,45 @@ func (s *visibilityStore) GetIndexName() string {
return s.index
}

//nolint:revive // control flag
func (s *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
validatedSearchAttributes := make(map[string]any, len(searchAttributes))
for saName, saValue := range searchAttributes {
if saValue != nil {
var err error
switch value := saValue.(type) {
case time.Time:
err = validateDatetime(value)
case []time.Time:
for _, item := range value {
if err = validateDatetime(item); err != nil {
break
}
}
case string:
err = validateString(value)
case []string:
for _, item := range value {
if err = validateString(item); err != nil {
break
}
}
}
if err != nil {
if ignoreInvalidValues {
continue
}
return nil, err
}
}
validatedSearchAttributes[saName] = saValue
}
return validatedSearchAttributes, nil
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down Expand Up @@ -941,6 +984,9 @@ func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequest
s.metricsHandler.Counter(metrics.ElasticsearchDocumentGenerateFailuresCount.GetMetricName()).Record(1)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to decode search attributes: %v", err))
}
// This is to prevent existing tasks to fail indefinitely.
// Since it's just gonna ignore invalid values, it's safe to ignore the error.
searchAttributes, _ = s.ValidateCustomSearchAttributes(searchAttributes, true)
for saName, saValue := range searchAttributes {
if saValue == nil {
// If search attribute value is `nil`, it means that it shouldn't be added to the document.
Expand Down Expand Up @@ -1321,3 +1367,25 @@ func parsePageTokenValue(
))
}
}

func validateDatetime(value time.Time) error {
if value.Before(minTime) || value.After(maxTime) {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("Date not supported in Elasticsearch: %v", value),
)
}
return nil
}

func validateString(value string) error {
if len(value) > maxStringLength {
return serviceerror.NewInvalidArgument(
fmt.Sprintf(
"Strings with more than %d bytes are not supported in Elasticsearch (got %s)",
maxStringLength,
value,
),
)
}
return nil
}
10 changes: 10 additions & 0 deletions common/persistence/visibility/store/sql/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ func (s *VisibilityStore) GetIndexName() string {
return s.sqlStore.GetDbName()
}

func (s *VisibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *VisibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down Expand Up @@ -514,6 +521,9 @@ func (s *VisibilityStore) prepareSearchAttributesForDb(
if err != nil {
return nil, err
}
// This is to prevent existing tasks to fail indefinitely.
// Since it's just gonna ignore invalid values, it's safe to ignore the error.
searchAttributes, _ = s.ValidateCustomSearchAttributes(searchAttributes, true)

for name, value := range searchAttributes {
if value == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ func (v *visibilityStore) GetIndexName() string {
return ""
}

func (v *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return searchAttributes, nil
}

// Close releases the resources held by this object
func (v *visibilityStore) Close() {
v.session.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ func (s *visibilityStore) GetIndexName() string {
return ""
}

func (s *visibilityStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (s *standardStore) GetIndexName() string {
return ""
}

func (s *standardStore) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return searchAttributes, nil
}

func (s *standardStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/visibility/store/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
persistence.Closeable
GetName() string
GetIndexName() string
ValidateCustomSearchAttributes(searchAttributes map[string]any, ignoreInvalidValues bool) (map[string]any, error)

// Write APIs.
RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibility/store/visibility_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions common/persistence/visibility/visibility_manager_dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ func (v *visibilityManagerDual) GetIndexName() string {
return v.visibilityManager.GetIndexName()
}

func (v *visibilityManagerDual) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return nil, err
}
for _, m := range ms {
searchAttributes, err = m.ValidateCustomSearchAttributes(searchAttributes, ignoreInvalidValues)
if err != nil {
return nil, err
}
}
return searchAttributes, nil
}

func (v *visibilityManagerDual) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func (p *visibilityManagerImpl) GetIndexName() string {
return p.store.GetIndexName()
}

func (p *visibilityManagerImpl) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return p.store.ValidateCustomSearchAttributes(searchAttributes, ignoreInvalidValues)
}

func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (m *visibilityManagerRateLimited) GetIndexName() string {
return m.delegate.GetIndexName()
}

func (m *visibilityManagerRateLimited) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return m.delegate.ValidateCustomSearchAttributes(searchAttributes, ignoreInvalidValues)
}

// Below are write APIs.

func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted(
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/visibility/visiblity_manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (m *visibilityManagerMetrics) GetIndexName() string {
return m.delegate.GetIndexName()
}

func (m *visibilityManagerMetrics) ValidateCustomSearchAttributes(
searchAttributes map[string]any,
ignoreInvalidValues bool,
) (map[string]any, error) {
return m.delegate.ValidateCustomSearchAttributes(searchAttributes, ignoreInvalidValues)
}

func (m *visibilityManagerMetrics) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
Expand Down
19 changes: 13 additions & 6 deletions common/searchattribute/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence/visibility/manager"
)

type (
Expand All @@ -44,7 +45,7 @@ type (
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
indexName string
visibilityManager manager.VisibilityManager

// allowList allows list of values when it's not keyword list type.
allowList bool
Expand All @@ -58,7 +59,7 @@ func NewValidator(
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
indexName string,
visibilityManager manager.VisibilityManager,
allowList bool,
) *Validator {
return &Validator{
Expand All @@ -67,7 +68,7 @@ func NewValidator(
searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit,
searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit,
searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit,
indexName: indexName,
visibilityManager: visibilityManager,
allowList: allowList,
}
}
Expand All @@ -90,13 +91,17 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(v.indexName, false)
saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(
v.visibilityManager.GetIndexName(),
false,
)
if err != nil {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("unable to get search attributes from cluster metadata: %v", err),
)
}

saMap := make(map[string]any, len(searchAttributes.GetIndexedFields()))
for saFieldName, saPayload := range searchAttributes.GetIndexedFields() {
// user search attribute cannot be a system search attribute
if _, err = saTypeMap.getType(saFieldName, systemCategory); err == nil {
Expand All @@ -121,7 +126,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

_, err = DecodeValue(saPayload, saType, v.allowList)
saValue, err := DecodeValue(saPayload, saType, v.allowList)
if err != nil {
var invalidValue interface{}
if err = payload.Decode(saPayload, &invalidValue); err != nil {
Expand All @@ -138,8 +143,10 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
namespace,
)
}
saMap[saFieldName] = saValue
}
return nil
_, err = v.visibilityManager.ValidateCustomSearchAttributes(saMap, false)
return err
}

// ValidateSize validate search attributes are valid for writing and not exceed limits.
Expand Down
Loading

0 comments on commit 9cd238d

Please sign in to comment.