From e5105f0701fb9c04f9269663195affdc36808a24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Masseix?= Date: Wed, 26 May 2021 03:29:07 +0200 Subject: [PATCH] [WIP] stream API results into a CSV to be able to free the memory as soon as possible and prevent any memory issue --- .../server/routes/utils/dataReportHelpers.ts | 29 +++++---------- .../routes/utils/savedSearchReportHelper.ts | 37 ++++++++++++------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/dashboards-reports/server/routes/utils/dataReportHelpers.ts b/dashboards-reports/server/routes/utils/dataReportHelpers.ts index 4379f8db..0ac1f83d 100644 --- a/dashboards-reports/server/routes/utils/dataReportHelpers.ts +++ b/dashboards-reports/server/routes/utils/dataReportHelpers.ts @@ -28,7 +28,6 @@ import { DATA_REPORT_CONFIG } from './constants'; import esb from 'elastic-builder'; import moment from 'moment'; -import converter from 'json-2-csv'; export var metaData = { saved_search_id: null, @@ -163,8 +162,8 @@ export const buildQuery = (report, is_count) => { return reqBody; }; -// Fetch the data from OpenSearch -export const getOpenSearchData = (arrayHits, report, params) => { +// Format each hit into an exportable CSV line +export const convertHitsToCSVLines = (arrayHits, report, params) => { let hits: any = []; for (let valueRes of arrayHits) { for (let data of valueRes.hits) { @@ -184,27 +183,19 @@ export const getOpenSearchData = (arrayHits, report, params) => { } else { hits.push(params.excel ? sanitize(data) : data); } - - // Truncate to expected limit size - if (hits.length >= params.limit) { - return hits; - } } } return hits; }; -//Convert the data to Csv format -export const convertToCSV = async (dataset) => { - let convertedData: any = []; - const options = { - delimiter: { field: ',', eol: '\n' }, - emptyFieldValue: ' ', - }; - await converter.json2csvAsync(dataset[0], options).then((csv) => { - convertedData = csv; - }); - return convertedData; +export const createCsv = (headers) => { + // @todo +}; + +export const pushToCsv = (csv, lines) => { + for (let line of lines) { + // @todo + } }; //Return only the selected fields diff --git a/dashboards-reports/server/routes/utils/savedSearchReportHelper.ts b/dashboards-reports/server/routes/utils/savedSearchReportHelper.ts index 6b839917..6162dca6 100644 --- a/dashboards-reports/server/routes/utils/savedSearchReportHelper.ts +++ b/dashboards-reports/server/routes/utils/savedSearchReportHelper.ts @@ -26,8 +26,9 @@ import { buildQuery, - convertToCSV, - getOpenSearchData, + createCsv, + pushToCsv, + convertHitsToCSVLines, getSelectedFields, metaData, } from './dataReportHelpers'; @@ -142,7 +143,6 @@ async function generateReportData( isScheduledTask: boolean ) { let opensearchData: any = {}; - const arrayHits: any = []; const report = { _source: metaData }; const indexPattern: string = report._source.paternName; const maxResultSize: number = await getMaxResultSize(); @@ -154,12 +154,21 @@ async function generateReportData( } const reqBody = buildRequestBody(buildQuery(report, 0)); + + const csv = createCsv([]); + if (total > maxResultSize) { - await getOpenSearchDataByScroll(); + for await (let items of getOpenSearchDataByScroll()) { + const dataset: any = convertHitsToCSVLines(items, report, params); + pushToCsv(csv, dataset); + } } else { - await getOpenSearchDataBySearch(); + let items: any = await getOpenSearchDataBySearch(); + const dataset: any = convertHitsToCSVLines(items, report, params); + pushToCsv(csv, dataset); } - return convertOpenSearchDataToCsv(); + + return finalizeCsv(csv); // Fetch OpenSearch query max size windows to decide search or scroll async function getMaxResultSize() { @@ -199,7 +208,7 @@ async function generateReportData( ); } - async function getOpenSearchDataByScroll() { + async function* getOpenSearchDataByScroll() { // Open scroll context by fetching first batch opensearchData = await callCluster( client, @@ -212,7 +221,8 @@ async function generateReportData( }, isScheduledTask ); - arrayHits.push(opensearchData.hits); + + yield opensearchData.hits; // Start scrolling till the end const nbScroll = Math.floor(total / maxResultSize); @@ -227,7 +237,7 @@ async function generateReportData( isScheduledTask ); if (Object.keys(resScroll.hits.hits).length > 0) { - arrayHits.push(resScroll.hits); + yield resScroll.hits; } } @@ -253,7 +263,8 @@ async function generateReportData( }, isScheduledTask ); - arrayHits.push(opensearchData.hits); + + return opensearchData.hits; } function buildRequestBody(query: any) { @@ -272,9 +283,7 @@ async function generateReportData( } // Parse OpenSearch data and convert to CSV - async function convertOpenSearchDataToCsv() { - const dataset: any = []; - dataset.push(getOpenSearchData(arrayHits, report, params)); - return await convertToCSV(dataset); + async function finalizeCsv(csv) { + // @todo } }