diff --git a/.circleci/config.yml b/.circleci/config.yml index 7c8035820..2c36dc84f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -25,6 +25,7 @@ defaults_Dependencies: &defaults_Dependencies | apk --no-cache add openssh-client apk add --no-cache -t build-dependencies make gcc g++ python3 libtool autoconf automake jq apk add --no-cache -t openssl ncurses coreutils libgcc linux-headers grep util-linux binutils findutils + apk add --no-cache curl npm config set unsafe-perm true npm install -g node-gyp @@ -221,12 +222,56 @@ jobs: ## wait for services to be up and running npm run wait-4-docker - run: - name: Run the integration tests + name: Install dependencies to check health services + command: | + sudo apt install curl + - run: + name: Rebuild any dependencies command: | npm rebuild - npm run test:int + - run: + name: Run migration scripts + command: | + npm run migrate + - run: + name: Run the integration tests + command: | + ## Run Service in background + ## This is a temporary work-around until the following issue can be addressed: https://github.com/mojaloop/project/issues/3112 + echo "Starting Service in the background" + npm start > ./test/results/cl-service.log & + + ## Store PID for cleanup + echo $! > /tmp/int-test-service.pid + PID=$(cat /tmp/int-test-service.pid) + echo "Service started with Process ID=$PID" + + ## Check Service Health + echo "Waiting for Service to be healthy" + bash .circleci/curl-retry-cl-health.sh + + ## Lets wait a few seconds to ensure that Kafka handlers are rebalanced + echo "Waiting ${WAIT_FOR_REBALANCE}s for Kafka Re-balancing..." && sleep $WAIT_FOR_REBALANCE + + ## Start integration tests + echo "Running Integration Tests" + npm run test:int:skipMigrate + + ## Kill service + echo "Stopping Service with Process ID=$PID" + kill $(cat /tmp/int-test-service.pid) environment: ENDPOINT_URL: http://localhost:4545/notification + UV_THREADPOOL_SIZE: 12 + WAIT_FOR_REBALANCE: 20 + TEST_INT_RETRY_COUNT: 20 + TEST_INT_RETRY_DELAY: 2 + TEST_INT_REBALANCE_DELAY: 20000 + - run: + name: Cleanup Docker + command: | + ## Shutdown Docker containers + docker compose down -v - store_artifacts: path: ./test/results prefix: test diff --git a/.circleci/curl-retry-cl-health.sh b/.circleci/curl-retry-cl-health.sh new file mode 100644 index 000000000..23c75d5c5 --- /dev/null +++ b/.circleci/curl-retry-cl-health.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +## Script is to wait-retry with a number of retries to determine if the Central Ledger health end-point is healthy or not + +# Central Ledger Health end-point for integration tests +url="http://localhost:3001/health" + +# Number of retries +retries=10 + +# Sleep between retries +sleepwait=1 + +# Counter for retries +count=0 + +while [ $count -lt $retries ] +do + response=$(curl -s -o /dev/null -w "%{http_code}" $url) + if [ $response -eq 200 ]; then + echo "Successful response: $response" + break + else + echo "Response: $response. Retrying..." + ((count++)) + sleep $sleepwait + fi +done + +if [ $count -eq $retries ]; then + echo "Failed after $retries attempts." + exit 1 +fi diff --git a/.ncurc.yaml b/.ncurc.yaml index d4f43caf2..6f4aa8bc8 100644 --- a/.ncurc.yaml +++ b/.ncurc.yaml @@ -5,11 +5,5 @@ reject: [ # TODO: New versions from 2.2.0 onwards introduce a newer incompatible version of the ILP-Packet that is not compatible with the Mojaloop Specification "ilp-packet", # TODO: v6+ (ref: https://github.com/sindresorhus/get-port/releases/tag/v6.0.0) is an ESM library and thus not compatible with CommonJS. Future story needed to resolve. - "get-port", - # There seems to be an issue with seeding or migration with integration tests - # that lead to some tests failing down the line. - # Specifically test case "update transfer state to COMMITTED by FULFIL request" - # "savePayeeTransferResponse::failure Cannot read properties of undefined (reading 'settlementWindowId')" - # TODO: More investigation - "knex" + "get-port" ] diff --git a/README.md b/README.md index ed7796d74..b4a3a3169 100644 --- a/README.md +++ b/README.md @@ -99,12 +99,27 @@ If you want to run integration tests in a repetitive manner, you can startup the npm run wait-4-docker ``` + Start the Central-Ledger Service in the background, capturing the Process ID, so we can kill it when we are done. Alternatively you could also start the process in a separate terminal. This is a temporary work-around until the following issue can be addressed: https://github.com/mojaloop/project/issues/3112. + + ```bash + npm start > cl-service.log & + echo $! > /tmp/int-test-service.pid + ``` + + You can access the Central-Ledger Service log in another terminal with `tail -f cl-service.log`. + Run the Integration Tests ```bash npm run test:int ``` + Kill the background Central-Ledger Service + + ```bash + kill $(cat /tmp/int-test-service.pid) + ``` + - Running inside docker Start containers required for Integration Tests, including a `central-ledger` container which will be used as a proxy shell. diff --git a/audit-ci.jsonc b/audit-ci.jsonc index 4d1b77134..66f75c509 100644 --- a/audit-ci.jsonc +++ b/audit-ci.jsonc @@ -24,13 +24,12 @@ "GHSA-9c47-m6qq-7p4h", // https://github.com/advisories/GHSA-9c47-m6qq-7p4h // TODO: Upgrade jsonwebtoken in the central-services-shared lib --> https://github.com/mojaloop/project/issues/3097 "GHSA-qwph-4952-7xr6", // https://github.com/advisories/GHSA-qwph-4952-7xr6 + "GHSA-hjrf-2m68-5959", // https://github.com/advisories/GHSA-hjrf-2m68-5959 "GHSA-27h2-hvpr-p74q", // https://github.com/advisories/GHSA-27h2-hvpr-p74q - // Unable to upgrade Knex due to the following issue: - // # There seems to be an issue with seeding or migration with integration tests - // # that lead to some tests failing down the line. - // # Specifically test case "update transfer state to COMMITTED by FULFIL request" - // # "savePayeeTransferResponse::failure Cannot read properties of undefined (reading 'settlementWindowId')" - // TODO: More investigation --> https://github.com/mojaloop/project/issues/3096 - "GHSA-4jv9-3563-23j3" // https://github.com/advisories/GHSA-4jv9-3563-23j3 + // Knex dependency has been upgraded to v2.4x as advised by this advisory. Not sure why its still reporting it as an issue? + // TODO: Investigate as to why this is still being reported even though Knex was upgraded! :( + "GHSA-4jv9-3563-23j3", // https://github.com/advisories/GHSA-4jv9-3563-23j3 + // TODO: To be investigated + "GHSA-rc47-6667-2j5j", // https://github.com/advisories/GHSA-rc47-6667-2j5j ] } \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 85962f0c9..f1b7d7cc3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -183,6 +183,8 @@ services: kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-transfer-position --replication-factor 1 --partitions 1 kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-transfer-fulfil --replication-factor 1 --partitions 1 kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-notification-event --replication-factor 1 --partitions 1 + kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-transfer-get --replication-factor 1 --partitions 1 + kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-admin-transfer --replication-factor 1 --partitions 1 echo -e 'Successfully created the following topics:' kafka-topics.sh --bootstrap-server kafka:29092 --list " diff --git a/package-lock.json b/package-lock.json index 6867e1166..1e799d74c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "Apache-2.0", "dependencies": { "@hapi/good": "9.0.1", - "@hapi/hapi": "21.2.0", + "@hapi/hapi": "21.2.1", "@hapi/inert": "7.0.0", "@hapi/joi": "17.1.1", "@hapi/vision": "7.0.0", @@ -41,7 +41,7 @@ "hapi-auth-bearer-token": "8.0.0", "hapi-swagger": "15.0.0", "ilp-packet": "2.2.0", - "knex": "2.3.0", + "knex": "2.4.2", "lodash": "4.17.21", "moment": "2.29.4", "rc": "1.2.8", @@ -859,9 +859,9 @@ "integrity": "sha512-/c6rf4UJlmHlC9b5BaNvzAcFv7HZ2QHaV0D4/HNlBdvFnvQq8RI4kYdhyPCl7Xj+oWvTWQ8ujhqS53LIgAe6KQ==" }, "node_modules/@hapi/hapi": { - "version": "21.2.0", - "resolved": "https://registry.npmjs.org/@hapi/hapi/-/hapi-21.2.0.tgz", - "integrity": "sha512-lhidm5B2y+4cgmI9BL0xDNDJJDcHaCUUUJk8FOSuTf61JvK5HSq6zEqdAjTD+RVePpItCMLv8ZzRrdCan0Zoqw==", + "version": "21.2.1", + "resolved": "https://registry.npmjs.org/@hapi/hapi/-/hapi-21.2.1.tgz", + "integrity": "sha512-mDBhIi/zIYhWPaZF4Z8h7jUtWC3bz7xYuUyI5riXZV9DabFnNOKpQfOob6V5ZcDwEJfmnWHgJO37BVCn31BStQ==", "dependencies": { "@hapi/accept": "^6.0.0", "@hapi/ammo": "^6.0.0", @@ -9280,9 +9280,9 @@ } }, "node_modules/knex": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/knex/-/knex-2.3.0.tgz", - "integrity": "sha512-WMizPaq9wRMkfnwKXKXgBZeZFOSHGdtoSz5SaLAVNs3WRDfawt9O89T4XyH52PETxjV8/kRk0Yf+8WBEP/zbYw==", + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/knex/-/knex-2.4.2.tgz", + "integrity": "sha512-tMI1M7a+xwHhPxjbl/H9K1kHX+VncEYcvCx5K00M16bWvpYPKAZd6QrCu68PtHAdIZNQPWZn0GVhqVBEthGWCg==", "dependencies": { "colorette": "2.0.19", "commander": "^9.1.0", @@ -17591,9 +17591,9 @@ } }, "@hapi/hapi": { - "version": "21.2.0", - "resolved": "https://registry.npmjs.org/@hapi/hapi/-/hapi-21.2.0.tgz", - "integrity": "sha512-lhidm5B2y+4cgmI9BL0xDNDJJDcHaCUUUJk8FOSuTf61JvK5HSq6zEqdAjTD+RVePpItCMLv8ZzRrdCan0Zoqw==", + "version": "21.2.1", + "resolved": "https://registry.npmjs.org/@hapi/hapi/-/hapi-21.2.1.tgz", + "integrity": "sha512-mDBhIi/zIYhWPaZF4Z8h7jUtWC3bz7xYuUyI5riXZV9DabFnNOKpQfOob6V5ZcDwEJfmnWHgJO37BVCn31BStQ==", "requires": { "@hapi/accept": "^6.0.0", "@hapi/ammo": "^6.0.0", @@ -24191,9 +24191,9 @@ "dev": true }, "knex": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/knex/-/knex-2.3.0.tgz", - "integrity": "sha512-WMizPaq9wRMkfnwKXKXgBZeZFOSHGdtoSz5SaLAVNs3WRDfawt9O89T4XyH52PETxjV8/kRk0Yf+8WBEP/zbYw==", + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/knex/-/knex-2.4.2.tgz", + "integrity": "sha512-tMI1M7a+xwHhPxjbl/H9K1kHX+VncEYcvCx5K00M16bWvpYPKAZd6QrCu68PtHAdIZNQPWZn0GVhqVBEthGWCg==", "requires": { "colorette": "2.0.19", "commander": "^9.1.0", diff --git a/package.json b/package.json index fc4b57dc7..b352ceea5 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,8 @@ "test:xunit": "npx tape 'test/unit/**/**.test.js' | tap-xunit > ./test/results/xunit.xml", "test:coverage": "npx nyc --reporter=lcov --reporter=text-summary tapes -- 'test/unit/**/**.test.js'", "test:coverage-check": "npm run test:coverage && nyc check-coverage", - "test:int": "npm run migrate && TST_RETRY_COUNT=20 TST_RETRY_TIMEOUT=3 UV_THREADPOOL_SIZE=12 npx tape 'test/integration/**/*.test.js' | tap-spec", + "test:int": "npm run migrate && npx tape 'test/integration/**/*.test.js' | tap-spec", + "test:int:skipMigrate": "npx tape 'test/integration/**/*.test.js' | tap-spec", "migrate": "npm run migrate:latest && npm run seed:run", "migrate:latest": "npx knex $npm_package_config_knex migrate:latest", "migrate:create": "npx knex migrate:make $npm_package_config_knex", @@ -77,7 +78,7 @@ }, "dependencies": { "@hapi/good": "9.0.1", - "@hapi/hapi": "21.2.0", + "@hapi/hapi": "21.2.1", "@hapi/inert": "7.0.0", "@hapi/joi": "17.1.1", "@hapi/vision": "7.0.0", @@ -108,7 +109,7 @@ "hapi-auth-bearer-token": "8.0.0", "hapi-swagger": "15.0.0", "ilp-packet": "2.2.0", - "knex": "2.3.0", + "knex": "2.4.2", "lodash": "4.17.21", "moment": "2.29.4", "rc": "1.2.8", diff --git a/test/integration/handlers/transfers/handlers.test.js b/test/integration/handlers/transfers/handlers.test.js index afbd36f47..aa83e74e6 100644 --- a/test/integration/handlers/transfers/handlers.test.js +++ b/test/integration/handlers/transfers/handlers.test.js @@ -73,10 +73,10 @@ const TransferInternalState = Enum.Transfers.TransferInternalState const TransferEventType = Enum.Events.Event.Type const TransferEventAction = Enum.Events.Event.Action -const debug = false -const rebalanceDelay = 10000 -const retryDelay = 500 -const retryCount = 40 +const debug = process?.env?.TEST_INT_DEBUG || false +const rebalanceDelay = process?.env?.TEST_INT_REBALANCE_DELAY || 10000 +const retryDelay = process?.env?.TEST_INT_RETRY_DELAY || 2 +const retryCount = process?.env?.TEST_INT_RETRY_COUNT || 40 const retryOpts = { retries: retryCount, minTimeout: retryDelay, @@ -127,11 +127,14 @@ const testDataZAR = { const prepareTestData = async (dataObj) => { try { - // Lets make sure that all existing producers are connected - await KafkaHelper.producers.connect() - // Lets make sure that all existing Consumers are connected - await KafkaHelper.consumers.connect() + // TODO: START - Disabling these handlers to test running the CL as a separate service independently. + // The following issue https://github.com/mojaloop/project/issues/3112 was created to investigate as to why the Integration Tests are so unstable when then Event Handlers are executing in-line. For the time being the above PR clearly separates the process which resolves the stability issue for the time being. + // // Lets make sure that all existing producers are connected + // await KafkaHelper.producers.connect() + // // Lets make sure that all existing Consumers are connected + // await KafkaHelper.consumers.connect() // const topics = TestTopics.list + // TODO: END - Disabling these handlers to test running the CL as a separate service independently. // // lets make sure all our Producers are already connected if they have already been defined. // for (const topic of topics) { @@ -326,8 +329,8 @@ Test('Handlers test', async handlersTest => { await HubAccountsHelper.prepareData() const wrapWithRetriesConf = { - remainingRetries: process.env.TST_RETRY_COUNT || 10, // default 10 - timeout: process.env.TST_RETRY_TIMEOUT || 2 // default 2 + remainingRetries: retryOpts?.retries || 10, // default 10 + timeout: retryOpts?.maxTimeout || 2 // default 2 } // Start a testConsumer to monitor events that our handlers emit @@ -362,14 +365,18 @@ Test('Handlers test', async handlersTest => { await handlersTest.test('registerAllHandlers should', async registerAllHandlers => { await registerAllHandlers.test('setup handlers', async (test) => { - await Handlers.transfers.registerPrepareHandler() - await Handlers.positions.registerPositionHandler() - await Handlers.transfers.registerFulfilHandler() - await Handlers.timeouts.registerTimeoutHandler() + // TODO: START - Disabling these handlers to test running the CL as a separate service independently. + // The following issue https://github.com/mojaloop/project/issues/3112 was created to investigate as to why the Integration Tests are so unstable when then Event Handlers are executing in-line. For the time being the above PR clearly separates the process which resolves the stability issue for the time being. + // await Handlers.transfers.registerPrepareHandler() + // await Handlers.positions.registerPositionHandler() + // await Handlers.transfers.registerFulfilHandler() + // await Handlers.timeouts.registerTimeoutHandler() + // TODO: END - Disabling these handlers to test running the CL as a separate service independently. // Set up the testConsumer here await testConsumer.startListening() + // TODO: MIG - Disabling these handlers to test running the CL as a separate service independently. sleep(rebalanceDelay, debug, 'registerAllHandlers', 'awaiting registration of common handlers') test.pass('done') @@ -392,17 +399,24 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) - const transfer = await wrapWithRetries(async () => { - // lets fetch the transfer - const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - console.dir(transfer) - // lets check its status, and if its what we expect return the result - if (transfer.transferState === 'RESERVED') return transfer - // otherwise lets return nothing - return null - }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - - test.equal(transfer.transferState, 'RESERVED', 'Transfer is in reserved state') + + let transfer = {} + try { + transfer = await wrapWithRetries(async () => { + // lets fetch the transfer + const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) + console.dir(transfer) + // lets check its status, and if its what we expect return the result + if (transfer?.transferState === 'RESERVED') return transfer + // otherwise lets return nothing + return null + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + test.equal(transfer?.transferState, 'RESERVED', 'Transfer is in reserved state') // 2. send an ABORTED request from Payee td.messageProtocolFulfil.metadata.event.action = TransferEventAction.RESERVE @@ -432,7 +446,7 @@ Test('Handlers test', async handlersTest => { console.log(err) test.ok('No payee abort notification sent') } - console.log(JSON.stringify(testConsumer.getAllEvents())) + if (debug) console.log(JSON.stringify(testConsumer.getAllEvents())) // TODO: I can't seem to find the payer abort notification in the log // is there something I'm missing here? Does it go to a different handler? @@ -465,15 +479,23 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) - const transfer = await wrapWithRetries(async () => { - // lets fetch the transfer - const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - // lets check its status, and if its what we expect return the result - if (transfer.transferState === 'RESERVED') return transfer - // otherwise lets return nothing - return null - }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - test.equal(transfer.transferState, 'RESERVED', 'Transfer is in reserved state') + + let transfer = {} + try { + transfer = await wrapWithRetries(async () => { + // lets fetch the transfer + const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) + // lets check its status, and if its what we expect return the result + if (transfer?.transferState === 'RESERVED') return transfer + // otherwise lets return nothing + return null + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + test.equal(transfer?.transferState, 'RESERVED', 'Transfer is in reserved state') // 2. sleep so that the RESERVED transfer expires await sleepPromise(wrapWithRetriesConf.timeout) @@ -496,29 +518,46 @@ Test('Handlers test', async handlersTest => { // 4. Get the updated transfer since the completedTimestamp may have changed const updatedTransfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - const expectedAbortNotificationPayload = { - completedTimestamp: Time.getUTCString(new Date(updatedTransfer.completedTimestamp)), - transferState: 'ABORTED' + + let expectedAbortNotificationPayload = {} + if (updatedTransfer) { + expectedAbortNotificationPayload = { + completedTimestamp: Time.getUTCString(new Date(updatedTransfer.completedTimestamp)), + transferState: 'ABORTED' + } } // Assert // 5. Check that we sent 2 notifications to kafka - one for the Payee, one for the Payer - const payerAbortNotification = (await wrapWithRetries( - () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'commit' }), - wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - )[0] - const payeeAbortNotification = (await wrapWithRetries( - () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'reserved-aborted' }), - wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - )[0] + let payerAbortNotification + let payeeAbortNotification + try { + payerAbortNotification = (await wrapWithRetries( + () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'commit' }), + wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + )[0] + payeeAbortNotification = (await wrapWithRetries( + () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'reserved-aborted' }), + wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + )[0] + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + test.ok(payerAbortNotification, 'Payer Abort notification sent') test.ok(payeeAbortNotification, 'Payee Abort notification sent') - test.deepEqual( - getMessagePayloadOrThrow(payeeAbortNotification), - expectedAbortNotificationPayload, - 'Abort notification should be sent with the correct values' - ) + try { + test.deepEqual( + getMessagePayloadOrThrow(payeeAbortNotification), + expectedAbortNotificationPayload, + 'Abort notification should be sent with the correct values' + ) + } catch (err) { + test.notOk('Error should not be thrown - getMessagePayloadOrThrow(payeeAbortNotification) failed!') + console.error(err) + } // Cleanup testConsumer.clearEvents() @@ -537,15 +576,23 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) - const transfer = await wrapWithRetries(async () => { - // lets fetch the transfer - const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - // lets check its status, and if its what we expect return the result - if (transfer.transferState === 'RESERVED') return transfer - // otherwise lets return nothing - return null - }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - test.equal(transfer.transferState, 'RESERVED', 'Transfer is in reserved state') + + let transfer = {} + try { + transfer = await wrapWithRetries(async () => { + // lets fetch the transfer + const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) + // lets check its status, and if its what we expect return the result + if (transfer?.transferState === 'RESERVED') return transfer + // otherwise lets return nothing + return null + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + test.equal(transfer?.transferState, 'RESERVED', 'Transfer is in reserved state') // 2. Modify the transfer in the DB await TransferService.saveTransferStateChange({ @@ -571,9 +618,13 @@ Test('Handlers test', async handlersTest => { // 4. Get the updated transfer since the completedTimestamp may have changed const updatedTransfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - const expectedAbortNotificationPayload = { - completedTimestamp: Time.getUTCString(new Date(updatedTransfer.completedTimestamp)), - transferState: 'ABORTED' + + let expectedAbortNotificationPayload = {} + if (updatedTransfer) { + expectedAbortNotificationPayload = { + completedTimestamp: Time.getUTCString(new Date(updatedTransfer.completedTimestamp)), + transferState: 'ABORTED' + } } // Assert @@ -619,15 +670,23 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) - const transfer = await wrapWithRetries(async () => { - // lets fetch the transfer - const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - // lets check its status, and if its what we expect return the result - if (transfer.transferState === 'RESERVED') return transfer - // otherwise lets return nothing - return null - }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - test.equal(transfer.transferState, 'RESERVED', 'Transfer is in reserved state') + + let transfer = {} + try { + transfer = await wrapWithRetries(async () => { + // lets fetch the transfer + const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) + // lets check its status, and if its what we expect return the result + if (transfer?.transferState === 'RESERVED') return transfer + // otherwise lets return nothing + return null + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + test.equal(transfer?.transferState, 'RESERVED', 'Transfer is in reserved state') // 2. send a RESERVED request with an invalid validation(from Payee) td.messageProtocolFulfil.metadata.event.action = TransferEventAction.RESERVE @@ -645,27 +704,45 @@ Test('Handlers test', async handlersTest => { fulfilConfig.logger = Logger await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) - await wrapWithRetries(async () => { - const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - return transfer.transferState === 'ABORTED_ERROR' - }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + try { + await wrapWithRetries(async () => { + const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) + return transfer?.transferState === 'ABORTED_ERROR' + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + const updatedTransfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) - test.equal(updatedTransfer.transferState, 'ABORTED_ERROR', 'Transfer is in ABORTED_ERROR state') - const expectedAbortNotificationPayload = { - completedTimestamp: (new Date(Date.parse(updatedTransfer.completedTimestamp))).toISOString(), - transferState: 'ABORTED' + test.equal(updatedTransfer?.transferState, 'ABORTED_ERROR', 'Transfer is in ABORTED_ERROR state') + + let expectedAbortNotificationPayload = {} + if (updatedTransfer) { + expectedAbortNotificationPayload = { + completedTimestamp: (new Date(Date.parse(updatedTransfer.completedTimestamp))).toISOString(), + transferState: 'ABORTED' + } + } + + let payerAbortNotificationEvent + let payeeAbortNotificationEvent + try { + // Assert + // 3. Check that we sent 2 notifications to kafka - one for the Payee, one for the Payer + payerAbortNotificationEvent = (await wrapWithRetries( + () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'abort-validation' }), + wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + )[0] + payeeAbortNotificationEvent = (await wrapWithRetries( + () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'reserved-aborted' }), + wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + )[0] + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) } - // Assert - // 3. Check that we sent 2 notifications to kafka - one for the Payee, one for the Payer - const payerAbortNotificationEvent = (await wrapWithRetries( - () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'abort-validation' }), - wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - )[0] - const payeeAbortNotificationEvent = (await wrapWithRetries( - () => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'reserved-aborted' }), - wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - )[0] test.ok(payerAbortNotificationEvent, 'Payer Abort notification sent') test.ok(payeeAbortNotificationEvent, 'Payee Abort notification sent') @@ -712,16 +789,16 @@ Test('Handlers test', async handlersTest => { const payerExpectedPosition = payerInitialPosition + td.transferPayload.amount.amount const payerPositionChange = await ParticipantService.getPositionChangeByParticipantPositionId(payerCurrentPosition.participantPositionId) || {} test.equal(producerResponse, true, 'Producer for prepare published message') - test.equal(transfer.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) + test.equal(transfer?.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) test.equal(payerCurrentPosition.value, payerExpectedPosition, 'Payer position incremented by transfer amount and updated in participantPosition') test.equal(payerPositionChange.value, payerCurrentPosition.value, 'Payer position change value inserted and matches the updated participantPosition value') - test.equal(payerPositionChange.transferStateChangeId, transfer.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') + test.equal(payerPositionChange.transferStateChangeId, transfer?.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.RESERVED) { + if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#1 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -751,17 +828,17 @@ Test('Handlers test', async handlersTest => { const payeeExpectedPosition = payeeInitialPosition - td.transferPayload.amount.amount const payeePositionChange = await ParticipantService.getPositionChangeByParticipantPositionId(payeeCurrentPosition.participantPositionId) || {} test.equal(producerResponse, true, 'Producer for fulfil published message') - test.equal(transfer.transferState, TransferState.COMMITTED, `Transfer state changed to ${TransferState.COMMITTED}`) + test.equal(transfer?.transferState, TransferState.COMMITTED, `Transfer state changed to ${TransferState.COMMITTED}`) test.equal(transfer.fulfilment, td.fulfilPayload.fulfilment, 'Commit ilpFulfilment saved') test.equal(payeeCurrentPosition.value, payeeExpectedPosition, 'Payee position decremented by transfer amount and updated in participantPosition') test.equal(payeePositionChange.value, payeeCurrentPosition.value, 'Payee position change value inserted and matches the updated participantPosition value') - test.equal(payeePositionChange.transferStateChangeId, transfer.transferStateChangeId, 'Payee position change record is bound to the corresponding transfer state change') + test.equal(payeePositionChange.transferStateChangeId, transfer?.transferStateChangeId, 'Payee position change record is bound to the corresponding transfer state change') } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.COMMITTED) { + if (transfer?.transferState !== TransferState.COMMITTED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#2 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -796,16 +873,16 @@ Test('Handlers test', async handlersTest => { const payerExpectedPosition = payerInitialPosition + td.transferPayload.amount.amount const payerPositionChange = await ParticipantService.getPositionChangeByParticipantPositionId(payerCurrentPosition.participantPositionId) || {} test.equal(producerResponse, true, 'Producer for prepare published message') - test.equal(transfer.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) + test.equal(transfer?.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) test.equal(payerCurrentPosition.value, payerExpectedPosition, 'Payer position incremented by transfer amount and updated in participantPosition') test.equal(payerPositionChange.value, payerCurrentPosition.value, 'Payer position change value inserted and matches the updated participantPosition value') - test.equal(payerPositionChange.transferStateChangeId, transfer.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') + test.equal(payerPositionChange.transferStateChangeId, transfer?.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.RESERVED) { + if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#1 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -833,17 +910,17 @@ Test('Handlers test', async handlersTest => { const payeeExpectedPosition = payeeInitialPosition - td.transferPayload.amount.amount const payeePositionChange = await ParticipantService.getPositionChangeByParticipantPositionId(payeeCurrentPosition.participantPositionId) || {} test.equal(producerResponse, true, 'Producer for fulfil published message') - test.equal(transfer.transferState, TransferState.COMMITTED, `Transfer state changed to ${TransferState.COMMITTED}`) + test.equal(transfer?.transferState, TransferState.COMMITTED, `Transfer state changed to ${TransferState.COMMITTED}`) test.equal(transfer.fulfilment, td.fulfilPayload.fulfilment, 'Commit ilpFulfilment saved') test.equal(payeeCurrentPosition.value, payeeExpectedPosition, 'Payee position decremented by transfer amount and updated in participantPosition') test.equal(payeePositionChange.value, payeeCurrentPosition.value, 'Payee position change value inserted and matches the updated participantPosition value') - test.equal(payeePositionChange.transferStateChangeId, transfer.transferStateChangeId, 'Payee position change record is bound to the corresponding transfer state change') + test.equal(payeePositionChange.transferStateChangeId, transfer?.transferStateChangeId, 'Payee position change record is bound to the corresponding transfer state change') } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.COMMITTED) { + if (transfer?.transferState !== TransferState.COMMITTED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#2 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -875,13 +952,13 @@ Test('Handlers test', async handlersTest => { const tests = async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} test.equal(producerResponse, true, 'Producer for prepare published message') - test.equal(transfer.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) + test.equal(transfer?.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.RESERVED) { + if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#3 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -912,13 +989,13 @@ Test('Handlers test', async handlersTest => { const tests = async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} test.equal(producerResponse, true, 'Producer for prepare published message') - test.equal(transfer.transferState, TransferInternalState.ABORTED_REJECTED, `Transfer state changed to ${TransferInternalState.ABORTED_REJECTED}`) + test.equal(transfer?.transferState, TransferInternalState.ABORTED_REJECTED, `Transfer state changed to ${TransferInternalState.ABORTED_REJECTED}`) } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferInternalState.ABORTED_REJECTED) { + if (transfer?.transferState !== TransferInternalState.ABORTED_REJECTED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#4 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -951,13 +1028,13 @@ Test('Handlers test', async handlersTest => { const tests = async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} test.equal(producerResponse, true, 'Producer for prepare published message') - test.equal(transfer.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) + test.equal(transfer?.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) } try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.RESERVED) { + if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#5 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -989,14 +1066,14 @@ Test('Handlers test', async handlersTest => { const transferError = await TransferService.getTransferErrorByTransferId(transfer.transferId) const transferExtension = await TransferExtensionModel.getByTransferId(transfer.transferId, false, true) test.equal(producerResponse, true, 'Producer for fulfil published message') - test.equal(transfer.transferState, TransferInternalState.ABORTED_ERROR, `Transfer state changed to ${TransferInternalState.ABORTED_ERROR}`) + test.equal(transfer?.transferState, TransferInternalState.ABORTED_ERROR, `Transfer state changed to ${TransferInternalState.ABORTED_ERROR}`) test.equal(payerCurrentPosition.value, payerExpectedPosition, 'Payer position decremented by transfer amount and updated in participantPosition') test.equal(payerPositionChange.value, payerCurrentPosition.value, 'Payer position change value inserted and matches the updated participantPosition value') - test.equal(payerPositionChange.transferStateChangeId, transfer.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') + test.equal(payerPositionChange.transferStateChangeId, transfer?.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') test.ok(transferError, 'A transfer error has been recorded') test.equal(transferError.errorCode, td.errorPayload.errorInformation.errorCode, 'Transfer error code matches') test.equal(transferError.errorDescription, expectedErrorDescription, 'Transfer error description matches') - test.notEqual(transferError.transferStateChangeId, transfer.transferStateChangeId, 'Transfer error record is bound to previous state of transfer') + test.notEqual(transferError.transferStateChangeId, transfer?.transferStateChangeId, 'Transfer error record is bound to previous state of transfer') test.ok(transferExtension, 'A transfer extension has been recorded') test.equal(transferExtension[0].transferId, transfer.transferId, 'Transfer extension recorded with transferErrorId key') } @@ -1004,7 +1081,7 @@ Test('Handlers test', async handlersTest => { try { await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferInternalState.ABORTED_ERROR) { + if (transfer?.transferState !== TransferInternalState.ABORTED_ERROR) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#6 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } @@ -1042,21 +1119,27 @@ Test('Handlers test', async handlersTest => { const payerExpectedPosition = payerInitialPosition + td.transferPayload.amount.amount const payerPositionChange = await ParticipantService.getPositionChangeByParticipantPositionId(payerCurrentPosition.participantPositionId) || {} test.equal(producerResponse, true, 'Producer for prepare published message') - test.equal(transfer.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) + test.equal(transfer?.transferState, TransferState.RESERVED, `Transfer state changed to ${TransferState.RESERVED}`) test.equal(payerCurrentPosition.value, payerExpectedPosition, 'Payer position incremented by transfer amount and updated in participantPosition') test.equal(payerPositionChange.value, payerCurrentPosition.value, 'Payer position change value inserted and matches the updated participantPosition value') - test.equal(payerPositionChange.transferStateChangeId, transfer.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') + test.equal(payerPositionChange.transferStateChangeId, transfer?.transferStateChangeId, 'Payer position change record is bound to the corresponding transfer state change') } try { + const retryTimeoutOpts = { + retries: Number(retryOpts.retries) * 2, + minTimeout: retryOpts.minTimeout, + maxTimeout: retryOpts.maxTimeout + } + await retry(async () => { // use bail(new Error('to break before max retries')) const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - if (transfer.transferState !== TransferState.RESERVED) { + if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) throw new Error(`#7 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) } return tests() - }, retryOpts) + }, retryTimeoutOpts) } catch (err) { Logger.error(err) test.fail(err.message) @@ -1078,7 +1161,7 @@ Test('Handlers test', async handlersTest => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} // Check Transfer for correct state - if (transfer.transferState === Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) { + if (transfer?.transferState === Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) { // We have a Transfer with the correct state, lets check if we can get the TransferError record try { // Fetch the TransferError record @@ -1114,7 +1197,7 @@ Test('Handlers test', async handlersTest => { test.fail(`Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].TransferState failed to transition to ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`) test.end() } else { - test.equal(result.transfer && result.transfer.transferState, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].TransferState = ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`) + test.equal(result.transfer && result.transfer?.transferState, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].TransferState = ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`) test.equal(result.transferError && result.transferError.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].transferError.errorCode = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code}`) test.equal(result.transferError && result.transferError.errorDescription, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].transferError.errorDescription = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message}`) test.pass()