From 0c6a3b23dc1207a21efde1bccf1f35c5d1f37afc Mon Sep 17 00:00:00 2001 From: llunaCreixent Date: Mon, 15 May 2023 08:59:33 +0200 Subject: [PATCH] Process ndjson from pathfinder responses --- src/token.js | 9 ++++- src/utils.js | 108 ++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 83 insertions(+), 34 deletions(-) diff --git a/src/token.js b/src/token.js index 67de65de..cc5027c5 100644 --- a/src/token.js +++ b/src/token.js @@ -18,6 +18,7 @@ import { getVersion } from '~/safe'; * @param {Object} userOptions - search arguments * @param {string} pathfinderType - pathfinder execution type * @param {number} pathfinderMaxTransferSteps - default pathfinder server max transfer steps + * @param {Boolean} returnIterativeFirstMatch - if true, the pathfinder service iteratively optimizes the result and returns the first match with 'value'. Only available when pathfinderType is 'cli' * * @return {Object[]} - transaction steps */ @@ -27,6 +28,7 @@ export async function findTransitiveTransfer( userOptions, pathfinderType, pathfinderMaxTransferSteps, + returnIterativeFirstMatch = false, ) { let result; if (pathfinderType == 'cli') { @@ -39,6 +41,7 @@ export async function findTransitiveTransfer( utils, userOptions, pathfinderMaxTransferSteps, + returnIterativeFirstMatch, ); } return result; @@ -109,6 +112,7 @@ async function findTransitiveTransferCli(web3, utils, userOptions) { * @param {string} userOptions.to - receiver Safe address * @param {BN} userOptions.value - value of Circles tokens * @param {number} userOptions.maxTransfers - limit of steps returned by the pathfinder service + * @param {Boolean} returnIterativeFirstMatch - if true, the pathfinder service iteratively optimizes the result and returns the first match with 'value'. Only available when pathfinderType is 'cli' * * @return {Object[]} - transaction steps */ @@ -117,6 +121,7 @@ async function findTransitiveTransferServer( utils, userOptions, pathfinderMaxTransferSteps, + returnIterativeFirstMatch, ) { const options = checkOptions(userOptions, { from: { @@ -136,7 +141,6 @@ async function findTransitiveTransferServer( try { const response = await utils.requestPathfinderAPI({ - method: 'POST', data: { id: crypto.randomUUID(), method: 'compute_transfer', @@ -145,9 +149,9 @@ async function findTransitiveTransferServer( to: options.to, value: options.value.toString(), max_transfers: options.maxTransfers, + iterative: returnIterativeFirstMatch, }, }, - isTrailingSlash: false, }); return response.result; } catch (error) { @@ -586,6 +590,7 @@ export default function createTokenModule( options, pathfinderType, pathfinderMaxTransferSteps, + true, ); if (web3.utils.toBN(response.maxFlowValue).lt(options.value)) { throw new TransferError( diff --git a/src/utils.js b/src/utils.js index 1e0200b0..241b2190 100644 --- a/src/utils.js +++ b/src/utils.js @@ -17,7 +17,79 @@ import { getTokenContract, getSafeContract } from '~/common/getContracts'; /** @access private */ const transactionQueue = new TransactionQueue(); -async function request(endpoint, userOptions) { +async function processResponseJson(response) { + return new Promise((resolve, reject) => { + const getJson = (response) => { + return response.json().then((json) => { + if (response.status >= 400) { + throw new RequestError(response.url, json, response.status); + } + return json; + }); + }; + const contentType = response.headers.get('Content-Type'); + if (contentType && contentType.includes('application/json')) { + getJson(response).then(resolve).catch(reject); + } else { + if (response.status >= 400) { + reject(new RequestError(response.url, response.body, response.status)); + } + resolve(response.body); + } + }) +} + +async function processResponseNdjson(response, data) { + let buffer = '' + let jsons = [] + let final + return new Promise((resolve, reject) => { + resolve(response.body) + }) + .then(res => { + return new Promise((resolve, reject) => { + res.on('readable', () => { + console.log("readable...*"); + let result + const decoder = new TextDecoder() + while (null !== (result = res.read())) { + buffer += decoder.decode(result) + let idx = buffer.indexOf("\n") + while(idx !== -1) { + const text = buffer.substring(0, idx) + try { + const jsonText = JSON.parse(text) + console.log(jsonText) + jsons.push(jsonText) + if (jsonText.result.maxFlowValue === data.params.value) { + final = jsonText + res.destroy() + } + } catch(error) { + console.warn(text) + } + buffer = buffer.substring(idx + 1) + idx = buffer.indexOf("\n") + } + } + }) + res.on('end', () => { // If haven't received a matching result yet, then return the last result + console.log("END!"); + console.log({final}); + console.log({jsons}); + resolve(jsons.pop()) + }); + res.on("close", function (err) { + console.log("Stream has been destroyed and file has been closed"); + console.log({final}); + console.log({jsons}); + resolve(final ? final : jsons.pop()) + }) + }) + }) +} + +async function request(endpoint, userOptions, processResponse = processResponseJson) { const options = checkOptions(userOptions, { path: { type: 'array', @@ -60,30 +132,7 @@ async function request(endpoint, userOptions) { const url = `${endpoint}/${path.join('/')}${slash}${paramsStr}`; try { - return fetch(url, request).then((response) => { - const contentType = response.headers.get('Content-Type'); - const transferEncoding = response.headers.get('Transfer-Encoding'); - if (contentType && contentType.includes('application/json')) { - return response.json().then((json) => { - if (response.status >= 400) { - throw new RequestError(url, json, response.status); - } - return json; - }); - } else { - if (response.status >= 400) { - throw new RequestError(url, response.body, response.status); - } - if (transferEncoding && transferEncoding.includes('chunked')) { - console.log("chunked"); - return response.text().then((text) => { - console.log(text.toString()); - return JSON.parse(text.toString()); - }); - } - return response.body; - } - }); + return fetch(url, request).then(response => processResponse(response, data)); } catch (err) { throw new RequestError(url, err.message); } @@ -1125,17 +1174,12 @@ export default function createUtilsModule(web3, contracts, globalOptions) { * @namespace core.utils.requestPathfinderAPI * * @param {Object} userOptions - Pathfinder API query options - * @param {string} userOptions.method - HTTP method * @param {Object} userOptions.data - Request body (JSON) * * @return {Object} - API response */ requestPathfinderAPI: async (userOptions) => { const options = checkOptions(userOptions, { - method: { - type: 'string', - default: 'GET', - }, data: { type: 'object', default: {}, @@ -1143,10 +1187,10 @@ export default function createUtilsModule(web3, contracts, globalOptions) { }); return request(pathfinderServiceEndpoint, { data: options.data, - method: options.method, + method: 'POST', path: [], isTrailingSlash: false, - }); + }, processResponseNdjson); }, /**