From 7ffdddd0809a7415881db2d34db8eec626daad6b Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 17 Nov 2024 02:58:30 +0800 Subject: [PATCH] feat(stream): fail pending request if server close stream without response (#191) * feat(stream): fail pending request if server close stream without exception * fix spotless --- .../streamnative/oxia/client/batch/ReadBatch.java | 8 ++++++++ .../oxia/client/grpc/WriteStreamWrapper.java | 13 ++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java index 348e08bc..4a783c4f 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java @@ -23,6 +23,7 @@ import io.streamnative.oxia.proto.ReadResponse; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; import lombok.NonNull; final class ReadBatch extends BatchBase implements Batch, StreamObserver { @@ -83,6 +84,13 @@ public void onError(Throwable batchError) { @Override public void onCompleted() { + // complete pending request if the server close stream without any response + gets.forEach( + g -> { + if (!g.callback().isDone()) { + g.fail(new CancellationException()); + } + }); factory.getReadRequestLatencyHistogram().recordSuccess(System.nanoTime() - startSendTimeNanos); } diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java b/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java index a12367cd..de091bd1 100644 --- a/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java +++ b/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java @@ -22,6 +22,7 @@ import io.streamnative.oxia.proto.WriteResponse; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; @@ -63,7 +64,17 @@ public void onError(Throwable t) { } @Override - public void onCompleted() {} + public void onCompleted() { + synchronized (WriteStreamWrapper.this) { + // complete pending request if the server close stream without any response + pendingWrites.forEach( + f -> { + if (!f.isDone()) { + f.completeExceptionally(new CancellationException()); + } + }); + } + } public CompletableFuture send(WriteRequest request) { synchronized (WriteStreamWrapper.this) {