Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for user meta data #2218

Merged
merged 8 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.google.common.base.Objects;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.common.CronSchedule;
import io.temporal.common.MethodRetry;
import io.temporal.common.RetryOptions;
import io.temporal.common.SearchAttributes;
import io.temporal.common.*;
import io.temporal.common.context.ContextPropagator;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.worker.WorkerFactory;
Expand Down Expand Up @@ -79,6 +76,8 @@ public static WorkflowOptions merge(
.setDisableEagerExecution(o.isDisableEagerExecution())
.setStartDelay(o.getStartDelay())
.setWorkflowIdConflictPolicy(o.getWorkflowIdConflictPolicy())
.setStaticSummary(o.getStaticSummary())
.setStaticDetails(o.getStaticDetails())
.validateBuildWithDefaults();
}

Expand Down Expand Up @@ -114,6 +113,10 @@ public static final class Builder {

private WorkflowIdConflictPolicy workflowIdConflictpolicy;

private String staticSummary;

private String staticDetails;

private Builder() {}

private Builder(WorkflowOptions options) {
Expand All @@ -135,6 +138,8 @@ private Builder(WorkflowOptions options) {
this.disableEagerExecution = options.disableEagerExecution;
this.startDelay = options.startDelay;
this.workflowIdConflictpolicy = options.workflowIdConflictpolicy;
this.staticSummary = options.staticSummary;
this.staticDetails = options.staticDetails;
}

/**
Expand Down Expand Up @@ -382,6 +387,31 @@ public Builder setStartDelay(Duration startDelay) {
return this;
}

/**
* Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be
* in single-line Temporal Markdown format.
*
* <p>Default is none/empty.
*/
@Experimental
public Builder setStaticSummary(String staticSummary) {
this.staticSummary = staticSummary;
return this;
}

/**
* General fixed details for this workflow execution that will appear in UI/CLI. This can be in
* Temporal Markdown format and can span multiple lines. This is a fixed value on the workflow
* that cannot be updated.
*
* <p>Default is none/empty.
*/
@Experimental
public Builder setStaticDetails(String staticDetails) {
this.staticDetails = staticDetails;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -398,7 +428,9 @@ public WorkflowOptions build() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy);
workflowIdConflictpolicy,
staticSummary,
staticDetails);
}

