Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed SearchAttributes and memo lost from ChildWorkflowOptions #500

Merged
merged 3 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ private Promise<Optional<Payloads>> executeChildWorkflow(
attributes.setRetryPolicy(toRetryPolicy(retryOptions));
}
attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
Map<String, Object> searchAttributes = options.getSearchAttributes();
if (searchAttributes != null) {
attributes.setSearchAttributes(InternalUtils.convertMapToSearchAttributes(searchAttributes));
}
Map<String, Object> memo = options.getMemo();
if (memo != null) {
attributes.setMemo(Memo.newBuilder().putAllFields(intoPayloadMapWithDefaultConverter(memo)));
}
io.temporal.api.common.v1.Header grpcHeader =
toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
attributes.setHeader(grpcHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,40 @@

package io.temporal.internal.sync;

import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.workflow.WorkflowInfo;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

final class WorkflowInfoImpl implements WorkflowInfo {

public enum SearchAttribute {
ExecutionStatus,
CloseTime,
CustomBoolField,
CustomDatetimeField,
CustomNamespace,
CustomDoubleField,
CustomIntField,
CustomKeywordField,
CustomStringField,
NamespaceId,
ExecutionTime,
HistoryLength,
RunId,
StartTime,
TaskQueue,
WorkflowId,
WorkflowType;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can hardcode the list of the search attributes as they can be added to a cluster at any time.


private final ReplayWorkflowContext context;

WorkflowInfoImpl(ReplayWorkflowContext context) {
Expand Down Expand Up @@ -80,10 +105,55 @@ public long getRunStartedTimestampMillis() {
}

@Override
@Deprecated
public SearchAttributes getSearchAttributes() {
return context.getSearchAttributes();
}

@Override
public Map<String, Object> getSearchAttributesMap() {
Map<String, Payload> serializedSearchAttributes =
context.getSearchAttributes().getIndexedFieldsMap();
Map<String, Object> searchAttributes = new HashMap<>();
DefaultDataConverter converter = DefaultDataConverter.newDefaultInstance();

for (String searchAttribute : serializedSearchAttributes.keySet()) {
SearchAttribute attribute = SearchAttribute.valueOf(searchAttribute);
Payload payload = serializedSearchAttributes.get(searchAttribute);
String stringValue = converter.fromPayload(payload, String.class, String.class);
switch (attribute) {
case CustomBoolField:
searchAttributes.put(searchAttribute, Boolean.parseBoolean(stringValue));
break;
case CustomDatetimeField:
searchAttributes.put(searchAttribute, LocalDateTime.parse(stringValue));
break;
case CustomDoubleField:
searchAttributes.put(searchAttribute, Double.parseDouble(stringValue));
break;
case CloseTime:
case CustomIntField:
case ExecutionStatus:
case ExecutionTime:
case HistoryLength:
case StartTime:
searchAttributes.put(searchAttribute, Integer.parseInt(stringValue));
break;
case CustomKeywordField:
case CustomNamespace:
case CustomStringField:
case NamespaceId:
case RunId:
case TaskQueue:
case WorkflowId:
case WorkflowType:
searchAttributes.put(searchAttribute, stringValue);
break;
}
}
return searchAttributes;
}

@Override
public Optional<String> getParentWorkflowId() {
WorkflowExecution parentWorkflowExecution = context.getParentWorkflowExecution();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.temporal.api.common.v1.SearchAttributes;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

public interface WorkflowInfo {
Expand Down Expand Up @@ -49,6 +50,8 @@ public interface WorkflowInfo {

SearchAttributes getSearchAttributes();

Map<String, Object> getSearchAttributesMap();

Optional<String> getParentWorkflowId();

Optional<String> getParentRunId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.temporal.workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.ByteString;
import com.uber.m3.tally.NoopScope;
Expand All @@ -35,48 +36,58 @@
import io.temporal.workflow.shared.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestMultiargdsWorkflowFunctions;
import io.temporal.workflow.shared.TestOptions;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class SearchAttributesTest {

private static Map<String, Object> searchAttributes = new HashMap<>();
private static String testKeyString = "CustomKeywordField";
private static String testValueString = "testKeyword";
private static String testKeyInteger = "CustomIntField";
private static Integer testValueInteger = 1;
private static String testKeyDateTime = "CustomDatetimeField";
private static LocalDateTime testValueDateTime = LocalDateTime.now();
private static String testKeyBool = "CustomBoolField";
private static Boolean testValueBool = true;
private static String testKeyDouble = "CustomDoubleField";
private static Double testValueDouble = 1.23;

@Before
public void setUp() {
// add more type to test
searchAttributes = new HashMap<>();
searchAttributes.put(testKeyString, testValueString);
searchAttributes.put(testKeyInteger, testValueInteger);
searchAttributes.put(testKeyDateTime, testValueDateTime);
searchAttributes.put(testKeyBool, testValueBool);
searchAttributes.put(testKeyDouble, testValueDouble);
}

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestMultiargdsWorkflowFunctions.TestMultiargsWorkflowsImpl.class)
.setWorkflowTypes(
TestMultiargdsWorkflowFunctions.TestMultiargsWorkflowsImpl.class,
TestParentWorkflow.class,
TestChild.class)
.build();

@Test
public void testSearchAttributes() {
if (SDKTestWorkflowRule.useExternalService) {
return;
}
String testKeyString = "CustomKeywordField";
String testValueString = "testKeyword";
String testKeyInteger = "CustomIntField";
Integer testValueInteger = 1;
String testKeyDateTime = "CustomDatetimeField";
LocalDateTime testValueDateTime = LocalDateTime.now();
String testKeyBool = "CustomBoolField";
Boolean testValueBool = true;
String testKeyDouble = "CustomDoubleField";
Double testValueDouble = 1.23;

// add more type to test
Map<String, Object> searchAttr = new HashMap<>();
searchAttr.put(testKeyString, testValueString);
searchAttr.put(testKeyInteger, testValueInteger);
searchAttr.put(testKeyDateTime, testValueDateTime);
searchAttr.put(testKeyBool, testValueBool);
searchAttr.put(testKeyDouble, testValueDouble);

WorkflowOptions workflowOptions =
TestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())
.toBuilder()
.setSearchAttributes(searchAttr)
.setSearchAttributes(searchAttributes)
.build();
TestMultiargdsWorkflowFunctions.TestMultiargsWorkflowsFunc stubF =
testWorkflowRule
Expand Down Expand Up @@ -119,4 +130,33 @@ public void testSearchAttributes() {
converter.fromPayload(searchAttrDoubleBytes, Double.class, Double.class);
assertEquals(testValueDouble, retrievedDouble);
}

@Test
public void testSearchAttributesPresentInChildWorkflow() {
TestWorkflows.TestWorkflow4 client =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow4.class);
Map<String, Object> result = client.execute();
result.put(testKeyDateTime, LocalDateTime.parse(result.get(testKeyDateTime).toString()));
assertTrue(result.equals(searchAttributes));
}

public static class TestParentWorkflow implements TestWorkflows.TestWorkflow4 {
@Override
public Map<String, Object> execute() {
ChildWorkflowOptions options =
ChildWorkflowOptions.newBuilder().setSearchAttributes(searchAttributes).build();
TestWorkflows.TestMapWorkflow child =
Workflow.newChildWorkflowStub(TestWorkflows.TestMapWorkflow.class, options);
Map<String, Object> result = child.execute();
result.put(testKeyDateTime, LocalDateTime.parse(result.get(testKeyDateTime).toString()));
return child.execute();
}
}

public static class TestChild implements TestWorkflows.TestMapWorkflow {
@Override
public Map<String, Object> execute() {
return Workflow.getInfo().getSearchAttributesMap();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public interface TestWorkflow3 {
Map<String, Map<String, Duration>> execute();
}

@WorkflowInterface
public interface TestWorkflow4 {

@WorkflowMethod
Map<String, Object> execute();
}

@WorkflowInterface
public interface TestMapWorkflow {

@WorkflowMethod
Map<String, Object> execute();
}

@WorkflowInterface
public interface TestChildWorkflow {
@WorkflowMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,9 @@ private static void initiateChildWorkflow(
if (d.hasHeader()) {
startChild.setHeader(d.getHeader());
}
if (d.hasSearchAttributes()) {
startChild.setSearchAttributes(d.getSearchAttributes());
}
if (d.hasMemo()) {
startChild.setMemo(d.getMemo());
}
Expand Down