Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client)!: Return a friendly type from handle.describe() #532

Merged
merged 3 commits into from
Mar 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,19 @@ export type DescribeWorkflowExecutionResponse = temporal.api.workflowservice.v1.
export type TerminateWorkflowExecutionResponse = temporal.api.workflowservice.v1.ITerminateWorkflowExecutionResponse;
export type RequestCancelWorkflowExecutionResponse =
temporal.api.workflowservice.v1.IRequestCancelWorkflowExecutionResponse;

export interface WorkflowExecutionDescription {
type: string;
workflowId: string;
runId: string;
taskQueue: string;
status: temporal.api.enums.v1.WorkflowExecutionStatus;
historyLength: Long;
startTime: Date;
executionTime?: Date;
closeTime?: Date;
memo?: Record<string, unknown>;
searchAttributes?: Record<string, unknown>;
parentExecution?: Required<temporal.api.common.v1.IWorkflowExecution>;
raw: DescribeWorkflowExecutionResponse;
}
35 changes: 33 additions & 2 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import {
decodeArrayFromPayloads,
decodeFromPayloadsAtIndex,
decodeMapFromPayloads,
decodeOptionalFailureToOptionalError,
encodeMapToPayloads,
encodeToPayloads,
Expand All @@ -21,8 +22,10 @@ import {
BaseWorkflowHandle,
compileRetryPolicy,
composeInterceptors,
optionalTsToDate,
QueryDefinition,
SignalDefinition,
tsToDate,
WithWorkflowArgs,
Workflow,
WorkflowNotFoundError,
Expand Down Expand Up @@ -57,6 +60,7 @@ import {
StartWorkflowExecutionRequest,
TerminateWorkflowExecutionResponse,
WorkflowExecution,
WorkflowExecutionDescription,
} from './types';
import { compileWorkflowOptions, WorkflowOptions, WorkflowSignalWithStartOptions } from './workflow-options';

Expand Down Expand Up @@ -113,7 +117,7 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
/**
* Describe the current workflow execution
*/
describe(): Promise<DescribeWorkflowExecutionResponse>;
describe(): Promise<WorkflowExecutionDescription>;

/**
* Readonly accessor to the underlying WorkflowClient
Expand Down Expand Up @@ -765,9 +769,36 @@ export class WorkflowClient {
async describe() {
const next = this.client._describeWorkflowHandler.bind(this.client);
const fn = interceptors.length ? composeInterceptors(interceptors, 'describe', next) : next;
return await fn({
const raw = await fn({
workflowExecution: { workflowId, runId },
});
return {
/* eslint-disable @typescript-eslint/no-non-null-assertion */
type: raw.workflowExecutionInfo!.type!.name!,
workflowId: raw.workflowExecutionInfo!.execution!.workflowId!,
runId: raw.workflowExecutionInfo!.execution!.runId!,
taskQueue: raw.workflowExecutionInfo!.taskQueue!,
status: raw.workflowExecutionInfo!.status!,
historyLength: raw.workflowExecutionInfo!.historyLength!,
startTime: tsToDate(raw.workflowExecutionInfo!.startTime!),
executionTime: optionalTsToDate(raw.workflowExecutionInfo!.executionTime),
closeTime: optionalTsToDate(raw.workflowExecutionInfo!.closeTime),
memo: await decodeMapFromPayloads(
this.client.options.loadedDataConverter,
raw.workflowExecutionInfo!.memo?.fields
),
searchAttributes: await decodeMapFromPayloads(
defaultDataConverter,
raw.workflowExecutionInfo!.searchAttributes?.indexedFields
),
parentExecution: raw.workflowExecutionInfo!.parentExecution
? {
workflowId: raw.workflowExecutionInfo!.parentExecution!.workflowId!,
runId: raw.workflowExecutionInfo!.parentExecution!.runId!,
}
: undefined,
raw,
};
},
async signal<Args extends any[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {
const next = this.client._signalWorkflowHandler.bind(this.client);
Expand Down
53 changes: 41 additions & 12 deletions packages/internal-non-workflow-common/src/codec-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
toPayload,
toPayloads,
} from '@temporalio/common';

import { DecodedPayload, DecodedProtoFailure, EncodedPayload, EncodedProtoFailure } from './codec-types';

export interface TypecheckedPayloadCodec {
Expand Down Expand Up @@ -63,7 +62,8 @@ export async function encodeOptional(
codec: PayloadCodec,
payloads: Payload[] | null | undefined
): Promise<EncodedPayload[] | null | undefined> {
if (!payloads) return payloads;
if (payloads === null) return null;
if (payloads === undefined) return undefined;
return (await codec.encode(payloads)) as EncodedPayload[];
}

Expand All @@ -72,7 +72,8 @@ export async function decodeOptional(
codec: PayloadCodec,
payloads: Payload[] | null | undefined
): Promise<DecodedPayload[] | null | undefined> {
if (!payloads) return payloads;
if (payloads === null) return null;
if (payloads === undefined) return undefined;
return (await codec.decode(payloads)) as DecodedPayload[];
}

Expand All @@ -86,7 +87,8 @@ export async function encodeOptionalSingle(
codec: PayloadCodec,
payload: Payload | null | undefined
): Promise<EncodedPayload | null | undefined> {
if (!payload) return payload;
if (payload === null) return null;
if (payload === undefined) return undefined;
return await encodeSingle(codec, payload);
}

Expand All @@ -100,7 +102,9 @@ export async function decodeOptionalSingle(
codec: PayloadCodec,
payload: Payload | null | undefined
): Promise<DecodedPayload | null | undefined> {
if (!payload) return payload;
if (payload === null) return null;
if (payload === undefined) return undefined;

return await decodeSingle(codec, payload);
}

Expand All @@ -127,12 +131,33 @@ export async function encodeToPayloads(
return payloads ? await payloadCodec.encode(payloads) : undefined;
}

/**
* Run {@link PayloadCodec.decode} and then {@link PayloadConverter.fromPayload} on values in `map`.
*/
export async function decodeMapFromPayloads<K extends string>(
converter: LoadedDataConverter,
map: Record<K, Payload> | null | undefined
): Promise<Record<K, unknown> | undefined> {
if (!map) return undefined;
const { payloadConverter, payloadCodec } = converter;
return Object.fromEntries(
await Promise.all(
Object.entries(map).map(async ([k, payload]): Promise<[K, unknown]> => {
const [decodedPayload] = await payloadCodec.decode([payload as Payload]);
const value = payloadConverter.fromPayload(decodedPayload);
return [k as K, value];
})
)
) as Record<K, unknown>;
}

/** Run {@link PayloadCodec.encode} on all values in `map` */
export async function encodeMap<K extends string>(
codec: PayloadCodec,
map: Record<K, Payload> | null | undefined
): Promise<Record<K, EncodedPayload> | null | undefined> {
if (!map) return map;
if (map === null) return null;
if (map === undefined) return undefined;
return Object.fromEntries(
await Promise.all(
Object.entries(map).map(async ([k, payload]): Promise<[K, EncodedPayload]> => {
Expand All @@ -143,11 +168,11 @@ export async function encodeMap<K extends string>(
}

/**
* Run {@link PayloadConverter.toPayload} and {@link PayloadCodec.encode} on values in `map`.
* Run {@link PayloadConverter.toPayload} and then {@link PayloadCodec.encode} on values in `map`.
*/
export async function encodeMapToPayloads<K extends string>(
converter: LoadedDataConverter,
map: Record<K, any>
map: Record<K, unknown>
): Promise<Record<K, Payload>> {
const { payloadConverter, payloadCodec } = converter;
return Object.fromEntries(
Expand Down Expand Up @@ -228,7 +253,8 @@ export async function encodeOptionalFailure(
codec: PayloadCodec,
failure: ProtoFailure | null | undefined
): Promise<EncodedProtoFailure | null | undefined> {
if (!failure) return failure;
if (failure === null) return null;
if (failure === undefined) return undefined;
return await encodeFailure(codec, failure);
}

Expand All @@ -239,7 +265,8 @@ export async function decodeOptionalFailure(
codec: PayloadCodec,
failure: ProtoFailure | null | undefined
): Promise<DecodedProtoFailure | null | undefined> {
if (!failure) return failure;
if (failure === null) return null;
if (failure === undefined) return undefined;
return await decodeFailure(codec, failure);
}

Expand Down Expand Up @@ -301,7 +328,8 @@ export async function decodeFailure(_codec: PayloadCodec, failure: ProtoFailure)
export function noopEncodeMap<K extends string>(
map: Record<K, Payload> | null | undefined
): Record<K, EncodedPayload> | null | undefined {
if (!map) return map;
if (map === null) return null;
if (map === undefined) return undefined;
return map as Record<K, EncodedPayload>;
}

Expand All @@ -312,6 +340,7 @@ export function noopEncodeMap<K extends string>(
export function noopDecodeMap<K extends string>(
map: Record<K, Payload> | null | undefined
): Record<K, DecodedPayload> | null | undefined {
if (!map) return map;
if (map === null) return null;
if (map === undefined) return undefined;
return map as Record<K, DecodedPayload>;
}
11 changes: 11 additions & 0 deletions packages/internal-workflow-common/src/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,14 @@ export function msToNumber(val: string | number): number {
}
return ms(val);
}

export function tsToDate(ts: Timestamp): Date {
return new Date(tsToMs(ts));
}

export function optionalTsToDate(ts: Timestamp | null | undefined): Date | undefined {
if (ts === undefined || ts === null) {
return undefined;
}
return new Date(tsToMs(ts));
}
81 changes: 50 additions & 31 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,28 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
t.regex(event.workflowTaskCompletedEventAttributes!.binaryChecksum!, /@temporalio\/worker@\d+\.\d+\.\d+/);
});

test('WorkflowHandle.describe result is wrapped', async (t) => {
const { client } = t.context;
const workflow = await client.start(workflows.argsAndReturn, {
args: ['hey', undefined, Buffer.from('def')],
taskQueue: 'test',
workflowId: uuid4(),
searchAttributes: {
CustomKeywordField: 'test-value',
},
memo: {
note: 'foo',
},
});
await workflow.result();
const execution = await workflow.describe();
t.deepEqual(execution.type, 'argsAndReturn');
t.deepEqual(execution.memo, { note: 'foo' });
t.true(execution.startTime instanceof Date);
t.is(execution.searchAttributes!.CustomKeywordField, 'test-value');
t.regex((execution.searchAttributes!.BinaryChecksums as string[])[0], /@temporalio\/worker@/);
});

test('WorkflowOptions are passed correctly with defaults', async (t) => {
const { client } = t.context;
const workflow = await client.start(workflows.argsAndReturn, {
Expand All @@ -547,22 +569,23 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
await workflow.result();
const execution = await workflow.describe();
t.deepEqual(
execution.workflowExecutionInfo?.type,
new iface.temporal.api.common.v1.WorkflowType({ name: 'argsAndReturn' })
);
t.deepEqual(execution.workflowExecutionInfo?.memo, new iface.temporal.api.common.v1.Memo({ fields: {} }));
t.deepEqual(Object.keys(execution.workflowExecutionInfo!.searchAttributes!.indexedFields!), ['BinaryChecksums']);
t.deepEqual(execution.type, 'argsAndReturn');
t.deepEqual(Object.keys(execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!), [
'BinaryChecksums',
]);

const checksums = defaultPayloadConverter.fromPayload(
execution.workflowExecutionInfo!.searchAttributes!.indexedFields!.BinaryChecksums!
execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.BinaryChecksums!
);
t.true(checksums instanceof Array && checksums.length === 1);
t.regex((checksums as string[])[0], /@temporalio\/worker@\d+\.\d+\.\d+/);
t.is(execution.executionConfig?.taskQueue?.name, 'test');
t.is(execution.executionConfig?.taskQueue?.kind, iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL);
t.is(execution.executionConfig?.workflowRunTimeout, null);
t.is(execution.executionConfig?.workflowExecutionTimeout, null);
t.is(execution.raw.executionConfig?.taskQueue?.name, 'test');
t.is(
execution.raw.executionConfig?.taskQueue?.kind,
iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL
);
t.is(execution.raw.executionConfig?.workflowRunTimeout, null);
t.is(execution.raw.executionConfig?.workflowExecutionTimeout, null);
});

test('WorkflowOptions are passed correctly', async (t) => {
Expand All @@ -584,22 +607,25 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
const execution = await workflow.describe();
t.deepEqual(
execution.workflowExecutionInfo?.type,
execution.raw.workflowExecutionInfo?.type,
new iface.temporal.api.common.v1.WorkflowType({ name: 'sleeper' })
);
t.deepEqual(await fromPayload(execution.workflowExecutionInfo!.memo!.fields!.a!), 'b');
t.deepEqual(await fromPayload(execution.raw.workflowExecutionInfo!.memo!.fields!.a!), 'b');
t.deepEqual(
await defaultPayloadConverter.fromPayload(
execution.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField!
execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField!
),
3
);
t.is(execution.executionConfig?.taskQueue?.name, 'test2');
t.is(execution.executionConfig?.taskQueue?.kind, iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL);
t.is(execution.raw.executionConfig?.taskQueue?.name, 'test2');
t.is(
execution.raw.executionConfig?.taskQueue?.kind,
iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL
);

t.is(tsToMs(execution.executionConfig!.workflowRunTimeout!), ms(options.workflowRunTimeout));
t.is(tsToMs(execution.executionConfig!.workflowExecutionTimeout!), ms(options.workflowExecutionTimeout));
t.is(tsToMs(execution.executionConfig!.defaultWorkflowTaskTimeout!), ms(options.workflowTaskTimeout));
t.is(tsToMs(execution.raw.executionConfig!.workflowRunTimeout!), ms(options.workflowRunTimeout));
t.is(tsToMs(execution.raw.executionConfig!.workflowExecutionTimeout!), ms(options.workflowExecutionTimeout));
t.is(tsToMs(execution.raw.executionConfig!.defaultWorkflowTaskTimeout!), ms(options.workflowTaskTimeout));
});

test('WorkflowHandle.result() throws if terminated', async (t) => {
Expand Down Expand Up @@ -667,7 +693,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
await workflow.result();
const info = await workflow.describe();
t.is(info.workflowExecutionInfo?.type?.name, 'sleeper');
t.is(info.raw.workflowExecutionInfo?.type?.name, 'sleeper');
const { history } = await client.service.getWorkflowExecutionHistory({
namespace,
execution: { workflowId: workflow.workflowId, runId: err.newExecutionRunId },
Expand Down Expand Up @@ -753,14 +779,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
return;
}
t.is(failure.message, 'unhandled rejection');
t.true(
failure.stackTrace?.includes(
dedent`
Error: unhandled rejection
at eval (webpack-internal:///./lib/workflows/unhandled-rejection.js
`
)
);
t.true(failure.stackTrace?.includes(`Error: unhandled rejection`));
t.is(failure.cause?.message, 'root failure');
},
{ minTimeout: 300, factor: 1, retries: 100 }
Expand Down Expand Up @@ -815,8 +834,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
});
await t.throwsAsync(handle.result());
const handleForSecondAttempt = client.getHandle(workflowId);
const { workflowExecutionInfo } = await handleForSecondAttempt.describe();
t.not(workflowExecutionInfo?.execution?.runId, handle.originalRunId);
const { raw } = await handleForSecondAttempt.describe();
t.not(raw.workflowExecutionInfo?.execution?.runId, handle.originalRunId);
});

test('Workflow RetryPolicy ignored with nonRetryable failure', async (t) => {
Expand All @@ -835,7 +854,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
await t.throwsAsync(handle.result());
const res = await handle.describe();
t.is(
res.workflowExecutionInfo?.status,
res.raw.workflowExecutionInfo?.status,
iface.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED
);
});
Expand Down
Loading