forked from elastic/elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This change adds new ingest processor that breaks line from CSV file into separate fields. By default it conforms to RFC 4180 but can be tweaked. Closes elastic#49113
- Loading branch information
1 parent
642390c
commit 67fde34
Showing
4 changed files
with
472 additions
and
1 deletion.
There are no files selected for viewing
172 changes: 172 additions & 0 deletions
172
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.ingest.common; | ||
|
||
import org.elasticsearch.ingest.IngestDocument; | ||
|
||
final class CsvParser { | ||
|
||
private enum State { | ||
START, UNQUOTED, QUOTED, QUOTED_END | ||
} | ||
|
||
private final char quote; | ||
private final char separator; | ||
private final boolean trim; | ||
private final String[] headers; | ||
private final IngestDocument ingestDocument; | ||
private final StringBuilder builder = new StringBuilder(); | ||
private State state = State.START; | ||
private String line; | ||
private int currentHeader; | ||
private int startIndex; | ||
private int length; | ||
private int currentIndex; | ||
|
||
CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) { | ||
this.ingestDocument = ingestDocument; | ||
this.quote = quote; | ||
this.separator = separator; | ||
this.trim = trim; | ||
this.headers = headers; | ||
} | ||
|
||
void process(String line) { | ||
this.line = line; | ||
length = line.length(); | ||
for (currentIndex = 0; currentIndex < length; currentIndex++) { | ||
switch (state) { | ||
case START: | ||
if (processStart()) { | ||
return; | ||
} | ||
break; | ||
case UNQUOTED: | ||
if (processUnquoted()) { | ||
return; | ||
} | ||
break; | ||
case QUOTED: | ||
processQuoted(); | ||
break; | ||
case QUOTED_END: | ||
if (processQuotedEnd()) { | ||
return; | ||
} | ||
break; | ||
} | ||
} | ||
|
||
//we've reached end of string, we need to handle last field | ||
switch (state) { | ||
case UNQUOTED: | ||
setField(length); | ||
break; | ||
case QUOTED_END: | ||
setField(length - 1); | ||
break; | ||
case QUOTED: | ||
throw new IllegalArgumentException("Unmatched quote"); | ||
} | ||
} | ||
|
||
private boolean processStart() { | ||
for (; currentIndex < length; currentIndex++) { | ||
char c = line.charAt(currentIndex); | ||
if (c == quote) { | ||
state = State.QUOTED; | ||
builder.setLength(0); | ||
startIndex++; | ||
return false; | ||
} else if (c == separator) { | ||
startIndex++; | ||
if (nextHeader()) { | ||
return true; | ||
} | ||
} else if (trim && (c == ' ' || c == '\t')) { | ||
startIndex++; | ||
} else { | ||
state = State.UNQUOTED; | ||
builder.setLength(0); | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
private boolean processUnquoted() { | ||
for (; currentIndex < length; currentIndex++) { | ||
char c = line.charAt(currentIndex); | ||
if (c == '\n' || c == '\r' || c == quote) { | ||
throw new IllegalArgumentException("Illegal character inside unquoted field at " + currentIndex); | ||
} else if (c == separator) { | ||
state = State.START; | ||
if (setField(currentIndex)) { | ||
return true; | ||
} | ||
startIndex = currentIndex + 1; | ||
return false; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
private void processQuoted() { | ||
for (; currentIndex < length; currentIndex++) { | ||
if (line.charAt(currentIndex) == quote) { | ||
state = State.QUOTED_END; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
private boolean processQuotedEnd() { | ||
char c = line.charAt(currentIndex); | ||
if (c == quote) { | ||
builder.append(line, startIndex, currentIndex - 1).append(quote); | ||
startIndex = currentIndex + 1; | ||
state = State.QUOTED; | ||
} else if (c == separator) { | ||
if (setField(currentIndex - 1)) { | ||
return true; | ||
} | ||
startIndex = currentIndex + 1; | ||
state = State.START; | ||
} else { | ||
throw new IllegalArgumentException("Characters after quoted field at " + currentIndex); | ||
} | ||
return false; | ||
} | ||
|
||
private boolean setField(int endIndex) { | ||
if (builder.length() == 0) { | ||
ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex)); | ||
} else { | ||
builder.append(line, startIndex, endIndex); | ||
ingestDocument.setFieldValue(headers[currentHeader], builder.toString()); | ||
} | ||
return nextHeader(); | ||
} | ||
|
||
private boolean nextHeader() { | ||
currentHeader++; | ||
return currentHeader == headers.length; | ||
} | ||
} |
104 changes: 104 additions & 0 deletions
104
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.ingest.common; | ||
|
||
import org.elasticsearch.ingest.AbstractProcessor; | ||
import org.elasticsearch.ingest.ConfigurationUtils; | ||
import org.elasticsearch.ingest.IngestDocument; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* A processor that breaks line from CSV file into separate fields. | ||
* If there's more fields requested than there is in the CSV extra field will no be present in document after processing. | ||
* In the same way this processor will skip any field that is empty in CSV. | ||
* | ||
* | ||
* By default it uses rules according to <a href="https://tools.ietf.org/html/rfc4180">RCF 4180</a>, | ||
* but it can be tweaked with following parameters: | ||
* | ||
* quote: set custom quote character (defaults to ") | ||
* separator: set custom separator (defaults to ,) | ||
* trim: trim leading whitespaces in each field (allows also whitespaces before quoted fields, defaults to false) | ||
*/ | ||
public class CsvProcessor extends AbstractProcessor { | ||
|
||
public static final String TYPE = "csv"; | ||
|
||
private final String field; | ||
private final String[] headers; | ||
private final boolean trim; | ||
private final char quote; | ||
private final char separator; | ||
private final boolean ignoreMissing; | ||
|
||
public CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) { | ||
super(tag); | ||
this.field = field; | ||
this.headers = headers; | ||
this.trim = trim; | ||
this.quote = quote; | ||
this.separator = separator; | ||
this.ignoreMissing = ignoreMissing; | ||
} | ||
|
||
@Override | ||
public IngestDocument execute(IngestDocument ingestDocument) { | ||
if (headers.length == 0) { | ||
return ingestDocument; | ||
} | ||
|
||
String line = ingestDocument.getFieldValue(field, String.class, ignoreMissing); | ||
if (line == null && ignoreMissing == false) { | ||
return ingestDocument; | ||
} else if (line == null) { | ||
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); | ||
} | ||
new CsvParser(ingestDocument, quote, separator, trim, headers).process(line); | ||
return ingestDocument; | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return TYPE; | ||
} | ||
|
||
public static final class Factory implements org.elasticsearch.ingest.Processor.Factory { | ||
@Override | ||
public CsvProcessor create(Map<String, org.elasticsearch.ingest.Processor.Factory> registry, String processorTag, | ||
Map<String, Object> config) { | ||
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); | ||
String quote = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "quote", "\""); | ||
if (quote.length() != 1) { | ||
throw new IllegalArgumentException("quote has to be single character like \" or '"); | ||
} | ||
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator", ","); | ||
if (separator.length() != 1) { | ||
throw new IllegalArgumentException("separator has to be single character like \" or '"); | ||
} | ||
boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false); | ||
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); | ||
List<String> targetFields = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "target_fields"); | ||
return new CsvProcessor(processorTag, field, targetFields == null ? new String[0] : targetFields.toArray(String[]::new), | ||
trim, separator.charAt(0), quote.charAt(0), ignoreMissing); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.