Skip to content

Commit

Permalink
Avoid doubled getRegion calls in page sinks
Browse files Browse the repository at this point in the history
Call `getRegion` once per chunk written. Previously `getRegion` was
called twice per chunk written.
  • Loading branch information
findepi committed May 10, 2023
1 parent 88a16c4 commit 60c0042
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 49 deletions.
21 changes: 7 additions & 14 deletions lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,26 +259,19 @@ public void write(Page page)
validationBuilder.addPage(page);
}

while (page != null) {
int writeOffset = 0;
while (writeOffset < page.getPositionCount()) {
// align page to row group boundaries
int chunkRows = min(page.getPositionCount(), min(rowGroupMaxRowCount - rowGroupRowCount, stripeMaxRowCount - stripeRowCount));
Page chunk = page.getRegion(0, chunkRows);
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, min(rowGroupMaxRowCount - rowGroupRowCount, stripeMaxRowCount - stripeRowCount)));

// avoid chunk with huge logical size
while (chunkRows > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) {
chunkRows /= 2;
chunk = chunk.getRegion(0, chunkRows);
}

if (chunkRows < page.getPositionCount()) {
page = page.getRegion(chunkRows, page.getPositionCount() - chunkRows);
}
else {
page = null;
while (chunk.getPositionCount() > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) {
chunk = chunk.getRegion(writeOffset, chunk.getPositionCount() / 2);
}

writeOffset += chunk.getPositionCount();
writeChunk(chunk);
fileRowCount += chunkRows;
fileRowCount += chunk.getPositionCount();
}

long recordedSizeInBytes = getRetainedBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,27 +160,16 @@ public void write(Page page)
Page validationPage = page;
recordValidation(validation -> validation.addPage(validationPage));

while (page != null) {
int chunkRows = min(page.getPositionCount(), writerOption.getBatchSize());
Page chunk = page;
if (chunkRows < page.getPositionCount()) {
chunk = chunk.getRegion(0, chunkRows);
}
int writeOffset = 0;
while (writeOffset < page.getPositionCount()) {
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, writerOption.getBatchSize()));

// avoid chunk with huge logical size
while (chunkRows > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) {
chunkRows /= 2;
chunk = chunk.getRegion(0, chunkRows);
}

// Remove chunk from current page
if (chunkRows < page.getPositionCount()) {
page = page.getRegion(chunkRows, page.getPositionCount() - chunkRows);
}
else {
page = null;
while (chunk.getPositionCount() > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) {
chunk = page.getRegion(writeOffset, chunk.getPositionCount() / 2);
}

writeOffset += chunk.getPositionCount();
writeChunk(chunk);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static io.trino.plugin.deltalake.DeltaLakeTypes.toParquetType;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.canonicalizeColumnName;
import static io.trino.plugin.hive.util.HiveUtil.escapePathName;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -263,17 +264,13 @@ public void abort()
@Override
public CompletableFuture<?> appendPage(Page page)
{
if (page.getPositionCount() == 0) {
return NOT_BLOCKED;
}

while (page.getPositionCount() > MAX_PAGE_POSITIONS) {
Page chunk = page.getRegion(0, MAX_PAGE_POSITIONS);
page = page.getRegion(MAX_PAGE_POSITIONS, page.getPositionCount() - MAX_PAGE_POSITIONS);
int writeOffset = 0;
while (writeOffset < page.getPositionCount()) {
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, MAX_PAGE_POSITIONS));
writeOffset += chunk.getPositionCount();
writePage(chunk);
}

writePage(page);
return NOT_BLOCKED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.spi.type.IntegerType.INTEGER;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -289,13 +290,12 @@ public CompletableFuture<?> appendPage(Page page)

private void doAppend(Page page)
{
while (page.getPositionCount() > MAX_PAGE_POSITIONS) {
Page chunk = page.getRegion(0, MAX_PAGE_POSITIONS);
page = page.getRegion(MAX_PAGE_POSITIONS, page.getPositionCount() - MAX_PAGE_POSITIONS);
int writeOffset = 0;
while (writeOffset < page.getPositionCount()) {
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, MAX_PAGE_POSITIONS));
writeOffset += chunk.getPositionCount();
writePage(chunk);
}

writePage(page);
}

private void writePage(Page page)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.UUID;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -252,13 +253,12 @@ public void abort()

private void doAppend(Page page)
{
while (page.getPositionCount() > MAX_PAGE_POSITIONS) {
Page chunk = page.getRegion(0, MAX_PAGE_POSITIONS);
page = page.getRegion(MAX_PAGE_POSITIONS, page.getPositionCount() - MAX_PAGE_POSITIONS);
int writeOffset = 0;
while (writeOffset < page.getPositionCount()) {
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, MAX_PAGE_POSITIONS));
writeOffset += chunk.getPositionCount();
writePage(chunk);
}

writePage(page);
}

private void writePage(Page page)
Expand Down

0 comments on commit 60c0042

Please sign in to comment.