Skip to content

Commit

Permalink
Fixing test failures
Browse files Browse the repository at this point in the history
Signed-off-by: PritLadani <[email protected]>
  • Loading branch information
PritLadani committed Dec 20, 2022
1 parent 253fc61 commit 5817770
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ public SearchBackpressureStats nodeStats() {
searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
taskResourceUsageTrackers.stream()
.collect(
Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.searchTaskStats(searchTasks))
Collectors.toUnmodifiableMap(
t -> TaskResourceUsageTrackerType.fromName(t.name()),
t -> t.searchTaskStats(searchTasks))
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure.stats;

import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -37,7 +38,13 @@ public SearchBackpressureStats(
}

public SearchBackpressureStats(StreamInput in) throws IOException {
this(new SearchTaskStats(in), new SearchShardTaskStats(in), SearchBackpressureMode.fromName(in.readString()));
searchShardTaskStats = new SearchShardTaskStats(in);
mode = SearchBackpressureMode.fromName(in.readString());
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
searchTaskStats = new SearchTaskStats(in);
} else {
searchTaskStats = null;
}
}

@Override
Expand All @@ -51,9 +58,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public void writeTo(StreamOutput out) throws IOException {
searchTaskStats.writeTo(out);
searchShardTaskStats.writeTo(out);
out.writeString(mode.getName());
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
searchTaskStats.writeTo(out);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,18 @@ public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) {
}

@Override
public TaskResourceUsageTracker.Stats searchTaskStats(List<? extends Task> activeTasks) {
public TaskResourceUsageTracker.Stats searchTaskStats(List<? extends Task> searchTasks) {
long now = timeNanosSupplier.getAsLong();
long currentMax = activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).max().orElse(0);
long currentAvg = (long) activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).average().orElse(0);
long currentMax = searchTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).max().orElse(0);
long currentAvg = (long) searchTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).average().orElse(0);
return new Stats(getSearchTaskCancellationCount(), currentMax, currentAvg);
}

@Override
public TaskResourceUsageTracker.Stats searchShardTaskStats(List<? extends Task> activeTasks) {
public TaskResourceUsageTracker.Stats searchShardTaskStats(List<? extends Task> searchShardTasks) {
long now = timeNanosSupplier.getAsLong();
long currentMax = activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).max().orElse(0);
long currentAvg = (long) activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).average().orElse(0);
long currentMax = searchShardTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).max().orElse(0);
long currentAvg = (long) searchShardTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).average().orElse(0);
return new Stats(getSearchShardTaskCancellationCount(), currentMax, currentAvg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task

double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
double averageUsage = movingAverage.getAverage();
double variance = (task instanceof SearchTask) ? getHeapVarianceThresholdForSearchQuery() : getHeapBytesThreshold();
double variance = (task instanceof SearchTask) ? getHeapVarianceThresholdForSearchQuery() : getHeapVarianceThreshold();
double allowedUsage = averageUsage * variance;
double threshold = (task instanceof SearchTask) ? getHeapBytesThresholdForSearchQuery() : getHeapBytesThreshold();

Expand Down Expand Up @@ -238,16 +238,16 @@ public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) {
}

@Override
public TaskResourceUsageTracker.Stats searchTaskStats(List<? extends Task> activeTasks) {
long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).max().orElse(0);
long currentAvg = (long) activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).average().orElse(0);
return new Stats(getSearchTaskCancellationCount(), currentMax, currentAvg, (long) movingAverageReference.get().getAverage());
public TaskResourceUsageTracker.Stats searchTaskStats(List<? extends Task> searchTasks) {
long currentMax = searchTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).max().orElse(0);
long currentAvg = (long) searchTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).average().orElse(0);
return new Stats(getSearchTaskCancellationCount(), currentMax, currentAvg, (long) movingAverageReferenceForSearchQuery.get().getAverage());
}

@Override
public TaskResourceUsageTracker.Stats searchShardTaskStats(List<? extends Task> activeTasks) {
long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).max().orElse(0);
long currentAvg = (long) activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).average().orElse(0);
public TaskResourceUsageTracker.Stats searchShardTaskStats(List<? extends Task> searchShardTasks) {
long currentMax = searchShardTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).max().orElse(0);
long currentAvg = (long) searchShardTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).average().orElse(0);
return new Stats(getSearchShardTaskCancellationCount(), currentMax, currentAvg, (long) movingAverageReference.get().getAverage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
}

@Override
public Stats searchTaskStats(List<? extends Task> activeTasks) {
public Stats searchTaskStats(List<? extends Task> searchTasks) {
return new MockStats(getSearchTaskCancellationCount());
}

@Override
public Stats searchShardTaskStats(List<? extends Task> activeTasks) {
public Stats searchShardTaskStats(List<? extends Task> searchShardTasks) {
return new MockStats(getSearchShardTaskCancellationCount());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
}

@Override
public Stats searchTaskStats(List<? extends Task> activeTasks) {
public Stats searchTaskStats(List<? extends Task> searchTasks) {
return null;
}

@Override
public Stats searchShardTaskStats(List<? extends Task> activeTasks) {
public Stats searchShardTaskStats(List<? extends Task> searchShardTasks) {
return null;
}
};
Expand Down

0 comments on commit 5817770

Please sign in to comment.