Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Add a timeout option to file structure finder #34117

Merged
merged 3 commits into from
Sep 28, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
413 changes: 408 additions & 5 deletions docs/reference/ml/apis/find-file-structure.asciidoc

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -112,6 +113,7 @@ public boolean equals(Object other) {
public static class Request extends ActionRequest {

public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField CHARSET = FileStructure.CHARSET;
public static final ParseField FORMAT = FileStructure.FORMAT;
public static final ParseField COLUMN_NAMES = FileStructure.COLUMN_NAMES;
Expand All @@ -128,6 +130,7 @@ public static class Request extends ActionRequest {
"[%s] may only be specified if [" + FORMAT.getPreferredName() + "] is [%s]";

private Integer linesToSample;
private TimeValue timeout;
private String charset;
private FileStructure.Format format;
private List<String> columnNames;
Expand All @@ -151,6 +154,14 @@ public void setLinesToSample(Integer linesToSample) {
this.linesToSample = linesToSample;
}

public TimeValue getTimeout() {
return timeout;
}

public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

public String getCharset() {
return charset;
}
Expand Down Expand Up @@ -313,6 +324,7 @@ public ActionRequestValidationException validate() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
linesToSample = in.readOptionalVInt();
timeout = in.readOptionalTimeValue();
charset = in.readOptionalString();
format = in.readBoolean() ? in.readEnum(FileStructure.Format.class) : null;
columnNames = in.readBoolean() ? in.readList(StreamInput::readString) : null;
Expand All @@ -330,6 +342,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalVInt(linesToSample);
out.writeOptionalTimeValue(timeout);
out.writeOptionalString(charset);
if (format == null) {
out.writeBoolean(false);
Expand Down Expand Up @@ -365,7 +378,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public int hashCode() {
return Objects.hash(linesToSample, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
return Objects.hash(linesToSample, timeout, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
timestampField, sample);
}

Expand All @@ -382,6 +395,7 @@ public boolean equals(Object other) {

Request that = (Request) other;
return Objects.equals(this.linesToSample, that.linesToSample) &&
Objects.equals(this.timeout, that.timeout) &&
Objects.equals(this.charset, that.charset) &&
Objects.equals(this.format, that.format) &&
Objects.equals(this.columnNames, that.columnNames) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ protected void doExecute(Task task, FindFileStructureAction.Request request,

private FindFileStructureAction.Response buildFileStructureResponse(FindFileStructureAction.Request request) throws Exception {

FileStructureFinderManager structureFinderManager = new FileStructureFinderManager();
FileStructureFinderManager structureFinderManager = new FileStructureFinderManager(threadPool.scheduler());

FileStructureFinder fileStructureFinder = structureFinderManager.findFileStructure(request.getLinesToSample(),
request.getSample().streamInput(), new FileStructureOverrides(request));
request.getSample().streamInput(), new FileStructureOverrides(request), request.getTimeout());

return new FindFileStructureAction.Response(fileStructureFinder.getStructure());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {

static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String> explanation, String sample, String charsetName,
Boolean hasByteOrderMarker, CsvPreference csvPreference,
boolean trimFields, FileStructureOverrides overrides)
boolean trimFields, FileStructureOverrides overrides,
TimeoutChecker timeoutChecker)
throws IOException {

Tuple<List<List<String>>, List<Integer>> parsed = readRows(sample, csvPreference);
Tuple<List<List<String>>, List<Integer>> parsed = readRows(sample, csvPreference, timeoutChecker);
List<List<String>> rows = parsed.v1();
List<Integer> lineNumbers = parsed.v2();

Expand Down Expand Up @@ -106,7 +107,8 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
structureBuilder.setShouldTrimFields(true);
}

Tuple<String, TimestampMatch> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides);
Tuple<String, TimestampMatch> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides,
timeoutChecker);
if (timeField != null) {
String timeLineRegex = null;
StringBuilder builder = new StringBuilder("^");
Expand Down Expand Up @@ -148,7 +150,7 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords);
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);

SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
if (timeField != null) {
Expand Down Expand Up @@ -183,7 +185,8 @@ public FileStructure getStructure() {
return structure;
}

static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPreference csvPreference) throws IOException {
static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPreference csvPreference, TimeoutChecker timeoutChecker)
throws IOException {

int fieldsInFirstRow = -1;

Expand All @@ -204,6 +207,7 @@ static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPrefe
}
}
rows.add(row);
timeoutChecker.check("delimited record parsing");
lineNumbers.add(csvReader.getLineNumber());
}
} catch (SuperCsvException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public boolean canCreateFromSample(List<String> explanation, String sample) {

@Override
public FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
FileStructureOverrides overrides) throws IOException {
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws IOException {
return DelimitedFileStructureFinder.makeDelimitedFileStructureFinder(explanation, sample, charsetName, hasByteOrderMarker,
csvPreference, trimFields, overrides);
csvPreference, trimFields, overrides, timeoutChecker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public interface FileStructureFinderFactory {
* @param hasByteOrderMarker Did the sample have a byte order marker? <code>null</code> means "not relevant".
* @param overrides Stores structure decisions that have been made by the end user, and should
* take precedence over anything the {@link FileStructureFinder} may decide.
* @param timeoutChecker Will abort the operation if its timeout is exceeded.
* @return A {@link FileStructureFinder} object suitable for determining the structure of the supplied sample.
* @throws Exception if something goes wrong during creation.
*/
FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
FileStructureOverrides overrides) throws Exception;
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import com.ibm.icu.text.CharsetDetector;
import com.ibm.icu.text.CharsetMatch;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
Expand All @@ -23,15 +25,17 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

/**
* Runs the high-level steps needed to create ingest configs for the specified file. In order:
* 1. Determine the most likely character set (UTF-8, UTF-16LE, ISO-8859-2, etc.)
* 2. Load a sample of the file, consisting of the first 1000 lines of the file
* 3. Determine the most likely file structure - one of ND-JSON, XML, CSV, TSV or semi-structured text
* 3. Determine the most likely file structure - one of ND-JSON, XML, delimited or semi-structured text
* 4. Create an appropriate structure object and delegate writing configs to it
*/
public final class FileStructureFinderManager {
Expand Down Expand Up @@ -81,8 +85,18 @@ public final class FileStructureFinderManager {

private static final int BUFFER_SIZE = 8192;

private final ScheduledExecutorService scheduler;

/**
* Create the file structure manager.
* @param scheduler Used for checking timeouts.
*/
public FileStructureFinderManager(ScheduledExecutorService scheduler) {
this.scheduler = Objects.requireNonNull(scheduler);
}

public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile) throws Exception {
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES);
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
}

/**
Expand All @@ -95,42 +109,49 @@ public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Input
* @param overrides Aspects of the file structure that are known in advance. These take precedence over
* values determined by structure analysis. An exception will be thrown if the file structure
* is incompatible with an overridden value.
* @param timeout The maximum time the analysis is permitted to take. If it takes longer than this an
* {@link ElasticsearchTimeoutException} may be thrown (although not necessarily immediately
* the timeout is exceeded).
* @return A {@link FileStructureFinder} object from which the structure and messages can be queried.
* @throws Exception A variety of problems could occur at various stages of the structure finding process.
*/
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides)
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides,
TimeValue timeout)
throws Exception {
return findFileStructure(new ArrayList<>(), (idealSampleLineCount == null) ? DEFAULT_IDEAL_SAMPLE_LINE_COUNT : idealSampleLineCount,
fromFile, overrides);
fromFile, overrides, timeout);
}

