diff --git a/config/default.json b/config/default.json index b29f25e69..45395885c 100644 --- a/config/default.json +++ b/config/default.json @@ -18,6 +18,7 @@ "CREATE_RETRY_INTERVAL_MILLIS": 200, "DEBUG": false }, + "MAX_FULFIL_TIMEOUT_DURATION_SECONDS": 300, "MIGRATIONS": { "DISABLED": false, "RUN_DATA_MIGRATIONS": true diff --git a/package-lock.json b/package-lock.json index 47aa43660..610422305 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,7 @@ "@mojaloop/central-services-health": "14.0.1", "@mojaloop/central-services-logger": "11.0.1", "@mojaloop/central-services-metrics": "12.0.5", - "@mojaloop/central-services-shared": "17.2.0", + "@mojaloop/central-services-shared": "17.3.0", "@mojaloop/central-services-stream": "11.0.0", "@mojaloop/event-sdk": "11.0.2", "@mojaloop/ml-number": "11.2.1", @@ -32,7 +32,7 @@ "catbox-memory": "4.0.1", "commander": "9.4.0", "cron": "2.1.0", - "decimal.js": "10.3.1", + "decimal.js": "10.4.0", "docdash": "1.2.0", "event-stream": "4.0.1", "five-bells-condition": "5.0.1", @@ -1743,9 +1743,9 @@ } }, "node_modules/@mojaloop/central-services-shared": { - "version": "17.2.0", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-17.2.0.tgz", - "integrity": "sha512-90bnSwiJEcrSnIIR4tSmmIv09hsN3wBBidDXaHib/5eXRBeVJibsm7NbwlA1c754UXg+zSUvGSReojcrqPWZEA==", + "version": "17.3.0", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-17.3.0.tgz", + "integrity": "sha512-pKXVFDbpWQCp2E4jCDFUfW7jkZYeueP2pTMElWim+yHRS8XqUFakf8tiFxh9hIPw/yOexW/PepkhZ2SZyt8fow==", "dependencies": { "@hapi/catbox": "12.0.0", "@hapi/catbox-memory": "5.0.1", @@ -4452,9 +4452,9 @@ } }, "node_modules/decimal.js": { - "version": "10.3.1", - "resolved": "https://registry.npmjs.org/decimal.js/-/decimal.js-10.3.1.tgz", - "integrity": "sha512-V0pfhfr8suzyPGOx3nmq4aHqabehUZn6Ch9kyFpV79TGDTWFmHqUqXdabR7QHqxzrYolF4+tVmJhUG4OURg5dQ==" + "version": "10.4.0", + "resolved": "https://registry.npmjs.org/decimal.js/-/decimal.js-10.4.0.tgz", + "integrity": "sha512-Nv6ENEzyPQ6AItkGwLE2PGKinZZ9g59vSh2BeH6NqPu0OTKZ5ruJsVqh/orbAnqXc9pBbgXAIrc2EyaCj8NpGg==" }, "node_modules/decompress-response": { "version": "6.0.0", @@ -4954,17 +4954,6 @@ "safe-buffer": "^5.0.1" } }, - "node_modules/ed25519": { - "version": "0.0.4", - "resolved": "https://registry.npmjs.org/ed25519/-/ed25519-0.0.4.tgz", - "integrity": "sha512-81yyGDHl4hhTD2YY779FRRMMAuKR3IQ2MmPFdwTvLnmZ+O02PgONzVgeyTWCjs/NCNAr35Ccg+hUd1y84Kdkbg==", - "hasInstallScript": true, - "optional": true, - "dependencies": { - "bindings": "^1.2.1", - "nan": "^2.0.9" - } - }, "node_modules/ee-first": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz", @@ -6589,19 +6578,6 @@ "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", "integrity": "sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==" }, - "node_modules/fsevents": { - "version": "2.3.2", - "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.2.tgz", - "integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==", - "hasInstallScript": true, - "optional": true, - "os": [ - "darwin" - ], - "engines": { - "node": "^8.16.0 || ^10.6.0 || >=11.0.0" - } - }, "node_modules/function-bind": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", @@ -17932,9 +17908,9 @@ } }, "@mojaloop/central-services-shared": { - "version": "17.2.0", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-17.2.0.tgz", - "integrity": "sha512-90bnSwiJEcrSnIIR4tSmmIv09hsN3wBBidDXaHib/5eXRBeVJibsm7NbwlA1c754UXg+zSUvGSReojcrqPWZEA==", + "version": "17.3.0", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-17.3.0.tgz", + "integrity": "sha512-pKXVFDbpWQCp2E4jCDFUfW7jkZYeueP2pTMElWim+yHRS8XqUFakf8tiFxh9hIPw/yOexW/PepkhZ2SZyt8fow==", "requires": { "@hapi/catbox": "12.0.0", "@hapi/catbox-memory": "5.0.1", @@ -20031,9 +20007,9 @@ } }, "decimal.js": { - "version": "10.3.1", - "resolved": "https://registry.npmjs.org/decimal.js/-/decimal.js-10.3.1.tgz", - "integrity": "sha512-V0pfhfr8suzyPGOx3nmq4aHqabehUZn6Ch9kyFpV79TGDTWFmHqUqXdabR7QHqxzrYolF4+tVmJhUG4OURg5dQ==" + "version": "10.4.0", + "resolved": "https://registry.npmjs.org/decimal.js/-/decimal.js-10.4.0.tgz", + "integrity": "sha512-Nv6ENEzyPQ6AItkGwLE2PGKinZZ9g59vSh2BeH6NqPu0OTKZ5ruJsVqh/orbAnqXc9pBbgXAIrc2EyaCj8NpGg==" }, "decompress-response": { "version": "6.0.0", @@ -20410,16 +20386,6 @@ "safe-buffer": "^5.0.1" } }, - "ed25519": { - "version": "0.0.4", - "resolved": "https://registry.npmjs.org/ed25519/-/ed25519-0.0.4.tgz", - "integrity": "sha512-81yyGDHl4hhTD2YY779FRRMMAuKR3IQ2MmPFdwTvLnmZ+O02PgONzVgeyTWCjs/NCNAr35Ccg+hUd1y84Kdkbg==", - "optional": true, - "requires": { - "bindings": "^1.2.1", - "nan": "^2.0.9" - } - }, "ee-first": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz", @@ -21650,12 +21616,6 @@ "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", "integrity": "sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==" }, - "fsevents": { - "version": "2.3.2", - "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.2.tgz", - "integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==", - "optional": true - }, "function-bind": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", diff --git a/package.json b/package.json index 953c051a0..d6263dc1d 100644 --- a/package.json +++ b/package.json @@ -85,7 +85,7 @@ "@mojaloop/central-services-health": "14.0.1", "@mojaloop/central-services-logger": "11.0.1", "@mojaloop/central-services-metrics": "12.0.5", - "@mojaloop/central-services-shared": "17.2.0", + "@mojaloop/central-services-shared": "17.3.0", "@mojaloop/central-services-stream": "11.0.0", "@mojaloop/event-sdk": "11.0.2", "@mojaloop/ml-number": "11.2.1", @@ -98,7 +98,7 @@ "catbox-memory": "4.0.1", "commander": "9.4.0", "cron": "2.1.0", - "decimal.js": "10.3.1", + "decimal.js": "10.4.0", "docdash": "1.2.0", "event-stream": "4.0.1", "five-bells-condition": "5.0.1", diff --git a/seeds/bulkProcessingState.js b/seeds/bulkProcessingState.js index 6fec19fa4..293e4f01d 100644 --- a/seeds/bulkProcessingState.js +++ b/seeds/bulkProcessingState.js @@ -27,44 +27,59 @@ const bulkProcessingStates = [ { + bulkProcessingStateId: 1, name: 'RECEIVED', description: 'The switch has received the individual transfer ids part of the bulk transfer' }, { + bulkProcessingStateId: 2, name: 'RECEIVED_DUPLICATE', description: 'The switch has matched individual transfer as duplicate' }, { + bulkProcessingStateId: 3, name: 'RECEIVED_INVALID', description: 'The switch has matched individual transfer as invalid within Prepare or Position Handler' }, { + bulkProcessingStateId: 4, name: 'ACCEPTED', description: 'The switch has reserved the funds for the transfers in the bulk' }, { + bulkProcessingStateId: 5, name: 'PROCESSING', description: 'Fulfilment request has been received for the individual transfer' }, { + bulkProcessingStateId: 6, name: 'FULFIL_DUPLICATE', description: 'The switch has matched individual transfer fulfil as duplicate' }, { + bulkProcessingStateId: 7, name: 'FULFIL_INVALID', description: 'The switch has matched individual transfer fulfilment as invalid within Fulfil or Position Handler' }, { + bulkProcessingStateId: 8, name: 'COMPLETED', description: 'The switch has marked the individual transfer as committed' }, { + bulkProcessingStateId: 9, name: 'REJECTED', description: 'The switch has marked the individual transfer as rejected' }, { + bulkProcessingStateId: 10, name: 'EXPIRED', description: 'The switch has marked the individual transfer as timed out' + }, + { + bulkProcessingStateId: 11, + name: 'ABORTING', + description: 'The switch has marked the individual transfer as aborting due to failed validation' } ] diff --git a/seeds/bulkTransferState.js b/seeds/bulkTransferState.js index a3aea3c73..5b5ebe369 100644 --- a/seeds/bulkTransferState.js +++ b/seeds/bulkTransferState.js @@ -80,6 +80,11 @@ const bulkTransferStates = [ bulkTransferStateId: 'INVALID', enumeration: 'REJECTED', description: 'Final state when the switch has completed processing of pending invalid bulk transfer' + }, + { + bulkTransferStateId: 'ABORTING', + enumeration: 'PROCESSING', + description: 'The switch is attempting to abort all individual transfers' } ] diff --git a/src/domain/bulkTransfer/index.js b/src/domain/bulkTransfer/index.js index d1a17c641..b46c76f65 100644 --- a/src/domain/bulkTransfer/index.js +++ b/src/domain/bulkTransfer/index.js @@ -126,7 +126,8 @@ const getBulkTransferById = async (id) => { expiration: bulkTransfer.expirationDate, completedDate: bulkTransfer.completedTimestamp, payerBulkTransfer, - payeeBulkTransfer + payeeBulkTransfer, + bulkTransferStateEnumeration: bulkTransfer.bulkTransferStateEnumeration } } catch (err) { Logger.isErrorEnabled && Logger.error(err) @@ -162,9 +163,19 @@ const getBulkTransferExtensionListById = async (id, completedTimestamp) => { } } +const bulkFulfilTransitionToAborting = async (bulkFulfilPayload, stateReason = null) => { + try { + BulkTransferFacade.saveBulkTransferAborting(bulkFulfilPayload, stateReason) + } catch (err) { + Logger.isErrorEnabled && Logger.error(err) + throw err + } +} + const BulkTransferService = { getBulkTransferById, getBulkTransferExtensionListById, + bulkFulfilTransitionToAborting, getBulkTransferByTransferId: BulkTransferModel.getByTransferId, getParticipantsById: BulkTransferModel.getParticipantsById, bulkPrepare: BulkTransferFacade.saveBulkTransferReceived, diff --git a/src/handlers/bulk/fulfil/handler.js b/src/handlers/bulk/fulfil/handler.js index 7d50bbbca..a6006b51a 100644 --- a/src/handlers/bulk/fulfil/handler.js +++ b/src/handlers/bulk/fulfil/handler.js @@ -126,11 +126,16 @@ const bulkFulfil = async (error, messages) => { } else { state = await BulkTransferService.bulkFulfil(payload) } - } catch (err) { // TODO: handle insert errors + } catch (err) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInternal1--${actionLetter}5`)) - Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, 'notImplemented')) Logger.isErrorEnabled && Logger.error(err) - return true + + const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR) + const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } + params.message.value.content.uriParams = { id: bulkTransferId } + + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + throw fspiopError } try { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'individualTransferFulfils')) @@ -139,14 +144,36 @@ const bulkFulfil = async (error, messages) => { const bulkTransfers = await BulkTransferService.getBulkTransferById(payload.bulkTransferId) for (const individualTransferFulfil of bulkTransfers.payeeBulkTransfer.individualTransferResults) { individualTransferFulfil.errorInformation = payload.errorInformation - await sendIndividualTransfer(message, messageId, kafkaTopic, headers, payload, state, params, individualTransferFulfil, histTimerEnd) + await sendIndividualTransfer( + message, + messageId, + kafkaTopic, + headers, + payload, + state, + params, + individualTransferFulfil, + histTimerEnd, + Enum.Transfers.BulkProcessingState.PROCESSING + ) } } else { const IndividualTransferFulfilModel = BulkTransferModels.getIndividualTransferFulfilModel() // enable async/await operations for the stream for await (const doc of IndividualTransferFulfilModel.find({ messageId }).cursor()) { - await sendIndividualTransfer(message, messageId, kafkaTopic, headers, payload, state, params, doc.payload, histTimerEnd) + await sendIndividualTransfer( + message, + messageId, + kafkaTopic, + headers, + payload, + state, + params, + doc.payload, + histTimerEnd, + Enum.Transfers.BulkProcessingState.PROCESSING + ) } } } catch (err) { // TODO: handle individual transfers streaming error @@ -155,9 +182,29 @@ const bulkFulfil = async (error, messages) => { Logger.isErrorEnabled && Logger.error(err) return true } - } else { // TODO: handle validation failure + } else { Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, { path: 'validationFailed' })) - Logger.isErrorEnabled && Logger.error(`validationFailure Reasons - ${JSON.stringify(reasons)}`) + + const validationFspiopError = reasons.shift() + if (reasons.length > 0) { + validationFspiopError.extensions = [] + // If there are multiple validation errors attach them as extensions + // to the first error + reasons.forEach((reason, i) => { + validationFspiopError.extensions.push({ + key: `additionalErrors${i}`, + value: reason.message + }) + }) + } + // Converting FSPIOPErrors to strings is verbose, so we reduce the errors + // to just their message. + const reasonsMessages = reasons.map(function (reason) { + return reason.message + }) + + Logger.isErrorEnabled && Logger.error(`validationFailure Reasons - ${JSON.stringify(reasonsMessages)}`) + try { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'saveInvalidRequest')) /** @@ -166,16 +213,43 @@ const bulkFulfil = async (error, messages) => { * reason is "FSPIOP-Source header should match Payee". In this case we should not * abort the bulk as we would have accepted non-legitimate source. */ - await BulkTransferService.bulkFulfil(payload, reasons.toString(), false) - } catch (err) { // TODO: handle insert error + const state = await BulkTransferService.bulkFulfilTransitionToAborting(payload) + const bulkTransfers = await BulkTransferService.getBulkTransferById(payload.bulkTransferId) + for (const individualTransferFulfil of bulkTransfers.payeeBulkTransfer.individualTransferResults) { + individualTransferFulfil.errorInformation = validationFspiopError.toApiErrorObject().errorInformation + // Abort-Reject all individual transfers + // The bulk processing handler will handle informing the payer + await sendIndividualTransfer( + message, + messageId, + kafkaTopic, + headers, + payload, + state, + params, + individualTransferFulfil, + histTimerEnd, + Enum.Transfers.BulkProcessingState.ABORTING + ) + } + } catch (err) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}7`)) - Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, 'notImplemented')) Logger.isErrorEnabled && Logger.error(err) - return true + + const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR) + const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } + params.message.value.content.uriParams = { id: bulkTransferId } + + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + throw fspiopError } Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorGeneric--${actionLetter}8`)) - Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, 'notImplemented')) - return true // TODO: store invalid bulk transfer to database and produce callback notification to payer + + const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } + params.message.value.content.uriParams = { id: bulkTransferId } + + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + throw validationFspiopError } } catch (err) { Logger.isErrorEnabled && Logger.error(`${Util.breadcrumb(location)}::${err.message}--BP0`) @@ -191,13 +265,13 @@ const bulkFulfil = async (error, messages) => { * @async * @description sends individual transfers to the fulfil handler */ -const sendIndividualTransfer = async (message, messageId, kafkaTopic, headers, payload, state, params, individualTransferFulfil, histTimerEnd) => { +const sendIndividualTransfer = async (message, messageId, kafkaTopic, headers, payload, state, params, individualTransferFulfil, histTimerEnd, bulkProcessingStateId) => { const transferId = individualTransferFulfil.transferId delete individualTransferFulfil.transferId const bulkTransferAssociationRecord = { transferId, bulkTransferId: payload.bulkTransferId, - bulkProcessingStateId: Enum.Transfers.BulkProcessingState.PROCESSING, + bulkProcessingStateId, errorCode: payload.errorInformation ? payload.errorInformation.errorCode : undefined, errorDescription: payload.errorInformation ? payload.errorInformation.errorDescription : undefined } diff --git a/src/handlers/bulk/get/handler.js b/src/handlers/bulk/get/handler.js index efae773a8..cac64d2d9 100644 --- a/src/handlers/bulk/get/handler.js +++ b/src/handlers/bulk/get/handler.js @@ -115,15 +115,11 @@ const getBulkTransfer = async (error, messages) => { const bulkTransferResult = await BulkTransferService.getBulkTransferById(bulkTransferId) const bulkTransfer = isPayeeRequest ? bulkTransferResult.payeeBulkTransfer : bulkTransferResult.payerBulkTransfer let payload = { - bulkTransferState: bulkTransfer.bulkTransferState + bulkTransferState: bulkTransferResult.bulkTransferStateEnumeration } let fspiopError - if (bulkTransfer.bulkTransferState === Enum.Transfers.BulkTransferState.REJECTED) { - payload = { - errorInformation: bulkTransfer.individualTransferResults[0].errorInformation - } - fspiopError = ErrorHandler.Factory.createFSPIOPErrorFromErrorInformation(payload.errorInformation) - } else if (bulkTransfer.bulkTransferState !== Enum.Transfers.BulkTransferState.PROCESSING) { + + if (bulkTransfer.bulkTransferState !== Enum.Transfers.BulkTransferState.PROCESSING) { payload = { ...payload, completedTimestamp: bulkTransfer.completedTimestamp, diff --git a/src/handlers/bulk/prepare/handler.js b/src/handlers/bulk/prepare/handler.js index 080ddfa3c..0ebfa2f07 100644 --- a/src/handlers/bulk/prepare/handler.js +++ b/src/handlers/bulk/prepare/handler.js @@ -226,15 +226,28 @@ const bulkPrepare = async (error, messages) => { } } else { // handle validation failure Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, { path: 'validationFailed' })) - Logger.isErrorEnabled && Logger.error(`validationFailure Reasons - ${JSON.stringify(reasons)}`) + const validationFspiopError = reasons.shift() + if (reasons.length > 0) { + validationFspiopError.extensions = [] + // If there are multiple validation errors attach them as extensions + // to the first error + reasons.forEach((reason, i) => { + validationFspiopError.extensions.push({ + key: `additionalErrors${i}`, + value: reason.message + }) + }) + } + // Converting FSPIOPErrors to strings is verbose, so we reduce the errors + // to just their message. + // `bulkTransferStateChange.reason` also has a 512 character limit. + const reasonsMessages = reasons.map(function (reason) { + return reason.message + }) + Logger.isErrorEnabled && Logger.error(`validationFailure Reasons - ${JSON.stringify(reasonsMessages)}`) try { // save invalid request for auditing Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'saveInvalidRequest')) - // `bulkTransferStateChange.reason` has a 512 character limit, so we - // reduce the errors to just their message - const reasonsMessages = reasons.map(function (reason) { - return reason.message - }) await BulkTransferService.bulkPrepare(payload, { payerParticipantId, payeeParticipantId }, reasonsMessages.toString(), false) } catch (err) { // handle insert error and produce error callback notification to payer Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`)) @@ -250,24 +263,11 @@ const bulkPrepare = async (error, messages) => { // produce validation error callback notification to payer Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorGeneric--${actionLetter}7`)) - const fspiopError = reasons.shift() - if (reasons.length > 0) { - fspiopError.extensions = [] - // If there are multiple validation errors attach them as extensions - // to the first error - reasons.forEach((reason, i) => { - fspiopError.extensions.push({ - key: `additionalErrors${i}`, - value: reason.message - }) - }) - } - const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) - throw fspiopError + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + throw validationFspiopError } } catch (err) { Logger.isErrorEnabled && Logger.error(`${Util.breadcrumb(location)}::${err.message}--BP0`) diff --git a/src/handlers/bulk/processing/handler.js b/src/handlers/bulk/processing/handler.js index 495db7106..15ac00392 100644 --- a/src/handlers/bulk/processing/handler.js +++ b/src/handlers/bulk/processing/handler.js @@ -150,7 +150,7 @@ const bulkProcessing = async (error, messages) => { errorCode = payload.errorInformation && payload.errorInformation.errorCode errorDescription = payload.errorInformation && payload.errorInformation.errorDescription } else { - const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action for bulk in ${Enum.Transfers.BulkTransferState.RECEIVED} state`) + const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action ${action} for bulk in ${Enum.Transfers.BulkTransferState.RECEIVED} state`) throw fspiopError } } else if ([Enum.Transfers.BulkTransferState.ACCEPTED].includes(bulkTransferInfo.bulkTransferStateId)) { @@ -162,7 +162,7 @@ const bulkProcessing = async (error, messages) => { errorCode = payload.errorInformation && payload.errorInformation.errorCode errorDescription = payload.errorInformation && payload.errorInformation.errorDescription } else { - const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action for bulk in ${Enum.Transfers.BulkTransferState.ACCEPTED} state`) + const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action ${action} for bulk in ${Enum.Transfers.BulkTransferState.ACCEPTED} state`) throw fspiopError } } else if ([Enum.Transfers.BulkTransferState.PROCESSING, Enum.Transfers.BulkTransferState.PENDING_FULFIL, Enum.Transfers.BulkTransferState.EXPIRING].includes(bulkTransferInfo.bulkTransferStateId)) { @@ -190,7 +190,19 @@ const bulkProcessing = async (error, messages) => { errorCode = payload.errorInformation && payload.errorInformation.errorCode errorDescription = payload.errorInformation && payload.errorInformation.errorDescription } else { - const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action for bulk in ${Enum.Transfers.BulkTransferState.PROCESSING} state`) + const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action ${action} for bulk in ${Enum.Transfers.BulkTransferState.PROCESSING} state`) + throw fspiopError + } + } else if ([Enum.Transfers.BulkTransferState.ABORTING].includes(bulkTransferInfo.bulkTransferStateId)) { + if (action === Enum.Events.Event.Action.BULK_ABORT) { + criteriaState = Enum.Transfers.BulkTransferState.ABORTING + processingStateId = Enum.Transfers.BulkProcessingState.FULFIL_INVALID + completedBulkState = Enum.Transfers.BulkTransferState.REJECTED + incompleteBulkState = Enum.Transfers.BulkTransferState.ABORTING + errorCode = payload.errorInformation && payload.errorInformation.errorCode + errorDescription = payload.errorInformation && payload.errorInformation.errorDescription + } else { + const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Invalid action ${action} for bulk in ${Enum.Transfers.BulkTransferState.ABORTING} state`) throw fspiopError } } else if (bulkTransferInfo.bulkTransferStateId === Enum.Transfers.BulkTransferState.COMPLETED && action === Enum.Events.Event.Action.FULFIL_DUPLICATE) { diff --git a/src/handlers/bulk/shared/validator.js b/src/handlers/bulk/shared/validator.js index 7b622ff20..3ef2060f6 100644 --- a/src/handlers/bulk/shared/validator.js +++ b/src/handlers/bulk/shared/validator.js @@ -36,6 +36,7 @@ const Participant = require('../../../domain/participant') const BulkTransferService = require('../../../domain/bulkTransfer') +const Config = require('../../../lib/config') const Enum = require('@mojaloop/central-services-shared').Enum const ErrorHandler = require('@mojaloop/central-services-error-handling') @@ -203,9 +204,34 @@ const validateBulkTransferFulfilment = async (payload, headers) => { return { isValid, reasons } } isValid = isValid && await validateFspiopSourceAndDestination(payload, headers) + isValid = isValid && validateCompletedTimestamp(payload) return { isValid, reasons } } +const validateCompletedTimestamp = (payload) => { + const maxLag = Config.MAX_FULFIL_TIMEOUT_DURATION_SECONDS * 1000 + const completedTimestamp = new Date(payload.completedTimestamp) + const now = new Date() + if (completedTimestamp > now) { + reasons.push( + ErrorHandler.Factory.createFSPIOPError( + ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, + 'Bulk fulfil failed validation - completedTimestamp fails because future timestamp was provided' + ) + ) + return false + } else if (completedTimestamp < now - maxLag) { + reasons.push( + ErrorHandler.Factory.createFSPIOPError( + ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, + 'Bulk fulfil failed validation - completedTimestamp fails because provided timestamp exceeded the maximum timeout duration' + ) + ) + return false + } + return true +} + const validateParticipantBulkTransferId = async function (participantName, bulkTransferId) { const bulkTransferParticipant = await BulkTransferService.getBulkTransferParticipant(participantName, bulkTransferId) let validationPassed = false diff --git a/src/lib/config.js b/src/lib/config.js index 1024c68dc..90cbc6881 100644 --- a/src/lib/config.js +++ b/src/lib/config.js @@ -3,6 +3,7 @@ const RC = require('rc')('CLEDG', require('../../config/default.json')) module.exports = { HOSTNAME: RC.HOSTNAME.replace(/\/$/, ''), PORT: RC.PORT, + MAX_FULFIL_TIMEOUT_DURATION_SECONDS: RC.MAX_FULFIL_TIMEOUT_DURATION_SECONDS || 300, MONGODB_URI: RC.MONGODB.URI, MONGODB_DEBUG: (RC.MONGODB.DEBUG === true || RC.MONGODB.DEBUG === 'true'), MONGODB_DISABLED: RC.MONGODB.DISABLED, diff --git a/src/models/bulkTransfer/bulkTransfer.js b/src/models/bulkTransfer/bulkTransfer.js index ac30a6463..7254d4077 100644 --- a/src/models/bulkTransfer/bulkTransfer.js +++ b/src/models/bulkTransfer/bulkTransfer.js @@ -34,11 +34,19 @@ const getById = async (id) => { .innerJoin('participant AS payee', 'payee.participantId', 'bulkTransfer.payeeParticipantId') .innerJoin('bulkTransferStateChange AS btsc', 'btsc.bulkTransferId', 'bulkTransfer.bulkTransferId') .leftJoin('bulkTransferFulfilment AS btf', 'btf.bulkTransferId', 'bulkTransfer.bulkTransferId') + .leftJoin('bulkTransferState AS bts', 'bts.bulkTransferStateId', 'btsc.bulkTransferStateId') .where({ 'bulkTransfer.bulkTransferId': id }) .orderBy('btsc.bulkTransferStateChangeId', 'desc') - .select('bulkTransfer.bulkTransferId', 'btsc.bulkTransferStateId', 'btf.completedDate AS completedTimestamp', - 'payer.name AS payerFsp', 'payee.name AS payeeFsp', 'bulkTransfer.bulkQuoteId', 'bulkTransfer.expirationDate') - .first() + .select( + 'bulkTransfer.bulkTransferId', + 'btsc.bulkTransferStateId', + 'btf.completedDate AS completedTimestamp', + 'bts.enumeration AS bulkTransferStateEnumeration', + 'payer.name AS payerFsp', + 'payee.name AS payeeFsp', + 'bulkTransfer.bulkQuoteId', + 'bulkTransfer.expirationDate' + ).first() return result }) } catch (err) { @@ -55,13 +63,19 @@ const getByTransferId = async (id) => { .innerJoin('participant AS payer', 'payer.participantId', 'bulkTransfer.payerParticipantId') .innerJoin('participant AS payee', 'payee.participantId', 'bulkTransfer.payeeParticipantId') .innerJoin('bulkTransferStateChange AS btsc', 'btsc.bulkTransferId', 'bulkTransfer.bulkTransferId') + .leftJoin('bulkTransferState AS bts', 'bts.bulkTransferStateId', 'btsc.bulkTransferStateId') .leftJoin('bulkTransferFulfilment AS btf', 'btf.bulkTransferId', 'bulkTransfer.bulkTransferId') .where({ 'bta.transferId': id }) .orderBy('btsc.bulkTransferStateChangeId', 'desc') - .select('bulkTransfer.bulkTransferId', 'btsc.bulkTransferStateId', 'btf.completedDate AS completedTimestamp', - 'payer.name AS payerFsp', 'payee.name AS payeeFsp', 'bulkTransfer.bulkQuoteId', - 'bulkTransfer.expirationDate AS expiration') - .first() + .select( + 'bulkTransfer.bulkTransferId', + 'btsc.bulkTransferStateId', + 'btf.completedDate AS completedTimestamp', + 'bts.enumeration AS bulkTransferStateEnumeration', + 'payer.name AS payerFsp', + 'payee.name AS payeeFsp', + 'bulkTransfer.bulkQuoteId' + ).first() return result }) } catch (err) { diff --git a/src/models/bulkTransfer/facade.js b/src/models/bulkTransfer/facade.js index fee2ea8aa..1dc71c90f 100644 --- a/src/models/bulkTransfer/facade.js +++ b/src/models/bulkTransfer/facade.js @@ -172,10 +172,54 @@ const saveBulkTransferErrorProcessing = async (payload, stateReason = null, isVa } } +const saveBulkTransferAborting = async (payload, stateReason = null) => { + try { + const bulkTransferFulfilmentRecord = { + bulkTransferId: payload.bulkTransferId, + completedDate: Time.getUTCString(new Date(payload.completedTimestamp)) + } + + const state = Enum.Transfers.BulkTransferState.ABORTING + const bulkTransferStateChangeRecord = { + bulkTransferId: payload.bulkTransferId, + bulkTransferStateId: state, + reason: stateReason + } + + const knex = await Db.getKnex() + return await knex.transaction(async (trx) => { + try { + await knex('bulkTransferFulfilment').transacting(trx).insert(bulkTransferFulfilmentRecord) + if (payload.extensionList && payload.extensionList.extension) { + const bulkTransferExtensionsRecordList = payload.extensionList.extension.map(ext => { + return { + bulkTransferId: payload.bulkTransferId, + isFulfilment: true, + key: ext.key, + value: ext.value + } + }) + await knex.batchInsert('bulkTransferExtension', bulkTransferExtensionsRecordList).transacting(trx) + } + await knex('bulkTransferStateChange').transacting(trx).insert(bulkTransferStateChangeRecord) + await trx.commit + return state + } catch (err) { + await trx.rollback + throw err + } + }) + } catch (err) { + Logger.isErrorEnabled && Logger.error(err) + throw err + } +} + const TransferFacade = { saveBulkTransferReceived, saveBulkTransferProcessing, - saveBulkTransferErrorProcessing + saveBulkTransferErrorProcessing, + saveBulkTransferAborting } module.exports = TransferFacade diff --git a/test/unit/handlers/bulk/shared/validator.test.js b/test/unit/handlers/bulk/shared/validator.test.js new file mode 100644 index 000000000..c373cd093 --- /dev/null +++ b/test/unit/handlers/bulk/shared/validator.test.js @@ -0,0 +1,145 @@ +'use strict' + +const Test = require('tapes')(require('tape')) +const Sinon = require('sinon') +const Uuid = require('uuid4') +const BulkTransfer = require('#src/domain/bulkTransfer/index') +const Validator = require('#src/handlers/bulk/shared/validator') +const Config = require('#src/lib/config') + +let payload +let headers + +Test('bulkTransfer validator', validatorTest => { + let sandbox + + validatorTest.beforeEach(test => { + payload = { + bulkTransferState: 'COMPLETED', + completedTimestamp: new Date(), + individualTransferResults: [{ + transferId: Uuid(), + fulfilment: 'adlcfFFpGkn3dDRPtR5zhCu8FrbgvrQwwmzuH0iQ0AI' + }] + } + headers = { + 'fspiop-source': 'dfsp2', + 'fspiop-destination': 'dfsp1' + } + sandbox = Sinon.createSandbox() + sandbox.stub(BulkTransfer, 'getParticipantsById') + test.end() + }) + + validatorTest.afterEach(test => { + sandbox.restore() + test.end() + }) + + validatorTest.test('validateBulkTransferFulfilment should', validateBulkTransferFulfilmentTest => { + validateBulkTransferFulfilmentTest.test('pass validation for valid payload', async (test) => { + BulkTransfer.getParticipantsById.returns(Promise.resolve({ + payerFsp: 'dfsp1', + payeeFsp: 'dfsp2' + })) + + const { + isValid + } = await Validator.validateBulkTransferFulfilment(payload, headers) + test.equal(isValid, true) + test.end() + }) + + validateBulkTransferFulfilmentTest.test('fail for no payload', async (test) => { + BulkTransfer.getParticipantsById.returns(Promise.resolve({ + payerFsp: 'dfsp1', + payeeFsp: 'dfsp2' + })) + + const { + isValid, reasons + } = await Validator.validateBulkTransferFulfilment(null, headers) + test.equal(reasons[0].apiErrorCode.code, '3100') + test.equal(reasons[0].apiErrorCode.message, 'Generic validation error') + test.equal(isValid, false) + test.end() + }) + + validateBulkTransferFulfilmentTest.test('fail on invalid payer', async (test) => { + BulkTransfer.getParticipantsById.returns(Promise.resolve({ + payerFsp: 'dfsp1', + payeeFsp: 'dfsp2' + })) + headers = { + 'fspiop-source': 'dfsp2', + 'fspiop-destination': 'invalidPayer' + } + const { + isValid, reasons + } = await Validator.validateBulkTransferFulfilment(payload, headers) + test.equal(reasons[0].apiErrorCode.code, '3100') + test.equal(reasons[0].apiErrorCode.message, 'Generic validation error') + test.equal(reasons[0].message, 'FSPIOP-Destination header should match Payer FSP') + test.equal(isValid, false) + test.end() + }) + + validateBulkTransferFulfilmentTest.test('fail on invalid payee', async (test) => { + BulkTransfer.getParticipantsById.returns(Promise.resolve({ + payerFsp: 'dfsp1', + payeeFsp: 'dfsp2' + })) + headers = { + 'fspiop-source': 'invalidPayee', + 'fspiop-destination': 'dfsp1' + } + const { + isValid, reasons + } = await Validator.validateBulkTransferFulfilment(payload, headers) + test.equal(reasons[0].apiErrorCode.code, '3100') + test.equal(reasons[0].apiErrorCode.message, 'Generic validation error') + test.equal(reasons[0].message, 'FSPIOP-Source header should match Payee FSP') + test.equal(isValid, false) + test.end() + }) + + validateBulkTransferFulfilmentTest.test('fail on invalid completedTimestamp', async (test) => { + BulkTransfer.getParticipantsById.returns(Promise.resolve({ + payerFsp: 'dfsp1', + payeeFsp: 'dfsp2' + })) + + payload.completedTimestamp = new Date((new Date()).getTime() + (10 * 86400000)) + + const { + isValid, reasons + } = await Validator.validateBulkTransferFulfilment(payload, headers) + test.equal(reasons[0].apiErrorCode.code, '3100') + test.equal(reasons[0].apiErrorCode.message, 'Generic validation error') + test.equal(reasons[0].message, 'Bulk fulfil failed validation - completedTimestamp fails because future timestamp was provided') + test.equal(isValid, false) + test.end() + }) + + validateBulkTransferFulfilmentTest.test('fail on completedTimestamp exceeds timeout duration', async (test) => { + BulkTransfer.getParticipantsById.returns(Promise.resolve({ + payerFsp: 'dfsp1', + payeeFsp: 'dfsp2' + })) + + payload.completedTimestamp = new Date((new Date()).getTime() - ((Config.MAX_FULFIL_TIMEOUT_DURATION_SECONDS + 1) * 1000)) + + const { + isValid, reasons + } = await Validator.validateBulkTransferFulfilment(payload, headers) + test.equal(reasons[0].apiErrorCode.code, '3100') + test.equal(reasons[0].apiErrorCode.message, 'Generic validation error') + test.equal(reasons[0].message, 'Bulk fulfil failed validation - completedTimestamp fails because provided timestamp exceeded the maximum timeout duration') + test.equal(isValid, false) + test.end() + }) + + validateBulkTransferFulfilmentTest.end() + }) + validatorTest.end() +}) diff --git a/test/unit/lib/config.test.js b/test/unit/lib/config.test.js index 23c766c22..5fd3c685f 100644 --- a/test/unit/lib/config.test.js +++ b/test/unit/lib/config.test.js @@ -30,5 +30,17 @@ Test('Config should', configTest => { test.end() }) + configTest.test('MAX_FULFIL_TIMEOUT_DURATION_SECONDS has default value if config file value is falsy', async function (test) { + console.log(Defaults) + const DefaultsStub = { ...Defaults } + DefaultsStub.MAX_FULFIL_TIMEOUT_DURATION_SECONDS = null + const Config = Proxyquire('../../../src/lib/config', { + '../../config/default.json': DefaultsStub + }) + + test.ok(Config.MAX_FULFIL_TIMEOUT_DURATION_SECONDS === 300) + test.end() + }) + configTest.end() })