Skip to content

Commit

Permalink
test: fix disconnect errors (#998)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalinkrustev authored Mar 28, 2024
1 parent 6bc8aeb commit 34f5418
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const {
wrapWithRetries
} = require('#test/util/helpers')
const TestConsumer = require('#test/integration/helpers/testConsumer')
const KafkaHelper = require('#test/integration/helpers/kafkaHelper')

const ParticipantCached = require('#src/models/participant/participantCached')
const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached')
Expand Down Expand Up @@ -722,9 +721,8 @@ Test('Handlers test', async handlersTest => {

test.pass('done')
test.end()
setupTests.end()
})

await setupTests.end()
})

await handlersTest.test('position batch handler should', async transferPositionPrepare => {
Expand Down Expand Up @@ -1226,10 +1224,9 @@ Test('Handlers test', async handlersTest => {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy()
await testConsumer.destroy() // this disconnects the consumers

await KafkaHelper.producers.disconnect()
await KafkaHelper.consumers.disconnect()
await Producer.disconnect()

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand All @@ -1241,8 +1238,8 @@ Test('Handlers test', async handlersTest => {
Logger.error(`teardown failed with error - ${err}`)
assert.fail()
assert.end()
} finally {
handlersTest.end()
}
})

handlersTest.end()
})
40 changes: 7 additions & 33 deletions test/integration-override/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const Test = require('tape')
const { randomUUID } = require('crypto')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('#src/lib/config')
const Time = require('@mojaloop/central-services-shared').Util.Time
const sleep = Time.sleep
const Db = require('@mojaloop/database-lib').Db
const Cache = require('#src/lib/cache')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
Expand Down Expand Up @@ -331,13 +329,12 @@ Test('Handlers test', async handlersTest => {
await testConsumer.startListening()
await KafkaHelper.producers.connect()
// TODO: MIG - Disabling these handlers to test running the CL as a separate service independently.
sleep(rebalanceDelay, debug, 'registerAllHandlers', 'awaiting registration of common handlers')
await new Promise(resolve => setTimeout(resolve, rebalanceDelay))

test.pass('done')
test.end()
registerAllHandlers.end()
})

await registerAllHandlers.end()
})

await handlersTest.test('transferPrepare should', async transferPrepare => {
Expand Down Expand Up @@ -425,32 +422,9 @@ Test('Handlers test', async handlersTest => {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy()

// TODO: Story to investigate as to why the Producers failed reconnection on the ./transfers/handlers.test.js - https://github.com/mojaloop/project/issues/3067
// const topics = KafkaHelper.topics
// for (const topic of topics) {
// try {
// await Producer.getProducer(topic).disconnect()
// assert.pass(`producer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Producers are disconnected
await KafkaHelper.producers.disconnect()

// TODO: Clean this up once the above issue has been resolved.
// for (const topic of topics) {
// try {
// await Consumer.getConsumer(topic).disconnect()
// assert.pass(`consumer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Consumers are disconnected
await KafkaHelper.consumers.disconnect()
await testConsumer.destroy() // this disconnects the consumers

await Producer.disconnect()

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand All @@ -462,8 +436,8 @@ Test('Handlers test', async handlersTest => {
Logger.error(`teardown failed with error - ${err}`)
assert.fail()
assert.end()
} finally {
handlersTest.end()
}
})

handlersTest.end()
})
40 changes: 7 additions & 33 deletions test/integration/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const retry = require('async-retry')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('#src/lib/config')
const Time = require('@mojaloop/central-services-shared').Util.Time
const sleep = Time.sleep
const Db = require('@mojaloop/database-lib').Db
const Cache = require('#src/lib/cache')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
Expand All @@ -54,7 +53,6 @@ const {
sleepPromise
} = require('#test/util/helpers')
const TestConsumer = require('#test/integration/helpers/testConsumer')
const KafkaHelper = require('#test/integration/helpers/kafkaHelper')

const ParticipantCached = require('#src/models/participant/participantCached')
const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached')
Expand Down Expand Up @@ -390,13 +388,12 @@ Test('Handlers test', async handlersTest => {
await testConsumer.startListening()

// TODO: MIG - Disabling these handlers to test running the CL as a separate service independently.
sleep(rebalanceDelay, debug, 'registerAllHandlers', 'awaiting registration of common handlers')
await new Promise(resolve => setTimeout(resolve, rebalanceDelay))

test.pass('done')
test.end()
registerAllHandlers.end()
})

await registerAllHandlers.end()
})

await handlersTest.test('transferPrepare should', async transferPrepare => {
Expand Down Expand Up @@ -1344,32 +1341,9 @@ Test('Handlers test', async handlersTest => {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy()

// TODO: Story to investigate as to why the Producers failed reconnection on the ./transfers/handlers.test.js - https://github.com/mojaloop/project/issues/3067
// const topics = KafkaHelper.topics
// for (const topic of topics) {
// try {
// await Producer.getProducer(topic).disconnect()
// assert.pass(`producer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Producers are disconnected
await KafkaHelper.producers.disconnect()

// TODO: Clean this up once the above issue has been resolved.
// for (const topic of topics) {
// try {
// await Consumer.getConsumer(topic).disconnect()
// assert.pass(`consumer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Consumers are disconnected
await KafkaHelper.consumers.disconnect()
await testConsumer.destroy() // this disconnects the consumers

await Producer.disconnect()

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand All @@ -1381,8 +1355,8 @@ Test('Handlers test', async handlersTest => {
Logger.error(`teardown failed with error - ${err}`)
assert.fail()
assert.end()
} finally {
handlersTest.end()
}
})

handlersTest.end()
})
4 changes: 3 additions & 1 deletion test/integration/helpers/testConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ class TestConsumer {
*/
async destroy () {
Logger.warn(`TestConsumer.destroy(): destroying ${this.consumers.length} consumers`)
await Promise.all(this.consumers.map(async c => c.disconnect()))
await Promise.all(this.consumers.map(consumer => new Promise((resolve, reject) => {
consumer.disconnect((err) => err ? reject(err) : resolve())
})))
}

/**
Expand Down

0 comments on commit 34f5418

Please sign in to comment.