Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Barrage stream reading into Chunks #5692

Merged
merged 17 commits into from
Jul 18, 2024
Merged

Conversation

niloc132
Copy link
Member

Like #5552, applies some after-the-fact review of the design for reading Barrage/Flight stream, in anticipation of sharing this code with JavaScript clients.

  • Removes unused BarrageChunkAppendingMarshaller
  • Adds more error detail when stream contents don't match metadata
  • Removes an unused parameter when parsing Flight messages
  • Inlines width of various types into their readers
  • Introduces an interface for reading data into chunks, and a factory interface to allow JS clients to supply their own implementations.

Partial #188

@niloc132 niloc132 added this to the June 2024 milestone Jun 27, 2024
@niloc132 niloc132 requested a review from nbauernfeind June 27, 2024 19:46
@niloc132 niloc132 self-assigned this Jun 27, 2024
this.conversion = conversion;
}

public <T> ChunkReader transform(Function<Byte, T> transform) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to replace these Function<BoxedPrimitive, T> interfaces with some replicated version to avoid unnecessary boxing of values that can never be null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the interest of moving this patch along, I'm going to punt on this, the updated version is no more wrong than it previously was.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, we may want the transformers to "transform" null values to give better control to the future-feature of custom formatters. If this were the case then the boxing would be necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that these already have the DH null values as primitives, so a null input is impossible at this time. T could certainly be null though for an output, depending on what kind of chunk is going to be written to (not controlled by this code).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply notice that we do not call the transformer if the value is the null value. As in, a custom transformer can't react to any null values. I see your point that we could use the deephaven null value and that using primitives will make it very clear that null is being represented in a non-boxed way. You're probably right that we should avoid boxing here.

this.conversion = conversion;
}

public <T> ChunkReader transform(Function<Byte, T> transform) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, we may want the transformers to "transform" null values to give better control to the future-feature of custom formatters. If this were the case then the boxing would be necessary.

ByteBuffer original = message.getByteBuffer();
ByteBuffer copy = ByteBuffer.allocate(original.remaining()).put(original).rewind();
Schema schema = new Schema();
Message.getRootAsMessage(copy).header(schema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a detailed comment as to why we need to copy this. I suspect the reason is that the converted arrow schema references the new buffer? We may want to push the copying into BarrageUtil if that's the case because it's super common to assume that the byte buffer is temporarily immutable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't quite about immutability, but the fact that the ByteBuffer is owned by python, and if py frees the underlying buffer we'll be reading garbage when trying to handle a later RecordBatch.

ByteBuffer copy = ByteBuffer.allocate(original.remaining()).put(original).rewind();
Schema schema = new Schema();
Message.getRootAsMessage(copy).header(schema);
header.header(schema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible I would like it more obvious why we need to copy here. (e.g. a comment related to what references get leaked)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% that we need to copy in this case - technically it appears no since line 87 does a ByteBuffer.wrap(). Instead this is an attempt to be defensive in case a future impl is reading from a slice/etc of the ByteBuffer that came in over the wire.

@niloc132 niloc132 requested a review from nbauernfeind July 15, 2024 20:53
@niloc132 niloc132 merged commit 90b9283 into deephaven:main Jul 18, 2024
16 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Jul 18, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants