Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(3654): added fxQuote step to outbound model; added inbound fxQuotes handler #461

Merged
merged 23 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6b194c3
feat(3618): added support for POST /fxQuotes and /fxTransfers inbound…
geka-evk Nov 24, 2023
1bbbf07
feat(3618): added support for POST /fxQuotes and /fxTransfers inbound…
geka-evk Nov 24, 2023
23669e4
feat(3618): ignored some vulnerabilities
geka-evk Nov 24, 2023
313aa2b
feat(3618): added unit-test for postFxQuote model
geka-evk Nov 24, 2023
4f7e0cf
feat(3618): added unit-test for postFxQuote model
geka-evk Nov 24, 2023
7daf266
feat(3618): added unit-tests for postFxTransfers method
geka-evk Nov 26, 2023
15ed782
feat(3618): added FX endpoints to .env
geka-evk Nov 27, 2023
315e311
chore(snapshot): 20.7.0-snapshot.2
geka-evk Nov 27, 2023
d3f8bd6
feat(3618): updated @mojaloop/central-services-shared version to supp…
geka-evk Nov 27, 2023
0105390
chore(snapshot): 20.7.0-snapshot.3
geka-evk Nov 27, 2023
a1265dd
feat(3618): disabled @mojaloop/central-services-shared"downgrading" f…
geka-evk Nov 27, 2023
2828017
chore(snapshot): 20.7.0-snapshot.4
geka-evk Nov 27, 2023
e7e89ea
feat(3618): added ilp unit-test; updated Ilp version
geka-evk Nov 27, 2023
a3e238d
chore(snapshot): 20.7.0-snapshot.5
geka-evk Nov 27, 2023
3471ce4
feat(3618): updated ilp packet version (with amount)
geka-evk Nov 28, 2023
5355812
chore(snapshot): 20.7.0-snapshot.6
geka-evk Nov 28, 2023
5ef541f
feat(3654): added FX States and Transitions for OutboundTransfersMode…
geka-evk Nov 29, 2023
c8cadb0
feat(3654): added ErrorMessages; updated deps
geka-evk Nov 29, 2023
63e5e71
Merge branch 'feat/fx-impl' into feat/fx-impl-3654
geka-evk Nov 30, 2023
c33fbbc
feat(3654): added fxQuote step to outbound model
geka-evk Nov 30, 2023
33fe9eb
Merge branch 'feat/fx-impl' into feat/fx-impl-3654-fxQuote
geka-evk Dec 1, 2023
db4f4a2
feat(3654): added fxQuotes inbound handler
geka-evk Dec 1, 2023
4b2b06b
fix: fxquotes
vijayg10 Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions modules/api-svc/src/InboundServer/handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
QuotesModel,
TransfersModel,
} = require('../lib/model');
const { CacheKeyPrefixes } = require('../lib/model/common');

