Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] stream API results into a CSV to be able to free the memory as … #1

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions dashboards-reports/server/routes/utils/dataReportHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import { DATA_REPORT_CONFIG } from './constants';

import esb from 'elastic-builder';
import moment from 'moment';
import converter from 'json-2-csv';

export var metaData = {
saved_search_id: <string>null,
Expand Down Expand Up @@ -163,8 +162,8 @@ export const buildQuery = (report, is_count) => {
return reqBody;
};

// Fetch the data from OpenSearch
export const getOpenSearchData = (arrayHits, report, params) => {
// Format each hit into an exportable CSV line
export const convertHitsToCSVLines = (arrayHits, report, params) => {
let hits: any = [];
for (let valueRes of arrayHits) {
for (let data of valueRes.hits) {
Expand All @@ -184,27 +183,19 @@ export const getOpenSearchData = (arrayHits, report, params) => {
} else {
hits.push(params.excel ? sanitize(data) : data);
}

// Truncate to expected limit size
if (hits.length >= params.limit) {
return hits;
}
}
}
return hits;
};

//Convert the data to Csv format
export const convertToCSV = async (dataset) => {
let convertedData: any = [];
const options = {
delimiter: { field: ',', eol: '\n' },
emptyFieldValue: ' ',
};
await converter.json2csvAsync(dataset[0], options).then((csv) => {
convertedData = csv;
});
return convertedData;
export const createCsv = (headers) => {
// @todo
};

export const pushToCsv = (csv, lines) => {
for (let line of lines) {
// @todo
}
};

//Return only the selected fields
Expand Down
37 changes: 23 additions & 14 deletions dashboards-reports/server/routes/utils/savedSearchReportHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@

import {
buildQuery,
convertToCSV,
getOpenSearchData,
createCsv,
pushToCsv,
convertHitsToCSVLines,
getSelectedFields,
metaData,
} from './dataReportHelpers';
Expand Down Expand Up @@ -142,7 +143,6 @@ async function generateReportData(
isScheduledTask: boolean
) {
let opensearchData: any = {};
const arrayHits: any = [];
const report = { _source: metaData };
const indexPattern: string = report._source.paternName;
const maxResultSize: number = await getMaxResultSize();
Expand All @@ -154,12 +154,21 @@ async function generateReportData(
}

const reqBody = buildRequestBody(buildQuery(report, 0));

const csv = createCsv([]);

if (total > maxResultSize) {
await getOpenSearchDataByScroll();
for await (let items of getOpenSearchDataByScroll()) {
const dataset: any = convertHitsToCSVLines(items, report, params);
pushToCsv(csv, dataset);
}
} else {
await getOpenSearchDataBySearch();
let items: any = await getOpenSearchDataBySearch();
const dataset: any = convertHitsToCSVLines(items, report, params);
pushToCsv(csv, dataset);
}
return convertOpenSearchDataToCsv();

return finalizeCsv(csv);

// Fetch OpenSearch query max size windows to decide search or scroll
async function getMaxResultSize() {
Expand Down Expand Up @@ -199,7 +208,7 @@ async function generateReportData(
);
}

async function getOpenSearchDataByScroll() {
async function* getOpenSearchDataByScroll() {
// Open scroll context by fetching first batch
opensearchData = await callCluster(
client,
Expand All @@ -212,7 +221,8 @@ async function generateReportData(
},
isScheduledTask
);
arrayHits.push(opensearchData.hits);

yield opensearchData.hits;

// Start scrolling till the end
const nbScroll = Math.floor(total / maxResultSize);
Expand All @@ -227,7 +237,7 @@ async function generateReportData(
isScheduledTask
);
if (Object.keys(resScroll.hits.hits).length > 0) {
arrayHits.push(resScroll.hits);
yield resScroll.hits;
}
}

Expand All @@ -253,7 +263,8 @@ async function generateReportData(
},
isScheduledTask
);
arrayHits.push(opensearchData.hits);

return opensearchData.hits;
}

function buildRequestBody(query: any) {
Expand All @@ -272,9 +283,7 @@ async function generateReportData(
}

// Parse OpenSearch data and convert to CSV
async function convertOpenSearchDataToCsv() {
const dataset: any = [];
dataset.push(getOpenSearchData(arrayHits, report, params));
return await convertToCSV(dataset);
async function finalizeCsv(csv) {
// @todo
}
}