Skip to content

Commit

Permalink
Introduce templating support to timezone/locale in DateProcessor (#27089
Browse files Browse the repository at this point in the history
)

Sometimes systems like Beats would want to extract the date's timezone and/or locale
from a value in a field of the document. This PR adds support for mustache templating
to extract these values.

Closes #24024.
  • Loading branch information
talevy authored Nov 9, 2017
1 parent e04e5ab commit d22fd4e
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 79 deletions.
24 changes: 24 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,30 @@ Here is an example that adds the parsed date to the `timestamp` field based on t
--------------------------------------------------
// NOTCONSOLE

The `timezone` and `locale` processor parameters are templated. This means that their values can be
extracted from fields within documents. The example below shows how to extract the locale/timezone
details from existing fields, `my_timezone` and `my_locale`, in the ingested document that contain
the timezone and locale values.

[source,js]
--------------------------------------------------
{
"description" : "...",
"processors" : [
{
"date" : {
"field" : "initial_date",
"target_field" : "timestamp",
"formats" : ["ISO8601"],
"timezone" : "{{ my_timezone }}",
"locale" : "{{ my_locale }}"
}
}
]
}
--------------------------------------------------
// NOTCONSOLE

[[date-index-name-processor]]
=== Date Index Name Processor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.LocaleUtils;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
Expand All @@ -40,14 +43,15 @@ public final class DateProcessor extends AbstractProcessor {
public static final String TYPE = "date";
static final String DEFAULT_TARGET_FIELD = "@timestamp";

private final DateTimeZone timezone;
private final Locale locale;
private final TemplateScript.Factory timezone;
private final TemplateScript.Factory locale;
private final String field;
private final String targetField;
private final List<String> formats;
private final List<Function<String, DateTime>> dateParsers;
private final List<Function<Map<String, Object>, Function<String, DateTime>>> dateParsers;

DateProcessor(String tag, DateTimeZone timezone, Locale locale, String field, List<String> formats, String targetField) {
DateProcessor(String tag, @Nullable TemplateScript.Factory timezone, @Nullable TemplateScript.Factory locale,
String field, List<String> formats, String targetField) {
super(tag);
this.timezone = timezone;
this.locale = locale;
Expand All @@ -57,10 +61,18 @@ public final class DateProcessor extends AbstractProcessor {
this.dateParsers = new ArrayList<>(this.formats.size());
for (String format : formats) {
DateFormat dateFormat = DateFormat.fromString(format);
dateParsers.add(dateFormat.getFunction(format, timezone, locale));
dateParsers.add((params) -> dateFormat.getFunction(format, newDateTimeZone(params), newLocale(params)));
}
}

private DateTimeZone newDateTimeZone(Map<String, Object> params) {
return timezone == null ? DateTimeZone.UTC : DateTimeZone.forID(timezone.newInstance(params).execute());
}

private Locale newLocale(Map<String, Object> params) {
return (locale == null) ? Locale.ROOT : LocaleUtils.parse(locale.newInstance(params).execute());
}

@Override
public void execute(IngestDocument ingestDocument) {
Object obj = ingestDocument.getFieldValue(field, Object.class);
Expand All @@ -72,9 +84,9 @@ public void execute(IngestDocument ingestDocument) {

DateTime dateTime = null;
Exception lastException = null;
for (Function<String, DateTime> dateParser : dateParsers) {
for (Function<Map<String, Object>, Function<String, DateTime>> dateParser : dateParsers) {
try {
dateTime = dateParser.apply(value);
dateTime = dateParser.apply(ingestDocument.getSourceAndMetadata()).apply(value);
} catch (Exception e) {
//try the next parser and keep track of the exceptions
lastException = ExceptionsHelper.useOrSuppress(lastException, e);
Expand All @@ -93,11 +105,11 @@ public String getType() {
return TYPE;
}

DateTimeZone getTimezone() {
TemplateScript.Factory getTimezone() {
return timezone;
}

Locale getLocale() {
TemplateScript.Factory getLocale() {
return locale;
}

Expand All @@ -115,19 +127,30 @@ List<String> getFormats() {

public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

public DateProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
TemplateScript.Factory compiledTimezoneTemplate = null;
if (timezoneString != null) {
compiledTimezoneTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"timezone", timezoneString, scriptService);
}
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "locale");
Locale locale = Locale.ROOT;
TemplateScript.Factory compiledLocaleTemplate = null;
if (localeString != null) {
locale = LocaleUtils.parse(localeString);
compiledLocaleTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"locale", localeString, scriptService);
}
List<String> formats = ConfigurationUtils.readList(TYPE, processorTag, config, "formats");
return new DateProcessor(processorTag, timezone, locale, field, formats, targetField);
return new DateProcessor(processorTag, compiledTimezoneTemplate, compiledLocaleTemplate, field, formats, targetField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public IngestCommonPlugin() throws IOException {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(DateProcessor.TYPE, new DateProcessor.Factory());
processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService));
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService));
processors.put(RenameProcessor.TYPE, new RenameProcessor.Factory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTimeZone;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -34,8 +36,14 @@

public class DateProcessorFactoryTests extends ESTestCase {

private DateProcessor.Factory factory;

@Before
public void init() {
factory = new DateProcessor.Factory(TestTemplateService.instance());
}

public void testBuildDefaults() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
Expand All @@ -46,12 +54,11 @@ public void testBuildDefaults() throws Exception {
assertThat(processor.getField(), equalTo(sourceField));
assertThat(processor.getTargetField(), equalTo(DateProcessor.DEFAULT_TARGET_FIELD));
assertThat(processor.getFormats(), equalTo(Collections.singletonList("dd/MM/yyyyy")));
assertThat(processor.getLocale(), equalTo(Locale.ROOT));
assertThat(processor.getTimezone(), equalTo(DateTimeZone.UTC));
assertNull(processor.getLocale());
assertNull(processor.getTimezone());
}

public void testMatchFieldIsMandatory() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String targetField = randomAlphaOfLengthBetween(1, 10);
config.put("target_field", targetField);
Expand All @@ -66,7 +73,6 @@ public void testMatchFieldIsMandatory() throws Exception {
}

public void testMatchFormatsIsMandatory() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
String targetField = randomAlphaOfLengthBetween(1, 10);
Expand All @@ -82,7 +88,6 @@ public void testMatchFormatsIsMandatory() throws Exception {
}

public void testParseLocale() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
Expand All @@ -91,39 +96,10 @@ public void testParseLocale() throws Exception {
config.put("locale", locale.toLanguageTag());

DateProcessor processor = factory.create(null, null, config);
assertThat(processor.getLocale().toLanguageTag(), equalTo(locale.toLanguageTag()));
}

public void testParseInvalidLocale() throws Exception {
String[] locales = new String[] { "invalid_locale", "english", "xy", "xy-US" };
for (String locale : locales) {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
config.put("formats", Collections.singletonList("dd/MM/yyyyy"));
config.put("locale", locale);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("Unknown language: " + locale.split("[_-]")[0]));
}

locales = new String[] { "en-XY", "en-Canada" };
for (String locale : locales) {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
config.put("formats", Collections.singletonList("dd/MM/yyyyy"));
config.put("locale", locale);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("Unknown country: " + locale.split("[_-]")[1]));
}
assertThat(processor.getLocale().newInstance(Collections.emptyMap()).execute(), equalTo(locale.toLanguageTag()));
}

public void testParseTimezone() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
Expand All @@ -132,26 +108,10 @@ public void testParseTimezone() throws Exception {
DateTimeZone timezone = randomDateTimeZone();
config.put("timezone", timezone.getID());
DateProcessor processor = factory.create(null, null, config);
assertThat(processor.getTimezone(), equalTo(timezone));
}

public void testParseInvalidTimezone() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
config.put("match_formats", Collections.singletonList("dd/MM/yyyyy"));
config.put("timezone", "invalid_timezone");
try {
factory.create(null, null, config);
fail("invalid timezone should fail");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("The datetime zone id 'invalid_timezone' is not recognised"));
}
assertThat(processor.getTimezone().newInstance(Collections.emptyMap()).execute(), equalTo(timezone.getID()));
}

public void testParseMatchFormats() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
Expand All @@ -162,7 +122,6 @@ public void testParseMatchFormats() throws Exception {
}

public void testParseMatchFormatsFailure() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
config.put("field", sourceField);
Expand All @@ -177,7 +136,6 @@ public void testParseMatchFormatsFailure() throws Exception {
}

public void testParseTargetField() throws Exception {
DateProcessor.Factory factory = new DateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String sourceField = randomAlphaOfLengthBetween(1, 10);
String targetField = randomAlphaOfLengthBetween(1, 10);
Expand Down
Loading

0 comments on commit d22fd4e

Please sign in to comment.