Skip to content

Commit

Permalink
Avoid volatile writes for empty responses in HttpPageBufferClient
Browse files Browse the repository at this point in the history
  • Loading branch information
pettyjamesm committed Feb 17, 2022
1 parent 192cf01 commit b8f44ea
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ public void onSuccess(PagesResponse result)
backoff.success();

List<Slice> pages;
boolean pagesAccepted;
try {
if (result.isTaskFailed()) {
throw new TrinoException(REMOTE_TASK_FAILED, format("Remote task failed: %s", remoteTaskId));
Expand Down Expand Up @@ -413,20 +414,28 @@ public Void handle(Request request, Response response)
// clientCallback can keep stats of requests and responses. For example, it may
// keep track of how often a client returns empty response and adjust request
// frequency or buffer size.
if (clientCallback.addPages(HttpPageBufferClient.this, pages)) {
pagesReceived.addAndGet(pages.size());
rowsReceived.addAndGet(pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum());
}
else {
pagesRejected.addAndGet(pages.size());
rowsRejected.addAndGet(pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum());
}
pagesAccepted = clientCallback.addPages(HttpPageBufferClient.this, pages);
}
catch (TrinoException e) {
handleFailure(e, resultFuture);
return;
}

// update client stats
if (!pages.isEmpty()) {
int pageCount = pages.size();
long rowCount = pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum();
if (pagesAccepted) {
pagesReceived.addAndGet(pageCount);
rowsReceived.addAndGet(rowCount);
}
else {
pagesRejected.addAndGet(pageCount);
rowsRejected.addAndGet(rowCount);
}
}
requestsCompleted.incrementAndGet();

synchronized (HttpPageBufferClient.this) {
// client is complete, acknowledge it by sending it a delete in the next request
if (result.isClientComplete()) {
Expand All @@ -437,7 +446,6 @@ public Void handle(Request request, Response response)
}
lastUpdate = DateTime.now();
}
requestsCompleted.incrementAndGet();
clientCallback.requestComplete(HttpPageBufferClient.this);
}

Expand Down

0 comments on commit b8f44ea

Please sign in to comment.