Skip to content

Commit

Permalink
[8.15] Fixing incorrect bulk request took time (elastic#111863) (elas…
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored Aug 14, 2024
1 parent 695ccd5 commit 51ea1d9
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 22 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/111863.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111863
summary: Fixing incorrect bulk request took time
area: Ingest Node
type: bug
issues:
- 111854
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,23 @@
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: "no write index is defined for alias [test_index]. The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index" }

---
"Took is not orders of magnitude off":
- requires:
cluster_features: ["gte_v8.15.1"]
reason: "Bug reporting wrong took time introduced in 8.15.0, fixed in 8.15.1"
- do:
bulk:
body:
- index:
_index: took_test
- f: 1
- index:
_index: took_test
- f: 2
- index:
_index: took_test
- f: 3
- match: { errors: false }
- gte: { took: 0 }
- lte: { took: 60000 } # Making sure we have a reasonable upper bound and that we're not for example returning nanoseconds
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
protected final SystemIndices systemIndices;
private final IngestService ingestService;
private final IngestActionForwarder ingestForwarder;
protected final LongSupplier relativeTimeProvider;
protected final LongSupplier relativeTimeNanosProvider;
protected final Executor writeExecutor;
protected final Executor systemWriteExecutor;
private final ActionType<BulkResponse> bulkAction;
Expand All @@ -71,7 +71,7 @@ public TransportAbstractBulkAction(
IngestService ingestService,
IndexingPressure indexingPressure,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeNanosProvider
) {
super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
Expand All @@ -83,7 +83,7 @@ public TransportAbstractBulkAction(
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
this.ingestForwarder = new IngestActionForwarder(transportService);
clusterService.addStateApplier(this.ingestForwarder);
this.relativeTimeProvider = relativeTimeProvider;
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
this.bulkAction = action;
}

Expand Down Expand Up @@ -216,7 +216,7 @@ private void processBulkIndexIngestRequest(
Metadata metadata,
ActionListener<BulkResponse> listener
) {
final long ingestStartTimeInNanos = System.nanoTime();
final long ingestStartTimeInNanos = relativeTimeNanos();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
getIngestService(original).executeBulkRequest(
original.numberOfActions(),
Expand All @@ -230,7 +230,7 @@ private void processBulkIndexIngestRequest(
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
ingestTookInMillis,
Expand Down Expand Up @@ -307,12 +307,12 @@ protected IngestService getIngestService(BulkRequest request) {
return ingestService;
}

protected long relativeTime() {
return relativeTimeProvider.getAsLong();
protected long relativeTimeNanos() {
return relativeTimeNanosProvider.getAsLong();
}

protected long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
return TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - startTimeNanos);
}

private void applyPipelinesAndDoInternalExecute(
Expand All @@ -321,9 +321,9 @@ private void applyPipelinesAndDoInternalExecute(
Executor executor,
ActionListener<BulkResponse> listener
) {
final long relativeStartTime = threadPool.relativeTimeInMillis();
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTime);
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public TransportBulkAction(
indexNameExpressionResolver,
indexingPressure,
systemIndices,
System::nanoTime
threadPool::relativeTimeInNanos
);
}

Expand Down Expand Up @@ -197,7 +197,7 @@ protected void doInternalExecute(
BulkRequest bulkRequest,
Executor executor,
ActionListener<BulkResponse> listener,
long relativeStartTime
long relativeStartTimeNanos
) {
Map<String, CreateIndexRequest> indicesToAutoCreate = new HashMap<>();
Set<String> dataStreamsToBeRolledOver = new HashSet<>();
Expand All @@ -212,7 +212,7 @@ protected void doInternalExecute(
indicesToAutoCreate,
dataStreamsToBeRolledOver,
failureStoresToBeRolledOver,
relativeStartTime
relativeStartTimeNanos
);
}

Expand Down Expand Up @@ -309,19 +309,19 @@ protected void createMissingIndicesAndIndexData(
Map<String, CreateIndexRequest> indicesToAutoCreate,
Set<String> dataStreamsToBeRolledOver,
Set<String> failureStoresToBeRolledOver,
long startTime
long startTimeNanos
) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
// Optimizing when there are no prerequisite actions
if (indicesToAutoCreate.isEmpty() && dataStreamsToBeRolledOver.isEmpty() && failureStoresToBeRolledOver.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executor, responses, Map.of());
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses, Map.of());
return;
}
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Runnable executeBulkRunnable = () -> executor.execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, listener, executor, responses, indicesThatCannotBeCreated);
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses, indicesThatCannotBeCreated);
}
});
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
Expand Down Expand Up @@ -533,7 +533,7 @@ void executeBulk(
responses,
indicesThatCannotBeCreated,
indexNameExpressionResolver,
relativeTimeProvider,
relativeTimeNanosProvider,
startTimeNanos,
listener
).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TransportSimulateBulkAction(
ingestService,
indexingPressure,
systemIndices,
System::nanoTime
threadPool::relativeTimeInNanos
);
}

Expand All @@ -64,7 +64,7 @@ protected void doInternalExecute(
BulkRequest bulkRequest,
Executor executor,
ActionListener<BulkResponse> listener,
long relativeStartTime
long relativeStartTimeNanos
) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
for (int i = 0; i < bulkRequest.requests.size(); i++) {
Expand All @@ -90,7 +90,7 @@ protected void doInternalExecute(
);
}
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(relativeStartTime))
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(relativeStartTimeNanos))
);
}

Expand All @@ -105,7 +105,7 @@ protected IngestService getIngestService(BulkRequest request) {
}

@Override
protected boolean shouldStoreFailure(String indexName, Metadata metadata, long time) {
protected boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) {
// A simulate bulk request should not change any persistent state in the system, so we never write to the failure store
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public String toString() {

@Override
public long relativeTimeInNanos() {
throw new AssertionError("DeterministicTaskQueue does not support nanosecond-precision timestamps");
return TimeValue.timeValueMillis(currentTimeMillis).nanos();
}

@Override
Expand Down

0 comments on commit 51ea1d9

Please sign in to comment.