Skip to content

Commit

Permalink
fix(orchestrator): improvements to backend services (janus-idp#1252)
Browse files Browse the repository at this point in the history
Improvements to backend services
  • Loading branch information
caponetto authored Feb 22, 2024
1 parent 55d6048 commit af8e072
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 21 deletions.
50 changes: 37 additions & 13 deletions plugins/orchestrator-backend/src/service/DataIndexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ export class DataIndexService {
this.logger.error(
`Error aborting workflow instance ${workflowId}: ${result.error}`,
);
} else {
this.logger.debug(`Successfully aborted workflow instance ${workflowId}`);
}

this.logger.debug(`Successfully aborted workflow instance ${workflowId}`);
return result;
}

public async getWorkflowDefinition(
definitionId: string,
): Promise<WorkflowInfo> {
): Promise<WorkflowInfo | undefined> {
const graphQlQuery = `{ ProcessDefinitions ( where: {id: {equal: "${definitionId}" } } ) { id, name, version, type, endpoint, serviceUrl, source } }`;

const result = await this.client.query(graphQlQuery, {});
Expand All @@ -70,7 +71,15 @@ export class DataIndexService {
this.logger.error(`Error fetching workflow definition ${result.error}`);
throw result.error;
}
return (result.data.ProcessDefinitions as WorkflowInfo[])[0];

const processDefinitions = result.data.ProcessDefinitions as WorkflowInfo[];

if (processDefinitions.length === 0) {
this.logger.info(`No workflow definition found for ${definitionId}`);
return undefined;
}

return processDefinitions[0];
}

public async getWorkflowDefinitions(): Promise<WorkflowInfo[]> {
Expand Down Expand Up @@ -123,9 +132,7 @@ export class DataIndexService {
}

private async getWorkflowDefinitionFromInstance(instance: ProcessInstance) {
const workflowItem: WorkflowInfo = await this.getWorkflowDefinition(
instance.processId,
);
const workflowItem = await this.getWorkflowDefinition(instance.processId);
if (!workflowItem?.source) {
throw new Error(
`Workflow defintion is required to fetch instance ${instance.id}`,
Expand Down Expand Up @@ -155,7 +162,15 @@ export class DataIndexService {
return undefined;
}

return response.data.ProcessDefinitions[0].source;
const processDefinitions = response.data
.ProcessDefinitions as WorkflowInfo[];

if (processDefinitions.length === 0) {
this.logger.info(`No workflow source found for ${workflowId}`);
return undefined;
}

return processDefinitions[0].source;
}

public async fetchWorkflowInstances(
Expand Down Expand Up @@ -206,9 +221,13 @@ export class DataIndexService {
throw result.error;
}

const variables = result.data.ProcessInstances.pop().variables;
const processInstances = result.data.ProcessInstances as ProcessInstance[];

return parseWorkflowVariables(variables);
if (processInstances.length === 0) {
return undefined;
}

return parseWorkflowVariables(processInstances[0].variables);
}

public async fetchProcessInstance(
Expand All @@ -225,10 +244,15 @@ export class DataIndexService {
throw result.error;
}

const instance = (result.data.ProcessInstances as ProcessInstance[])[0];
const workflowItem: WorkflowInfo = await this.getWorkflowDefinition(
instance.processId,
);
const processInstances = result.data.ProcessInstances as ProcessInstance[];

if (processInstances.length === 0) {
return undefined;
}

const instance = processInstances[0];

const workflowItem = await this.getWorkflowDefinition(instance.processId);
if (!workflowItem?.source) {
throw new Error(
`Workflow defintion is required to fetch instance ${instance.id}`,
Expand Down
18 changes: 18 additions & 0 deletions plugins/orchestrator-backend/src/service/Helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ import { Logger } from 'winston';

import os from 'os';

export async function retryAsyncFunction<T>(args: {
asyncFunc: () => Promise<T | undefined>;
retries: number;
delayMs: number;
}): Promise<T> {
let result: T | undefined;
for (let i = 0; i < args.retries; i++) {
result = await args.asyncFunc();
if (result !== undefined) {
return result;
}
await new Promise(resolve => setTimeout(resolve, args.delayMs));
}
throw new Error(
`Exceeded maximum number of retries for function ${args.asyncFunc.name}`,
);
}

export async function getWorkingDirectory(
config: Config,
logger: Logger,
Expand Down
9 changes: 5 additions & 4 deletions plugins/orchestrator-backend/src/service/SonataFlowService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ export class SonataFlowService {
workflowId: string,
): Promise<string | undefined> {
try {
const endpoint =
(await this.dataIndex.getWorkflowDefinition(workflowId)).serviceUrl ??
'';
const urlToFetch = `${endpoint}/management/processes/${workflowId}/sources`;
const definition = await this.dataIndex.getWorkflowDefinition(workflowId);
if (!definition?.serviceUrl) {
return undefined;
}
const urlToFetch = `${definition.serviceUrl}/management/processes/${workflowId}/sources`;
const response = await executeWithRetry(() => fetch(urlToFetch));

if (response.ok) {
Expand Down
20 changes: 17 additions & 3 deletions plugins/orchestrator-backend/src/service/api/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ import {
} from '@janus-idp/backstage-plugin-orchestrator-common';

import { DataIndexService } from '../DataIndexService';
import { retryAsyncFunction } from '../Helper';
import { SonataFlowService } from '../SonataFlowService';
import { WorkflowService } from '../WorkflowService';

const FETCH_INSTANCE_MAX_RETRIES = 5;
const FETCH_INSTANCE_RETRY_DELAY_MS = 1000;

export namespace V1 {
export async function getWorkflowsOverview(
sonataFlowService: SonataFlowService,
Expand Down Expand Up @@ -146,21 +150,31 @@ export namespace V1 {
businessKey: string | undefined,
): Promise<WorkflowExecutionResponse> {
const definition = await dataIndexService.getWorkflowDefinition(workflowId);
const serviceUrl = definition.serviceUrl;
if (!serviceUrl) {
if (!definition) {
throw new Error(`Couldn't fetch workflow definition for ${workflowId}`);
}
if (!definition.serviceUrl) {
throw new Error(`ServiceURL is not defined for workflow ${workflowId}`);
}
const executionResponse = await sonataFlowService.executeWorkflow({
workflowId,
inputData: reqBody,
endpoint: serviceUrl,
endpoint: definition.serviceUrl,
businessKey,
});

if (!executionResponse) {
throw new Error(`Couldn't execute workflow ${workflowId}`);
}

// Making sure the instance data is available before returning
await retryAsyncFunction({
asyncFunc: () =>
dataIndexService.fetchProcessInstance(executionResponse.id),
retries: FETCH_INSTANCE_MAX_RETRIES,
delayMs: FETCH_INSTANCE_RETRY_DELAY_MS,
});

return executionResponse;
}

Expand Down
10 changes: 9 additions & 1 deletion plugins/orchestrator-backend/src/service/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,17 @@ function setupInternalRoutes(

const workflowDefinition =
await services.dataIndexService.getWorkflowDefinition(workflowId);

if (!workflowDefinition) {
res.status(500).send(`Couldn't fetch workflow definition ${workflowId}`);
return;
}
const serviceUrl = workflowDefinition.serviceUrl;
if (!serviceUrl) {
throw new Error(`ServiceUrl is not defined for workflow ${workflowId}`);
res
.status(500)
.send(`Service URL is not defined for workflow ${workflowId}`);
return;
}

// workflow source
Expand Down

0 comments on commit af8e072

Please sign in to comment.