Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename persistence schema option to persistence options #170

Merged
merged 2 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iworkflow.core;

import io.iworkflow.core.persistence.PersistenceSchemaOptions;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
Expand Down Expand Up @@ -198,9 +198,9 @@ private String doStartWorkflow(
}
}

final PersistenceSchemaOptions schemaOptions = registry.getPersistenceSchemaOptions(wfType);
if (schemaOptions.getCachingPersistenceByMemo()) {
unregisterWorkflowOptions.usingMemoForDataAttributes(schemaOptions.getCachingPersistenceByMemo());
final PersistenceOptions schemaOptions = registry.getPersistenceOptions(wfType);
if (schemaOptions.getEnableCaching()) {
unregisterWorkflowOptions.usingMemoForDataAttributes(schemaOptions.getEnableCaching());
}

return unregisteredClient.startWorkflow(wfType, startStateId, workflowId, workflowTimeoutSeconds, input, unregisterWorkflowOptions.build());
Expand Down Expand Up @@ -426,9 +426,9 @@ private Map<String, Object> doGetWorkflowDataObjects(
}
}

final PersistenceSchemaOptions schemaOptions = registry.getPersistenceSchemaOptions(wfType);
final PersistenceOptions schemaOptions = registry.getPersistenceOptions(wfType);

final WorkflowGetDataObjectsResponse response = unregisteredClient.getAnyWorkflowDataObjects(workflowId, workflowRunId, keys, schemaOptions.getCachingPersistenceByMemo());
final WorkflowGetDataObjectsResponse response = unregisteredClient.getAnyWorkflowDataObjects(workflowId, workflowRunId, keys, schemaOptions.getEnableCaching());