const extractBodyHeadersSourceFspId = ctx => ({
sourceFspId: ctx.request.headers['fspiop-source'],
Expand Down Expand Up @@ -974,6 +975,36 @@ const postFxQuotes = async (ctx) => {
prepareResponse(ctx);
};

/**
* Create a handler for PUT /fxQuotes/{ID} and PUT /fxQuotes/{ID}/error routes
*
* @param success {boolean} - false is for handling error callback response
*/
const createPutFxQuoteHandler = (success) => async (ctx) => {
const { body, headers } = extractBodyHeadersSourceFspId(ctx);
const { ID } = ctx.state.path.params;

const channel = `${CacheKeyPrefixes.FX_QUOTE_CALLBACK_CHANNEL}_${ID}`;
await ctx.state.cache.publish(channel, {
success,
data: { body, headers },
type: `fxQuoteResponse${success ? '' : 'Error'}`
});

// todo: think, what does it mean in putQuote handler!
//
// duplicate publication until legacy code refactored
// await QuotesModel.triggerDeferredJob({
// cache: ctx.state.cache,
// message: data,
// args: {
// quoteId
// }
// });

ctx.response.status = ReturnCodes.OK.CODE;
};

const postFxTransfers = async (ctx) => {
const { body, headers, sourceFspId } = extractBodyHeadersSourceFspId(ctx);
const { logger } = ctx.state;
Expand Down Expand Up @@ -1081,6 +1112,12 @@ module.exports = {
'/fxQuotes': {
post: postFxQuotes
},
'/fxQuotes/{ID}': {
put: createPutFxQuoteHandler(true)
},
'/fxQuotes/{ID}/error': {
put: createPutFxQuoteHandler(false)
},
'/fxTransfers': {
post: postFxTransfers
}
Expand Down
35 changes: 34 additions & 1 deletion modules/api-svc/src/lib/dto.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/*eslint quote-props: ["error", "as-needed"]*/
const { randomUUID } = require('node:crypto');
const { Directions, SDKStateEnum} = require('./model/common');

const quoteRequestStateDto = (request) => ({
Expand All @@ -23,7 +25,38 @@ const fxQuoteRequestStateDto = (request) => ({
initiatedTimestamp: new Date().toISOString(),
});

/**
* @param data {object} - "state" of inbound transaction request
*/
const outboundPostFxQuotePayloadDto = (data) => Object.freeze({
conversionRequestId: randomUUID(),
conversionTerms: {
conversionId: randomUUID(), // should be the same as commitRequestId from fxTransfer
initiatingFsp: data.from.fspId,
counterPartyFsp: data.fxProviders[0], // todo: think if we have several FXPs
amountType: data.amountType,
sourceAmount: {
currency: data.currency,
amount: data.amount
},
targetAmount: {
currency: data.supportedCurrencies[0], // todo: think if we have several currencies
},
expiration: data.fxQuoteExpiration,
}
});

/**
* @param data {object} - "state" of inbound transaction request
*/
const outboundPostFxTransferPayloadDto = (data) => Object.freeze({
commitRequestId: data.transferId, // should be the same as conversionTerms.conversionId from fxQuote
// todo: add other fields
});

module.exports = {
quoteRequestStateDto,
fxQuoteRequestStateDto
fxQuoteRequestStateDto,
outboundPostFxQuotePayloadDto,
outboundPostFxTransferPayloadDto,
};
2 changes: 1 addition & 1 deletion modules/api-svc/src/lib/model/InboundTransfersModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ class InboundTransfersModel {
if (!conversionId) {
throw new Error('No conversionId for making cache key');
}
return `${CacheKeyPrefixes.FX_QUOTE}_${conversionId}`;
return `${CacheKeyPrefixes.FX_QUOTE_INBOUND}_${conversionId}`;
}
}

Expand Down
100 changes: 94 additions & 6 deletions modules/api-svc/src/lib/model/OutboundTransfersModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ const StateMachine = require('javascript-state-machine');
const { Enum } = require('@mojaloop/central-services-shared');
const { Ilp, MojaloopRequests } = require('@mojaloop/sdk-standard-components');

const dto = require('../dto');
const shared = require('./lib/shared');
const PartiesModel = require('./PartiesModel');
const {
AmountTypes,
BackendError,
CacheKeyPrefixes,
CurrencyConverters,
Directions,
ErrorMessages,
SDKStateEnum,
Expand Down Expand Up @@ -347,11 +350,15 @@ class OutboundTransfersModel {
}

if (Array.isArray(payee.supportedCurrencies)) {
if (!payee.supportedCurrencies.length) {
throw new Error(ErrorMessages.noSupportedCurrencies);
}
const needFx = !payee.supportedCurrencies.includes(this.data.currency);
if (needFx && this.data.amountType !== AmountTypes.SEND) {
throw new Error(ErrorMessages.unsupportedFxAmountType);
}
this.data.needFx = needFx;
this.data.supportedCurrencies = payee.supportedCurrencies;
}

return resolve(payee);
Expand Down Expand Up @@ -509,15 +516,81 @@ class OutboundTransfersModel {
async _requestServicesFxp() {
this.data.fxProviders = this.getServicesFxpResponse;
// todo: add impl. with real http-request
if (!this.data.fxProviders?.length) {
throw new Error(ErrorMessages.noFxProviderDetected);
}
return this.data.fxProviders;
}

async _requestFxQuote() {
// todo: add impl.
// 1. build fxQuote payload
// 2. subscribe to cache stream by conversionRequestId (to await fxQuotes callback)
// 2.a - handle error response as well
// 3. send POST fxQuote request to hub
let timer;
let channel;
let subId;

// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
try {
this.data.fxQuoteExpiration = this._getExpirationTimestamp();
const payload = dto.outboundPostFxQuotePayloadDto(this.data);

channel = `${CacheKeyPrefixes.FX_QUOTE_CALLBACK_CHANNEL}_${payload.conversionRequestId}`;

timer = setTimeout(() => {
this.unsubscribeCache(channel, subId);
const errMessage = `Timeout requesting fxQuote for transfer ${this.data.transferId}`;
const err = new BackendError(errMessage, 504);
this._logger.push({ err }).log(`fxQuote payload: ${JSON.stringify(payload)}`);
reject(err);
}, this._requestProcessingTimeoutSeconds * 1000);

subId = await this._cache.subscribe(channel, (cn, msg, subId) => {
try {
clearTimeout(timer);
this.unsubscribeCache(channel, subId);

const message = JSON.parse(msg);

const { body, headers } = message.data;
this._logger.push({ body }).log('fxQuote response received');

if (!message.success) {
const error = new BackendError(`Got an error response requesting fxQuote: ${util.inspect(body, { depth: Infinity })}`, 500);
error.mojaloopError = body;
throw error;
}

if (this._rejectExpiredQuoteResponses) {
const now = new Date().toISOString();
if (now > this.data.fxQuoteExpiration) {
const errMessage = `${ErrorMessages.responseMissedExpiryDeadline} (fxQuote)`;
this._logger.warn(`${errMessage}: system time=${now} > expiration time=${this.data.fxQuoteExpiration}`);
throw new BackendError(errMessage, 504);
}
}

this.data.fxQuoteResponse = {
body,
headers,
};
this.data.fxQuoteResponseSource = headers['fspiop-source']; // todo: check what for we need this

resolve(payload); // todo: think, what should we return at this point
} catch (err) {
this._logger.push({ err }).log(`error in fxQuote cache subscription processing: ${err?.message}`);
reject(err);
}
});

const res = await this._requests.postFxQuotes(payload, payload.conversionTerms.counterPartyFsp);
this.data.fxQuoteRequest = res.originalRequest;
this._logger.push({ res }).log('fxQuote request is sent to hub');
} catch (err) {
this._logger.push({ err }).log(`error in _requestFxQuote: ${err.message}`);
if (timer) clearTimeout(timer);
this.unsubscribeCache(channel, subId);
reject(err);
}
});
}

/**
Expand Down Expand Up @@ -675,12 +748,16 @@ class OutboundTransfersModel {
}

// add extensionList if provided
if(this.data.quoteRequestExtensions && this.data.quoteRequestExtensions.length > 0) {
if (this.data.quoteRequestExtensions?.length) {
quote.extensionList = {
extension: this.data.quoteRequestExtensions
};
}

if (this.data.needFx) {
quote.converter = CurrencyConverters.PAYER;
}

return quote;
}

Expand Down Expand Up @@ -1194,6 +1271,17 @@ class OutboundTransfersModel {
throw err;
}
}

async unsubscribeCache(channelKey, subId) {
if (channelKey && subId) {
return this._cache.unsubscribe(channelKey, subId)
.catch(err => {
this._logger.push({ err }).log(`Unsubscribing cache error [${channelKey} ${subId}]: ${err.stack}`);
});
}
}


}

module.exports = OutboundTransfersModel;
13 changes: 12 additions & 1 deletion modules/api-svc/src/lib/model/common/Enums.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ const Directions = Object.freeze({
});

const CacheKeyPrefixes = Object.freeze({
FX_QUOTE: 'fxQuote_in'
FX_QUOTE_INBOUND: 'fxQuote_in',
FX_QUOTE_CALLBACK_CHANNEL: 'fxQuote_callback',
FX_TRANSFER_CALLBACK_CHANNEL: 'fxQuote_callback',
});

const CurrencyConverters = Object.freeze({
PAYER: 'PAYER',
PAYEE: 'PAYEE',
});

const States = Object.freeze({
Expand Down Expand Up @@ -71,6 +78,9 @@ const Transitions = Object.freeze({
});

const ErrorMessages = Object.freeze({
noFxProviderDetected: 'No FX provider detected',
noSupportedCurrencies: 'No payee supportedCurrencies received',
responseMissedExpiryDeadline: 'Response missed expiry deadline',
unsupportedFxAmountType: 'Unsupported amountType when currency conversion is needed',
});

Expand All @@ -82,6 +92,7 @@ const AmountTypes = Object.freeze({
module.exports = {
AmountTypes,
CacheKeyPrefixes,
CurrencyConverters,
Directions,
ErrorMessages,
SDKStateEnum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ const PartiesModel = require('~/lib/model').PartiesModel;
const { MojaloopRequests, Logger } = require('@mojaloop/sdk-standard-components');
const StateMachine = require('javascript-state-machine');

const mocks = require('./data/mocks');
const defaultConfig = require('./data/defaultConfig');
const transferRequest = require('./data/transferRequest');
const payeeParty = require('./data/payeeParty');
const quoteResponseTemplate = require('./data/quoteResponse');
const transferFulfil = require('./data/transferFulfil');

const { SDKStateEnum } = require('../../../../src/lib/model/common');
const { SDKStateEnum, CacheKeyPrefixes, States } = require('../../../../src/lib/model/common');
const FSPIOPTransferStateEnum = require('@mojaloop/central-services-shared').Enum.Transfers.TransferState;

const genPartyId = (party) => {
Expand Down Expand Up @@ -1576,4 +1577,55 @@ describe('outboundModel', () => {
expect(StateMachine.__instance.state).toBe('quoteReceived');
});

describe('FX flow Tests -->', () => {
let model;

beforeEach(() => {
model = new Model({
cache,
logger,
metricsClient,
...config,
});
});

afterEach(async () => {
jest.clearAllMocks();
});

test('should process callback for POST fxQuotes request', async () => {
model._requests.postFxQuotes = jest.fn(async (payload) => {
// eslint-disable-next-line no-unused-vars
const { conversionRequestId, ...restPayload} = payload;
const channel = `${CacheKeyPrefixes.FX_QUOTE_CALLBACK_CHANNEL}_${payload.conversionRequestId}`;
const cachedCallbackPayload = {
success: true,
data: {
body: {
...restPayload,
condition: 'fxCondition'
},
headers: {},
}
};
await cache.publish(channel, JSON.stringify(cachedCallbackPayload));
return mocks.mockMojaApiResponse();
});

await model.initialize({
...mocks.coreConnectorPostTransfersPayloadDto(),
currentState: States.PAYEE_RESOLVED,
needFx: true,
supportedCurrencies: ['USD']
});
expect(model.data.currentState).toBe(States.PAYEE_RESOLVED);

const result = await model.run();
expect(result).toBeTruthy();
expect(result.fxQuoteRequest).toBeTruthy();
expect(result.currentState).toBe(SDKStateEnum.WAITING_FOR_CONVERSION_ACCEPTANCE);
expect(model.data.currentState).toBe(States.FX_QUOTE_RECEIVED);
// todo: add more tests
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@
"logIndent": 2,
"metrics": {
"port": 4004
}
},
"getServicesFxpResponse": ["fxp_id"]
}
Loading