Skip to content

Commit

Permalink
TSDB: add index timestamp range check (#78291)
Browse files Browse the repository at this point in the history
Adds settings for the "start time" and "end time" of a tsdb index and
rejects any documents outside of that range.
  • Loading branch information
weizijun authored Nov 11, 2021
1 parent 4b81094 commit 8b2019a
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,65 @@ routing required:
mappings:
_routing:
required: true
---
set start_time and end_time:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0
- do:
indices.create:
index: test_index
body:
settings:
index:
mode: time_series
routing_path: [metricset]
time_series:
start_time: 1632625782000
end_time: 1632625792000

- do:
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625793000

- do:
catch: /index.time_series.end_time must be larger than current value \[1632625793000\]/
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625792000

- do:
indices.delete:
index: test_index

---
set start_time and end_time without timeseries mode:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0
- do:
catch: /\[index.time_series.start_time\] requires \[index.mode=time_series\]/
indices.create:
index: test_index
body:
settings:
index:
time_series:
start_time: 1632625782000

- do:
catch: /\[index.time_series.end_time\] requires \[index.mode=time_series\]/
indices.create:
index: test_index
body:
settings:
index:
time_series:
end_time: 1632625782000
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ private static Set<Setting<?>> builtInIndexSettings() {
Set<Setting<?>> result = new HashSet<>(ALWAYS_ENABLED_BUILT_IN_INDEX_SETTINGS);
result.add(IndexSettings.MODE);
result.add(IndexMetadata.INDEX_ROUTING_PATH);
result.add(IndexSettings.TIME_SERIES_START_TIME);
result.add(IndexSettings.TIME_SERIES_END_TIME);
return Set.copyOf(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -1331,6 +1333,16 @@ public static Setting<Long> longSetting(String key, long defaultValue, long minV
);
}

public static Setting<Instant> dateSetting(String key, Instant defaultValue, Validator<Instant> validator, Property... properties) {
return new Setting<>(
key,
defaultValue.toString(),
(s) -> Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(s)),
validator,
properties
);
}

public static Setting<String> simpleString(String key, Property... properties) {
return new Setting<>(key, s -> "", Function.identity(), properties);
}
Expand Down
37 changes: 25 additions & 12 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@

/**
* "Mode" that controls which behaviors and settings an index supports.
* <p>
* For the most part this class concentrates on validating settings and
* mappings. Most different behavior is controlled by forcing settings
* to be set or not set and by enabling extra fields in the mapping.
*/
public enum IndexMode {
STANDARD {
@Override
void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
if (false == Objects.equals(
IndexMetadata.INDEX_ROUTING_PATH.getDefault(Settings.EMPTY),
settings.get(IndexMetadata.INDEX_ROUTING_PATH)
)) {
throw new IllegalArgumentException(
"[" + IndexMetadata.INDEX_ROUTING_PATH.getKey() + "] requires [" + IndexSettings.MODE.getKey() + "=time_series]"
);
settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH);
settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_START_TIME);
settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_END_TIME);
}

private void settingRequiresTimeSeries(Map<Setting<?>, Object> settings, Setting<?> setting) {
if (false == Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) {
throw new IllegalArgumentException("[" + setting.getKey() + "] requires [" + IndexSettings.MODE.getKey() + "=time_series]");
}
}

Expand All @@ -67,10 +72,13 @@ void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
throw new IllegalArgumentException(error(unsupported));
}
}
if (IndexMetadata.INDEX_ROUTING_PATH.getDefault(Settings.EMPTY).equals(settings.get(IndexMetadata.INDEX_ROUTING_PATH))) {
throw new IllegalArgumentException(
"[" + IndexSettings.MODE.getKey() + "=time_series] requires [" + IndexMetadata.INDEX_ROUTING_PATH.getKey() + "]"
);
settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH);
// TODO make start and stop time required
}

private void settingRequiresTimeSeries(Map<Setting<?>, Object> settings, Setting<?> setting) {
if (Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) {
throw new IllegalArgumentException("[" + IndexSettings.MODE.getKey() + "=time_series] requires [" + setting.getKey() + "]");
}
}