if (response.getObjects() == null) {
throw new IllegalStateException("data attributes not returned");
Expand Down Expand Up @@ -478,7 +478,7 @@ public WorkflowSearchResponse searchWorkflow(final WorkflowSearchRequest request
public <T> T newRpcStub(Class<T> workflowClassForRpc, String workflowId, String workflowRunId) {

final String wfType = workflowClassForRpc.getSimpleName();
final PersistenceSchemaOptions schemaOptions = registry.getPersistenceSchemaOptions(wfType);
final PersistenceOptions schemaOptions = registry.getPersistenceOptions(wfType);
final Map<String, SearchAttributeValueType> searchAttributeKeyToTypeMap = registry.getSearchAttributeKeyToTypeMap(wfType);
List<SearchAttributeKeyAndType> keyAndTypes = new ArrayList<>();

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/iworkflow/core/ObjectWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.iworkflow.core.communication.CommunicationMethodDef;
import io.iworkflow.core.persistence.PersistenceFieldDef;
import io.iworkflow.core.persistence.PersistenceSchemaOptions;
import io.iworkflow.core.persistence.PersistenceOptions;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -47,8 +47,8 @@ default List<PersistenceFieldDef> getPersistenceSchema() {
return Collections.emptyList();
}

default PersistenceSchemaOptions getPersistenceSchemaOptions() {
return PersistenceSchemaOptions.getDefault();
default PersistenceOptions getPersistenceOptions() {
return PersistenceOptions.getDefault();
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/iworkflow/core/RPC.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iworkflow.core;

import io.iworkflow.core.persistence.PersistenceSchemaOptions;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.PersistenceLoadingType;

import java.lang.annotation.ElementType;
Expand Down Expand Up @@ -31,7 +31,7 @@
String[] searchAttributesPartialLoadingKeys() default {};

/**
* Only used when workflow has enabled {@link PersistenceSchemaOptions} CachingPersistenceByMemo
* Only used when workflow has enabled {@link PersistenceOptions} CachingPersistenceByMemo
* By default, it's false for high throughput support
* flip to true to bypass the caching for strong consistent reads
*/
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/iworkflow/core/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.iworkflow.core.communication.SignalChannelDef;
import io.iworkflow.core.persistence.DataAttributeDef;
import io.iworkflow.core.persistence.PersistenceFieldDef;
import io.iworkflow.core.persistence.PersistenceSchemaOptions;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.core.persistence.SearchAttributeDef;
import io.iworkflow.gen.models.SearchAttributeValueType;

Expand All @@ -30,7 +30,7 @@ public class Registry {

private final Map<String, Map<String, SearchAttributeValueType>> searchAttributeTypeStore = new HashMap<>();

private final Map<String, PersistenceSchemaOptions> persistenceSchemaOptionsMap = new HashMap<>();
private final Map<String, PersistenceOptions> persistenceOptionsMap = new HashMap<>();
private final Map<String, Map<String, Method>> rpcMethodStore = new HashMap<>();

private static final String DELIMITER = "_";
Expand All @@ -50,7 +50,7 @@ public void addWorkflow(final ObjectWorkflow wf) {
registerWorkflowInternalChannel(wf);
registerWorkflowDataAttributes(wf);
registerWorkflowSearchAttributes(wf);
registerPersistenceSchemaOptions(wf);
registerPersistenceOptions(wf);
registerWorkflowRPCs(wf);
}
public static String getWorkflowType(final ObjectWorkflow wf) {
Expand Down Expand Up @@ -174,9 +174,9 @@ private void registerWorkflowDataAttributes(final ObjectWorkflow wf) {
}
}

private void registerPersistenceSchemaOptions(final ObjectWorkflow wf) {
private void registerPersistenceOptions(final ObjectWorkflow wf) {
String workflowType = getWorkflowType(wf);
this.persistenceSchemaOptionsMap.put(workflowType, wf.getPersistenceSchemaOptions());
this.persistenceOptionsMap.put(workflowType, wf.getPersistenceOptions());
}

private List<DataAttributeDef> getDataAttributeFields(final ObjectWorkflow wf) {
Expand Down Expand Up @@ -266,8 +266,8 @@ public Map<String, SearchAttributeValueType> getSearchAttributeKeyToTypeMap(fina
return searchAttributeTypeStore.get(workflowType);
}

public PersistenceSchemaOptions getPersistenceSchemaOptions(final String workflowType) {
return persistenceSchemaOptionsMap.get(workflowType);
public PersistenceOptions getPersistenceOptions(final String workflowType) {
return persistenceOptionsMap.get(workflowType);
}

private String getStateDefKey(final String workflowType, final String stateId) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/iworkflow/core/RpcInvocationHandler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iworkflow.core;

import io.iworkflow.core.persistence.PersistenceSchemaOptions;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
Expand All @@ -22,11 +22,11 @@ public class RpcInvocationHandler {

final UnregisteredClient unregisteredClient;

final PersistenceSchemaOptions schemaOptions;
final PersistenceOptions schemaOptions;

final List<SearchAttributeKeyAndType> searchAttributeKeyAndTypes;

public RpcInvocationHandler(final UnregisteredClient unregisteredClient, final String workflowId, final String workflowRunId, final PersistenceSchemaOptions schemaOptions, final List<SearchAttributeKeyAndType> searchAttributeKeyAndTypes) {
public RpcInvocationHandler(final UnregisteredClient unregisteredClient, final String workflowId, final String workflowRunId, final PersistenceOptions schemaOptions, final List<SearchAttributeKeyAndType> searchAttributeKeyAndTypes) {
this.unregisteredClient = unregisteredClient;
this.workflowId = workflowId;
this.workflowRunId = workflowRunId;
Expand All @@ -49,7 +49,7 @@ public Object intercept(@AllArguments Object[] allArguments,

final Class<?> outputType = method.getReturnType();

boolean useMemo = schemaOptions.getCachingPersistenceByMemo();
boolean useMemo = schemaOptions.getEnableCaching();
if (rpcAnno.bypassCachingForStrongConsistency()) {
useMemo = false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.iworkflow.core.persistence;

import org.immutables.value.Value;

@Value.Immutable
public abstract class PersistenceOptions {
// This option will enable caching persistence (data/search attributes) so that the readonly-RPC or GetDataAttributes API can
// support a much higher throughput on a single workflow execution.
// NOTES:
// 1. The read after write will become eventual consistent, unless set bypassCachingForStrongConsistency to true in RPC annotation
// 2. The caching is implemented by Temporal upsertMemo feature. Only iWF service with Temporal as backend is supporting this feature at the moment
// 3. It will extra cost as it will upsertMemo(WorkflowPropertiesModified event in the history) for write
// 4. Only useful for read-only RPC(no persistence.SetXXX API or communication API calls)
public abstract boolean getEnableCaching();

public static PersistenceOptions getDefault() {
return ImmutablePersistenceOptions.builder()
.enableCaching(false)
.build();
}

public static ImmutablePersistenceOptions.Builder builder() {
return ImmutablePersistenceOptions.builder();
}
}

This file was deleted.

8 changes: 4 additions & 4 deletions src/test/java/io/iworkflow/integ/rpc/RpcMemoWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.iworkflow.core.persistence.DataAttributeDef;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.core.persistence.PersistenceFieldDef;
import io.iworkflow.core.persistence.PersistenceSchemaOptions;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.core.persistence.SearchAttributeDef;
import io.iworkflow.gen.models.SearchAttributeValueType;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -56,9 +56,9 @@ public List<PersistenceFieldDef> getPersistenceSchema() {
}

@Override
public PersistenceSchemaOptions getPersistenceSchemaOptions() {
return PersistenceSchemaOptions.builder()
.cachingPersistenceByMemo(true).build();
public PersistenceOptions getPersistenceOptions() {
return PersistenceOptions.builder()
.enableCaching(true).build();
}

@RPC
Expand Down