Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve JDBC performance + spooling polishing #23954

Merged
merged 15 commits into from
Oct 29, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class AsyncResultIterator
extends AbstractIterator<List<Object>>
implements CancellableIterator<List<Object>>
{
private static final int BATCH_SIZE = 100;
private static final int MAX_QUEUED_ROWS = 50_000;
private static final ExecutorService executorService = newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("Trino JDBC worker-%s").setDaemon(true).build());
Expand All @@ -66,16 +67,23 @@ public class AsyncResultIterator
this.finished = false;
this.future = executorService.submit(() -> {
try {
int rowsProcessed = 0;
do {
QueryStatusInfo results = client.currentStatusInfo();
progressCallback.accept(QueryStats.create(results.getId(), results.getStats()));
warningsManager.addWarnings(results.getWarnings());
for (List<Object> row : client.currentRows()) {
rowQueue.put(row);
semaphore.release();
if (rowsProcessed++ % BATCH_SIZE == 0) {
wendigo marked this conversation as resolved.
Show resolved Hide resolved
semaphore.release(rowsProcessed);
rowsProcessed = 0;
}
}
}
while (!cancelled && client.advance());
if (rowsProcessed > 0) {
semaphore.release(rowsProcessed);
}

verify(client.isFinished());
QueryStatusInfo results = client.finalStatusInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
Expand Down Expand Up @@ -111,12 +112,13 @@ private List<Object> getRowValues()
{
// types are present if data is present
List<Object> row = new ArrayList<>(columns.size());
ConnectorSession connectorSession = session.toConnectorSession();
for (OutputColumn outputColumn : columns) {
Type type = outputColumn.type();

try {
Block block = currentPage.getBlock(outputColumn.sourcePageChannel());
Object value = type.getObjectValue(session.toConnectorSession(), block, inPageIndex);
Object value = type.getObjectValue(connectorSession, block, inPageIndex);
if (!supportsParametricDateTime) {
value = getLegacyValue(value, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.server.protocol.OutputColumn;
import io.trino.server.protocol.spooling.QueryDataEncoder;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
Expand Down Expand Up @@ -64,6 +65,7 @@ public DataAttributes encodeTo(OutputStream output, List<Page> pages)
throws IOException
{
JsonFactory jsonFactory = jsonFactory();
ConnectorSession connectorSession = session.toConnectorSession();
try (CountingOutputStream wrapper = new CountingOutputStream(output); JsonGenerator generator = jsonFactory.createGenerator(wrapper)) {
generator.writeStartArray();
for (Page page : pages) {
Expand All @@ -72,7 +74,7 @@ public DataAttributes encodeTo(OutputStream output, List<Page> pages)
for (OutputColumn column : columns) {
Object value = column
.type()
.getObjectValue(session.toConnectorSession(), page.getBlock(column.sourcePageChannel()), position);
.getObjectValue(connectorSession, page.getBlock(column.sourcePageChannel()), position);
writeValue(mapper, generator, value);
}
generator.writeEndArray();
Expand Down