Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1741: Respect decimal reader isRepeating flag #1960

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,7 @@ private void nextVector(DecimalColumnVector result,
HiveDecimalWritable[] vector = result.vector;
HiveDecimalWritable decWritable;
if (result.noNulls) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
for (int r = 0; r < batchSize; ++r) {
decWritable = vector[r];
Expand All @@ -1562,7 +1563,11 @@ private void nextVector(DecimalColumnVector result,
}
setIsRepeatingIfNeeded(result, r);
}
if (!preIsRepeating && result.isRepeating) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you move this logic inside of setIsRepeatingIfNeeded instead of scattering it all over the code?

Copy link
Member

@deniskuzZ deniskuzZ Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a question from the Hive Orc PR:

where do we handle repeated nulls? it seems that setIsRepeatingIfNeeded is only called when result.noNulls || !result.isNull[0]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a draft PR, it should not be considered a bug.

When we convert the same value of the same batch size into isRepeating=true, the two should be equivalent.

I'm not sure why this behavior will affect the calculation results of Hiive. I repeatedly debugged this in Hive, but I didn't find the root cause.

    DecimalColumnVector v1 = new DecimalColumnVector(1024, 10, 2);
    v1.isRepeating = true;
    v1.vector[0] = new HiveDecimalWritable("1.234");

    DecimalColumnVector v2 = new DecimalColumnVector(1024, 10, 2);
    for (int i = 0; i < 1024; i++) {
      v2.vector[i] = new HiveDecimalWritable("1.234");
    }

    StringBuilder sb1 = new StringBuilder();
    for (int i = 0; i < 1024; i++) {
      v1.stringifyValue(sb1, i);
    }
    StringBuilder sb2 = new StringBuilder();
    for (int i = 0; i < 1024; i++) {
      v2.stringifyValue(sb2, i);
    }
    System.out.println(sb1.toString().equals(sb2.toString()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoubleTreeReader and FloatTreeReader also have similar behavior. If the entire batch size is repeated, it will set isRepeating=true, which is consistent with the behavior of ORC-1266.


DoubleTreeReader

} else {
// no nulls
boolean repeating = (batchSize > 1);
final double d1 = utils.readDouble(stream);
result.vector[0] = d1;
// conditions to ensure bounds checks skips
for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
final double d2 = utils.readDouble(stream);
repeating = repeating && (d1 == d2);
result.vector[i] = d2;
}
result.isRepeating = repeating;
}

FloatTreeReader

} else {
// no nulls & > 1 row (check repeating)
boolean repeating = (batchSize > 1);
final float f1 = utils.readFloat(stream);
result.vector[0] = f1;
// conditions to ensure bounds checks skips
for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
final float f2 = utils.readFloat(stream);
repeating = repeating && (f1 == f2);
result.vector[i] = f2;
}
result.isRepeating = repeating;
}


