Skip to content

Commit

Permalink
Revert changes made on streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Dec 8, 2021
1 parent e9a3bc7 commit e7b0e6a
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private void ensureOpen() throws IOException {
throw new IOException("Stream has been closed");
}

if (buffer == null || (buffer != EMPTY && buffer.limit() > 0 && !buffer.hasRemaining())) {
if (buffer == null || (buffer != EMPTY && !buffer.hasRemaining())) {
updateBuffer();
}
}
Expand All @@ -65,17 +65,9 @@ private int updateBuffer() throws IOException {

@Override
public int available() throws IOException {
if (closed || buffer == EMPTY) {
return 0;
}
ensureOpen();

int available = 0;
if (buffer == null || (buffer.limit() > 0 && !buffer.hasRemaining())) {
available = updateBuffer();
} else {
available = buffer.remaining();
}
return available;
return buffer.remaining();
}

@Override
Expand All @@ -95,7 +87,7 @@ public void close() throws IOException {
public byte readByte() throws IOException {
ensureOpen();

if (buffer == EMPTY || buffer.limit() == 0) {
if (buffer == EMPTY) {
close();
throw new EOFException();
}
Expand All @@ -107,7 +99,7 @@ public byte readByte() throws IOException {
public int read() throws IOException {
ensureOpen();

if (buffer == EMPTY || buffer.limit() == 0) {
if (buffer == EMPTY) {
return -1;
}

Expand All @@ -120,7 +112,7 @@ public int read(byte[] b, int off, int len) throws IOException {

int counter = 0;
while (len > 0) {
if (buffer == EMPTY || buffer.limit() == 0) {
if (buffer == EMPTY) {
return counter > 0 ? counter : -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public class ClickHouseLZ4InputStream extends ClickHouseInputStream {
private boolean closed;

private boolean checkNext() throws IOException {
if (currentBlock == null) {
if (currentBlock == null || !currentBlock.hasRemaining()) {
currentBlock = readNextBlock();
}
return currentBlock != null && currentBlock.hasRemaining();
return currentBlock != null;
}

// every block is:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,15 @@ private ClickHouseHttpResponse postString(HttpRequest.Builder reqBuilder, String
reqBuilder.POST(HttpRequest.BodyPublishers.ofString(sql));
HttpResponse<InputStream> r;
try {
// r = httpClient.send(reqBuilder.build(), responseInfo -> new
// ExtendedResponseInputStream());
r = httpClient.send(reqBuilder.build(),
CompletableFuture<HttpResponse<InputStream>> f = httpClient.sendAsync(reqBuilder.build(),
responseInfo -> new ClickHouseResponseHandler(config.getMaxBufferSize(),
config.getSocketTimeout()));
r = f.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Thread was interrupted when posting request or receiving response", e);
} catch (ExecutionException e) {
throw new IOException("Failed to post query", e);
}
return buildResponse(r);
}
Expand Down

0 comments on commit e7b0e6a

Please sign in to comment.