From b8f44ead362e822b0e783e350bb3505010d5a714 Mon Sep 17 00:00:00 2001 From: James Petty Date: Wed, 16 Feb 2022 10:49:04 -0500 Subject: [PATCH] Avoid volatile writes for empty responses in HttpPageBufferClient --- .../trino/operator/HttpPageBufferClient.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java b/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java index 9ab0e43b4cfd..6bbfb31020aa 100644 --- a/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java +++ b/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java @@ -354,6 +354,7 @@ public void onSuccess(PagesResponse result) backoff.success(); List pages; + boolean pagesAccepted; try { if (result.isTaskFailed()) { throw new TrinoException(REMOTE_TASK_FAILED, format("Remote task failed: %s", remoteTaskId)); @@ -413,20 +414,28 @@ public Void handle(Request request, Response response) // clientCallback can keep stats of requests and responses. For example, it may // keep track of how often a client returns empty response and adjust request // frequency or buffer size. - if (clientCallback.addPages(HttpPageBufferClient.this, pages)) { - pagesReceived.addAndGet(pages.size()); - rowsReceived.addAndGet(pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum()); - } - else { - pagesRejected.addAndGet(pages.size()); - rowsRejected.addAndGet(pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum()); - } + pagesAccepted = clientCallback.addPages(HttpPageBufferClient.this, pages); } catch (TrinoException e) { handleFailure(e, resultFuture); return; } + // update client stats + if (!pages.isEmpty()) { + int pageCount = pages.size(); + long rowCount = pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum(); + if (pagesAccepted) { + pagesReceived.addAndGet(pageCount); + rowsReceived.addAndGet(rowCount); + } + else { + pagesRejected.addAndGet(pageCount); + rowsRejected.addAndGet(rowCount); + } + } + requestsCompleted.incrementAndGet(); + synchronized (HttpPageBufferClient.this) { // client is complete, acknowledge it by sending it a delete in the next request if (result.isClientComplete()) { @@ -437,7 +446,6 @@ public Void handle(Request request, Response response) } lastUpdate = DateTime.now(); } - requestsCompleted.incrementAndGet(); clientCallback.requestComplete(HttpPageBufferClient.this); }