Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow authoring support #563

Merged
merged 18 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions examples/workflow/authoring/src/activity-sequence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowR

async function start() {
// Update the gRPC client and worker to use a local address and port
const clientHost = "localhost";
const clientPort = "50001"
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
clientHost,
clientPort,
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
clientHost,
clientPort,
daprHost,
daprPort,
shubham1172 marked this conversation as resolved.
Show resolved Hide resolved
});

const hello = async (_: WorkflowActivityContext, name: string) => {
Expand Down
12 changes: 6 additions & 6 deletions examples/workflow/authoring/src/fanout-fanin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import {
// Wrap the entire code in an immediately-invoked async function
async function start() {
// Update the gRPC client and worker to use a local address and port
const clientHost = "localhost";
const clientPort = "50001"
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
clientHost,
clientPort,
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
clientHost,
clientPort,
daprHost,
daprPort,
});

function getRandomInt(min: number, max: number): number {
shubham1172 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
16 changes: 8 additions & 8 deletions examples/workflow/authoring/src/human-interaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ async function start() {
}

// Update the gRPC client and worker to use a local address and port
const clientHost = "localhost";
const clientPort = "50001"
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
clientHost,
clientPort,
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
clientHost,
clientPort,
daprHost,
daprPort,
});

//Activity function that sends an approval request to the manager
// Activity function that sends an approval request to the manager
const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
// Simulate some work that takes an amount of time
await sleep(3000);
Expand Down Expand Up @@ -111,7 +111,7 @@ async function start() {
const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
console.log(`Orchestration scheduled with ID: ${id}`);

//prompt for approval asynchronously
// prompt for approval asynchronously
promptForApproval(approver, workflowClient, id);

// Wait for orchestration completion
Expand Down
8 changes: 4 additions & 4 deletions src/types/workflow/WorkflowClientOption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ export type WorkflowClientOptions = {
* Host location of the Dapr sidecar.
* Default is 127.0.0.1.
*/
clientHost: string;
daprHost: string;

/**
* Port of the Dapr sidecar.
* Default is 4001.
* Port of the Dapr sidecar running a gRPC server.
* Default is 50001.
*/
clientPort: string;
daprPort: string;

/**
* Options related to logging.
Expand Down
19 changes: 10 additions & 9 deletions src/workflow/client/DaprWorkflowClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import { getFunctionName } from "../internal";
import { Settings } from "../../utils/Settings.util";
import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";
import { GrpcEndpoint } from "../../network/GrpcEndpoint";

/**
* Class that defines client operations for managing workflow instances.
Expand All @@ -33,19 +34,19 @@
* @param {WorkflowClientOptions | undefined} options - Additional options for configuring DaprWorkflowClient.
*/
constructor(options: Partial<WorkflowClientOptions> = {}) {
const hostAddress = this.generateHostAddress(options);
options.daprApiToken = this.generateDaprApiToken(options);
this._innerClient = this.buildInnerClient(hostAddress, options);
const grpcEndpoint = this.generateEndpoint(options);
options.daprApiToken = this.getDaprApiToken(options);
this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
}

private generateHostAddress(options: Partial<WorkflowClientOptions>): string {
const host = options?.clientHost ?? Settings.getDefaultHost();
const port = options?.clientPort ?? Settings.getDefaultGrpcPort();
const hostAddress = `${host}:${port}`;
return hostAddress;
private generateEndpoint(options: Partial<WorkflowClientOptions>): GrpcEndpoint {
const host = options?.daprHost ?? Settings.getDefaultHost();
const port = options?.daprPort ?? Settings.getDefaultGrpcPort();
const uri = `${host}:${port}`;
return new GrpcEndpoint(uri);
}

private generateDaprApiToken(options: Partial<WorkflowClientOptions>): string | undefined {
private getDaprApiToken(options: Partial<WorkflowClientOptions>): string | undefined {
const daprApiToken = options?.daprApiToken ?? Settings.getDefaultApiToken();
return daprApiToken;
}
Expand All @@ -53,7 +54,7 @@
private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
let innerOptions = options?.grpcOptions;
if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
innerOptions = {

Check warning on line 57 in src/workflow/client/DaprWorkflowClient.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/client/DaprWorkflowClient.ts#L57

Added line #L57 was not covered by tests
...innerOptions,
interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
};
Expand All @@ -77,7 +78,7 @@
startAt?: Date,
): Promise<string> {
if (typeof workflow === "string") {
return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt);

Check warning on line 81 in src/workflow/client/DaprWorkflowClient.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/client/DaprWorkflowClient.ts#L81

Added line #L81 was not covered by tests
}
return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt);
}
Expand Down Expand Up @@ -106,9 +107,9 @@
workflowInstanceId: string,
getInputsAndOutputs: boolean,
): Promise<WorkflowState | undefined> {
const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs);

Check warning on line 110 in src/workflow/client/DaprWorkflowClient.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/client/DaprWorkflowClient.ts#L110

Added line #L110 was not covered by tests
if (state !== undefined) {
return new WorkflowState(state);

Check warning on line 112 in src/workflow/client/DaprWorkflowClient.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/client/DaprWorkflowClient.ts#L112

Added line #L112 was not covered by tests
}
}

Expand Down Expand Up @@ -202,7 +203,7 @@
if (purgeResult !== undefined) {
return purgeResult.deletedInstanceCount > 0;
}
return false;