Some tests

  @Test
  public  void testDoubleIsRepeatingFlag() throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path testFilePath = new Path(workDir, "testDoubleIsRepeatingFlag.orc");
    fs.delete(testFilePath, true);

    Configuration doubleConf = new Configuration(conf);
    doubleConf.set(OrcConf.STRIPE_ROW_COUNT.getAttribute(), "1024");
    doubleConf.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "1");
    String typeStr = "double";
    TypeDescription schema = TypeDescription.fromString("struct<col1:" + typeStr + ">");
    Writer w = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(doubleConf).setSchema(schema));

    VectorizedRowBatch b = schema.createRowBatch();
    DoubleColumnVector f1 = (DoubleColumnVector) b.cols[0];
    for (int i = 0; i < 1024; i++) {
      f1.vector[i] = -119.4594594595D;
    }
    b.size = 1024;
    w.addRowBatch(b);

    b.reset();
    for (int i = 0; i < 1024; i++) {
      f1.vector[i] = 9318.4351351351D;
    }
    b.size = 1024;
    w.addRowBatch(b);

    b.reset();
    for (int i = 0; i < 1024; i++) {
      f1.vector[i] = -4298.1513513514D;
    }
    b.size = 1024;
    w.addRowBatch(b);

    b.reset();
    w.close();

    Reader.Options options = new Reader.Options();
    try (Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
         RecordReader rows = reader.rows(options)) {
      VectorizedRowBatch batch = schema.createRowBatch();

      rows.nextBatch(batch);
      assertEquals(1024, batch.size);
      assertTrue(batch.cols[0].isRepeating);

      rows.nextBatch(batch);
      assertEquals(1024, batch.size);
      assertTrue(batch.cols[0].isRepeating);

      rows.nextBatch(batch);
      assertEquals(1024, batch.size);
      assertTrue(batch.cols[0].isRepeating);

    }
  }

  @Test
  public  void testFloatIsRepeatingFlag() throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path testFilePath = new Path(workDir, "testFloatIsRepeatingFlag.orc");
    fs.delete(testFilePath, true);

    Configuration floatConf = new Configuration(conf);
    floatConf.set(OrcConf.STRIPE_ROW_COUNT.getAttribute(), "1024");
    floatConf.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "1");
    String typeStr = "float";
    TypeDescription schema = TypeDescription.fromString("struct<col1:" + typeStr + ">");
    Writer w = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(floatConf).setSchema(schema));

    VectorizedRowBatch b = schema.createRowBatch();
    DoubleColumnVector f1 = (DoubleColumnVector) b.cols[0];
    for (int i = 0; i < 1024; i++) {
      f1.vector[i] = -119.4594594595D;
    }
    b.size = 1024;
    w.addRowBatch(b);

    b.reset();
    for (int i = 0; i < 1024; i++) {
      f1.vector[i] = 9318.4351351351D;
    }
    b.size = 1024;
    w.addRowBatch(b);

    b.reset();
    for (int i = 0; i < 1024; i++) {
      f1.vector[i] = -4298.1513513514D;
    }
    b.size = 1024;
    w.addRowBatch(b);

    b.reset();
    w.close();

    Reader.Options options = new Reader.Options();
    try (Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
         RecordReader rows = reader.rows(options)) {
      VectorizedRowBatch batch = schema.createRowBatch();

      rows.nextBatch(batch);
      assertEquals(1024, batch.size);
      assertTrue(batch.cols[0].isRepeating);

      rows.nextBatch(batch);
      assertEquals(1024, batch.size);
      assertTrue(batch.cols[0].isRepeating);

      rows.nextBatch(batch);
      assertEquals(1024, batch.size);
      assertTrue(batch.cols[0].isRepeating);
    }
  }

result.isRepeating = preIsRepeating;
}
} else if (!result.isRepeating || !result.isNull[0]) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
for (int r = 0; r < batchSize; ++r) {
if (!result.isNull[r]) {
Expand All @@ -1576,6 +1581,9 @@ private void nextVector(DecimalColumnVector result,
}
setIsRepeatingIfNeeded(result, r);
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
}
}

