Skip to content

Commit

Permalink
[ML] Add a timeout option to file structure finder (#34117)
Browse files Browse the repository at this point in the history
This can be used to restrict the amount of CPU a single
structure finder request can use.

The timeout is not implemented precisely, so requests
may run for slightly longer than the timeout before
aborting.

The default is 25 seconds, which is a little below
Kibana's default timeout of 30 seconds for calls to
Elasticsearch APIs.
  • Loading branch information
droberts195 authored and kcm committed Oct 30, 2018
1 parent adb1642 commit b5cd33a
Show file tree
Hide file tree
Showing 28 changed files with 900 additions and 170 deletions.
413 changes: 408 additions & 5 deletions docs/reference/ml/apis/find-file-structure.asciidoc

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -112,6 +113,7 @@ public boolean equals(Object other) {
public static class Request extends ActionRequest {

public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField CHARSET = FileStructure.CHARSET;
public static final ParseField FORMAT = FileStructure.FORMAT;
public static final ParseField COLUMN_NAMES = FileStructure.COLUMN_NAMES;
Expand All @@ -128,6 +130,7 @@ public static class Request extends ActionRequest {
"[%s] may only be specified if [" + FORMAT.getPreferredName() + "] is [%s]";

private Integer linesToSample;
private TimeValue timeout;
private String charset;
private FileStructure.Format format;
private List<String> columnNames;
Expand All @@ -151,6 +154,14 @@ public void setLinesToSample(Integer linesToSample) {
this.linesToSample = linesToSample;
}

public TimeValue getTimeout() {
return timeout;
}

public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

public String getCharset() {
return charset;
}
Expand Down Expand Up @@ -313,6 +324,7 @@ public ActionRequestValidationException validate() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
linesToSample = in.readOptionalVInt();
timeout = in.readOptionalTimeValue();
charset = in.readOptionalString();
format = in.readBoolean() ? in.readEnum(FileStructure.Format.class) : null;
columnNames = in.readBoolean() ? in.readList(StreamInput::readString) : null;
Expand All @@ -330,6 +342,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalVInt(linesToSample);
out.writeOptionalTimeValue(timeout);
out.writeOptionalString(charset);
if (format == null) {
out.writeBoolean(false);
Expand Down Expand Up @@ -365,7 +378,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public int hashCode() {
return Objects.hash(linesToSample, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
return Objects.hash(linesToSample, timeout, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
timestampField, sample);
}

Expand All @@ -382,6 +395,7 @@ public boolean equals(Object other) {

Request that = (Request) other;
return Objects.equals(this.linesToSample, that.linesToSample) &&
Objects.equals(this.timeout, that.timeout) &&
Objects.equals(this.charset, that.charset) &&
Objects.equals(this.format, that.format) &&
Objects.equals(this.columnNames, that.columnNames) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ protected void doExecute(Task task, FindFileStructureAction.Request request,

private FindFileStructureAction.Response buildFileStructureResponse(FindFileStructureAction.Request request) throws Exception {

FileStructureFinderManager structureFinderManager = new FileStructureFinderManager();
FileStructureFinderManager structureFinderManager = new FileStructureFinderManager(threadPool.scheduler());

FileStructureFinder fileStructureFinder = structureFinderManager.findFileStructure(request.getLinesToSample(),
request.getSample().streamInput(), new FileStructureOverrides(request));
request.getSample().streamInput(), new FileStructureOverrides(request), request.getTimeout());

return new FindFileStructureAction.Response(fileStructureFinder.getStructure());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {

static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String> explanation, String sample, String charsetName,
Boolean hasByteOrderMarker, CsvPreference csvPreference,
boolean trimFields, FileStructureOverrides overrides)
boolean trimFields, FileStructureOverrides overrides,
TimeoutChecker timeoutChecker)
throws IOException {

Tuple<List<List<String>>, List<Integer>> parsed = readRows(sample, csvPreference);
Tuple<List<List<String>>, List<Integer>> parsed = readRows(sample, csvPreference, timeoutChecker);
List<List<String>> rows = parsed.v1();
List<Integer> lineNumbers = parsed.v2();

Expand Down Expand Up @@ -106,7 +107,8 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
structureBuilder.setShouldTrimFields(true);
}

Tuple<String, TimestampMatch> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides);
Tuple<String, TimestampMatch> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides,
timeoutChecker);
if (timeField != null) {
String timeLineRegex = null;
StringBuilder builder = new StringBuilder("^");
Expand Down Expand Up @@ -148,7 +150,7 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords);
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);

SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
if (timeField != null) {
Expand Down Expand Up @@ -183,7 +185,8 @@ public FileStructure getStructure() {
return structure;
}

static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPreference csvPreference) throws IOException {
static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPreference csvPreference, TimeoutChecker timeoutChecker)
throws IOException {

int fieldsInFirstRow = -1;

Expand All @@ -204,6 +207,7 @@ static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPrefe
}
}
rows.add(row);
timeoutChecker.check("delimited record parsing");
lineNumbers.add(csvReader.getLineNumber());
}
} catch (SuperCsvException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public boolean canCreateFromSample(List<String> explanation, String sample) {

@Override
public FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
FileStructureOverrides overrides) throws IOException {
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws IOException {
return DelimitedFileStructureFinder.makeDelimitedFileStructureFinder(explanation, sample, charsetName, hasByteOrderMarker,
csvPreference, trimFields, overrides);
csvPreference, trimFields, overrides, timeoutChecker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public interface FileStructureFinderFactory {
* @param hasByteOrderMarker Did the sample have a byte order marker? <code>null</code> means "not relevant".
* @param overrides Stores structure decisions that have been made by the end user, and should
* take precedence over anything the {@link FileStructureFinder} may decide.
* @param timeoutChecker Will abort the operation if its timeout is exceeded.
* @return A {@link FileStructureFinder} object suitable for determining the structure of the supplied sample.
* @throws Exception if something goes wrong during creation.
*/
FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
FileStructureOverrides overrides) throws Exception;
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import com.ibm.icu.text.CharsetDetector;
import com.ibm.icu.text.CharsetMatch;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
Expand All @@ -23,15 +25,17 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

/**
* Runs the high-level steps needed to create ingest configs for the specified file. In order:
* 1. Determine the most likely character set (UTF-8, UTF-16LE, ISO-8859-2, etc.)
* 2. Load a sample of the file, consisting of the first 1000 lines of the file
* 3. Determine the most likely file structure - one of ND-JSON, XML, CSV, TSV or semi-structured text
* 3. Determine the most likely file structure - one of ND-JSON, XML, delimited or semi-structured text
* 4. Create an appropriate structure object and delegate writing configs to it
*/
public final class FileStructureFinderManager {
Expand Down Expand Up @@ -81,8 +85,18 @@ public final class FileStructureFinderManager {

private static final int BUFFER_SIZE = 8192;

private final ScheduledExecutorService scheduler;

/**
* Create the file structure manager.
* @param scheduler Used for checking timeouts.
*/
public FileStructureFinderManager(ScheduledExecutorService scheduler) {
this.scheduler = Objects.requireNonNull(scheduler);
}

public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile) throws Exception {
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES);
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
}

/**
Expand All @@ -95,42 +109,49 @@ public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Input
* @param overrides Aspects of the file structure that are known in advance. These take precedence over
* values determined by structure analysis. An exception will be thrown if the file structure
* is incompatible with an overridden value.
* @param timeout The maximum time the analysis is permitted to take. If it takes longer than this an
* {@link ElasticsearchTimeoutException} may be thrown (although not necessarily immediately
* the timeout is exceeded).
* @return A {@link FileStructureFinder} object from which the structure and messages can be queried.
* @throws Exception A variety of problems could occur at various stages of the structure finding process.
*/
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides)
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides,
TimeValue timeout)
throws Exception {
return findFileStructure(new ArrayList<>(), (idealSampleLineCount == null) ? DEFAULT_IDEAL_SAMPLE_LINE_COUNT : idealSampleLineCount,
fromFile, overrides);
fromFile, overrides, timeout);
}

public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile)
throws Exception {
return findFileStructure(new ArrayList<>(), idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES);
return findFileStructure(explanation, idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
}

public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile,
FileStructureOverrides overrides) throws Exception {

String charsetName = overrides.getCharset();
Reader sampleReader;
if (charsetName != null) {
// Creating the reader will throw if the specified character set does not exist
sampleReader = new InputStreamReader(fromFile, charsetName);
explanation.add("Using specified character encoding [" + charsetName + "]");
} else {
CharsetMatch charsetMatch = findCharset(explanation, fromFile);
charsetName = charsetMatch.getName();
sampleReader = charsetMatch.getReader();
}
FileStructureOverrides overrides, TimeValue timeout) throws Exception {

try (TimeoutChecker timeoutChecker = new TimeoutChecker("structure analysis", timeout, scheduler)) {

String charsetName = overrides.getCharset();
Reader sampleReader;
if (charsetName != null) {
// Creating the reader will throw if the specified character set does not exist
sampleReader = new InputStreamReader(fromFile, charsetName);
explanation.add("Using specified character encoding [" + charsetName + "]");
} else {
CharsetMatch charsetMatch = findCharset(explanation, fromFile, timeoutChecker);
charsetName = charsetMatch.getName();
sampleReader = charsetMatch.getReader();
}

Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount));
Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount), timeoutChecker);

