Skip to content

Commit

Permalink
feat(client)!: Return a friendly type from handle.describe() (#532)
Browse files Browse the repository at this point in the history
* feat(client)!: Return a friendly type from handle.describe()

* Address comments

* Fix tests
  • Loading branch information
lorensr authored Mar 12, 2022
1 parent f4fad3a commit 95c5bd2
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 57 deletions.
16 changes: 16 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,19 @@ export type DescribeWorkflowExecutionResponse = temporal.api.workflowservice.v1.
export type TerminateWorkflowExecutionResponse = temporal.api.workflowservice.v1.ITerminateWorkflowExecutionResponse;
export type RequestCancelWorkflowExecutionResponse =
temporal.api.workflowservice.v1.IRequestCancelWorkflowExecutionResponse;

export interface WorkflowExecutionDescription {
type: string;
workflowId: string;
runId: string;
taskQueue: string;
status: temporal.api.enums.v1.WorkflowExecutionStatus;
historyLength: Long;
startTime: Date;
executionTime?: Date;
closeTime?: Date;
memo?: Record<string, unknown>;
searchAttributes?: Record<string, unknown>;
parentExecution?: Required<temporal.api.common.v1.IWorkflowExecution>;
raw: DescribeWorkflowExecutionResponse;
}
35 changes: 33 additions & 2 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import {
decodeArrayFromPayloads,
decodeFromPayloadsAtIndex,
decodeMapFromPayloads,
decodeOptionalFailureToOptionalError,
encodeMapToPayloads,
encodeToPayloads,
Expand All @@ -21,8 +22,10 @@ import {
BaseWorkflowHandle,
compileRetryPolicy,
composeInterceptors,
optionalTsToDate,
QueryDefinition,
SignalDefinition,
tsToDate,
WithWorkflowArgs,
Workflow,
WorkflowNotFoundError,
Expand Down Expand Up @@ -57,6 +60,7 @@ import {
StartWorkflowExecutionRequest,
TerminateWorkflowExecutionResponse,
WorkflowExecution,
WorkflowExecutionDescription,
} from './types';
import { compileWorkflowOptions, WorkflowOptions, WorkflowSignalWithStartOptions } from './workflow-options';

Expand Down Expand Up @@ -113,7 +117,7 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
/**
* Describe the current workflow execution
*/
describe(): Promise<DescribeWorkflowExecutionResponse>;
describe(): Promise<WorkflowExecutionDescription>;

/**
* Readonly accessor to the underlying WorkflowClient
Expand Down Expand Up @@ -765,9 +769,36 @@ export class WorkflowClient {
async describe() {
const next = this.client._describeWorkflowHandler.bind(this.client);
const fn = interceptors.length ? composeInterceptors(interceptors, 'describe', next) : next;
return await fn({
const raw = await fn({
workflowExecution: { workflowId, runId },
});
return {
/* eslint-disable @typescript-eslint/no-non-null-assertion */
type: raw.workflowExecutionInfo!.type!.name!,
workflowId: raw.workflowExecutionInfo!.execution!.workflowId!,
runId: raw.workflowExecutionInfo!.execution!.runId!,
taskQueue: raw.workflowExecutionInfo!.taskQueue!,
status: raw.workflowExecutionInfo!.status!,
historyLength: raw.workflowExecutionInfo!.historyLength!,
startTime: tsToDate(raw.workflowExecutionInfo!.startTime!),
executionTime: optionalTsToDate(raw.workflowExecutionInfo!.executionTime),
closeTime: optionalTsToDate(raw.workflowExecutionInfo!.closeTime),
memo: await decodeMapFromPayloads(
this.client.options.loadedDataConverter,
raw.workflowExecutionInfo!.memo?.fields
),
searchAttributes: await decodeMapFromPayloads(
defaultDataConverter,
raw.workflowExecutionInfo!.searchAttributes?.indexedFields
),
parentExecution: raw.workflowExecutionInfo!.parentExecution
? {
workflowId: raw.workflowExecutionInfo!.parentExecution!.workflowId!,
runId: raw.workflowExecutionInfo!.parentExecution!.runId!,
}
: undefined,
raw,
};
},
async signal<Args extends any[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {
const next = this.client._signalWorkflowHandler.bind(this.client);
Expand Down
53 changes: 41 additions & 12 deletions packages/internal-non-workflow-common/src/codec-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
toPayload,
toPayloads,
} from '@temporalio/common';

import { DecodedPayload, DecodedProtoFailure, EncodedPayload, EncodedProtoFailure } from './codec-types';

export interface TypecheckedPayloadCodec {
Expand Down Expand Up @@ -63,7 +62,8 @@ export async function encodeOptional(
codec: PayloadCodec,
payloads: Payload[] | null | undefined
): Promise<EncodedPayload[] | null | undefined> {
if (!payloads) return payloads;
if (payloads === null) return null;
if (payloads === undefined) return undefined;
return (await codec.encode(payloads)) as EncodedPayload[];
}

Expand All @@ -72,7 +72,8 @@ export async function decodeOptional(
codec: PayloadCodec,
payloads: Payload[] | null | undefined
): Promise<DecodedPayload[] | null | undefined> {
if (!payloads) return payloads;
if (payloads === null) return null;
if (payloads === undefined) return undefined;
return (await codec.decode(payloads)) as DecodedPayload[];
}

Expand All @@ -86,7 +87,8 @@ export async function encodeOptionalSingle(
codec: PayloadCodec,
payload: Payload | null | undefined
): Promise<EncodedPayload | null | undefined> {
if (!payload) return payload;
if (payload === null) return null;
if (payload === undefined) return undefined;
return await encodeSingle(codec, payload);
}

Expand All @@ -100,7 +102,9 @@ export async function decodeOptionalSingle(
codec: PayloadCodec,
payload: Payload | null | undefined
): Promise<DecodedPayload | null | undefined> {
if (!payload) return payload;
if (payload === null) return null;
if (payload === undefined) return undefined;

return await decodeSingle(codec, payload);
}

Expand All @@ -127,12 +131,33 @@ export async function encodeToPayloads(
return payloads ? await payloadCodec.encode(payloads) : undefined;
}

/**
* Run {@link PayloadCodec.decode} and then {@link PayloadConverter.fromPayload} on values in `map`.
*/
export async function decodeMapFromPayloads<K extends string>(
converter: LoadedDataConverter,
map: Record<K, Payload> | null | undefined
): Promise<Record<K, unknown> | undefined> {
if (!map) return undefined;
const { payloadConverter, payloadCodec } = converter;
return Object.fromEntries(
await Promise.all(
Object.entries(map).map(async ([k, payload]): Promise<[K, unknown]> => {
const [decodedPayload] = await payloadCodec.decode([payload as Payload]);
const value = payloadConverter.fromPayload(decodedPayload);
return [k as K, value];
})
)
) as Record<K, unknown>;
}

/** Run {@link PayloadCodec.encode} on all values in `map` */
export async function encodeMap<K extends string>(
codec: PayloadCodec,
map: Record<K, Payload> | null | undefined
): Promise<Record<K, EncodedPayload> | null | undefined> {
if (!map) return map;
if (map === null) return null;
if (map === undefined) return undefined;
return Object.fromEntries(
await Promise.all(
Object.entries(map).map(async ([k, payload]): Promise<[K, EncodedPayload]> => {
Expand All @@ -143,11 +168,11 @@ export async function encodeMap<K extends string>(
}

/**
* Run {@link PayloadConverter.toPayload} and {@link PayloadCodec.encode} on values in `map`.
* Run {@link PayloadConverter.toPayload} and then {@link PayloadCodec.encode} on values in `map`.
*/
export async function encodeMapToPayloads<K extends string>(
converter: LoadedDataConverter,
map: Record<K, any>
map: Record<K, unknown>
): Promise<Record<K, Payload>> {
const { payloadConverter, payloadCodec } = converter;
return Object.fromEntries(
Expand Down Expand Up @@ -228,7 +253,8 @@ export async function encodeOptionalFailure(
codec: PayloadCodec,
failure: ProtoFailure | null | undefined
): Promise<EncodedProtoFailure | null | undefined> {
if (!failure) return failure;
if (failure === null) return null;
if (failure === undefined) return undefined;
return await encodeFailure(codec, failure);
}

Expand All @@ -239,7 +265,8 @@ export async function decodeOptionalFailure(
codec: PayloadCodec,
failure: ProtoFailure | null | undefined
): Promise<DecodedProtoFailure | null | undefined> {
if (!failure) return failure;
if (failure === null) return null;
if (failure === undefined) return undefined;
return await decodeFailure(codec, failure);
}

Expand Down Expand Up @@ -301,7 +328,8 @@ export async function decodeFailure(_codec: PayloadCodec, failure: ProtoFailure)
export function noopEncodeMap<K extends string>(
map: Record<K, Payload> | null | undefined
): Record<K, EncodedPayload> | null | undefined {
if (!map) return map;
if (map === null) return null;
if (map === undefined) return undefined;
return map as Record<K, EncodedPayload>;
}

Expand All @@ -312,6 +340,7 @@ export function noopEncodeMap<K extends string>(
export function noopDecodeMap<K extends string>(
map: Record<K, Payload> | null | undefined
): Record<K, DecodedPayload> | null | undefined {
if (!map) return map;
if (map === null) return null;
if (map === undefined) return undefined;
return map as Record<K, DecodedPayload>;
}
11 changes: 11 additions & 0 deletions packages/internal-workflow-common/src/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,14 @@ export function msToNumber(val: string | number): number {
}
return ms(val);
}

export function tsToDate(ts: Timestamp): Date {
return new Date(tsToMs(ts));
}

export function optionalTsToDate(ts: Timestamp | null | undefined): Date | undefined {
if (ts === undefined || ts === null) {
return undefined;
}
return new Date(tsToMs(ts));
}
81 changes: 50 additions & 31 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,28 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
t.regex(event.workflowTaskCompletedEventAttributes!.binaryChecksum!, /@temporalio\/worker@\d+\.\d+\.\d+/);
});

test('WorkflowHandle.describe result is wrapped', async (t) => {
const { client } = t.context;
const workflow = await client.start(workflows.argsAndReturn, {
args: ['hey', undefined, Buffer.from('def')],
taskQueue: 'test',
workflowId: uuid4(),
searchAttributes: {
CustomKeywordField: 'test-value',
},
memo: {
note: 'foo',
},
});
await workflow.result();
const execution = await workflow.describe();
t.deepEqual(execution.type, 'argsAndReturn');
t.deepEqual(execution.memo, { note: 'foo' });
t.true(execution.startTime instanceof Date);
t.is(execution.searchAttributes!.CustomKeywordField, 'test-value');
t.regex((execution.searchAttributes!.BinaryChecksums as string[])[0], /@temporalio\/worker@/);
});

test('WorkflowOptions are passed correctly with defaults', async (t) => {
const { client } = t.context;
const workflow = await client.start(workflows.argsAndReturn, {
Expand All @@ -547,22 +569,23 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
await workflow.result();
const execution = await workflow.describe();
t.deepEqual(
execution.workflowExecutionInfo?.type,
new iface.temporal.api.common.v1.WorkflowType({ name: 'argsAndReturn' })
);
t.deepEqual(execution.workflowExecutionInfo?.memo, new iface.temporal.api.common.v1.Memo({ fields: {} }));
t.deepEqual(Object.keys(execution.workflowExecutionInfo!.searchAttributes!.indexedFields!), ['BinaryChecksums']);
t.deepEqual(execution.type, 'argsAndReturn');
t.deepEqual(Object.keys(execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!), [
'BinaryChecksums',
]);

const checksums = defaultPayloadConverter.fromPayload(
execution.workflowExecutionInfo!.searchAttributes!.indexedFields!.BinaryChecksums!
execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.BinaryChecksums!
);
t.true(checksums instanceof Array && checksums.length === 1);
t.regex((checksums as string[])[0], /@temporalio\/worker@\d+\.\d+\.\d+/);
t.is(execution.executionConfig?.taskQueue?.name, 'test');
t.is(execution.executionConfig?.taskQueue?.kind, iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL);
t.is(execution.executionConfig?.workflowRunTimeout, null);
t.is(execution.executionConfig?.workflowExecutionTimeout, null);
t.is(execution.raw.executionConfig?.taskQueue?.name, 'test');
t.is(
execution.raw.executionConfig?.taskQueue?.kind,
iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL
);
t.is(execution.raw.executionConfig?.workflowRunTimeout, null);
t.is(execution.raw.executionConfig?.workflowExecutionTimeout, null);
});

test('WorkflowOptions are passed correctly', async (t) => {
Expand All @@ -584,22 +607,25 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
const execution = await workflow.describe();
t.deepEqual(
execution.workflowExecutionInfo?.type,
execution.raw.workflowExecutionInfo?.type,
new iface.temporal.api.common.v1.WorkflowType({ name: 'sleeper' })
);
t.deepEqual(await fromPayload(execution.workflowExecutionInfo!.memo!.fields!.a!), 'b');
t.deepEqual(await fromPayload(execution.raw.workflowExecutionInfo!.memo!.fields!.a!), 'b');
t.deepEqual(
await defaultPayloadConverter.fromPayload(
execution.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField!
execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField!
),
3
);
t.is(execution.executionConfig?.taskQueue?.name, 'test2');
t.is(execution.executionConfig?.taskQueue?.kind, iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL);
t.is(execution.raw.executionConfig?.taskQueue?.name, 'test2');
t.is(
execution.raw.executionConfig?.taskQueue?.kind,
iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL
);

t.is(tsToMs(execution.executionConfig!.workflowRunTimeout!), ms(options.workflowRunTimeout));
t.is(tsToMs(execution.executionConfig!.workflowExecutionTimeout!), ms(options.workflowExecutionTimeout));
t.is(tsToMs(execution.executionConfig!.defaultWorkflowTaskTimeout!), ms(options.workflowTaskTimeout));
t.is(tsToMs(execution.raw.executionConfig!.workflowRunTimeout!), ms(options.workflowRunTimeout));
t.is(tsToMs(execution.raw.executionConfig!.workflowExecutionTimeout!), ms(options.workflowExecutionTimeout));
t.is(tsToMs(execution.raw.executionConfig!.defaultWorkflowTaskTimeout!), ms(options.workflowTaskTimeout));
});

test('WorkflowHandle.result() throws if terminated', async (t) => {
Expand Down Expand Up @@ -667,7 +693,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
await workflow.result();
const info = await workflow.describe();
t.is(info.workflowExecutionInfo?.type?.name, 'sleeper');
t.is(info.raw.workflowExecutionInfo?.type?.name, 'sleeper');
const { history } = await client.service.getWorkflowExecutionHistory({
namespace,
execution: { workflowId: workflow.workflowId, runId: err.newExecutionRunId },
Expand Down Expand Up @@ -753,14 +779,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
return;
}
t.is(failure.message, 'unhandled rejection');
t.true(
failure.stackTrace?.includes(
dedent`
Error: unhandled rejection
at eval (webpack-internal:///./lib/workflows/unhandled-rejection.js
`
)
);
t.true(failure.stackTrace?.includes(`Error: unhandled rejection`));
t.is(failure.cause?.message, 'root failure');
},
{ minTimeout: 300, factor: 1, retries: 100 }
Expand Down Expand Up @@ -815,8 +834,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
await t.throwsAsync(handle.result());
const handleForSecondAttempt = client.getHandle(workflowId);
const { workflowExecutionInfo } = await handleForSecondAttempt.describe();
t.not(workflowExecutionInfo?.execution?.runId, handle.originalRunId);
const { raw } = await handleForSecondAttempt.describe();
t.not(raw.workflowExecutionInfo?.execution?.runId, handle.originalRunId);
});

test('Workflow RetryPolicy ignored with nonRetryable failure', async (t) => {
Expand All @@ -835,7 +854,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
await t.throwsAsync(handle.result());
const res = await handle.describe();
t.is(
res.workflowExecutionInfo?.status,
res.raw.workflowExecutionInfo?.status,
iface.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED
);
});
Expand Down
Loading

0 comments on commit 95c5bd2

Please sign in to comment.