Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconcile typed search attributes with schedules #1848

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public <T> Builder setWorkflowType(Class<T> workflowInterface) {
* Set the workflow options to use when starting a workflow action.
*
* @note ID and TaskQueue are required. Some options like ID reuse policy, cron schedule, and
* start signal cannot be set or an error will occur.
* start signal cannot be set or an error will occur. Schedules requires the use of typed
* search attributes and untyped search attributes will be ignored.
*/
public Builder setOptions(WorkflowOptions options) {
this.options = options;
Expand All @@ -99,7 +100,10 @@ public Builder setArguments(Object... arguments) {

public ScheduleActionStartWorkflow build() {
return new ScheduleActionStartWorkflow(
workflowType, options, header == null ? Header.empty() : header, arguments);
workflowType,
options,
header == null ? Header.empty() : header,
arguments == null ? new EncodedValues() : arguments);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.client.schedules;

import io.temporal.api.common.v1.Payload;
import io.temporal.common.SearchAttributes;
import io.temporal.common.converter.DataConverter;
import java.lang.reflect.Type;
import java.util.List;
Expand All @@ -35,6 +36,7 @@ public final class ScheduleDescription {
private final ScheduleInfo info;
private final Schedule schedule;
private final Map<String, List<?>> searchAttributes;
private final SearchAttributes typedSearchAttributes;
private final Map<String, Payload> memo;
private final DataConverter dataConverter;

Expand All @@ -43,12 +45,14 @@ public ScheduleDescription(
ScheduleInfo info,
Schedule schedule,
Map<String, List<?>> searchAttributes,
SearchAttributes typedSearchAttributes,
Map<String, Payload> memo,
DataConverter dataConverter) {
this.id = id;
this.info = info;
this.schedule = schedule;
this.searchAttributes = searchAttributes;
this.typedSearchAttributes = typedSearchAttributes;
this.memo = memo;
this.dataConverter = dataConverter;
}
Expand Down Expand Up @@ -84,12 +88,23 @@ public ScheduleDescription(
* Gets the search attributes on the schedule.
*
* @return search attributes
* @deprecated use {@link ScheduleDescription#getTypedSearchAttributes} instead.
*/
@Nonnull
public Map<String, List<?>> getSearchAttributes() {
return searchAttributes;
}

/**
* Gets the search attributes on the schedule.
*
* @return search attributes
*/
@Nonnull
public SearchAttributes getTypedSearchAttributes() {
return typedSearchAttributes;
}

@Nullable
public <T> Object getMemo(String key, Class<T> valueClass) {
return getMemo(key, valueClass, valueClass);
Expand All @@ -113,12 +128,13 @@ public boolean equals(Object o) {
&& Objects.equals(info, that.info)
&& Objects.equals(schedule, that.schedule)
&& Objects.equals(searchAttributes, that.searchAttributes)
&& Objects.equals(typedSearchAttributes, that.typedSearchAttributes)
&& Objects.equals(memo, that.memo);
}

@Override
public int hashCode() {
return Objects.hash(id, info, schedule, searchAttributes, memo);
return Objects.hash(id, info, schedule, searchAttributes, typedSearchAttributes, memo);
}

@Override
Expand All @@ -133,6 +149,8 @@ public String toString() {
+ schedule
+ ", searchAttributes="
+ searchAttributes
+ ", typedSearchAttributes="
+ typedSearchAttributes
+ ", memo="
+ memo
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.client.schedules;

import io.temporal.common.SearchAttributes;
import java.util.List;
import java.util.Map;

Expand All @@ -38,6 +39,7 @@ public static final class Builder {
private List<ScheduleBackfill> backfills;
private Map<String, Object> memo;
private Map<String, ?> searchAttributes;
private SearchAttributes typedSearchAttributes;

private Builder() {}

Expand All @@ -49,6 +51,7 @@ private Builder(ScheduleOptions options) {
this.backfills = options.backfills;
this.memo = options.memo;
this.searchAttributes = options.searchAttributes;
this.typedSearchAttributes = options.typedSearchAttributes;
}

/** Set if the schedule will be triggered immediately upon creation. */
Expand All @@ -69,31 +72,45 @@ public Builder setMemo(Map<String, Object> memo) {
return this;
}

/** Set the search attributes for the schedule. */
/**
* Set the search attributes for the schedule.
*
* @deprecated use {@link ScheduleOptions.Builder#setTypedSearchAttributes} instead.
*/
public Builder setSearchAttributes(Map<String, ?> searchAttributes) {
this.searchAttributes = searchAttributes;
return this;
}

/** Set the search attributes for the schedule. */
public Builder setTypedSearchAttributes(SearchAttributes searchAttributes) {
this.typedSearchAttributes = searchAttributes;
return this;
}

public ScheduleOptions build() {
return new ScheduleOptions(triggerImmediately, backfills, memo, searchAttributes);
return new ScheduleOptions(
triggerImmediately, backfills, memo, searchAttributes, typedSearchAttributes);
}
}

private final boolean triggerImmediately;
private final List<ScheduleBackfill> backfills;
private final Map<String, Object> memo;
private final Map<String, ?> searchAttributes;
private final SearchAttributes typedSearchAttributes;

private ScheduleOptions(
boolean triggerImmediately,
List<ScheduleBackfill> backfills,
Map<String, Object> memo,
Map<String, ?> searchAttributes) {
Map<String, ?> searchAttributes,
SearchAttributes typedSearchAttributes) {
this.triggerImmediately = triggerImmediately;
this.backfills = backfills;
this.memo = memo;
this.searchAttributes = searchAttributes;
this.typedSearchAttributes = typedSearchAttributes;
}

/**
Expand Down Expand Up @@ -127,8 +144,18 @@ public Map<String, Object> getMemo() {
* Get the search attributes for the schedule.
*
* @return search attributes for the schedule
* @deprecated use {@link ScheduleOptions#getTypedSearchAttributes()} instead.
*/
public Map<String, ?> getSearchAttributes() {
return searchAttributes;
}

/**
* Get the search attributes for the schedule.
*
* @return search attributes for the schedule
*/
public SearchAttributes getTypedSearchAttributes() {
return typedSearchAttributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common;

import com.google.common.reflect.TypeToken;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.enums.v1.IndexedValueType;
import java.lang.reflect.Type;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -73,6 +74,17 @@ public static SearchAttributeKey<List<String>> forKeywordList(String name) {
KEYWORD_LIST_REFLECT_TYPE);
}

/**
* Create a search attribute key for an untyped attribute type.
*
* <p>This should only be used when the server can return untyped search attributes, for example,
* when describing a schedule workflow action.
*/
public static SearchAttributeKey<Payload> forUntyped(String name) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also make calling valueSet and valueUnset an error since I am not aware of any valid use case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would work, I think we should. We can hide this more in other langs. Arguably we could make the name of this method more unwieldy, but this is probably good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return new SearchAttributeKey<>(
name, IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED, Payload.class);
}

private final String name;
private final IndexedValueType valueType;
private final Class<? super T> valueClass;
Expand Down Expand Up @@ -115,11 +127,17 @@ public Type getValueReflectType() {

/** Create an update that sets a value for this key. */
public SearchAttributeUpdate<T> valueSet(@Nonnull T value) {
if (valueType == IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED) {
throw new IllegalStateException("untyped keys should not be used in workflows");
}
return SearchAttributeUpdate.valueSet(this, value);
}

/** Create an update that unsets a value for this key. */
public SearchAttributeUpdate<T> valueUnset() {
if (valueType == IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED) {
throw new IllegalStateException("untyped keys should not be used in workflows");
}
return SearchAttributeUpdate.valueUnset(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public RootScheduleClientInvoker(
}

@Override
@SuppressWarnings("deprecation")
public void createSchedule(CreateScheduleInput input) {

CreateScheduleRequest.Builder request =
Expand All @@ -75,8 +76,15 @@ public void createSchedule(CreateScheduleInput input) {

if (input.getOptions().getSearchAttributes() != null
&& !input.getOptions().getSearchAttributes().isEmpty()) {
if (input.getOptions().getTypedSearchAttributes() != null) {
throw new IllegalArgumentException(
"Cannot have search attributes and typed search attributes");
}
request.setSearchAttributes(
SearchAttributesUtil.encode(input.getOptions().getSearchAttributes()));
} else if (input.getOptions().getTypedSearchAttributes() != null) {
request.setSearchAttributes(
SearchAttributesUtil.encodeTyped(input.getOptions().getTypedSearchAttributes()));
}

if (input.getOptions().isTriggerImmediately()
Expand Down Expand Up @@ -194,6 +202,7 @@ public DescribeScheduleOutput describeSchedule(DescribeScheduleInput input) {
scheduleRequestHeader.protoToSchedule(response.getSchedule()),
Collections.unmodifiableMap(
SearchAttributesUtil.decode(response.getSearchAttributes())),
SearchAttributesUtil.decodeTyped(response.getSearchAttributes()),
response.getMemo().getFieldsMap(),
clientOptions.getDataConverter()));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ final class SearchAttributePayloadConverter {
new SearchAttributePayloadConverter();

public Payload encodeTyped(SearchAttributeKey<?> key, @Nullable Object value) {
if (key.getValueType() == IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED) {
// If we don't have the type we should just leave the payload as is
return (Payload) value;
}
// We can encode as-is because we know it's strictly typed to expected key value. We
// accept a null value because updates for unset can be null.
return DefaultDataConverter.STANDARD_INSTANCE.toPayload(value).get().toBuilder()
Expand All @@ -62,6 +66,13 @@ public void decodeTyped(SearchAttributes.Builder builder, String name, @Nonnull
// Get key type
SearchAttributeKey key;
IndexedValueType indexType = getIndexType(payload.getMetadataMap().get(METADATA_TYPE_KEY));
if (indexType == null) {
// If the server didn't write the type metadata we
// don't know how to decode this search attribute
key = SearchAttributeKey.forUntyped(name);
builder.set(key, payload);
return;
}
switch (indexType) {
case INDEXED_VALUE_TYPE_TEXT:
key = SearchAttributeKey.forText(name);
Expand Down
Loading