Skip to content

Commit

Permalink
[ML] Improving parsing of large uploaded files (#62970)
Browse files Browse the repository at this point in the history
* [ML] Improving parsing of large uploaded files

* small clean up

* increasing max to 1GB

* adding comments

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
2 people authored and wayneseymour committed Apr 15, 2020
1 parent 7501055 commit fe9abbc
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 67 deletions.
4 changes: 2 additions & 2 deletions x-pack/plugins/ml/common/constants/file_datavisualizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

export const MAX_BYTES = 104857600;
export const ABSOLUTE_MAX_BYTES = MAX_BYTES * 5;
export const MAX_BYTES = 104857600; // 100MB
export const ABSOLUTE_MAX_BYTES = 1073741274; // 1GB
export const FILE_SIZE_DISPLAY_FORMAT = '0,0.[0] b';

// Value to use in the Elasticsearch index mapping meta data to identify the
Expand Down
4 changes: 3 additions & 1 deletion x-pack/plugins/ml/common/types/file_datavisualizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ export interface ImportResponse {
export interface ImportFailure {
item: number;
reason: string;
doc: Doc;
doc: ImportDoc;
}

export interface Doc {
message: string;
}

export type ImportDoc = Doc | string;

export interface Settings {
pipeline?: string;
index: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import {
readFile,
createUrlOverrides,
processResults,
reduceData,
hasImportPermission,
} from '../utils';

import { MODE } from './constants';

const UPLOAD_SIZE_MB = 5;

export class FileDataVisualizerView extends Component {
constructor(props) {
super(props);
Expand All @@ -40,6 +37,7 @@ export class FileDataVisualizerView extends Component {
files: {},
fileName: '',
fileContents: '',
data: [],
fileSize: 0,
fileTooLarge: false,
fileCouldNotBeRead: false,
Expand Down Expand Up @@ -79,6 +77,7 @@ export class FileDataVisualizerView extends Component {
loaded: false,
fileName: '',
fileContents: '',
data: [],
fileSize: 0,
fileTooLarge: false,
fileCouldNotBeRead: false,
Expand All @@ -97,15 +96,15 @@ export class FileDataVisualizerView extends Component {
async loadFile(file) {
if (file.size <= this.maxFileUploadBytes) {
try {
const fileContents = await readFile(file);
const data = fileContents.data;
const { data, fileContents } = await readFile(file);
this.setState({
fileContents: data,
data,
fileContents,
fileName: file.name,
fileSize: file.size,
});

await this.loadSettings(data);
await this.analyzeFile(fileContents);
} catch (error) {
this.setState({
loaded: false,
Expand All @@ -124,14 +123,9 @@ export class FileDataVisualizerView extends Component {
}
}

async loadSettings(data, overrides, isRetry = false) {
async analyzeFile(fileContents, overrides, isRetry = false) {
try {
// reduce the amount of data being sent to the endpoint
// 5MB should be enough to contain 1000 lines
const lessData = reduceData(data, UPLOAD_SIZE_MB);
console.log('overrides', overrides);
const { analyzeFile } = ml.fileDatavisualizer;
const resp = await analyzeFile(lessData, overrides);
const resp = await ml.fileDatavisualizer.analyzeFile(fileContents, overrides);
const serverSettings = processResults(resp);
const serverOverrides = resp.overrides;

Expand Down Expand Up @@ -198,7 +192,7 @@ export class FileDataVisualizerView extends Component {
loading: true,
loaded: false,
});
this.loadSettings(data, this.previousOverrides, true);
this.analyzeFile(fileContents, this.previousOverrides, true);
}
}
}
Expand Down Expand Up @@ -240,7 +234,7 @@ export class FileDataVisualizerView extends Component {
},
() => {
const formattedOverrides = createUrlOverrides(overrides, this.originalSettings);
this.loadSettings(this.state.fileContents, formattedOverrides);
this.analyzeFile(this.state.fileContents, formattedOverrides);
}
);
};
Expand All @@ -261,6 +255,7 @@ export class FileDataVisualizerView extends Component {
results,
explanation,
fileContents,
data,
fileName,
fileSize,
fileTooLarge,
Expand Down Expand Up @@ -339,6 +334,7 @@ export class FileDataVisualizerView extends Component {
results={results}
fileName={fileName}
fileContents={fileContents}
data={data}
indexPatterns={this.props.indexPatterns}
kibanaConfig={this.props.kibanaConfig}
showBottomBar={this.showBottomBar}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class ImportView extends Component {

// TODO - sort this function out. it's a mess
async import() {
const { fileContents, results, indexPatterns, kibanaConfig, showBottomBar } = this.props;
const { data, results, indexPatterns, kibanaConfig, showBottomBar } = this.props;

const { format } = results;
let { timeFieldName } = this.state;
Expand Down Expand Up @@ -217,7 +217,7 @@ export class ImportView extends Component {
if (success) {
const importer = importerFactory(format, results, indexCreationSettings);
if (importer !== undefined) {
const readResp = importer.read(fileContents, this.setReadProgress);
const readResp = importer.read(data, this.setReadProgress);
success = readResp.success;
this.setState({
readStatus: success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import moment from 'moment';
import { i18n } from '@kbn/i18n';
import { ml } from '../../../../../services/ml_api_service';
import {
Doc,
ImportDoc,
ImportFailure,
ImportResponse,
Mappings,
Expand All @@ -20,6 +20,7 @@ import {
const CHUNK_SIZE = 5000;
const MAX_CHUNK_CHAR_COUNT = 1000000;
const IMPORT_RETRIES = 5;
const STRING_CHUNKS_MB = 100;

export interface ImportConfig {
settings: Settings;
Expand All @@ -34,20 +35,53 @@ export interface ImportResults {
error?: any;
}

export class Importer {
export interface CreateDocsResponse {
success: boolean;
remainder: number;
docs: ImportDoc[];
error?: any;
}

export abstract class Importer {
private _settings: Settings;
private _mappings: Mappings;
private _pipeline: IngestPipeline;

protected _docArray: Doc[] = [];
protected _docArray: ImportDoc[] = [];

constructor({ settings, mappings, pipeline }: ImportConfig) {
this._settings = settings;
this._mappings = mappings;
this._pipeline = pipeline;
}

async initializeImport(index: string) {
public read(data: ArrayBuffer) {
const decoder = new TextDecoder();
const size = STRING_CHUNKS_MB * Math.pow(2, 20);

// chop the data up into 100MB chunks for processing.
// if the chop produces a partial line at the end, a character "remainder" count
// is returned which is used to roll the next chunk back that many chars so
// it is included in the next chunk.
const parts = Math.ceil(data.byteLength / size);
let remainder = 0;
for (let i = 0; i < parts; i++) {
const byteArray = decoder.decode(data.slice(i * size - remainder, (i + 1) * size));
const { success, docs, remainder: tempRemainder } = this._createDocs(byteArray);
if (success) {
this._docArray = this._docArray.concat(docs);
remainder = tempRemainder;
} else {
return { success: false };
}
}

return { success: true };
}

protected abstract _createDocs(t: string): CreateDocsResponse;

public async initializeImport(index: string) {
const settings = this._settings;
const mappings = this._mappings;
const pipeline = this._pipeline;
Expand Down Expand Up @@ -75,7 +109,7 @@ export class Importer {
return createIndexResp;
}

async import(
public async import(
id: string,
index: string,
pipelineId: string,
Expand Down Expand Up @@ -201,8 +235,8 @@ function updatePipelineTimezone(ingestPipeline: IngestPipeline) {
}
}

function createDocumentChunks(docArray: Doc[]) {
const chunks: Doc[][] = [];
function createDocumentChunks(docArray: ImportDoc[]) {
const chunks: ImportDoc[][] = [];
// chop docArray into 5000 doc chunks
const tempChunks = chunk(docArray, CHUNK_SIZE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Importer, ImportConfig } from './importer';
import { Importer, ImportConfig, CreateDocsResponse } from './importer';
import {
Doc,
FindFileStructureResponse,
Expand Down Expand Up @@ -33,54 +33,54 @@ export class MessageImporter extends Importer {
// multiline_start_pattern regex
// if it does, it is a legitimate end of line and can be pushed into the list,
// if not, it must be a newline char inside a field value, so keep looking.
read(text: string) {
protected _createDocs(text: string): CreateDocsResponse {
let remainder = 0;
try {
const data: Doc[] = [];
const docs: Doc[] = [];

let message = '';
let line = '';
for (let i = 0; i < text.length; i++) {
const char = text[i];
if (char === '\n') {
message = this.processLine(data, message, line);
message = this._processLine(docs, message, line);
line = '';
} else {
line += char;
}
}

// the last line may have been missing a newline ending
if (line !== '') {
message = this.processLine(data, message, line);
}
remainder = line.length;

// add the last message to the list if not already done
// // add the last message to the list if not already done
if (message !== '') {
this.addMessage(data, message);
this._addMessage(docs, message);
}

// remove first line if it is blank
if (data[0] && data[0].message === '') {
data.shift();
if (docs[0] && docs[0].message === '') {
docs.shift();
}

this._docArray = data;

return {
success: true,
docs,
remainder,
};
} catch (error) {
return {
success: false,
docs: [],
remainder,
error,
};
}
}

processLine(data: Doc[], message: string, line: string) {
private _processLine(data: Doc[], message: string, line: string) {
if (this._excludeLinesRegex === null || line.match(this._excludeLinesRegex) === null) {
if (this._multilineStartRegex === null || line.match(this._multilineStartRegex) !== null) {
this.addMessage(data, message);
this._addMessage(data, message);
message = '';
} else if (data.length === 0) {
// discard everything before the first line that is considered the first line of a message
Expand All @@ -95,7 +95,7 @@ export class MessageImporter extends Importer {
return message;
}

addMessage(data: Doc[], message: string) {
private _addMessage(data: Doc[], message: string) {
// if the message ended \r\n (Windows line endings)
// then omit the \r as well as the \n for consistency
message = message.replace(/\r$/, '');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,50 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Importer, ImportConfig } from './importer';
import { Importer, ImportConfig, CreateDocsResponse } from './importer';
import { FindFileStructureResponse } from '../../../../../../../common/types/file_datavisualizer';

export class NdjsonImporter extends Importer {
constructor(results: FindFileStructureResponse, settings: ImportConfig) {
super(settings);
}

read(json: string) {
protected _createDocs(json: string): CreateDocsResponse {
let remainder = 0;
try {
const splitJson = json.split(/}\s*\n/);
const incompleteLastLine = json.match(/}\s*\n?$/) === null;

const ndjson: any[] = [];
for (let i = 0; i < splitJson.length; i++) {
if (splitJson[i] !== '') {
// note the extra } at the end of the line, adding back
// the one that was eaten in the split
ndjson.push(`${splitJson[i]}}`);
const docs: string[] = [];
if (splitJson.length) {
for (let i = 0; i < splitJson.length - 1; i++) {
if (splitJson[i] !== '') {
// note the extra } at the end of the line, adding back
// the one that was eaten in the split
docs.push(`${splitJson[i]}}`);
}
}
}

this._docArray = ndjson;
const lastDoc = splitJson[splitJson.length - 1];
if (lastDoc) {
if (incompleteLastLine === true) {
remainder = lastDoc.length;
} else {
docs.push(`${lastDoc}}`);
}
}
}

return {
success: true,
docs,
remainder,
};
} catch (error) {
return {
success: false,
docs: [],
remainder,
error,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export {
hasImportPermission,
processResults,
readFile,
reduceData,
getMaxBytes,
getMaxBytesFormatted,
} from './utils';
Loading

0 comments on commit fe9abbc

Please sign in to comment.