Expand All @@ -1595,6 +1603,7 @@ private void nextVector(DecimalColumnVector result,
HiveDecimalWritable[] vector = result.vector;
HiveDecimalWritable decWritable;
if (result.noNulls) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
int previousIdx = 0;
for (int r = 0; r != filterContext.getSelectedSize(); ++r) {
Expand All @@ -1612,8 +1621,12 @@ private void nextVector(DecimalColumnVector result,
setIsRepeatingIfNeeded(result, idx);
previousIdx = idx + 1;
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
skipStreamRows(batchSize - previousIdx);
} else if (!result.isRepeating || !result.isNull[0]) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
int previousIdx = 0;
for (int r = 0; r != filterContext.getSelectedSize(); ++r) {
Expand All @@ -1633,6 +1646,9 @@ private void nextVector(DecimalColumnVector result,
setIsRepeatingIfNeeded(result, idx);
previousIdx = idx + 1;
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
}
}
Expand All @@ -1651,13 +1667,18 @@ private void nextVector(Decimal64ColumnVector result,
// read the scales
scaleReader.nextVector(result, scratchScaleVector, batchSize);
if (result.noNulls) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
for (int r = 0; r < batchSize; ++r) {
final long scaleFactor = powerOfTenTable[scale - scratchScaleVector[r]];
result.vector[r] = SerializationUtils.readVslong(valueStream) * scaleFactor;
setIsRepeatingIfNeeded(result, r);
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
} else if (!result.isRepeating || !result.isNull[0]) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
for (int r = 0; r < batchSize; ++r) {
if (!result.isNull[r]) {
Expand All @@ -1666,6 +1687,9 @@ private void nextVector(Decimal64ColumnVector result,
}
setIsRepeatingIfNeeded(result, r);
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
}
result.precision = (short) precision;
result.scale = (short) scale;
Expand All @@ -1686,6 +1710,7 @@ private void nextVector(Decimal64ColumnVector result,
// Read all the scales
scaleReader.nextVector(result, scratchScaleVector, batchSize);
if (result.noNulls) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
int previousIdx = 0;
for (int r = 0; r != filterContext.getSelectedSize(); r++) {
Expand All @@ -1700,8 +1725,12 @@ private void nextVector(Decimal64ColumnVector result,
setIsRepeatingIfNeeded(result, idx);
previousIdx = idx + 1;
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
skipStreamRows(batchSize - previousIdx);
} else if (!result.isRepeating || !result.isNull[0]) {
boolean preIsRepeating = result.isRepeating;
result.isRepeating = true;
int previousIdx = 0;
for (int r = 0; r != filterContext.getSelectedSize(); r++) {
Expand All @@ -1718,6 +1747,9 @@ private void nextVector(Decimal64ColumnVector result,
setIsRepeatingIfNeeded(result, idx);
previousIdx = idx + 1;
}
if (!preIsRepeating && result.isRepeating) {
result.isRepeating = preIsRepeating;
}
skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
}
result.precision = (short) precision;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ private void readDecimalInNullStripe(String typeString, Class<?> expectedColumnT
assertEquals(expected, options.toString());
assertEquals(batch.cols.length, 1);
assertEquals(batch.cols[0].getClass(), expectedColumnType);
assertTrue(batch.cols[0].isRepeating);
assertFalse(batch.cols[0].isRepeating);
StringBuilder sb3 = new StringBuilder();
batch.cols[0].stringifyValue(sb3, 1023);
assertEquals(sb3.toString(), expectedResult[2]);
Expand Down
68 changes: 68 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
Expand Down Expand Up @@ -2732,4 +2733,71 @@ public void testHadoopVectoredIO() throws Exception {

verify(spyFSDataInputStream, atLeastOnce()).readVectored(any(), any());
}

@Test
public void testDecimalIsRepeatingFlag() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path testFilePath = new Path(workDir, "testDecimalIsRepeatingFlag.orc");
fs.delete(testFilePath, true);

Configuration decimalConf = new Configuration(conf);
decimalConf.set(OrcConf.STRIPE_ROW_COUNT.getAttribute(), "1024");
decimalConf.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "1");
String typeStr = "decimal(20,10)";
TypeDescription schema = TypeDescription.fromString("struct<col1:" + typeStr + ">");
Writer w = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(decimalConf).setSchema(schema));

VectorizedRowBatch b = schema.createRowBatch();
DecimalColumnVector f1 = (DecimalColumnVector) b.cols[0];
for (int i = 0; i < 1024; i++) {
f1.set(i, HiveDecimal.create("-119.4594594595"));
}
b.size = 1024;
w.addRowBatch(b);

b.reset();
for (int i = 0; i < 1024; i++) {
f1.set(i, HiveDecimal.create("9318.4351351351"));
}
b.size = 1024;
w.addRowBatch(b);

b.reset();
for (int i = 0; i < 1024; i++) {
f1.set(i, HiveDecimal.create("-4298.1513513514"));
}
b.size = 1024;
w.addRowBatch(b);

b.reset();
w.close();

Reader.Options options = new Reader.Options();
try (Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
RecordReader rows = reader.rows(options)) {
VectorizedRowBatch batch = schema.createRowBatch();

rows.nextBatch(batch);
assertEquals(1024, batch.size);
assertFalse(batch.cols[0].isRepeating);
for (HiveDecimalWritable hiveDecimalWritable : ((DecimalColumnVector) batch.cols[0]).vector) {
assertEquals(HiveDecimal.create("-119.4594594595"), hiveDecimalWritable.getHiveDecimal());
}

rows.nextBatch(batch);
assertEquals(1024, batch.size);
assertFalse(batch.cols[0].isRepeating);
for (HiveDecimalWritable hiveDecimalWritable : ((DecimalColumnVector) batch.cols[0]).vector) {
assertEquals(HiveDecimal.create("9318.4351351351"), hiveDecimalWritable.getHiveDecimal());
}

rows.nextBatch(batch);
assertEquals(1024, batch.size);
assertFalse(batch.cols[0].isRepeating);
for (HiveDecimalWritable hiveDecimalWritable : ((DecimalColumnVector) batch.cols[0]).vector) {
assertEquals(HiveDecimal.create("-4298.1513513514"), hiveDecimalWritable.getHiveDecimal());
}
}
}
}
Loading