diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts index c54a64b7c..723b841d4 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts @@ -247,7 +247,10 @@ export type ReadModelLedger = { IsPaused: boolean } -export type MethodNext = () => Promise +export type MethodNext = ( + timeout?: number, + notificationExtraPayload?: object +) => Promise export type MethodGetRemainingTime = () => number export type MethodGetEncryption = () => ( event: ReadModelEvent @@ -289,6 +292,7 @@ export type BuildInfo = { notificationId: string sendTime: number coldStart?: boolean + [key: string]: any } export type AdapterConnection< diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts index 1cfbf2e18..3e991d890 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts @@ -52,6 +52,13 @@ function statusDataToError( } } +const isHTTPServiceError = (name: string) => + name === 'FetchError' || name === 'AbortError' || name === 'ServiceError' + +const getBuildDelay = (iterationNumber: number) => { + return 30000 * 2 ** iterationNumber +} + const BATCH_PROCESSING_POLL_MS = 50 const build: ExternalMethods['build'] = async ( @@ -61,15 +68,30 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) => { const log = getLog('build') - await eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis) + let iterationNumber = buildInfo.iterationNumber ?? 0 + + const delayNext = async (delay: number, error: any) => { + log.debug( + `Delaying next for ${delay}ms due to service error ${ + error ? error.name + ': ' + error.message : '' + }` + ) + await next(delay, { iterationNumber: iterationNumber + 1 }) + } const state = await basePool.getReplicationState(basePool) if (state.status === 'error') { - log.error('Refuse to start or continue replication with error state') + log.error( + `Refuse to start or continue replication with error state: ${state.statusData}` + ) + return + } else if (state.status === 'serviceError') { + await delayNext(getBuildDelay(iterationNumber), state.statusData) return } if (state.paused) { @@ -78,18 +100,21 @@ const build: ExternalMethods['build'] = async ( } const timeLeft = getVacantTimeInMillis() - const occupationResult = await basePool.occupyReplication(basePool, timeLeft) - if (!occupationResult.success) { - log.debug( - `Could not occupy replication process. ${occupationResult.message ?? ''}` - ) + try { + const result = await basePool.occupyReplication(basePool, timeLeft) + if (!result.success) { + log.error(`Could not occupy replication process: ${result.message}`) + return + } + } catch (error) { + await delayNext(getBuildDelay(iterationNumber), error) return } let iterator = state.iterator let localContinue = true - const sleepAfterServiceErrorMs = 3000 + await eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis) let eventLoader: EventLoader const onExit = async () => { @@ -101,7 +126,7 @@ const build: ExternalMethods['build'] = async ( try { await basePool.releaseReplication(basePool) } catch (error) { - log.error(error) + if (!isHTTPServiceError(error.name)) log.error(error) } } @@ -119,18 +144,14 @@ const build: ExternalMethods['build'] = async ( ) } catch (error) { await onExit() - if (RequestTimeoutError.is(error) || ServiceBusyError.is(error)) { - log.debug( - `Got non-fatal error, continuing on the next step. ${error.message}` - ) + if (RequestTimeoutError.is(error)) { await next() - return - } else if (ConnectionError.is(error)) { - log.error(error) - return + } else if (ServiceBusyError.is(error)) { + await delayNext(getBuildDelay(iterationNumber), error) } else { - throw error + log.error(error) } + return } log.debug('Starting or continuing replication process') @@ -150,16 +171,15 @@ const build: ExternalMethods['build'] = async ( loadEventsResult.events ) } catch (error) { - if (RequestTimeoutError.is(error) || ServiceBusyError.is(error)) { - log.debug( - `Got non-fatal error, continuing on the next step. ${error.message}` - ) - await onExit() - return + await onExit() + if (RequestTimeoutError.is(error)) { + await next() + } else if (ServiceBusyError.is(error)) { + await delayNext(getBuildDelay(iterationNumber), error) } else { - await onExit() - throw error + log.error(error) } + return } const { cursor: nextCursor, events } = loadEventsResult const { existingSecrets, deletedSecrets } = gatheredSecrets @@ -194,6 +214,7 @@ const build: ExternalMethods['build'] = async ( ? (state.statusData.appliedEventsCount as number) : 0 wasPaused = state.paused + iterationNumber = 0 break } else if (state.status === 'serviceError') { lastError = statusDataToError(state.statusData, { @@ -217,23 +238,19 @@ const build: ExternalMethods['build'] = async ( lastError = error } - const isBuildSuccess = lastError == null && appliedEventsCount > 0 - - if (isBuildSuccess) { + if (appliedEventsCount > 0) { log.verbose(`Replicated batch of ${appliedEventsCount} events`) } + let delay = 0 + let shouldContinue = appliedEventsCount > 0 if (lastError) { - log.error(lastError) - if ( - lastError.name === 'ServiceError' || - lastError.name === 'AbortError' || - lastError.name === 'FetchError' - ) { - const vacantTime = getVacantTimeInMillis() - if (vacantTime > 0) { - await sleep(Math.min(vacantTime, sleepAfterServiceErrorMs)) - } + if (isHTTPServiceError(lastError.name)) { + delay = getBuildDelay(iterationNumber) + shouldContinue = true + localContinue = false + } else { + shouldContinue = false } } @@ -241,19 +258,24 @@ const build: ExternalMethods['build'] = async ( localContinue = false } - if (isBuildSuccess && wasPaused) { + if (wasPaused) { log.debug('Pausing replication as requested') await onExit() return } - if (isBuildSuccess && localContinue) { + if (shouldContinue && localContinue && delay === 0) { log.verbose('Continuing replication in the local build loop') } else { await onExit() - if (isBuildSuccess) { - log.debug('Calling next in build') - await next() + if (lastError) + log.error(`Exiting replication loop due to error ${lastError.message}`) + if (shouldContinue) { + if (delay > 0) { + await delayNext(delay, lastError) + } else { + await next() + } } return } diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts index 5db39a8a5..3e43e4a29 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts @@ -13,6 +13,13 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({ return checkResult } + const defaultValues = { + paused: false, + iterator: null, + successEvent: null, + locked: false, + } + try { const response = await fetch( `${targetApplicationUrl}${REPLICATION_STATE.endpoint}` @@ -25,10 +32,7 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({ name: response.statusText, message: (state as any).message ?? response.statusText, }, - paused: false, - iterator: null, - successEvent: null, - locked: false, + ...defaultValues, } } return state @@ -45,13 +49,18 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({ message: error.message as string, stack: error.stack ? (error.stack as string) : null, }, - paused: false, - iterator: null, - successEvent: null, - locked: false, + ...defaultValues, } } else { - throw error + return { + status: 'error', + statusData: { + name: error.name as string, + message: error.message as string, + stack: error.stack ? (error.stack as string) : null, + }, + ...defaultValues, + } } } } diff --git a/packages/runtime/runtimes/runtime-aws-serverless/src/event-subscriber-notifier-factory.ts b/packages/runtime/runtimes/runtime-aws-serverless/src/event-subscriber-notifier-factory.ts index 7b62c7385..1bbdb4e9c 100644 --- a/packages/runtime/runtimes/runtime-aws-serverless/src/event-subscriber-notifier-factory.ts +++ b/packages/runtime/runtimes/runtime-aws-serverless/src/event-subscriber-notifier-factory.ts @@ -1,3 +1,4 @@ +import STS from 'aws-sdk/clients/sts' import type { EventSubscriberNotifier } from '@resolve-js/runtime-base' import { createEventSubscriberNotification, @@ -26,6 +27,8 @@ type NotifierRuntime = { invokeLambdaAsync: Function } +export const LAMBDA_TO_STEP_FUNCTION_COST_EXPENSE_THRESHOLD_MS = 3000 + export const waitForSubscriber = async (isSaga = false) => await new Promise((resolve) => setTimeout(resolve, isSaga ? 10000 : 1000)) @@ -136,16 +139,45 @@ export const eventSubscriberNotifierFactory = async ( ? `arn:aws:sqs:${region}:${accountId}:${userId}-${eventSubscriberScope}-${eventSubscriber}` : functionArn - const invokeBuildAsync = async (parameters: { eventSubscriber: string }) => - useSqs - ? await sendSqsMessage( - `${userId}-${eventSubscriberScope}-${parameters.eventSubscriber}`, - parameters - ) - : await invokeLambdaAsync(functionName, { - resolveSource: 'BuildEventSubscriber', - ...parameters, - }) + const invokeBuildAsync = async ( + parameters: { eventSubscriber: string }, + timeout?: number + ) => { + if (useSqs) { + return await sendSqsMessage( + `${userId}-${eventSubscriberScope}-${parameters.eventSubscriber}`, + parameters + ) + } + const lambdaEvent = { + resolveSource: 'BuildEventSubscriber', + ...parameters, + } + if ( + timeout == null || + timeout < LAMBDA_TO_STEP_FUNCTION_COST_EXPENSE_THRESHOLD_MS + ) { + await new Promise((resolve) => setTimeout(resolve, timeout)) + await invokeLambdaAsync(functionName, lambdaEvent) + } else { + const { Arn } = await new STS().getCallerIdentity().promise() + await invokeFunction({ + Region: process.env.AWS_REGION as string, + FunctionName: process.env.RESOLVE_SCHEDULER_LAMBDA_ARN as string, + Payload: { + functionName: process.env.AWS_LAMBDA_FUNCTION_NAME, + event: lambdaEvent, + date: new Date(Date.now() + timeout).toISOString(), + validationRoleArn: Arn, + principial: { + accessKeyId: process.env.AWS_ACCESS_KEY_ID, + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, + sessionToken: process.env.AWS_SESSION_TOKEN, + }, + }, + }) + } + } const ensureQueue = async (name?: string) => { if (!useSqs) { diff --git a/packages/runtime/runtimes/runtime-base/src/query/wrap-read-model.ts b/packages/runtime/runtimes/runtime-base/src/query/wrap-read-model.ts index 491a0a14d..23a139ca7 100644 --- a/packages/runtime/runtimes/runtime-base/src/query/wrap-read-model.ts +++ b/packages/runtime/runtimes/runtime-base/src/query/wrap-read-model.ts @@ -110,17 +110,33 @@ const serializeState = async ({ state }: { state: any }): Promise => { const next = async ( pool: ReadModelPool, eventSubscriber: string, + timeout?: number, + notificationExtraPayload?: object, ...args: any[] ) => { if (args.length > 0) { throw new TypeError('Next should be invoked with no arguments') } - await pool.invokeBuildAsync({ - eventSubscriber, - initiator: 'read-model-next', - notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`, - sendTime: Date.now(), - }) + if (timeout != null && (isNaN(+timeout) || +timeout < 0)) { + throw new TypeError('Timeout should be non-negative integer') + } + if ( + notificationExtraPayload != null && + notificationExtraPayload.constructor !== Object + ) { + throw new TypeError('Notification extra payload should be plain object') + } + + await pool.invokeBuildAsync( + { + eventSubscriber, + initiator: 'read-model-next', + notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`, + sendTime: Date.now(), + ...notificationExtraPayload, + }, + timeout != null ? Math.floor(+timeout) : timeout + ) } const updateCustomReadModel = async ( diff --git a/packages/runtime/runtimes/runtime-base/src/types.ts b/packages/runtime/runtimes/runtime-base/src/types.ts index 7c0bc240a..f1aeac79b 100644 --- a/packages/runtime/runtimes/runtime-base/src/types.ts +++ b/packages/runtime/runtimes/runtime-base/src/types.ts @@ -126,10 +126,12 @@ export type EventSubscriberNotification = { sendTime: number event?: Event cursor?: string + [key: string]: any } export type InvokeBuildAsync = ( - parameters: EventSubscriberNotification + parameters: EventSubscriberNotification, + timeout?: number ) => Promise export type { BuildTimeConstants } diff --git a/packages/runtime/runtimes/runtime-single-process/src/index.ts b/packages/runtime/runtimes/runtime-single-process/src/index.ts index 9f4134fb9..6871b953c 100644 --- a/packages/runtime/runtimes/runtime-single-process/src/index.ts +++ b/packages/runtime/runtimes/runtime-single-process/src/index.ts @@ -124,16 +124,25 @@ const entry = async ( getVacantTimeInMillis, eventSubscriberScope: constants.applicationName, notifyEventSubscriber, - invokeBuildAsync: backgroundJob( - async (parameters: EventSubscriberNotification) => { - const runtime = await createRuntime(factoryParameters) - try { - return await runtime.eventSubscriber.build(parameters) - } finally { - await runtime.dispose() - } + invokeBuildAsync: async ( + parameters: EventSubscriberNotification, + timeout?: number + ) => { + if (timeout != null && timeout > 0) { + await new Promise((resolve) => setTimeout(resolve, timeout)) } - ), + const job = backgroundJob( + async (parameters: EventSubscriberNotification) => { + const runtime = await createRuntime(factoryParameters) + try { + return await runtime.eventSubscriber.build(parameters) + } finally { + await runtime.dispose() + } + } + ) + return await job(parameters) + }, eventListeners: gatherEventListeners(domain, domainInterop), uploader: uploaderData?.uploader ?? null, sendReactiveEvent: websocketServerData.sendReactiveEvent,