-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce kudo writer. #2559
Introduce kudo writer. #2559
Conversation
Signed-off-by: liurenjie1024 <[email protected]>
build |
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/SerializedTableHeader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/SerializedTableHeaderCalc.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/SerializedTableHeaderCalc.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/DataOutputStreamWriter.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/SerializedTableHeaderCalc.java
Outdated
Show resolved
Hide resolved
Signed-off-by: liurenjie1024 <[email protected]>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be some unit tests added (e.g.: serializing row-count only, serializing a simple table of a single int column, serializing a list, serializing a struct, etc.) that examine the bytes serialized to ensure it matches the expected format.
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableHeader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableHeader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/SerializedTableHeaderCalc.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableHeaderCalc.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/SerializedTableHeaderCalc.java
Outdated
Show resolved
Hide resolved
* <tr> | ||
* <td>Offset</td> | ||
* <td>4</td> | ||
* <td>Row offset in original table, in little endian format</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually little endian? I thought that DataOutputStream always writes data in network order (big endian)
https://docs.oracle.com/javase/8/docs/api/java/io/DataOutput.html#writeLong-long-
https://guava.dev/releases/17.0/api/docs/com/google/common/io/LittleEndianDataOutputStream.html
Not sure if this is bleedover from what I did with jcudf serialization or not. That did some things little-endian on the hope that the data could be read by native code without needing to remap it at all.
This comment goes for all of the rows here that mention little endian.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Fixed comments.
* The body consists of three part: | ||
* <ol> | ||
* <li>Validity buffers for every column with validity in depth-first ordering of schema columns. Each buffer of | ||
* each column is 4 bytes padded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we have a follow on issue to see if padding actually is a win or not? I can see that padding might make some things faster for memory access, but most of the time having less data might be even better. But this is super minor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first the 4 byte padding is just to ensure that it would be easier to handle offset at read time, since we always need to getInt
from buffer directly. Now we have another justification for this padding, see #2532 (comment) . Also I think whether it's a win or not depends on data distribution.
} | ||
|
||
static int safeLongToInt(long value) { | ||
assert value >= Integer.MIN_VALUE : "Value is too small to fit in an int"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever want to write negative numbers for any of these values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was used for more general purpose rather than in table header only. I've fix it to invalidate negative numbers, and will add it back when necessary.
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableHeader.java
Outdated
Show resolved
Hide resolved
Signed-off-by: liurenjie1024 <[email protected]>
I've added some simple uts, more sophiscated tests will come after we checked in reader part. |
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
build |
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableHeader.java
Outdated
Show resolved
Hide resolved
src/test/java/com/nvidia/spark/rapids/jni/KudoSerializerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good enough to ship for me. Especially because the API I have issues with is package private so we can change it later. But I want to hear from Jason on this too.
public KudoTableHeader(DataInputStream din) throws IOException { | ||
readFrom(din); | ||
return new KudoTableHeader(offset, numRows, validityBufferLen, offsetBufferLen, totalDataLen, numColumns, | ||
hasValidityBuffer); | ||
} | ||
|
||
KudoTableHeader(long offset, long numRows, long validityBufferLen, long offsetBufferLen, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still confused why most of these are longs if we are going to make sure that they fit in the range of ints. Especially if this is a package private API that we are the consumers/producers of. That means that safeLongToNonNegativeInt
, really would just become something to ensure that the value is not-negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point. The problem is that if we use int
arguments, what we need to do is:
- Check values are non negative in constructor.
- Check values are in the range of ints in the caller.
Since this api is currently package private and the package is both consumer and producer, I'm just putting these checks in one place. Now I think splitting these checks makes api semantics clearer and easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
build |
Signed-off-by: liurenjie1024 <[email protected]>
public long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { | ||
HostColumnVector[] columns = null; | ||
try { | ||
columns = IntStream.range(0, table.getNumberOfColumns()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like we are writing the whole Table
(not sliced? but this is probably what I am missing) for every single partition of such table. Is that true? In the past we had 1 copy per column, not 200 copies per column. I am not seeing where this is getting called so that could explain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently used in tests only, not used in spark-rapids. What's actually used is the one under this. I agree that it would be wasteful to copy for each partition. Would marking it as tests only be useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, that would be great to mark as "test only". It wasn't obvious to me.
return paddedBytes; | ||
} | ||
|
||
static long padFor64byteAlignment(long orig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks to be unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used by following pr.
public class KudoSerializerTest { | ||
|
||
@Test | ||
public void testRowCountOnly() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't test a no-rows case, but it would be nice to add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will fix in following pr.
return builder.build(); | ||
} | ||
|
||
private static Table buildSimpleTable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to add a deeply nested case as well. The integration tests in spark-rapids are likely to cover some of this, but as a serialization format, I could see a bit more permutations of columns added to the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are more tests in following pr.
new HostColumnVector.BasicType(true, DType.INT64) | ||
); | ||
return new Table.TestBuilder() | ||
.column(1, 2, 3, 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we have a case where the number of bytes in each slice is Int.MaxValue
? We have tests for a small number of things, and 0 rows, but not the limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Will add some tests in following pr.
This pr is part of #2532 , which introduces writer part of kudo format.