public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile)
throws Exception {
return findFileStructure(new ArrayList<>(), idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES);
return findFileStructure(explanation, idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
}

public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile,
FileStructureOverrides overrides) throws Exception {

String charsetName = overrides.getCharset();
Reader sampleReader;
if (charsetName != null) {
// Creating the reader will throw if the specified character set does not exist
sampleReader = new InputStreamReader(fromFile, charsetName);
explanation.add("Using specified character encoding [" + charsetName + "]");
} else {
CharsetMatch charsetMatch = findCharset(explanation, fromFile);
charsetName = charsetMatch.getName();
sampleReader = charsetMatch.getReader();
}
FileStructureOverrides overrides, TimeValue timeout) throws Exception {

try (TimeoutChecker timeoutChecker = new TimeoutChecker("structure analysis", timeout, scheduler)) {

String charsetName = overrides.getCharset();
Reader sampleReader;
if (charsetName != null) {
// Creating the reader will throw if the specified character set does not exist
sampleReader = new InputStreamReader(fromFile, charsetName);
explanation.add("Using specified character encoding [" + charsetName + "]");
} else {
CharsetMatch charsetMatch = findCharset(explanation, fromFile, timeoutChecker);
charsetName = charsetMatch.getName();
sampleReader = charsetMatch.getReader();
}

Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount));
Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount), timeoutChecker);

