Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(csi-414): fixed TypeError - Cannot read properties of undefined; updated proxyCache lib; added request logging #344

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
},
"PROXY_CACHE": {
"enabled": false,
"type": "redis",
"type": "redis-cluster",
"proxyConfig": {
"cluster": [
{ "host": "localhost", "port": 6379 }
Expand Down
26 changes: 11 additions & 15 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ services:
<<: *healthcheckParams
test: [ "CMD", "mysqladmin" ,"ping", "-h", "mysql" ]

# To use with proxyCache.type === 'redis-cluster'
redis-node-0:
<<: *REDIS_NODE
environment:
Expand All @@ -132,22 +133,17 @@ services:
redis-node-5:
<<: *REDIS_NODE

# mockserver:
# image: jamesdbloom/mockserver
# container_name: qs_mockserver
# ports:
# - "1080:1080"
#
# temp_curl:
# image: byrnedo/alpine-curl
# container_name: qs_temp-curl
# volumes:
# - ./docker/wait-for-mockserver.sh:/opt/wait-for-mockserver.sh
# entrypoint: [ "sh", "-c" ]
# command:
# - /opt/wait-for-mockserver.sh
## To use with proxyCache.type === 'redis'
# redis:
# image: redis:6.2.4-alpine
# restart: "unless-stopped"
# environment:
# - MOCK_HOST=mockserver
# - ALLOW_EMPTY_PASSWORD=yes
# - REDIS_PORT=6379
# - REDIS_REPLICATION_MODE=master
# ports:
# - "${REDIS_STANDALONE_PORT}:6379"


kafka:
image: docker.io/bitnami/kafka:3.5
Expand Down
2 changes: 1 addition & 1 deletion docker/quoting-service/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
},
"PROXY_CACHE": {
"enabled": true,
"type": "redis",
"type": "redis-cluster",
"proxyConfig": {
"cluster": [
{ "host": "redis-node-0", "port": 6379 }
Expand Down
18 changes: 9 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@
"@hapi/vision": "7.0.3",
"@mojaloop/central-services-error-handling": "13.0.1",
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.4.5",
"@mojaloop/central-services-logger": "11.5.0",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.5.0-snapshot.2",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/event-sdk": "14.1.1",
"@mojaloop/inter-scheme-proxy-cache-lib": "2.0.0-snapshot.1",
"@mojaloop/inter-scheme-proxy-cache-lib": "2.2.0",
"@mojaloop/ml-number": "11.2.4",
"@mojaloop/sdk-standard-components": "18.1.0",
"ajv": "8.17.1",
Expand Down
5 changes: 5 additions & 0 deletions src/api/plugins/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const loggingPlugin = require('./loggingPlugin')

module.exports = {
loggingPlugin
}
43 changes: 43 additions & 0 deletions src/api/plugins/loggingPlugin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { asyncStorage } = require('@mojaloop/central-services-logger/src/contextLogger')
const { logger } = require('../../lib') // pass though options

const loggingPlugin = {
name: 'loggingPlugin',
version: '1.0.0',
once: true,
register: async (server, options) => {
// const { logger } = options;
server.ext({
type: 'onPreHandler',
method: (request, h) => {
const { path, method, headers, payload, query } = request
const { remoteAddress } = request.info
const requestId = request.info.id = `${request.info.id}__${headers.traceid}`
asyncStorage.enterWith({ requestId })

logger.isInfoEnabled && logger.info(`[==> req] ${method.toUpperCase()} ${path}`, { headers, payload, query, remoteAddress })
return h.continue
}
})

server.ext({
type: 'onPreResponse',
method: (request, h) => {
if (logger.isInfoEnabled) {
const { path, method, headers, payload, query, response } = request
const { received } = request.info

const statusCode = response instanceof Error
? response.output?.statusCode
: response.statusCode
const respTimeSec = ((Date.now() - received) / 1000).toFixed(3)

logger.info(`[<== ${statusCode}][${respTimeSec} s] ${method.toUpperCase()} ${path}`, { headers, payload, query })
}
return h.continue
}
})
}
}

module.exports = loggingPlugin
8 changes: 8 additions & 0 deletions src/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const { loggerFactory } = require('@mojaloop/central-services-logger/src/contextLogger')

const logger = loggerFactory('QS') // global logger

module.exports = {
logger,
loggerFactory
}
1 change: 1 addition & 0 deletions src/lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ function generateRequestHeaders (headers, protocolVersions, noAccept = false, ad
let ret = {
'Content-Type': contentTypeHeader,
Date: headers.date,
// todo: use Enum.Http.Headers.FSPIOP....
'FSPIOP-Source': headers['fspiop-source'],
'FSPIOP-Destination': headers['fspiop-destination'],
'FSPIOP-HTTP-Method': headers['fspiop-http-method'],
Expand Down
9 changes: 4 additions & 5 deletions src/model/bulkQuotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ delete axios.defaults.headers.common['Content-Type']
*/

class BulkQuotesModel {
constructor (config) {
this.config = config
this.db = config.db
this.requestId = config.requestId
this.proxyClient = config.proxyClient
constructor (deps) {
this.db = deps.db
this.requestId = deps.requestId
this.proxyClient = deps.proxyClient
}

/**
Expand Down
54 changes: 32 additions & 22 deletions src/model/fxQuotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ const Logger = require('@mojaloop/central-services-logger')
const JwsSigner = require('@mojaloop/sdk-standard-components').Jws.signer

const Config = require('../lib/config')
const { loggerFactory } = require('../lib')
const { httpRequest } = require('../lib/http')
const { getStackOrInspect, generateRequestHeadersForJWS, generateRequestHeaders, getParticipantEndpoint } = require('../lib/util')
const LOCAL_ENUM = require('../lib/enum')

delete axios.defaults.headers.common.Accept
delete axios.defaults.headers.common['Content-Type']
axios.defaults.headers.common = {}

class FxQuotesModel {
constructor (config) {
this.config = config
this.db = config.db
this.requestId = config.requestId
this.proxyClient = config.proxyClient
constructor (deps) {
this.db = deps.db
this.requestId = deps.requestId
this.proxyClient = deps.proxyClient
this.envConfig = deps.envConfig || new Config()
this.log = deps.log || loggerFactory(this.constructor.name)
}

/**
Expand All @@ -53,9 +54,10 @@ class FxQuotesModel {
// Ensure the proxy client is connected
if (this.proxyClient?.isConnected === false) await this.proxyClient.connect()
// if the payee dfsp has a proxy cache entry, we do not validate the dfsp here
if (!(await this.proxyClient?.lookupProxyByDfspId(fspiopDestination))) {
await Promise.all(currencies.map(async (currency) => {
await this.db.getParticipant(fspiopDestination, LOCAL_ENUM.COUNTERPARTY_FSP, currency, ENUM.Accounts.LedgerAccountType.POSITION)
const proxy = await this.proxyClient?.lookupProxyByDfspId(fspiopDestination)
if (!proxy) {
await Promise.all(currencies.map((currency) => {
return this.db.getParticipant(fspiopDestination, LOCAL_ENUM.COUNTERPARTY_FSP, currency, ENUM.Accounts.LedgerAccountType.POSITION)
}))
}
}
Expand Down Expand Up @@ -307,7 +309,7 @@ class FxQuotesModel {
await childSpan.audit({ headers, params: { conversionRequestId } }, EventSdk.AuditEventAction.start)
return await this.sendErrorCallback(fspiopSource, fspiopError, conversionRequestId, headers, childSpan, true)
} catch (err) {
this.writeLog(`Error occurred while handling error. Check service logs as this error may not have been propagated successfully to any other party: ${getStackOrInspect(err)}`)
this.log.error('error in handleException, stop request processing!', err)
} finally {
if (!childSpan.isFinished) {
await childSpan.finish()
Expand All @@ -323,22 +325,19 @@ class FxQuotesModel {
* @returns {promise}
*/
async sendErrorCallback (fspiopSource, fspiopError, conversionRequestId, headers, span, modifyHeaders = true) {
const envConfig = new Config()
const { envConfig, log } = this
const fspiopDest = headers[ENUM.Http.Headers.FSPIOP.DESTINATION]

try {
const endpoint = await this._getParticipantEndpoint(fspiopSource)

this.writeLog(`Resolved participant '${fspiopSource}' '${ENUM.EndPoints.FspEndpointTypes.FSPIOP_CALLBACK_URL_FX_QUOTES}' to: '${endpoint}'`)

if (!endpoint) {
throw ErrorHandler.CreateFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PARTY_NOT_FOUND, `No FSPIOP_CALLBACK_URL_FX_QUOTES endpoint found for FSP '${fspiopSource}', unable to make error callback`, null, fspiopSource)
}

const fspiopUri = `/fxQuotes/${conversionRequestId}/error`
const fullCallbackUrl = `${endpoint}${fspiopUri}`

// log the original error
this.writeLog(`Making error callback to participant '${fspiopSource}' for conversionRequestId '${conversionRequestId}' to ${fullCallbackUrl} for error: ${util.inspect(fspiopError.toFullErrorObject())}`)
log.info('Making error callback to participant...', { fspiopSource, conversionRequestId, fspiopError, fullCallbackUrl })

// make an error callback
let fromSwitchHeaders
Expand All @@ -352,7 +351,7 @@ class FxQuotesModel {
delete headers['fspiop-signature']
fromSwitchHeaders = Object.assign({}, headers, {
'fspiop-destination': fspiopSource,
'fspiop-source': ENUM.Http.Headers.FSPIOP.SWITCH.value,
'fspiop-source': envConfig.hubName,
'fspiop-http-method': ENUM.Http.RestMethods.PUT,
'fspiop-uri': fspiopUri
})
Expand Down Expand Up @@ -393,7 +392,7 @@ class FxQuotesModel {
opts.headers['fspiop-signature'] = jwsSigner.getSignature(opts)
}

res = await axios.request(opts)
res = await this.sendHttpRequest(opts)
} catch (err) {
throw ErrorHandler.CreateFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.DESTINATION_COMMUNICATION_ERROR, `network error in sendErrorCallback: ${err.message}`, {
error: err,
Expand All @@ -417,7 +416,7 @@ class FxQuotesModel {
}, fspiopSource)
}
} catch (err) {
this.writeLog(`Error in sendErrorCallback: ${getStackOrInspect(err)}`)
this.log.error('Error in sendErrorCallback', err)
const fspiopError = ErrorHandler.ReformatFSPIOPError(err)
const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message)
if (span) {
Expand All @@ -430,18 +429,29 @@ class FxQuotesModel {

// wrapping this dependency here to allow for easier use and testing
async _getParticipantEndpoint (fspId, endpointType = ENUM.EndPoints.FspEndpointTypes.FSPIOP_CALLBACK_URL_FX_QUOTES) {
return getParticipantEndpoint({ fspId, db: this.db, loggerFn: this.writeLog.bind(this), endpointType, proxyClient: this.proxyClient })
const { db, proxyClient, log } = this
const endpoint = await getParticipantEndpoint({ fspId, db, loggerFn: this.writeLog.bind(this), endpointType, proxyClient })
log.debug('Resolved participant endpoint:', { fspId, endpoint, endpointType })
return endpoint
}

/**
* Writes a formatted message to the console
*
* @returns {undefined}
*/
// eslint-disable-next-line no-unused-vars
writeLog (message) {
Logger.isDebugEnabled && Logger.debug(`${new Date().toISOString()}, (${this.requestId}) [fxQuotesModel]: ${message}`)
}

/**
* Writes a formatted message to the console
* @param {AxiosRequestConfig} options
* @returns {AxiosResponse}
*/
async sendHttpRequest (options) {
return axios.request(options)
}
}

module.exports = FxQuotesModel
9 changes: 4 additions & 5 deletions src/model/quotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ delete axios.defaults.headers.common['Content-Type']
* @returns {undefined}
*/
class QuotesModel {
constructor (config) {
this.config = config
this.db = config.db
this.requestId = config.requestId
this.proxyClient = config.proxyClient
constructor (deps) {
this.db = deps.db
this.requestId = deps.requestId
this.proxyClient = deps.proxyClient
}

async executeRules (headers, quoteRequest, payer, payee) {
Expand Down
5 changes: 5 additions & 0 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const { getStackOrInspect, failActionHandler } = require('../src/lib/util')
const Config = require('./lib/config.js')
const Handlers = require('./api')
const Routes = require('./api/routes')
const plugins = require('./api/plugins')
const dto = require('./lib/dto')

const OpenAPISpecPath = Path.resolve(__dirname, './interface/QuotingService-swagger.yaml')
Expand Down Expand Up @@ -134,6 +135,10 @@ const initServer = async function (config, topicNames) {

// add plugins to the server
await server.register([
{
plugin: plugins.loggingPlugin,
options: {}
},
{
plugin: Good,
options: {
Expand Down
18 changes: 17 additions & 1 deletion test/mocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,23 @@ const kafkaMessagePayloadPostDto = (params = {}) => kafkaMessagePayloadDto({
operationId: 'Quotes'
})

const proxyCacheConfigDto = ({
type = 'redis'
} = {}) => Object.freeze({
type,
proxyConfig: {
...(type === 'redis' && {
host: 'localhost', port: 6379
}),
...(type === 'redis-cluster' && {
cluster: [{ host: 'localhost', port: 6379 }]
})
},
timeout: 5000 // is it used anywhere?
})

module.exports = {
kafkaMessagePayloadDto,
kafkaMessagePayloadPostDto
kafkaMessagePayloadPostDto,
proxyCacheConfigDto
}
Loading