From 65ec6ae0c890e57e5a4f51cb32679c406e623689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 4 Feb 2025 10:17:44 +0100 Subject: [PATCH] refactor(core): Consolidate execution lifecycle hooks even more + additional tests (#12898) --- packages/@n8n/api-types/src/scaling.ts | 2 +- packages/cli/src/active-executions.ts | 6 +- .../execution-lifecycle-hooks.test.ts | 125 +++++++----- .../__tests__/save-execution-progress.test.ts | 182 +++++++++++------ .../execution-lifecycle-hooks.ts | 185 ++++++++++-------- .../save-execution-progress.ts | 77 +++----- .../execution-lifecycle/to-save-settings.ts | 9 +- .../executions/execution-recovery.service.ts | 2 +- packages/cli/src/scaling/job-processor.ts | 4 +- .../src/workflow-execute-additional-data.ts | 4 - packages/cli/src/workflow-runner.ts | 37 ++-- packages/workflow/src/Interfaces.ts | 7 +- packages/workflow/src/WorkflowHooks.ts | 18 +- 13 files changed, 356 insertions(+), 302 deletions(-) diff --git a/packages/@n8n/api-types/src/scaling.ts b/packages/@n8n/api-types/src/scaling.ts index f0c3627e84556..30db754a11269 100644 --- a/packages/@n8n/api-types/src/scaling.ts +++ b/packages/@n8n/api-types/src/scaling.ts @@ -6,7 +6,7 @@ export type RunningJobSummary = { workflowName: string; mode: WorkflowExecuteMode; startedAt: Date; - retryOf: string; + retryOf?: string; status: ExecutionStatus; }; diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index fb9cd0270638a..e00b693be61bf 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -61,9 +61,7 @@ export class ActiveExecutions { workflowId: executionData.workflowData.id, }; - if (executionData.retryOf !== undefined) { - fullExecutionData.retryOf = executionData.retryOf.toString(); - } + fullExecutionData.retryOf = executionData.retryOf ?? undefined; const workflowId = executionData.workflowData.id; if (workflowId !== undefined && isWorkflowIdValid(workflowId)) { @@ -183,7 +181,7 @@ export class ActiveExecutions { data = this.activeExecutions[id]; returnData.push({ id, - retryOf: data.executionData.retryOf, + retryOf: data.executionData.retryOf ?? undefined, startedAt: data.startedAt, mode: data.executionData.executionMode, workflowId: data.executionData.workflowData.id, diff --git a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts index 46f27d3541e56..38ea6de196ea2 100644 --- a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts @@ -159,35 +159,77 @@ describe('Execution Lifecycle Hooks', () => { }); }; - describe('getWorkflowHooksMain', () => { - beforeEach(() => { - hooks = getWorkflowHooksMain( - { + const externalHooksTests = () => { + describe('workflowExecuteBefore', () => { + it('should run workflow.preExecute hook', async () => { + await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [ + workflow, executionMode, + ]); + }); + }); + + describe('workflowExecuteAfter', () => { + it('should run workflow.postExecute hook', async () => { + await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + + expect(externalHooks.run).toHaveBeenCalledWith('workflow.postExecute', [ + successfulRun, workflowData, - pushRef, - retryOf, - }, - executionId, - ); + executionId, + ]); + }); + }); + }; + + const statisticsTests = () => { + describe('statistics events', () => { + it('workflowExecuteAfter should emit workflowExecutionCompleted statistics event', async () => { + await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + + expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { + workflowData, + fullRunData: successfulRun, + }); + }); + + it('nodeFetchedData should handle nodeFetchedData statistics event', async () => { + await hooks.executeHookFunctions('nodeFetchedData', [workflowId, node]); + + expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', { + workflowId, + node, + }); + }); + }); + }; + + describe('getWorkflowHooksMain', () => { + const createHooks = () => + getWorkflowHooksMain({ executionMode, workflowData, pushRef, retryOf }, executionId); + + beforeEach(() => { + hooks = createHooks(); }); workflowEventTests(); nodeEventsTests(); + externalHooksTests(); + statisticsTests(); it('should setup the correct set of hooks', () => { expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks.mode).toBe('manual'); expect(hooks.executionId).toBe(executionId); expect(hooks.workflowData).toEqual(workflowData); - expect(hooks.pushRef).toEqual('test-push-ref'); - expect(hooks.retryOf).toEqual('test-retry-of'); const { hookFunctions } = hooks; expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(3); + expect(hookFunctions.nodeExecuteAfter).toHaveLength(2); expect(hookFunctions.workflowExecuteBefore).toHaveLength(3); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(5); expect(hookFunctions.nodeFetchedData).toHaveLength(1); expect(hookFunctions.sendResponse).toHaveLength(0); }); @@ -219,6 +261,9 @@ describe('Execution Lifecycle Hooks', () => { it('should save execution progress when enabled', async () => { workflowData.settings = { saveExecutionProgress: true }; + hooks = createHooks(); + + expect(hooks.hookFunctions.nodeExecuteAfter).toHaveLength(3); await hooks.executeHookFunctions('nodeExecuteAfter', [ nodeName, @@ -234,6 +279,9 @@ describe('Execution Lifecycle Hooks', () => { it('should not save execution progress when disabled', async () => { workflowData.settings = { saveExecutionProgress: false }; + hooks = createHooks(); + + expect(hooks.hookFunctions.nodeExecuteAfter).toHaveLength(2); await hooks.executeHookFunctions('nodeExecuteAfter', [ nodeName, @@ -365,6 +413,7 @@ describe('Execution Lifecycle Hooks', () => { it('should soft delete manual executions when manual saving is disabled', async () => { hooks.workflowData.settings = { saveManualExecutions: false }; + hooks = createHooks(); await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); @@ -373,6 +422,7 @@ describe('Execution Lifecycle Hooks', () => { it('should not soft delete manual executions with waitTill', async () => { hooks.workflowData.settings = { saveManualExecutions: false }; + hooks = createHooks(); await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]); @@ -458,32 +508,18 @@ describe('Execution Lifecycle Hooks', () => { }); }); - describe('statistics events', () => { - it('workflowExecuteAfter should emit workflowExecutionCompleted statistics event', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); - - expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { - workflowData, - fullRunData: successfulRun, - }); - }); - - it('nodeFetchedData should handle nodeFetchedData statistics event', async () => { - await hooks.executeHookFunctions('nodeFetchedData', [workflowId, node]); - - expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', { - workflowId, - node, - }); - }); - }); - describe("when pushRef isn't set", () => { beforeEach(() => { - hooks = getWorkflowHooksMain({ executionMode, workflowData }, executionId); + hooks = getWorkflowHooksMain({ executionMode, workflowData, retryOf }, executionId); }); - it('should not send any push events', async () => { + it('should not setup any push hooks', async () => { + const { hookFunctions } = hooks; + expect(hookFunctions.nodeExecuteBefore).toHaveLength(1); + expect(hookFunctions.nodeExecuteAfter).toHaveLength(1); + expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); + await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); await hooks.executeHookFunctions('nodeExecuteAfter', [ nodeName, @@ -507,20 +543,19 @@ describe('Execution Lifecycle Hooks', () => { }); workflowEventTests(); + externalHooksTests(); it('should setup the correct set of hooks', () => { expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks.mode).toBe('manual'); expect(hooks.executionId).toBe(executionId); expect(hooks.workflowData).toEqual(workflowData); - expect(hooks.pushRef).toEqual('test-push-ref'); - expect(hooks.retryOf).toEqual('test-retry-of'); const { hookFunctions } = hooks; expect(hookFunctions.nodeExecuteBefore).toHaveLength(0); expect(hookFunctions.nodeExecuteAfter).toHaveLength(0); expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(3); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); expect(hookFunctions.nodeFetchedData).toHaveLength(0); expect(hookFunctions.sendResponse).toHaveLength(0); }); @@ -584,18 +619,18 @@ describe('Execution Lifecycle Hooks', () => { }); nodeEventsTests(); + externalHooksTests(); + statisticsTests(); it('should setup the correct set of hooks', () => { expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks.mode).toBe('manual'); expect(hooks.executionId).toBe(executionId); expect(hooks.workflowData).toEqual(workflowData); - expect(hooks.pushRef).toEqual('test-push-ref'); - expect(hooks.retryOf).toEqual('test-retry-of'); const { hookFunctions } = hooks; expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(3); + expect(hookFunctions.nodeExecuteAfter).toHaveLength(2); expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); expect(hookFunctions.nodeFetchedData).toHaveLength(1); @@ -680,20 +715,20 @@ describe('Execution Lifecycle Hooks', () => { workflowEventTests(); nodeEventsTests(); + externalHooksTests(); + statisticsTests(); it('should setup the correct set of hooks', () => { expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks.mode).toBe('manual'); expect(hooks.executionId).toBe(executionId); expect(hooks.workflowData).toEqual(workflowData); - expect(hooks.pushRef).toBeUndefined(); - expect(hooks.retryOf).toBeUndefined(); const { hookFunctions } = hooks; expect(hookFunctions.nodeExecuteBefore).toHaveLength(1); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(2); + expect(hookFunctions.nodeExecuteAfter).toHaveLength(1); expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(3); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); expect(hookFunctions.nodeFetchedData).toHaveLength(1); expect(hookFunctions.sendResponse).toHaveLength(0); }); diff --git a/packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts b/packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts index 863006d9e7d01..8c90c865fd589 100644 --- a/packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts @@ -1,100 +1,152 @@ +import { mock } from 'jest-mock-extended'; import { ErrorReporter } from 'n8n-core'; import { Logger } from 'n8n-core'; -import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow'; +import type { IRunExecutionData, ITaskData } from 'n8n-workflow'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutionResponse } from '@/interfaces'; import { mockInstance } from '@test/mocking'; import { saveExecutionProgress } from '../save-execution-progress'; -import * as fnModule from '../to-save-settings'; -mockInstance(Logger); -const errorReporter = mockInstance(ErrorReporter); -const executionRepository = mockInstance(ExecutionRepository); +describe('saveExecutionProgress', () => { + mockInstance(Logger); + const errorReporter = mockInstance(ErrorReporter); + const executionRepository = mockInstance(ExecutionRepository); -afterEach(() => { - jest.clearAllMocks(); -}); + afterEach(() => { + jest.resetAllMocks(); + }); + + const workflowId = 'some-workflow-id'; + const executionId = 'some-execution-id'; + const nodeName = 'My Node'; + const taskData = mock(); + const runExecutionData = mock(); + + const commonArgs = [workflowId, executionId, nodeName, taskData, runExecutionData] as const; + + test('should not try to update non-existent executions', async () => { + executionRepository.findSingleExecution.mockResolvedValue(undefined); -const commonArgs: [IWorkflowBase, string, string, ITaskData, IRunExecutionData, string] = [ - {} as IWorkflowBase, - 'some-execution-id', - 'My Node', - {} as ITaskData, - {} as IRunExecutionData, - 'some-session-id', -]; - -const commonSettings = { error: true, success: true, manual: true }; - -test('should ignore if save settings say so', async () => { - jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ - ...commonSettings, - progress: false, + await saveExecutionProgress(...commonArgs); + expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); }); - await saveExecutionProgress(...commonArgs); + test('should handle DB errors on execution lookup', async () => { + const error = new Error('Something went wrong'); + executionRepository.findSingleExecution.mockImplementation(() => { + throw error; + }); - expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); -}); + await saveExecutionProgress(...commonArgs); -test('should ignore on leftover async call', async () => { - jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ - ...commonSettings, - progress: true, + expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); + expect(errorReporter.error).toHaveBeenCalledWith(error); }); - executionRepository.findSingleExecution.mockResolvedValue({ - finished: true, - } as IExecutionResponse); + test('should handle DB errors when updating the execution', async () => { + const error = new Error('Something went wrong'); + executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse); + executionRepository.updateExistingExecution.mockImplementation(() => { + throw error; + }); - await saveExecutionProgress(...commonArgs); + await saveExecutionProgress(...commonArgs); - expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); -}); + expect(executionRepository.findSingleExecution).toHaveBeenCalled(); + expect(executionRepository.updateExistingExecution).toHaveBeenCalled(); + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + + test('should not try to update finished executions', async () => { + executionRepository.findSingleExecution.mockResolvedValue( + mock({ + finished: true, + }), + ); + + await saveExecutionProgress(...commonArgs); -test('should update execution when saving progress is enabled', async () => { - jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ - ...commonSettings, - progress: true, + expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); }); - executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse); + test('should populate `.data` when it is missing', async () => { + const fullExecutionData = {} as IExecutionResponse; + executionRepository.findSingleExecution.mockResolvedValue(fullExecutionData); - await saveExecutionProgress(...commonArgs); + await saveExecutionProgress(...commonArgs); - expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', { - data: { - executionData: undefined, - resultData: { - lastNodeExecuted: 'My Node', - runData: { - 'My Node': [{}], + expect(fullExecutionData).toEqual({ + data: { + executionData: runExecutionData.executionData, + resultData: { + lastNodeExecuted: nodeName, + runData: { + [nodeName]: [taskData], + }, }, + startData: {}, }, - startData: {}, - }, - status: 'running', - }); + status: 'running', + }); - expect(errorReporter.error).not.toHaveBeenCalled(); -}); + expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( + executionId, + fullExecutionData, + ); -test('should report error on failure', async () => { - jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ - ...commonSettings, - progress: true, + expect(errorReporter.error).not.toHaveBeenCalled(); }); - const error = new Error('Something went wrong'); + test('should augment `.data` if it already exists', async () => { + const fullExecutionData = { + data: { + startData: {}, + resultData: { + runData: { + [nodeName]: [{}], + }, + }, + }, + } as unknown as IExecutionResponse; + executionRepository.findSingleExecution.mockResolvedValue(fullExecutionData); + + await saveExecutionProgress(...commonArgs); + + expect(fullExecutionData).toEqual({ + data: { + executionData: runExecutionData.executionData, + resultData: { + lastNodeExecuted: nodeName, + runData: { + [nodeName]: [{}, taskData], + }, + }, + startData: {}, + }, + status: 'running', + }); - executionRepository.findSingleExecution.mockImplementation(() => { - throw error; + expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( + executionId, + fullExecutionData, + ); }); - await saveExecutionProgress(...commonArgs); + test('should set last executed node correctly', async () => { + const fullExecutionData = { + data: { + resultData: { + lastNodeExecuted: 'Another Node', + runData: {}, + }, + }, + } as unknown as IExecutionResponse; + executionRepository.findSingleExecution.mockResolvedValue(fullExecutionData); - expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); - expect(errorReporter.error).toHaveBeenCalledWith(error); + await saveExecutionProgress(...commonArgs); + + expect(fullExecutionData.data.resultData.lastNodeExecuted).toEqual(nodeName); + }); }); diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts index bb0542dba058e..d3f23797a5caf 100644 --- a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -10,7 +10,6 @@ import type { ITaskData, IWorkflowBase, IWorkflowExecuteHooks, - IWorkflowHooksOptionalParameters, WorkflowExecuteMode, IWorkflowExecutionDataProcess, Workflow, @@ -32,7 +31,13 @@ import { prepareExecutionDataForDbUpdate, updateExistingExecution, } from './shared/shared-hook-functions'; -import { toSaveSettings } from './to-save-settings'; +import { type ExecutionSaveSettings, toSaveSettings } from './to-save-settings'; + +type HooksSetupParameters = { + saveSettings: ExecutionSaveSettings; + pushRef?: string; + retryOf?: string; +}; function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks { const result: IWorkflowExecuteHooks = { @@ -91,19 +96,16 @@ function hookFunctionsNodeEvents(): IWorkflowExecuteHooks { /** * Returns hook functions to push data to Editor-UI */ -function hookFunctionsPush(): IWorkflowExecuteHooks { +function hookFunctionsPush({ pushRef, retryOf }: HooksSetupParameters): IWorkflowExecuteHooks { + if (!pushRef) return {}; const logger = Container.get(Logger); const pushInstance = Container.get(Push); return { nodeExecuteBefore: [ async function (this: WorkflowHooks, nodeName: string): Promise { - const { pushRef, executionId } = this; + const { executionId } = this; // Push data to session which started workflow before each // node which starts rendering - if (pushRef === undefined) { - return; - } - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { executionId, pushRef, @@ -115,12 +117,8 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { ], nodeExecuteAfter: [ async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { - const { pushRef, executionId } = this; + const { executionId } = this; // Push data to session which started workflow after each rendered node - if (pushRef === undefined) { - return; - } - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { executionId, pushRef, @@ -135,7 +133,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { ], workflowExecuteBefore: [ async function (this: WorkflowHooks, _workflow, data): Promise { - const { pushRef, executionId } = this; + const { executionId } = this; const { id: workflowId, name: workflowName } = this.workflowData; logger.debug('Executing hook (hookFunctionsPush)', { executionId, @@ -143,9 +141,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId, }); // Push data to session which started the workflow - if (pushRef === undefined) { - return; - } pushInstance.send( { type: 'executionStarted', @@ -153,7 +148,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { executionId, mode: this.mode, startedAt: new Date(), - retryOf: this.retryOf, + retryOf, workflowId, workflowName, flattedRunData: data?.resultData.runData @@ -167,9 +162,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { ], workflowExecuteAfter: [ async function (this: WorkflowHooks, fullRunData: IRun): Promise { - const { pushRef, executionId } = this; - if (pushRef === undefined) return; - + const { executionId } = this; const { id: workflowId } = this.workflowData; logger.debug('Executing hook (hookFunctionsPush)', { executionId, @@ -192,7 +185,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { }; } -function hookFunctionsPreExecute(): IWorkflowExecuteHooks { +function hookFunctionsExternalHooks(): IWorkflowExecuteHooks { const externalHooks = Container.get(ExternalHooks); return { workflowExecuteBefore: [ @@ -200,6 +193,21 @@ function hookFunctionsPreExecute(): IWorkflowExecuteHooks { await externalHooks.run('workflow.preExecute', [workflow, this.mode]); }, ], + workflowExecuteAfter: [ + async function (this: WorkflowHooks, fullRunData: IRun) { + await externalHooks.run('workflow.postExecute', [ + fullRunData, + this.workflowData, + this.executionId, + ]); + }, + ], + }; +} + +function hookFunctionsSaveProgress({ saveSettings }: HooksSetupParameters): IWorkflowExecuteHooks { + if (!saveSettings.progress) return {}; + return { nodeExecuteAfter: [ async function ( this: WorkflowHooks, @@ -208,12 +216,11 @@ function hookFunctionsPreExecute(): IWorkflowExecuteHooks { executionData: IRunExecutionData, ): Promise { await saveExecutionProgress( - this.workflowData, + this.workflowData.id, this.executionId, nodeName, data, executionData, - this.pushRef, ); }, ], @@ -231,11 +238,29 @@ function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks { }; } +function hookFunctionsStatistics(): IWorkflowExecuteHooks { + const workflowStatisticsService = Container.get(WorkflowStatisticsService); + return { + nodeFetchedData: [ + async (workflowId: string, node: INode) => { + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); + }, + ], + }; +} + /** * Returns hook functions to save workflow execution and call error workflow */ -function hookFunctionsSave(): IWorkflowExecuteHooks { +function hookFunctionsSave({ + pushRef, + retryOf, + saveSettings, +}: HooksSetupParameters): IWorkflowExecuteHooks { const logger = Container.get(Logger); + const errorReporter = Container.get(ErrorReporter); + const executionRepository = Container.get(ExecutionRepository); + const workflowStaticDataService = Container.get(WorkflowStaticDataService); const workflowStatisticsService = Container.get(WorkflowStatisticsService); return { workflowExecuteAfter: [ @@ -257,12 +282,12 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { // Workflow is saved so update in database try { - await Container.get(WorkflowStaticDataService).saveStaticDataById( + await workflowStaticDataService.saveStaticDataById( this.workflowData.id, newStaticData, ); } catch (e) { - Container.get(ErrorReporter).error(e); + errorReporter.error(e); logger.error( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, @@ -271,8 +296,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { } } - const saveSettings = toSaveSettings(this.workflowData.settings); - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { /** * When manual executions are not being saved, we only soft-delete @@ -283,7 +306,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { * on the next pruning cycle after the grace period set by * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. */ - await Container.get(ExecutionRepository).softDelete(this.executionId); + await executionRepository.softDelete(this.executionId); return; } @@ -298,10 +321,10 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { fullRunData, this.mode, this.executionId, - this.retryOf, + retryOf, ); - await Container.get(ExecutionRepository).hardDelete({ + await executionRepository.hardDelete({ workflowId: this.workflowData.id, executionId: this.executionId, }); @@ -315,12 +338,12 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { runData: fullRunData, workflowData: this.workflowData, workflowStatusFinal: fullRunData.status, - retryOf: this.retryOf, + retryOf, }); // When going into the waiting state, store the pushRef in the execution-data if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = this.pushRef; + fullExecutionData.data.pushRef = pushRef; } await updateExistingExecution({ @@ -335,7 +358,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { fullRunData, this.mode, this.executionId, - this.retryOf, + retryOf, ); } } finally { @@ -346,11 +369,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { } }, ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, - ], }; } @@ -359,8 +377,13 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { * for running with queues. Manual executions should never run on queues as * they are always executed in the main process. */ -function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { +function hookFunctionsSaveWorker({ + pushRef, + retryOf, +}: HooksSetupParameters): IWorkflowExecuteHooks { const logger = Container.get(Logger); + const errorReporter = Container.get(ErrorReporter); + const workflowStaticDataService = Container.get(WorkflowStaticDataService); const workflowStatisticsService = Container.get(WorkflowStatisticsService); return { workflowExecuteAfter: [ @@ -380,16 +403,16 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { // Workflow is saved so update in database try { - await Container.get(WorkflowStaticDataService).saveStaticDataById( + await workflowStaticDataService.saveStaticDataById( this.workflowData.id, newStaticData, ); } catch (e) { - Container.get(ErrorReporter).error(e); + errorReporter.error(e); logger.error( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, - { pushRef: this.pushRef, workflowId: this.workflowData.id }, + { workflowId: this.workflowData.id }, ); } } @@ -404,7 +427,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { fullRunData, this.mode, this.executionId, - this.retryOf, + retryOf, ); } @@ -414,12 +437,12 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { runData: fullRunData, workflowData: this.workflowData, workflowStatusFinal: fullRunData.status, - retryOf: this.retryOf, + retryOf, }); // When going into the waiting state, store the pushRef in the execution-data if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = this.pushRef; + fullExecutionData.data.pushRef = pushRef; } await updateExistingExecution({ @@ -434,21 +457,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { }); } }, - async function (this: WorkflowHooks, fullRunData: IRun) { - const externalHooks = Container.get(ExternalHooks); - try { - await externalHooks.run('workflow.postExecute', [ - fullRunData, - this.workflowData, - this.executionId, - ]); - } catch {} - }, - ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, ], }; } @@ -463,12 +471,15 @@ export function getWorkflowHooksIntegrated( workflowData: IWorkflowBase, userId?: string, ): WorkflowHooks { + const saveSettings = toSaveSettings(workflowData.settings); const hookFunctions = mergeHookFunctions( hookFunctionsWorkflowEvents(userId), hookFunctionsNodeEvents(), hookFunctionsFinalizeExecutionStatus(), - hookFunctionsSave(), - hookFunctionsPreExecute(), + hookFunctionsSave({ saveSettings }), + hookFunctionsSaveProgress({ saveSettings }), + hookFunctionsStatistics(), + hookFunctionsExternalHooks(), ); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); } @@ -480,21 +491,25 @@ export function getWorkflowHooksWorkerExecuter( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, - optionalParameters: IWorkflowHooksOptionalParameters = {}, + { pushRef, retryOf }: Omit = {}, ): WorkflowHooks { + const saveSettings = toSaveSettings(workflowData.settings); + const optionalParameters = { pushRef, retryOf, saveSettings }; const toMerge = [ hookFunctionsNodeEvents(), hookFunctionsFinalizeExecutionStatus(), - hookFunctionsSaveWorker(), - hookFunctionsPreExecute(), + hookFunctionsSaveWorker(optionalParameters), + hookFunctionsSaveProgress(optionalParameters), + hookFunctionsStatistics(), + hookFunctionsExternalHooks(), ]; if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { - toMerge.push(hookFunctionsPush()); + toMerge.push(hookFunctionsPush(optionalParameters)); } const hookFunctions = mergeHookFunctions(...toMerge); - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); } /** @@ -504,11 +519,15 @@ export function getWorkflowHooksWorkerMain( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, - optionalParameters: IWorkflowHooksOptionalParameters = {}, + { pushRef, retryOf }: Omit = {}, ): WorkflowHooks { + const saveSettings = toSaveSettings(workflowData.settings); + const optionalParameters = { pushRef, retryOf, saveSettings }; + const executionRepository = Container.get(ExecutionRepository); const hookFunctions = mergeHookFunctions( hookFunctionsWorkflowEvents(), - hookFunctionsPreExecute(), + hookFunctionsSaveProgress(optionalParameters), + hookFunctionsExternalHooks(), hookFunctionsFinalizeExecutionStatus(), { workflowExecuteAfter: [ @@ -516,8 +535,6 @@ export function getWorkflowHooksWorkerMain( // Don't delete executions before they are finished if (!fullRunData.finished) return; - const saveSettings = toSaveSettings(this.workflowData.settings); - const isManualMode = this.mode === 'manual'; if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { @@ -530,7 +547,7 @@ export function getWorkflowHooksWorkerMain( * on the next pruning cycle after the grace period set by * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. */ - await Container.get(ExecutionRepository).softDelete(this.executionId); + await executionRepository.softDelete(this.executionId); return; } @@ -540,7 +557,7 @@ export function getWorkflowHooksWorkerMain( (fullRunData.status !== 'success' && !saveSettings.error); if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { - await Container.get(ExecutionRepository).hardDelete({ + await executionRepository.hardDelete({ workflowId: this.workflowData.id, executionId: this.executionId, }); @@ -556,7 +573,7 @@ export function getWorkflowHooksWorkerMain( hookFunctions.nodeExecuteBefore = []; hookFunctions.nodeExecuteAfter = []; - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); } /** @@ -566,16 +583,18 @@ export function getWorkflowHooksMain( data: IWorkflowExecutionDataProcess, executionId: string, ): WorkflowHooks { + const { pushRef, retryOf } = data; + const saveSettings = toSaveSettings(data.workflowData.settings); + const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings }; const hookFunctions = mergeHookFunctions( hookFunctionsWorkflowEvents(), hookFunctionsNodeEvents(), hookFunctionsFinalizeExecutionStatus(), - hookFunctionsSave(), - hookFunctionsPush(), - hookFunctionsPreExecute(), + hookFunctionsSave(optionalParameters), + hookFunctionsPush(optionalParameters), + hookFunctionsSaveProgress(optionalParameters), + hookFunctionsStatistics(), + hookFunctionsExternalHooks(), ); - return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { - pushRef: data.pushRef, - retryOf: data.retryOf as string, - }); + return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData); } diff --git a/packages/cli/src/execution-lifecycle/save-execution-progress.ts b/packages/cli/src/execution-lifecycle/save-execution-progress.ts index 8ca33c70959b3..78f61ff98fa4a 100644 --- a/packages/cli/src/execution-lifecycle/save-execution-progress.ts +++ b/packages/cli/src/execution-lifecycle/save-execution-progress.ts @@ -1,24 +1,19 @@ import { Container } from '@n8n/di'; import { ErrorReporter, Logger } from 'n8n-core'; -import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow'; +import type { IRunExecutionData, ITaskData } from 'n8n-workflow'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import { toSaveSettings } from './to-save-settings'; - export async function saveExecutionProgress( - workflowData: IWorkflowBase, + workflowId: string, executionId: string, nodeName: string, data: ITaskData, executionData: IRunExecutionData, - pushRef?: string, ) { - const saveSettings = toSaveSettings(workflowData.settings); - - if (!saveSettings.progress) return; - const logger = Container.get(Logger); + const executionRepository = Container.get(ExecutionRepository); + const errorReporter = Container.get(ErrorReporter); try { logger.debug(`Save execution progress to database for execution ID ${executionId} `, { @@ -26,13 +21,10 @@ export async function saveExecutionProgress( nodeName, }); - const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution( - executionId, - { - includeData: true, - unflattenData: true, - }, - ); + const fullExecutionData = await executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); if (!fullExecutionData) { // Something went badly wrong if this happens. @@ -47,29 +39,22 @@ export async function saveExecutionProgress( return; } - if (fullExecutionData.data === undefined) { - fullExecutionData.data = { - startData: {}, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - metadata: {}, - nodeExecutionStack: [], - waitingExecution: {}, - waitingExecutionSource: {}, - }, - }; - } + fullExecutionData.data ??= { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack: [], + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; - if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) { - // Append data if array exists - fullExecutionData.data.resultData.runData[nodeName].push(data); - } else { - // Initialize array and save data - fullExecutionData.data.resultData.runData[nodeName] = [data]; - } + const { runData } = fullExecutionData.data.resultData; + (runData[nodeName] ??= []).push(data); fullExecutionData.data.executionData = executionData.executionData; @@ -78,27 +63,19 @@ export async function saveExecutionProgress( fullExecutionData.status = 'running'; - await Container.get(ExecutionRepository).updateExistingExecution( - executionId, - fullExecutionData, - ); + await executionRepository.updateExistingExecution(executionId, fullExecutionData); } catch (e) { const error = e instanceof Error ? e : new Error(`${e}`); - Container.get(ErrorReporter).error(error); + errorReporter.error(error); // TODO: Improve in the future! // Errors here might happen because of database access // For busy machines, we may get "Database is locked" errors. // We do this to prevent crashes and executions ending in `unknown` state. logger.error( - `Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`, - { - ...error, - executionId, - pushRef, - workflowId: workflowData.id, - }, + `Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsSaveProgress, nodeExecuteAfter)`, + { error, executionId, workflowId }, ); } } diff --git a/packages/cli/src/execution-lifecycle/to-save-settings.ts b/packages/cli/src/execution-lifecycle/to-save-settings.ts index a7af8f3ddc233..b5cd24f7d9cfe 100644 --- a/packages/cli/src/execution-lifecycle/to-save-settings.ts +++ b/packages/cli/src/execution-lifecycle/to-save-settings.ts @@ -2,6 +2,13 @@ import type { IWorkflowSettings } from 'n8n-workflow'; import config from '@/config'; +export type ExecutionSaveSettings = { + error: boolean | 'all' | 'none'; + success: boolean | 'all' | 'none'; + manual: boolean; + progress: boolean; +}; + /** * Return whether a workflow execution is configured to be saved or not: * @@ -10,7 +17,7 @@ import config from '@/config'; * - `manual`: Whether to save successful or failed manual executions. * - `progress`: Whether to save execution progress, i.e. after each node's execution. */ -export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) { +export function toSaveSettings(workflowSettings: IWorkflowSettings = {}): ExecutionSaveSettings { const DEFAULTS = { ERROR: config.getEnv('executions.saveDataOnError'), SUCCESS: config.getEnv('executions.saveDataOnSuccess'), diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index f3cae9196794e..9ddf893a4fa3d 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -189,7 +189,7 @@ export class ExecutionRecoveryService { executionMode: execution.mode, executionData: execution.data, runData: execution.data.resultData.runData, - retryOf: execution.retryOf, + retryOf: execution.retryOf ?? undefined, }, execution.id, ); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 84657f52a0cf0..363d7dab157db 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -135,7 +135,7 @@ export class JobProcessor { execution.mode, job.data.executionId, execution.workflowData, - { retryOf: execution.retryOf as string, pushRef }, + { retryOf: execution.retryOf ?? undefined, pushRef }, ); if (pushRef) { @@ -229,7 +229,7 @@ export class JobProcessor { workflowName: execution.workflowData.name, mode: execution.mode, startedAt, - retryOf: execution.retryOf ?? '', + retryOf: execution.retryOf ?? undefined, status: execution.status, }; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 98a736916a211..85d1201ed02dd 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -38,7 +38,6 @@ import { WorkflowRepository } from '@/databases/repositories/workflow.repository import { EventService } from '@/events/event.service'; import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map'; import { getWorkflowHooksIntegrated } from '@/execution-lifecycle/execution-lifecycle-hooks'; -import { ExternalHooks } from '@/external-hooks'; import type { UpdateExecutionPayload } from '@/interfaces'; import { NodeTypes } from '@/node-types'; import { Push } from '@/push'; @@ -303,9 +302,6 @@ async function startExecution( ); } - const externalHooks = Container.get(ExternalHooks); - await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); - // subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here if (data.finished === true || data.status === 'waiting') { // Workflow did finish successfully diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index da48255faaf0f..e258284827015 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -26,7 +26,6 @@ import { getWorkflowHooksWorkerExecuter, getWorkflowHooksWorkerMain, } from '@/execution-lifecycle/execution-lifecycle-hooks'; -import { ExternalHooks } from '@/external-hooks'; import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import type { ScalingService } from '@/scaling/scaling.service'; @@ -49,7 +48,6 @@ export class WorkflowRunner { private readonly errorReporter: ErrorReporter, private readonly activeExecutions: ActiveExecutions, private readonly executionRepository: ExecutionRepository, - private readonly externalHooks: ExternalHooks, private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly nodeTypes: NodeTypes, private readonly permissionChecker: PermissionChecker, @@ -177,24 +175,17 @@ export class WorkflowRunner { data.executionMode === 'manual' ) { const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); - postExecutePromise - .then(async (executionData) => { - try { - await this.externalHooks.run('workflow.postExecute', [ - executionData, - data.workflowData, - executionId, - ]); - } catch {} - }) - .catch((error) => { - if (error instanceof ExecutionCancelledError) return; - this.errorReporter.error(error); - this.logger.error( - 'There was a problem running internal hook "onWorkflowPostExecute"', - error, - ); + postExecutePromise.catch((error) => { + if (error instanceof ExecutionCancelledError) return; + this.errorReporter.error(error, { + extra: { executionId, workflowId }, }); + this.logger.error('There was an error in the post-execution promise', { + error, + executionId, + workflowId, + }); + }); } return executionId; @@ -361,7 +352,7 @@ export class WorkflowRunner { job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); hooks = getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { - retryOf: data.retryOf ? data.retryOf.toString() : undefined, + retryOf: data.retryOf ?? undefined, }); // Normally also workflow should be supplied here but as it only used for sending @@ -374,7 +365,7 @@ export class WorkflowRunner { data.executionMode, executionId, data.workflowData, - { retryOf: data.retryOf ? data.retryOf.toString() : undefined }, + { retryOf: data.retryOf ?? undefined }, ); await this.processError(error, new Date(), data.executionMode, executionId, hooks); throw error; @@ -392,7 +383,7 @@ export class WorkflowRunner { data.executionMode, executionId, data.workflowData, - { retryOf: data.retryOf ? data.retryOf.toString() : undefined }, + { retryOf: data.retryOf ?? undefined }, ); const error = new ExecutionCancelledError(executionId); @@ -417,7 +408,7 @@ export class WorkflowRunner { data.executionMode, executionId, data.workflowData, - { retryOf: data.retryOf ? data.retryOf.toString() : undefined }, + { retryOf: data.retryOf ?? undefined }, ); await this.processError(error, new Date(), data.executionMode, executionId, hooks); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index ce1797ce01959..1d64f3e8fcb5e 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -2284,7 +2284,7 @@ export interface IWorkflowExecutionDataProcess { executionData?: IRunExecutionData; runData?: IRunData; pinData?: IPinData; - retryOf?: string; + retryOf?: string | null; pushRef?: string; startNodes?: StartNodeData[]; workflowData: IWorkflowBase; @@ -2407,11 +2407,6 @@ export type WorkflowActivateMode = | 'manual' // unused | 'leadershipChange'; -export interface IWorkflowHooksOptionalParameters { - retryOf?: string; - pushRef?: string; -} - export namespace WorkflowSettings { export type CallerPolicy = 'any' | 'none' | 'workflowsFromAList' | 'workflowsFromSameOwner'; export type SaveDataExecution = 'DEFAULT' | 'all' | 'none'; diff --git a/packages/workflow/src/WorkflowHooks.ts b/packages/workflow/src/WorkflowHooks.ts index feb7a46e20839..d7e05b9a1a567 100644 --- a/packages/workflow/src/WorkflowHooks.ts +++ b/packages/workflow/src/WorkflowHooks.ts @@ -1,9 +1,4 @@ -import type { - IWorkflowBase, - IWorkflowExecuteHooks, - IWorkflowHooksOptionalParameters, - WorkflowExecuteMode, -} from './Interfaces'; +import type { IWorkflowBase, IWorkflowExecuteHooks, WorkflowExecuteMode } from './Interfaces'; export class WorkflowHooks { mode: WorkflowExecuteMode; @@ -12,10 +7,6 @@ export class WorkflowHooks { executionId: string; - pushRef?: string; - - retryOf?: string; - hookFunctions: IWorkflowExecuteHooks; constructor( @@ -23,18 +14,11 @@ export class WorkflowHooks { mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, ) { - // eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing - optionalParameters = optionalParameters || {}; - this.hookFunctions = hookFunctions; this.mode = mode; this.executionId = executionId; this.workflowData = workflowData; - this.pushRef = optionalParameters.pushRef; - // retryOf might be `null` from TypeORM - this.retryOf = optionalParameters.retryOf ?? undefined; } // eslint-disable-next-line @typescript-eslint/no-explicit-any