return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides);
return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides, timeoutChecker);
}
}

CharsetMatch findCharset(List<String> explanation, InputStream inputStream) throws Exception {
CharsetMatch findCharset(List<String> explanation, InputStream inputStream, TimeoutChecker timeoutChecker) throws Exception {

// We need an input stream that supports mark and reset, so wrap the argument
// in a BufferedInputStream if it doesn't already support this feature
Expand All @@ -141,6 +162,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
// This is from ICU4J
CharsetDetector charsetDetector = new CharsetDetector().setText(inputStream);
CharsetMatch[] charsetMatches = charsetDetector.detectAll();
timeoutChecker.check("character set detection");

// Determine some extra characteristics of the input to compensate for some deficiencies of ICU4J
boolean pureAscii = true;
Expand All @@ -164,6 +186,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
remainingLength -= bytesRead;
} while (containsZeroBytes == false && remainingLength > 0);
inputStream.reset();
timeoutChecker.check("character set detection");

if (pureAscii) {
// If the input is pure ASCII then many single byte character sets will match. We want to favour
Expand Down Expand Up @@ -220,7 +243,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
}

FileStructureFinder makeBestStructureFinder(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
FileStructureOverrides overrides) throws Exception {
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception {

Character delimiter = overrides.getDelimiter();
Character quote = overrides.getQuote();
Expand Down Expand Up @@ -250,16 +273,18 @@ FileStructureFinder makeBestStructureFinder(List<String> explanation, String sam
}

for (FileStructureFinderFactory factory : factories) {
timeoutChecker.check("high level format detection");
if (factory.canCreateFromSample(explanation, sample)) {
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides);
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides, timeoutChecker);
}
}

throw new IllegalArgumentException("Input did not match " +
((overrides.getFormat() == null) ? "any known formats" : "the specified format [" + overrides.getFormat() + "]"));
}

private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int minLines, int maxLines) throws IOException {
private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int minLines, int maxLines, TimeoutChecker timeoutChecker)
throws IOException {

int lineCount = 0;
BufferedReader bufferedReader = new BufferedReader(reader);
Expand All @@ -283,6 +308,7 @@ private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int
String line;
while ((line = bufferedReader.readLine()) != null && ++lineCount <= maxLines) {
sample.append(line).append('\n');
timeoutChecker.check("sample line splitting");
}

if (lineCount < minLines) {
Expand Down
Loading

0 comments on commit b5cd33a

Please sign in to comment.