Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify write methods in PinotSegmentPageSource #22234

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ public Page getNextPage()
estimatedMemoryUsageInBytes += currentDataTable.estimatedSizeInBytes();
pageSizeBytes += currentDataTable.estimatedSizeInBytes();
pageBuilder.declarePositions(currentDataTable.dataTable().getNumberOfRows());
for (int columnHandleIdx = 0; columnHandleIdx < columnHandles.size(); columnHandleIdx++) {
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(columnHandleIdx);
Type columnType = columnTypes.get(columnHandleIdx);
// Write a block for each column in the original order.
writeBlock(blockBuilder, columnType, columnHandleIdx);
for (int rowIndex = 0; rowIndex < currentDataTable.dataTable().getNumberOfRows(); rowIndex++) {
for (int columnHandleIdx = 0; columnHandleIdx < columnHandles.size(); columnHandleIdx++) {
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(columnHandleIdx);
Type columnType = columnTypes.get(columnHandleIdx);
// Write a block for each column in the original order.
writeBlock(blockBuilder, columnType, rowIndex, columnHandleIdx);
}
}
}

Expand All @@ -162,33 +164,34 @@ public void close()
*
* @param blockBuilder blockBuilder for the current column
* @param columnType type of the column
* @param rowIdx row index
* @param columnIdx column index
*/

private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
private void writeBlock(BlockBuilder blockBuilder, Type columnType, int rowIdx, int columnIdx)
{
Class<?> javaType = columnType.getJavaType();
DataSchema.ColumnDataType pinotColumnType = currentDataTable.dataTable().getDataSchema().getColumnDataType(columnIdx);
if (javaType.equals(boolean.class)) {
writeBooleanBlock(blockBuilder, columnType, columnIdx);
writeBooleanBlock(blockBuilder, columnType, rowIdx, columnIdx);
}
else if (javaType.equals(long.class)) {
if (columnType instanceof TimestampType) {
// Pinot TimestampType is always ShortTimestampType.
writeShortTimestampBlock(blockBuilder, columnType, columnIdx);
writeShortTimestampBlock(blockBuilder, columnType, rowIdx, columnIdx);
}
else {
writeLongBlock(blockBuilder, columnType, columnIdx);
writeLongBlock(blockBuilder, columnType, rowIdx, columnIdx);
}
}
else if (javaType.equals(double.class)) {
writeDoubleBlock(blockBuilder, columnType, columnIdx);
writeDoubleBlock(blockBuilder, columnType, rowIdx, columnIdx);
}
else if (javaType.equals(Slice.class)) {
writeSliceBlock(blockBuilder, columnType, columnIdx);
writeSliceBlock(blockBuilder, columnType, rowIdx, columnIdx);
}
else if (javaType.equals(Block.class)) {
writeArrayBlock(blockBuilder, columnType, columnIdx);
writeArrayBlock(blockBuilder, columnType, rowIdx, columnIdx);
}
else {
throw new TrinoException(
Expand All @@ -199,55 +202,43 @@ else if (javaType.equals(Block.class)) {
}
}

private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int rowIndex, int columnIndex)
{
for (int i = 0; i < currentDataTable.dataTable().getNumberOfRows(); i++) {
columnType.writeBoolean(blockBuilder, getBoolean(i, columnIndex));
completedBytes++;
}
columnType.writeBoolean(blockBuilder, getBoolean(rowIndex, columnIndex));
completedBytes++;
}

private void writeLongBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
private void writeLongBlock(BlockBuilder blockBuilder, Type columnType, int rowIndex, int columnIndex)
{
for (int i = 0; i < currentDataTable.dataTable().getNumberOfRows(); i++) {
columnType.writeLong(blockBuilder, getLong(i, columnIndex));
completedBytes += Long.BYTES;
}
columnType.writeLong(blockBuilder, getLong(rowIndex, columnIndex));
completedBytes += Long.BYTES;
}

private void writeDoubleBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
private void writeDoubleBlock(BlockBuilder blockBuilder, Type columnType, int rowIndex, int columnIndex)
{
for (int i = 0; i < currentDataTable.dataTable().getNumberOfRows(); i++) {
columnType.writeDouble(blockBuilder, getDouble(i, columnIndex));
completedBytes += Double.BYTES;
}
columnType.writeDouble(blockBuilder, getDouble(rowIndex, columnIndex));
completedBytes += Double.BYTES;
}

private void writeSliceBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
private void writeSliceBlock(BlockBuilder blockBuilder, Type columnType, int rowIndex, int columnIndex)
{
for (int i = 0; i < currentDataTable.dataTable().getNumberOfRows(); i++) {
Slice slice = getSlice(i, columnIndex);
columnType.writeSlice(blockBuilder, slice, 0, slice.length());
completedBytes += slice.getBytes().length;
}
Slice slice = getSlice(rowIndex, columnIndex);
columnType.writeSlice(blockBuilder, slice, 0, slice.length());
completedBytes += slice.getBytes().length;
}

private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int rowIndex, int columnIndex)
{
for (int i = 0; i < currentDataTable.dataTable().getNumberOfRows(); i++) {
Block block = getArrayBlock(i, columnIndex);
columnType.writeObject(blockBuilder, block);
completedBytes += block.getSizeInBytes();
}
Block block = getArrayBlock(rowIndex, columnIndex);
columnType.writeObject(blockBuilder, block);
completedBytes += block.getSizeInBytes();
}

private void writeShortTimestampBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
private void writeShortTimestampBlock(BlockBuilder blockBuilder, Type columnType, int rowIndex, int columnIndex)
{
for (int i = 0; i < currentDataTable.dataTable().getNumberOfRows(); i++) {
// Trino is using micros since epoch for ShortTimestampType, Pinot uses millis since epoch.
columnType.writeLong(blockBuilder, PinotTimestamps.toMicros(getLong(i, columnIndex)));
completedBytes += Long.BYTES;
}
// Trino is using micros since epoch for ShortTimestampType, Pinot uses millis since epoch.
columnType.writeLong(blockBuilder, PinotTimestamps.toMicros(getLong(rowIndex, columnIndex)));
completedBytes += Long.BYTES;
}

private Type getType(int columnIndex)
Expand Down