Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort time series indices by time range in GetDataStreams API #107967

Merged
merged 4 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/107967.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 107967
summary: Sort time series indices by time range in `GetDataStreams` API
area: TSDB
type: bug
issues:
- 102088
Original file line number Diff line number Diff line change
Expand Up @@ -151,46 +151,61 @@ static GetDataStreamAction.Response innerOperation(

GetDataStreamAction.Response.TimeSeries timeSeries = null;
if (dataStream.getIndexMode() == IndexMode.TIME_SERIES) {
List<Tuple<Instant, Instant>> ranges = new ArrayList<>();
Tuple<Instant, Instant> current = null;
String previousIndexName = null;
for (Index index : dataStream.getIndices()) {
IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata.getIndexMode() != IndexMode.TIME_SERIES) {
continue;
record IndexInfo(String name, Instant timeSeriesStart, Instant timeSeriesEnd) implements Comparable<IndexInfo> {
@Override
public int compareTo(IndexInfo o) {
return Comparator.comparing(IndexInfo::timeSeriesStart).thenComparing(IndexInfo::timeSeriesEnd).compare(this, o);
}
Instant start = indexMetadata.getTimeSeriesStart();
Instant end = indexMetadata.getTimeSeriesEnd();
if (current == null) {
current = new Tuple<>(start, end);
} else if (current.v2().compareTo(start) == 0) {
current = new Tuple<>(current.v1(), end);
} else if (current.v2().compareTo(start) < 0) {
ranges.add(current);
current = new Tuple<>(start, end);
}

List<Tuple<Instant, Instant>> mergedRanges = new ArrayList<>();
Tuple<Instant, Instant> currentMergedRange = null;
IndexInfo previous = null;

// We need indices to be sorted by time series range
// to produce temporal ranges.
// But it is not enforced in API, so we explicitly sort here.
var sortedRanges = dataStream.getIndices()
.stream()
.map(metadata::index)
.filter(m -> m.getIndexMode() == IndexMode.TIME_SERIES)
.map(m -> new IndexInfo(m.getIndex().getName(), m.getTimeSeriesStart(), m.getTimeSeriesEnd()))
.sorted()
.toList();

for (var info : sortedRanges) {
Instant start = info.timeSeriesStart();
Instant end = info.timeSeriesEnd();

if (currentMergedRange == null) {
currentMergedRange = new Tuple<>(start, end);
} else if (currentMergedRange.v2().compareTo(start) == 0) {
currentMergedRange = new Tuple<>(currentMergedRange.v1(), end);
} else if (currentMergedRange.v2().compareTo(start) < 0) {
mergedRanges.add(currentMergedRange);
currentMergedRange = new Tuple<>(start, end);
} else {
String message = "previous backing index ["
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the indices getting sorted, I think don't we will ever log this warning? Maybe we should remove it and replace this with an assert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a "should never happen" thing.

+ previousIndexName
+ previous.name()
+ "] range ["
+ current.v1()
+ previous.timeSeriesStart()
+ "/"
+ current.v2()
+ previous.timeSeriesEnd()
+ "] range is colliding with current backing ["
+ index.getName()
+ info.name()
+ "] index range ["
+ start
+ "/"
+ end
+ "]";
assert current.v2().compareTo(start) < 0 : message;
LOGGER.warn(message);
assert currentMergedRange.v2().compareTo(start) < 0 : message;
}
previousIndexName = index.getName();
previous = info;
}
if (current != null) {
ranges.add(current);
if (currentMergedRange != null) {
mergedRanges.add(currentMergedRange);
}
timeSeries = new GetDataStreamAction.Response.TimeSeries(ranges);
timeSeries = new GetDataStreamAction.Response.TimeSeries(mergedRanges);
}

dataStreamInfos.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,49 @@ public void testGetTimeSeriesDataStream() {
);
}

public void testGetTimeSeriesDataStreamWithOutOfOrderIndices() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
String dataStream = "ds-1";
Instant sixHoursAgo = now.minus(6, ChronoUnit.HOURS);
Instant fourHoursAgo = now.minus(4, ChronoUnit.HOURS);
Instant twoHoursAgo = now.minus(2, ChronoUnit.HOURS);
Instant twoHoursAhead = now.plus(2, ChronoUnit.HOURS);

ClusterState state;
{
var mBuilder = new Metadata.Builder();
DataStreamTestHelper.getClusterStateWithDataStream(
mBuilder,
dataStream,
List.of(
new Tuple<>(fourHoursAgo, twoHoursAgo),
new Tuple<>(sixHoursAgo, fourHoursAgo),
new Tuple<>(twoHoursAgo, twoHoursAhead)
)
);
state = ClusterState.builder(new ClusterName("_name")).metadata(mBuilder).build();
}

var req = new GetDataStreamAction.Request(new String[] {});
var response = GetDataStreamsTransportAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionResolver
);
assertThat(
response.getDataStreams(),
contains(
allOf(
transformedMatch(d -> d.getDataStream().getName(), equalTo(dataStream)),
transformedMatch(d -> d.getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)))
)
)
);
}

public void testGetTimeSeriesMixedDataStream() {
Instant instant = Instant.parse("2023-06-06T14:00:00.000Z").truncatedTo(ChronoUnit.SECONDS);
String dataStream1 = "ds-1";
Expand Down