From 613bd2f3feb42cc4aedf637260a2bc6672f323d4 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Mon, 5 Jul 2021 15:53:59 +0530 Subject: [PATCH] Extract method HttpRemoteTask#triggerUpdate --- .../server/remotetask/HttpRemoteTask.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java index 14fbca669ce8..94842f146406 100644 --- a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java +++ b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java @@ -359,8 +359,7 @@ public synchronized void addSplits(Multimap splitsBySource) updateSplitQueueSpace(); if (needsUpdate) { - this.needsUpdate.set(true); - scheduleUpdate(); + triggerUpdate(); } } @@ -372,16 +371,14 @@ public synchronized void noMoreSplits(PlanNodeId sourceId) } noMoreSplits.put(sourceId, true); - needsUpdate.set(true); - scheduleUpdate(); + triggerUpdate(); } @Override public synchronized void noMoreSplits(PlanNodeId sourceId, Lifespan lifespan) { if (pendingNoMoreSplitsForLifespan.put(sourceId, lifespan)) { - needsUpdate.set(true); - scheduleUpdate(); + triggerUpdate(); } } @@ -394,8 +391,7 @@ public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers) if (newOutputBuffers.getVersion() > outputBuffers.get().getVersion()) { outputBuffers.set(newOutputBuffers); - needsUpdate.set(true); - scheduleUpdate(); + triggerUpdate(); } } @@ -511,6 +507,13 @@ private void scheduleUpdate() executor.execute(this::sendUpdate); } + private synchronized void triggerUpdate() + { + // synchronized so that needsUpdate is not cleared in sendUpdate before actual request is sent + needsUpdate.set(true); + sendUpdate(); + } + private synchronized void sendUpdate() { TaskStatus taskStatus = getTaskStatus();