Expand Down Expand Up @@ -150,7 +158,12 @@ private void validateTimeStampField(Object timestampFieldValue) {

static final List<Setting<?>> VALIDATE_WITH_SETTINGS = List.copyOf(
Stream.concat(
Stream.of(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING, IndexMetadata.INDEX_ROUTING_PATH),
Stream.of(
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetadata.INDEX_ROUTING_PATH,
IndexSettings.TIME_SERIES_START_TIME,
IndexSettings.TIME_SERIES_END_TIME
),
TIME_SERIES_UNSUPPORTED.stream()
).collect(toSet())
);
Expand Down
72 changes: 71 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;

import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -474,6 +476,46 @@ public static boolean isTimeSeriesModeEnabled() {
return Build.CURRENT.isSnapshot() || (TIME_SERIES_MODE_FEATURE_FLAG_REGISTERED != null && TIME_SERIES_MODE_FEATURE_FLAG_REGISTERED);
}

/**
* in time series mode, the start time of the index, timestamp must larger than start_time
*/
public static final Setting<Instant> TIME_SERIES_START_TIME = Setting.dateSetting(
"index.time_series.start_time",
Instant.ofEpochMilli(0),
v -> {},
Property.IndexScope,
Property.Final
);

/**
* in time series mode, the end time of the index, timestamp must smaller than start_time
*/
public static final Setting<Instant> TIME_SERIES_END_TIME = Setting.dateSetting(
"index.time_series.end_time",
DateUtils.MAX_NANOSECOND_INSTANT,
new Setting.Validator<>() {
@Override
public void validate(Instant value) {}

@Override
public void validate(Instant value, Map<Setting<?>, Object> settings) {
@SuppressWarnings("unchecked")
Instant startTime = (Instant) settings.get(TIME_SERIES_START_TIME);
if (startTime.toEpochMilli() > value.toEpochMilli()) {
throw new IllegalArgumentException("index.time_series.end_time must be larger than index.time_series.start_time");
}
}

@Override
public Iterator<Setting<?>> settings() {
List<Setting<?>> settings = List.of(TIME_SERIES_START_TIME);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
);

/**
* The {@link IndexMode "mode"} of the index.
*/
Expand Down Expand Up @@ -509,6 +551,14 @@ public Iterator<Setting<?>> settings() {
* The {@link IndexMode "mode"} of the index.
*/
private final IndexMode mode;
/**
* Start time of the time_series index.
*/
private final long timeSeriesStartTime;
/**
* End time of the time_series index.
*/
private volatile long timeSeriesEndTime;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
Expand Down Expand Up @@ -651,7 +701,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
mode = isTimeSeriesModeEnabled() ? scopedSettings.get(MODE) : IndexMode.STANDARD;

timeSeriesStartTime = TIME_SERIES_START_TIME.get(settings).toEpochMilli();
timeSeriesEndTime = TIME_SERIES_END_TIME.get(settings).toEpochMilli();
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -762,6 +813,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING, this::setMappingDimensionFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(TIME_SERIES_END_TIME, this::updateTimeSeriesEndTime);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1275,4 +1327,22 @@ public long getMappingDimensionFieldsLimit() {
private void setMappingDimensionFieldsLimit(long value) {
this.mappingDimensionFieldsLimit = value;
}

public long getTimeSeriesStartTime() {
return timeSeriesStartTime;
}

public long getTimeSeriesEndTime() {
return timeSeriesEndTime;
}

public void updateTimeSeriesEndTime(Instant endTimeInstant) {
long endTime = endTimeInstant.toEpochMilli();
if (this.timeSeriesEndTime > endTime) {
throw new IllegalArgumentException(
"index.time_series.end_time must be larger than current value [" + this.timeSeriesEndTime + "]"
);
}
this.timeSeriesEndTime = endTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -23,6 +24,7 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.core.TimeValue.NSEC_PER_MSEC;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

/**
Expand Down Expand Up @@ -204,6 +206,29 @@ public void postParse(DocumentParserContext context) throws IOException {
if (numberOfValues > 1) {
throw new IllegalArgumentException("data stream timestamp field [" + DEFAULT_PATH + "] encountered multiple values");
}

validateTimestamp(fields[0], context);
}

private void validateTimestamp(IndexableField field, DocumentParserContext context) {
if (context.indexSettings().getMode() == null || context.indexSettings().getMode() != IndexMode.TIME_SERIES) {
return;
}

long value = field.numericValue().longValue();
if (context.mappingLookup().getMapper(DEFAULT_PATH).typeName().equals(DateFieldMapper.DATE_NANOS_CONTENT_TYPE)) {
value /= NSEC_PER_MSEC;
}

long startTime = context.indexSettings().getTimeSeriesStartTime();
if (value < startTime) {
throw new IllegalArgumentException("time series index @timestamp value [" + value + "] must be larger than " + startTime);
}

long endTime = context.indexSettings().getTimeSeriesEndTime();
if (value >= endTime) {
throw new IllegalArgumentException("time series index @timestamp value [" + value + "] must be smaller than " + endTime);
}
}

@Override
Expand Down
Loading

0 comments on commit 8b2019a

Please sign in to comment.