diff --git a/plugins/orchestrator-backend/src/service/DataIndexService.ts b/plugins/orchestrator-backend/src/service/DataIndexService.ts index 5b2357d7c3..6c90a9a65d 100644 --- a/plugins/orchestrator-backend/src/service/DataIndexService.ts +++ b/plugins/orchestrator-backend/src/service/DataIndexService.ts @@ -53,15 +53,16 @@ export class DataIndexService { this.logger.error( `Error aborting workflow instance ${workflowId}: ${result.error}`, ); + } else { + this.logger.debug(`Successfully aborted workflow instance ${workflowId}`); } - this.logger.debug(`Successfully aborted workflow instance ${workflowId}`); return result; } public async getWorkflowDefinition( definitionId: string, - ): Promise { + ): Promise { const graphQlQuery = `{ ProcessDefinitions ( where: {id: {equal: "${definitionId}" } } ) { id, name, version, type, endpoint, serviceUrl, source } }`; const result = await this.client.query(graphQlQuery, {}); @@ -70,7 +71,15 @@ export class DataIndexService { this.logger.error(`Error fetching workflow definition ${result.error}`); throw result.error; } - return (result.data.ProcessDefinitions as WorkflowInfo[])[0]; + + const processDefinitions = result.data.ProcessDefinitions as WorkflowInfo[]; + + if (processDefinitions.length === 0) { + this.logger.info(`No workflow definition found for ${definitionId}`); + return undefined; + } + + return processDefinitions[0]; } public async getWorkflowDefinitions(): Promise { @@ -123,9 +132,7 @@ export class DataIndexService { } private async getWorkflowDefinitionFromInstance(instance: ProcessInstance) { - const workflowItem: WorkflowInfo = await this.getWorkflowDefinition( - instance.processId, - ); + const workflowItem = await this.getWorkflowDefinition(instance.processId); if (!workflowItem?.source) { throw new Error( `Workflow defintion is required to fetch instance ${instance.id}`, @@ -155,7 +162,15 @@ export class DataIndexService { return undefined; } - return response.data.ProcessDefinitions[0].source; + const processDefinitions = response.data + .ProcessDefinitions as WorkflowInfo[]; + + if (processDefinitions.length === 0) { + this.logger.info(`No workflow source found for ${workflowId}`); + return undefined; + } + + return processDefinitions[0].source; } public async fetchWorkflowInstances( @@ -206,9 +221,13 @@ export class DataIndexService { throw result.error; } - const variables = result.data.ProcessInstances.pop().variables; + const processInstances = result.data.ProcessInstances as ProcessInstance[]; - return parseWorkflowVariables(variables); + if (processInstances.length === 0) { + return undefined; + } + + return parseWorkflowVariables(processInstances[0].variables); } public async fetchProcessInstance( @@ -225,10 +244,15 @@ export class DataIndexService { throw result.error; } - const instance = (result.data.ProcessInstances as ProcessInstance[])[0]; - const workflowItem: WorkflowInfo = await this.getWorkflowDefinition( - instance.processId, - ); + const processInstances = result.data.ProcessInstances as ProcessInstance[]; + + if (processInstances.length === 0) { + return undefined; + } + + const instance = processInstances[0]; + + const workflowItem = await this.getWorkflowDefinition(instance.processId); if (!workflowItem?.source) { throw new Error( `Workflow defintion is required to fetch instance ${instance.id}`, diff --git a/plugins/orchestrator-backend/src/service/Helper.ts b/plugins/orchestrator-backend/src/service/Helper.ts index 2134d87f77..c1db42a715 100644 --- a/plugins/orchestrator-backend/src/service/Helper.ts +++ b/plugins/orchestrator-backend/src/service/Helper.ts @@ -5,6 +5,24 @@ import { Logger } from 'winston'; import os from 'os'; +export async function retryAsyncFunction(args: { + asyncFunc: () => Promise; + retries: number; + delayMs: number; +}): Promise { + let result: T | undefined; + for (let i = 0; i < args.retries; i++) { + result = await args.asyncFunc(); + if (result !== undefined) { + return result; + } + await new Promise(resolve => setTimeout(resolve, args.delayMs)); + } + throw new Error( + `Exceeded maximum number of retries for function ${args.asyncFunc.name}`, + ); +} + export async function getWorkingDirectory( config: Config, logger: Logger, diff --git a/plugins/orchestrator-backend/src/service/SonataFlowService.ts b/plugins/orchestrator-backend/src/service/SonataFlowService.ts index e5332bed98..e93a5dd931 100644 --- a/plugins/orchestrator-backend/src/service/SonataFlowService.ts +++ b/plugins/orchestrator-backend/src/service/SonataFlowService.ts @@ -98,10 +98,11 @@ export class SonataFlowService { workflowId: string, ): Promise { try { - const endpoint = - (await this.dataIndex.getWorkflowDefinition(workflowId)).serviceUrl ?? - ''; - const urlToFetch = `${endpoint}/management/processes/${workflowId}/sources`; + const definition = await this.dataIndex.getWorkflowDefinition(workflowId); + if (!definition?.serviceUrl) { + return undefined; + } + const urlToFetch = `${definition.serviceUrl}/management/processes/${workflowId}/sources`; const response = await executeWithRetry(() => fetch(urlToFetch)); if (response.ok) { diff --git a/plugins/orchestrator-backend/src/service/api/v1.ts b/plugins/orchestrator-backend/src/service/api/v1.ts index 1f7562c13a..cbc1ce824f 100644 --- a/plugins/orchestrator-backend/src/service/api/v1.ts +++ b/plugins/orchestrator-backend/src/service/api/v1.ts @@ -16,9 +16,13 @@ import { } from '@janus-idp/backstage-plugin-orchestrator-common'; import { DataIndexService } from '../DataIndexService'; +import { retryAsyncFunction } from '../Helper'; import { SonataFlowService } from '../SonataFlowService'; import { WorkflowService } from '../WorkflowService'; +const FETCH_INSTANCE_MAX_RETRIES = 5; +const FETCH_INSTANCE_RETRY_DELAY_MS = 1000; + export namespace V1 { export async function getWorkflowsOverview( sonataFlowService: SonataFlowService, @@ -146,14 +150,16 @@ export namespace V1 { businessKey: string | undefined, ): Promise { const definition = await dataIndexService.getWorkflowDefinition(workflowId); - const serviceUrl = definition.serviceUrl; - if (!serviceUrl) { + if (!definition) { + throw new Error(`Couldn't fetch workflow definition for ${workflowId}`); + } + if (!definition.serviceUrl) { throw new Error(`ServiceURL is not defined for workflow ${workflowId}`); } const executionResponse = await sonataFlowService.executeWorkflow({ workflowId, inputData: reqBody, - endpoint: serviceUrl, + endpoint: definition.serviceUrl, businessKey, }); @@ -161,6 +167,14 @@ export namespace V1 { throw new Error(`Couldn't execute workflow ${workflowId}`); } + // Making sure the instance data is available before returning + await retryAsyncFunction({ + asyncFunc: () => + dataIndexService.fetchProcessInstance(executionResponse.id), + retries: FETCH_INSTANCE_MAX_RETRIES, + delayMs: FETCH_INSTANCE_RETRY_DELAY_MS, + }); + return executionResponse; } diff --git a/plugins/orchestrator-backend/src/service/router.ts b/plugins/orchestrator-backend/src/service/router.ts index a68e5d7553..d08a73b5a1 100644 --- a/plugins/orchestrator-backend/src/service/router.ts +++ b/plugins/orchestrator-backend/src/service/router.ts @@ -431,9 +431,17 @@ function setupInternalRoutes( const workflowDefinition = await services.dataIndexService.getWorkflowDefinition(workflowId); + + if (!workflowDefinition) { + res.status(500).send(`Couldn't fetch workflow definition ${workflowId}`); + return; + } const serviceUrl = workflowDefinition.serviceUrl; if (!serviceUrl) { - throw new Error(`ServiceUrl is not defined for workflow ${workflowId}`); + res + .status(500) + .send(`Service URL is not defined for workflow ${workflowId}`); + return; } // workflow source