Check warning on line 206 in src/workflow/client/DaprWorkflowClient.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/client/DaprWorkflowClient.ts#L206

Added line #L206 was not covered by tests
}

/**
Expand Down
19 changes: 10 additions & 9 deletions src/workflow/runtime/WorkflowRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import { getFunctionName } from "../internal";
import { Settings } from "../../utils/Settings.util";
import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";
import { GrpcEndpoint } from "../../network/GrpcEndpoint";

/**
* Contains methods to register workflows and activities.
Expand All @@ -33,19 +34,19 @@
* @param {WorkflowClientOptions | undefined} options - Additional options for configuring WorkflowRuntime.
*/
constructor(options: Partial<WorkflowClientOptions> = {}) {
const hostAddress = this.generateHostAddress(options);
options.daprApiToken = this.generateDaprApiToken(options);
this.worker = this.buildInnerWorker(hostAddress, options);
const grpcEndpoint = this.generateEndpoint(options);
options.daprApiToken = this.getDaprApiToken(options);
this.worker = this.buildInnerWorker(grpcEndpoint.endpoint, options);
}

private generateHostAddress(options: Partial<WorkflowClientOptions>): string {
const host = options?.clientHost ?? Settings.getDefaultHost();
const port = options?.clientPort ?? Settings.getDefaultGrpcPort();
const hostAddress = `${host}:${port}`;
return hostAddress;
private generateEndpoint(options: Partial<WorkflowClientOptions>): GrpcEndpoint {
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
const host = options?.daprHost ?? Settings.getDefaultHost();
const port = options?.daprPort ?? Settings.getDefaultGrpcPort();
const uri = `${host}:${port}`;
return new GrpcEndpoint(uri);
}

private generateDaprApiToken(options: Partial<WorkflowClientOptions>): string | undefined {
private getDaprApiToken(options: Partial<WorkflowClientOptions>): string | undefined {
const daprApiToken = options?.daprApiToken ?? Settings.getDefaultApiToken();
return daprApiToken;
}
Expand All @@ -53,7 +54,7 @@
private buildInnerWorker(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcWorker {
let innerOptions = options?.grpcOptions;
if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
innerOptions = {

Check warning on line 57 in src/workflow/runtime/WorkflowRuntime.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/runtime/WorkflowRuntime.ts#L57

Added line #L57 was not covered by tests
...innerOptions,
interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
};
Expand Down Expand Up @@ -84,12 +85,12 @@
* @param {TWorkflow} workflow - The instance of the Workflow class being registered.
*/
public registerWorkflowWithName(name: string, workflow: TWorkflow): WorkflowRuntime {
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
const workflowContext = new WorkflowContext(ctx);
return workflow(workflowContext, input);

Check warning on line 90 in src/workflow/runtime/WorkflowRuntime.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/runtime/WorkflowRuntime.ts#L88-L90

Added lines #L88 - L90 were not covered by tests
};
this.worker.addNamedOrchestrator(name, workflowWrapper);
return this;

Check warning on line 93 in src/workflow/runtime/WorkflowRuntime.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/runtime/WorkflowRuntime.ts#L92-L93

Added lines #L92 - L93 were not covered by tests
}

/**
Expand Down Expand Up @@ -117,13 +118,13 @@
* @returns {WorkflowRuntime} The current instance of WorkflowRuntime.
*/
public registerActivityWithName(name: string, fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
const activityWrapper = (ctx: ActivityContext, intput: TInput): any => {
const wfActivityContext = new WorkflowActivityContext(ctx);
return fn(wfActivityContext, intput);

Check warning on line 123 in src/workflow/runtime/WorkflowRuntime.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/runtime/WorkflowRuntime.ts#L121-L123

Added lines #L121 - L123 were not covered by tests
};

this.worker.addNamedActivity(name, activityWrapper);
return this;

Check warning on line 127 in src/workflow/runtime/WorkflowRuntime.ts

View check run for this annotation

Codecov / codecov/patch

src/workflow/runtime/WorkflowRuntime.ts#L126-L127

Added lines #L126 - L127 were not covered by tests
}

/**
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/workflow/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
beforeEach(async () => {
// Start a worker, which will connect to the sidecar in a background thread
workflowClient = new DaprWorkflowClient({
clientHost,
clientPort,
daprHost: clientHost,
daprPort: clientPort,
});
workflowRuntime = new WorkflowRuntime({
clientHost,
clientPort,
daprHost: clientHost,
daprPort: clientPort,
});
});

Expand Down Expand Up @@ -209,7 +209,7 @@
expect(state?.runtimeStatus).toEqual(WorkflowRuntimeStatus.COMPLETED);
expect(state?.createdAt).toBeDefined();
expect(state?.lastUpdatedAt).toBeDefined();
expect(expectedCompletionSecond).toBeLessThanOrEqual(actualCompletionSecond!);

Check warning on line 212 in test/e2e/workflow/workflow.test.ts

View workflow job for this annotation

GitHub Actions / build (16.14.0)

Forbidden non-null assertion

Check warning on line 212 in test/e2e/workflow/workflow.test.ts

View workflow job for this annotation

GitHub Actions / test-e2e

Forbidden non-null assertion
}, 31000);

it("should wait for external events with a timeout - true", async () => {
Expand Down
Loading