return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides);
return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides, timeoutChecker);
}
}

CharsetMatch findCharset(List<String> explanation, InputStream inputStream) throws Exception {
CharsetMatch findCharset(List<String> explanation, InputStream inputStream, TimeoutChecker timeoutChecker) throws Exception {

// We need an input stream that supports mark and reset, so wrap the argument
// in a BufferedInputStream if it doesn't already support this feature
Expand All @@ -141,6 +162,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
// This is from ICU4J
CharsetDetector charsetDetector = new CharsetDetector().setText(inputStream);
CharsetMatch[] charsetMatches = charsetDetector.detectAll();
timeoutChecker.check("character set detection");

// Determine some extra characteristics of the input to compensate for some deficiencies of ICU4J
boolean pureAscii = true;
Expand All @@ -164,6 +186,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
remainingLength -= bytesRead;
} while (containsZeroBytes == false && remainingLength > 0);
inputStream.reset();
timeoutChecker.check("character set detection");

if (pureAscii) {
// If the input is pure ASCII then many single byte character sets will match. We want to favour
Expand Down Expand Up @@ -220,7 +243,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
}

FileStructureFinder makeBestStructureFinder(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
FileStructureOverrides overrides) throws Exception {
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception {

Character delimiter = overrides.getDelimiter();
Character quote = overrides.getQuote();
Expand Down Expand Up @@ -250,16 +273,18 @@ FileStructureFinder makeBestStructureFinder(List<String> explanation, String sam
}

for (FileStructureFinderFactory factory : factories) {
timeoutChecker.check("high level format detection");
if (factory.canCreateFromSample(explanation, sample)) {
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides);
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides, timeoutChecker);
}
}

throw new IllegalArgumentException("Input did not match " +
((overrides.getFormat() == null) ? "any known formats" : "the specified format [" + overrides.getFormat() + "]"));
}

private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int minLines, int maxLines) throws IOException {
private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int minLines, int maxLines, TimeoutChecker timeoutChecker)
throws IOException {

int lineCount = 0;
BufferedReader bufferedReader = new BufferedReader(reader);
Expand All @@ -283,6 +308,7 @@ private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int
String line;
while ((line = bufferedReader.readLine()) != null && ++lineCount <= maxLines) {
sample.append(line).append('\n');
timeoutChecker.check("sample line splitting");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a thin loop to be doing the check every time. There is a similar problem in DelimitedFileStructureFinder I don't want to get into premature optimisations but it would be nice if the check wasn't run every iteration. Possibly the overhead of the logic to check that would be heavier than the fast call to timeoutChecker.check(...)

Copy link
Contributor Author

@droberts195 droberts195 Sep 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly the overhead of the logic to check that would be heavier

That's my assumption - see #34117 (comment)

I'll profile it to confirm.

}

if (lineCount < minLines) {
Expand Down
Loading