/**
Expand All @@ -420,7 +452,9 @@ public WorkflowOptions validateBuildWithDefaults() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy);
workflowIdConflictpolicy,
staticSummary,
staticDetails);
}
}

Expand Down Expand Up @@ -454,6 +488,10 @@ public WorkflowOptions validateBuildWithDefaults() {

private final WorkflowIdConflictPolicy workflowIdConflictpolicy;

private final String staticSummary;

private final String staticDetails;

private WorkflowOptions(
String workflowId,
WorkflowIdReusePolicy workflowIdReusePolicy,
Expand All @@ -469,7 +507,9 @@ private WorkflowOptions(
List<ContextPropagator> contextPropagators,
boolean disableEagerExecution,
Duration startDelay,
WorkflowIdConflictPolicy workflowIdConflictpolicy) {
WorkflowIdConflictPolicy workflowIdConflictpolicy,
String staticSummary,
String staticDetails) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.workflowRunTimeout = workflowRunTimeout;
Expand All @@ -485,6 +525,8 @@ private WorkflowOptions(
this.disableEagerExecution = disableEagerExecution;
this.startDelay = startDelay;
this.workflowIdConflictpolicy = workflowIdConflictpolicy;
this.staticSummary = staticSummary;
this.staticDetails = staticDetails;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -556,6 +598,14 @@ public WorkflowIdConflictPolicy getWorkflowIdConflictPolicy() {
return workflowIdConflictpolicy;
}

public String getStaticSummary() {
return staticSummary;
}

public String getStaticDetails() {
return staticDetails;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand All @@ -579,7 +629,9 @@ public boolean equals(Object o) {
&& Objects.equal(contextPropagators, that.contextPropagators)
&& Objects.equal(disableEagerExecution, that.disableEagerExecution)
&& Objects.equal(startDelay, that.startDelay)
&& Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy);
&& Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy)
&& Objects.equal(staticSummary, that.staticSummary)
&& Objects.equal(staticDetails, that.staticDetails);
}

@Override
Expand All @@ -599,7 +651,9 @@ public int hashCode() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy);
workflowIdConflictpolicy,
staticSummary,
staticDetails);
}

@Override
Expand Down Expand Up @@ -638,6 +692,10 @@ public String toString() {
+ startDelay
+ ", workflowIdConflictpolicy="
+ workflowIdConflictpolicy
+ ", staticSummary="
+ staticSummary
+ ", staticDetails="
+ staticDetails
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ public DynamicUpdateHandler getHandler() {

Promise<Void> newTimer(Duration duration);

Promise<Void> newTimer(Duration duration, TimerOptions options);

<R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func);

<R> R mutableSideEffect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.common.SearchAttributeUpdate;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Promise;
import io.temporal.workflow.TimerOptions;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -90,6 +91,11 @@ public Promise<Void> newTimer(Duration duration) {
return next.newTimer(duration);
}

@Override
public Promise<Void> newTimer(Duration duration, TimerOptions options) {
return next.newTimer(duration, options);
}

@Override
public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
return next.sideEffect(resultClass, resultType, func);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.client;

import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;

import io.grpc.Deadline;
import io.grpc.Status;
Expand All @@ -29,6 +30,7 @@
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.update.v1.*;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.*;
Expand Down Expand Up @@ -86,14 +88,22 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
.build()
: null;

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
input.getOptions().getStaticSummary(),
input.getOptions().getStaticDetails(),
dataConverterWithWorkflowContext);

StartWorkflowExecutionRequest.Builder request =
requestsHelper.newStartWorkflowExecutionRequest(
input.getWorkflowId(),
input.getWorkflowType(),
input.getHeader(),
input.getOptions(),
inputArgs.orElse(null),
memo);
memo,
userMetadata);
try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
boolean requestEagerExecution = eagerDispatchHandle != null;
request.setRequestEagerExecution(requestEagerExecution);
Expand Down Expand Up @@ -173,14 +183,22 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
.build()
: null;

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
workflowStartInput.getOptions().getStaticSummary(),
workflowStartInput.getOptions().getStaticDetails(),
dataConverterWithWorkflowContext);

StartWorkflowExecutionRequestOrBuilder startRequest =
requestsHelper.newStartWorkflowExecutionRequest(
workflowStartInput.getWorkflowId(),
workflowStartInput.getWorkflowType(),
workflowStartInput.getHeader(),
workflowStartInput.getOptions(),
workflowInput.orElse(null),
memo);
memo,
userMetadata);

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand All @@ -33,6 +34,7 @@
import io.temporal.api.schedule.v1.ScheduleInfo;
import io.temporal.api.schedule.v1.ScheduleSpec;
import io.temporal.api.schedule.v1.ScheduleState;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflow.v1.NewWorkflowExecutionInfo;
import io.temporal.client.WorkflowOptions;
Expand Down Expand Up @@ -160,6 +162,16 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
SearchAttributesUtil.encodeTyped(wfOptions.getTypedSearchAttributes()));
}

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
wfOptions.getStaticSummary(),
wfOptions.getStaticDetails(),
dataConverterWithWorkflowContext);
if (userMetadata != null) {
workflowRequest.setUserMetadata(userMetadata);
}

Header grpcHeader =
toHeaderGrpc(
startWorkflowAction.getHeader(),
Expand Down Expand Up @@ -460,6 +472,15 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu
SearchAttributesUtil.decodeTyped(startWfAction.getSearchAttributes()));
}

if (startWfAction.hasUserMetadata()) {
wfOptionsBuilder.setStaticSummary(
dataConverterWithWorkflowContext.fromPayload(
startWfAction.getUserMetadata().getSummary(), String.class, String.class));
wfOptionsBuilder.setStaticDetails(
dataConverterWithWorkflowContext.fromPayload(
startWfAction.getUserMetadata().getDetails(), String.class, String.class));
}

builder.setOptions(wfOptionsBuilder.build());
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.protobuf.ByteString;
import io.temporal.api.common.v1.*;
import io.temporal.api.enums.v1.HistoryEventFilterType;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest;
Expand Down Expand Up @@ -59,7 +60,8 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
@Nonnull io.temporal.common.interceptors.Header header,
@Nonnull WorkflowOptions options,
@Nullable Payloads inputArgs,
@Nullable Memo memo) {
@Nullable Memo memo,
@Nullable UserMetadata userMetadata) {
StartWorkflowExecutionRequest.Builder request =
StartWorkflowExecutionRequest.newBuilder()
.setNamespace(clientOptions.getNamespace())
Expand Down Expand Up @@ -108,6 +110,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
request.setWorkflowStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay()));
}

if (userMetadata != null) {
request.setUserMetadata(userMetadata);
}

if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) {
if (options.getTypedSearchAttributes() != null) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -183,6 +189,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
request.setWorkflowStartDelay(startParameters.getWorkflowStartDelay());
}

if (startParameters.hasUserMetadata()) {
request.setUserMetadata(startParameters.getUserMetadata());
}

return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.history.v1.*;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.client.WorkflowFailedException;
import io.temporal.common.converter.DataConverter;
Expand Down Expand Up @@ -240,6 +241,21 @@ public static WorkflowExecutionStatus getCloseStatus(HistoryEvent event) {
}
}

public static UserMetadata makeUserMetaData(String summary, String details, DataConverter dc) {
if (summary == null && details == null) {
return null;
}

UserMetadata.Builder builder = UserMetadata.newBuilder();
if (summary != null) {
builder.setSummary(dc.toPayload(summary).get());
}
if (details != null) {
builder.setDetails(dc.toPayload(details).get());
}
return builder.build();
}

public static String prettyPrintCommands(Iterable<Command> commands) {
StringBuilder result = new StringBuilder();
for (Command command : commands) {
Expand Down
Loading
Loading