Skip to content

Commit

Permalink
Process ndjson from pathfinder responses
Browse files Browse the repository at this point in the history
  • Loading branch information
llunaCreixent committed May 15, 2023
1 parent 4fb4ba5 commit 0c6a3b2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 34 deletions.
9 changes: 7 additions & 2 deletions src/token.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { getVersion } from '~/safe';
* @param {Object} userOptions - search arguments
* @param {string} pathfinderType - pathfinder execution type
* @param {number} pathfinderMaxTransferSteps - default pathfinder server max transfer steps
* @param {Boolean} returnIterativeFirstMatch - if true, the pathfinder service iteratively optimizes the result and returns the first match with 'value'. Only available when pathfinderType is 'cli'
*
* @return {Object[]} - transaction steps
*/
Expand All @@ -27,6 +28,7 @@ export async function findTransitiveTransfer(
userOptions,
pathfinderType,
pathfinderMaxTransferSteps,
returnIterativeFirstMatch = false,
) {
let result;
if (pathfinderType == 'cli') {
Expand All @@ -39,6 +41,7 @@ export async function findTransitiveTransfer(
utils,
userOptions,
pathfinderMaxTransferSteps,
returnIterativeFirstMatch,
);
}
return result;
Expand Down Expand Up @@ -109,6 +112,7 @@ async function findTransitiveTransferCli(web3, utils, userOptions) {
* @param {string} userOptions.to - receiver Safe address
* @param {BN} userOptions.value - value of Circles tokens
* @param {number} userOptions.maxTransfers - limit of steps returned by the pathfinder service
* @param {Boolean} returnIterativeFirstMatch - if true, the pathfinder service iteratively optimizes the result and returns the first match with 'value'. Only available when pathfinderType is 'cli'
*
* @return {Object[]} - transaction steps
*/
Expand All @@ -117,6 +121,7 @@ async function findTransitiveTransferServer(
utils,
userOptions,
pathfinderMaxTransferSteps,
returnIterativeFirstMatch,
) {
const options = checkOptions(userOptions, {
from: {
Expand All @@ -136,7 +141,6 @@ async function findTransitiveTransferServer(

try {
const response = await utils.requestPathfinderAPI({
method: 'POST',
data: {
id: crypto.randomUUID(),
method: 'compute_transfer',
Expand All @@ -145,9 +149,9 @@ async function findTransitiveTransferServer(
to: options.to,
value: options.value.toString(),
max_transfers: options.maxTransfers,
iterative: returnIterativeFirstMatch,
},
},
isTrailingSlash: false,
});
return response.result;
} catch (error) {
Expand Down Expand Up @@ -586,6 +590,7 @@ export default function createTokenModule(
options,
pathfinderType,
pathfinderMaxTransferSteps,
true,
);
if (web3.utils.toBN(response.maxFlowValue).lt(options.value)) {
throw new TransferError(
Expand Down
108 changes: 76 additions & 32 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,79 @@ import { getTokenContract, getSafeContract } from '~/common/getContracts';
/** @access private */
const transactionQueue = new TransactionQueue();

async function request(endpoint, userOptions) {
async function processResponseJson(response) {
return new Promise((resolve, reject) => {
const getJson = (response) => {
return response.json().then((json) => {
if (response.status >= 400) {
throw new RequestError(response.url, json, response.status);
}
return json;
});
};
const contentType = response.headers.get('Content-Type');
if (contentType && contentType.includes('application/json')) {
getJson(response).then(resolve).catch(reject);
} else {
if (response.status >= 400) {
reject(new RequestError(response.url, response.body, response.status));
}
resolve(response.body);
}
})
}

async function processResponseNdjson(response, data) {
let buffer = ''
let jsons = []
let final
return new Promise((resolve, reject) => {
resolve(response.body)
})
.then(res => {
return new Promise((resolve, reject) => {
res.on('readable', () => {
console.log("readable...*");
let result
const decoder = new TextDecoder()
while (null !== (result = res.read())) {
buffer += decoder.decode(result)
let idx = buffer.indexOf("\n")
while(idx !== -1) {
const text = buffer.substring(0, idx)
try {
const jsonText = JSON.parse(text)
console.log(jsonText)
jsons.push(jsonText)
if (jsonText.result.maxFlowValue === data.params.value) {
final = jsonText
res.destroy()
}
} catch(error) {
console.warn(text)
}
buffer = buffer.substring(idx + 1)
idx = buffer.indexOf("\n")
}
}
})
res.on('end', () => { // If haven't received a matching result yet, then return the last result
console.log("END!");
console.log({final});
console.log({jsons});
resolve(jsons.pop())
});
res.on("close", function (err) {
console.log("Stream has been destroyed and file has been closed");
console.log({final});
console.log({jsons});
resolve(final ? final : jsons.pop())
})
})
})
}

async function request(endpoint, userOptions, processResponse = processResponseJson) {
const options = checkOptions(userOptions, {
path: {
type: 'array',
Expand Down Expand Up @@ -60,30 +132,7 @@ async function request(endpoint, userOptions) {
const url = `${endpoint}/${path.join('/')}${slash}${paramsStr}`;

try {
return fetch(url, request).then((response) => {
const contentType = response.headers.get('Content-Type');
const transferEncoding = response.headers.get('Transfer-Encoding');
if (contentType && contentType.includes('application/json')) {
return response.json().then((json) => {
if (response.status >= 400) {
throw new RequestError(url, json, response.status);
}
return json;
});
} else {
if (response.status >= 400) {
throw new RequestError(url, response.body, response.status);
}
if (transferEncoding && transferEncoding.includes('chunked')) {
console.log("chunked");
return response.text().then((text) => {
console.log(text.toString());
return JSON.parse(text.toString());
});
}
return response.body;
}
});
return fetch(url, request).then(response => processResponse(response, data));
} catch (err) {
throw new RequestError(url, err.message);
}
Expand Down Expand Up @@ -1125,28 +1174,23 @@ export default function createUtilsModule(web3, contracts, globalOptions) {
* @namespace core.utils.requestPathfinderAPI
*
* @param {Object} userOptions - Pathfinder API query options
* @param {string} userOptions.method - HTTP method
* @param {Object} userOptions.data - Request body (JSON)
*
* @return {Object} - API response
*/
requestPathfinderAPI: async (userOptions) => {
const options = checkOptions(userOptions, {
method: {
type: 'string',
default: 'GET',
},
data: {
type: 'object',
default: {},
},
});
return request(pathfinderServiceEndpoint, {
data: options.data,
method: options.method,
method: 'POST',
path: [],
isTrailingSlash: false,
});
}, processResponseNdjson);
},

/**
Expand Down

0 comments on commit 0c6a3b2

Please sign in to comment.