-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Block writer thread if response output buffer is full #5386
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,14 @@ | |
|
||
package io.confluent.ksql.api.server; | ||
|
||
import io.confluent.ksql.util.KsqlException; | ||
import io.confluent.ksql.util.VertxUtils; | ||
import io.vertx.core.buffer.Buffer; | ||
import io.vertx.core.http.HttpServerResponse; | ||
import java.io.OutputStream; | ||
import java.util.Objects; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import org.jetbrains.annotations.NotNull; | ||
|
||
/* | ||
|
@@ -41,7 +45,7 @@ public void write(final int b) { | |
} | ||
|
||
@Override | ||
public void write(final @NotNull byte[] bytes, final int offset, final int length) { | ||
public synchronized void write(final @NotNull byte[] bytes, final int offset, final int length) { | ||
Objects.requireNonNull(bytes); | ||
if ((offset < 0) || (offset > bytes.length)) { | ||
throw new IndexOutOfBoundsException(); | ||
|
@@ -55,13 +59,29 @@ public void write(final @NotNull byte[] bytes, final int offset, final int lengt | |
final byte[] bytesToWrite = new byte[length]; | ||
System.arraycopy(bytes, offset, bytesToWrite, 0, length); | ||
final Buffer buffer = Buffer.buffer(bytesToWrite); | ||
blockIfWriteQueueFull(); | ||
response.write(buffer); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
response.end(); | ||
} | ||
|
||
private void blockIfWriteQueueFull() { | ||
VertxUtils.checkIsWorker(); | ||
if (response.writeQueueFull()) { | ||
final CompletableFuture<Void> cf = new CompletableFuture<>(); | ||
response.drainHandler(v -> cf.complete(null)); | ||
try { | ||
cf.get(60, TimeUnit.SECONDS); | ||
} catch (Exception e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we check explicitly for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could, but I don't think it adds much as we want to rethrow on any exception we receive. |
||
// Very slow consumers will result in a timeout, this will cause the push query to be closed | ||
throw new KsqlException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a more informative error message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The wrapped exception should contain the important error messages. I don't usually like adding new messages on rethrows unless you're doing something extra. |
||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResponseOutputStream
is only written to by a single thread anyway, right? (Trying to check my understanding.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it's a sanity check.