-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Make pull queries streamed asynchronously #6813
feat: Make pull queries streamed asynchronously #6813
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting to lose steam, so I'll revisit in a bit. Note to myself, I've reviewed up until PullQueryStreamWriter
and will continue later today!
ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java
Outdated
Show resolved
Hide resolved
// If the queue has been closed, we stop adding rows and cleanup. | ||
break; | ||
} | ||
pullQueryQueue.acceptRow(rowFactory.apply(row, schema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this asynchronous model, how would we handle operators that require sorting? e.g. SELECT * FROM foo ORDER BY date
One option might be to block at the operator node itself, but I'm not sure what the canonical way of handling this is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you're describing is likely how this would be done. Obviously any operation that cannot be done on a per-row basis complicates streaming since it requires caching many rows from the lower layer before returning even a single one.
Probably the easiest thing to do would be to allow for sorting up to N entries (or M bytes of memory) and if that's hit, throw an error. If there was sufficient interest, we might consider doing something more complex. You can do disk-based sorts if memory is limited. Conventional DBs will try to consider exactly that and I believe try to sort in memory up to a point before spilling to disk and doing a kind of merge-sort from disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just throwing out some more ideas here: if a query touches on multiple partitions, we can let the sorting to first happen on each partition itself, and then at the operator we can just do merge-sort based on the returned streaming of sorted rows, which would take const space only.
Even more general, if we allow aggregations in the future, such aggregation push-downs can be applied as well.
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java
Outdated
Show resolved
Hide resolved
@@ -1797,22 +1797,23 @@ | |||
"statements": [ | |||
"CREATE STREAM INPUT (ID DOUBLE KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", | |||
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", | |||
"SELECT * FROM AGGREGATE WHERE ID IN (10.1, 8.1);", | |||
"SELECT * FROM AGGREGATE WHERE ID IN (10.5, 8.5);", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of interest, why did you make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because in floating point representation in the CPU (i.e. binary), .1
cannot be represented exactly: https://www.exploringbinary.com/why-0-point-1-does-not-exist-in-floating-point
Doing a key lookup using equality with such a value creates weird flakey results sometimes: https://floating-point-gui.de/errors/comparison/
I was banging my head against the wall with these tests not returning the row I thought it should until I tried this. Look at the result and you'll see that the existing answer is wrong! 0.5
can be represented exactly in binary. With all this, it probably doesn't make sense to do double
key lookups, but I guess we allow it.
User beware!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙀
// This allows us to hit the limit without having to queue one last row | ||
if (queue.isEmpty()) { | ||
ctx.runOnContext(v -> sendComplete()); | ||
} else { | ||
ctx.runOnContext(v -> doSend()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. Why is this necessary? Why would we otherwise have to enqueue another row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With push queries, the only thing that would stop a stream was a limit being hit. The way that the code was set up was a row was enqueued and then if that made it hit the limit, it would fire off the limit handler. Look below in doSend()
. Effectively, the only way to trigger sendCompete was to push one more row into the queue. In the pull query case, we don't know when we push a row on the queue if that's the last until we try to fetch another.
To fix this, I could have either changed pull query behavior to match push queries in the way, or made this publisher do a sendComplete
without having to enqueue another row, and the latter is simpler and makes a fair amount of sense.
|
||
PullQueryExecutionUtil.checkRateLimit(rateLimiter); | ||
|
||
final PullQueryResult result = ksqlEngine.executePullQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously we were calling this inside the Subscription#request
method, now we're doing it before calling the subscriber. I guess that this is intentional because of the new asynchronous nature of pull queries? Just wanted to confirm that I understand correctly (and to confirm that it is OK to do this here in the subscribe
method instead of on the first poll
call)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. I think it was largely done that way so that we could have a big try/catch around it and ensure we called subscriber.onError(e);
if we hit an exception. In this case, even though it's kicked off immediately, it still sets result.onException(this::setError);
which should allow us to handle errors correctly.
I roughly patterned this off of push queries in PushQueryPublisher
and they also start the query immediately, so I don't think there should be a big difference so far as I can tell.
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more comments, all that's left now are the tests :) I'll get to them tomorrow morning
writeRow(toProcess, head, sb); | ||
if (sb.length() >= FLUSH_SIZE_BYTES || (clock.millis() - lastFlush) >= MAX_FLUSH_MS) { | ||
output.write(sb.toString().getBytes(StandardCharsets.UTF_8)); | ||
output.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering how this works from a client perspective. It looks like we'd be flushing incomplete JSON, right? Is that what was happening previously for pull queries? Would clients know how to handle that properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we flush incomplete JSON. This is how push queries work at the moment, so I just copied that behavior (Check out QueryStreamWriter
). If you look in this PR at KsqlTarget
, you'll see how we do this for partial results for forwarded requests. I assume that the CLI does something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own clarification: were you referring to the JSON format of the HTTP response, or the JSON format of a single record?
} | ||
|
||
@Override | ||
public void write(final OutputStream output) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a lot of different levels going on and wrapping my head around passing around different heads/string buffers and flushing in the middle (etc...) is a struggle. At a minimum, I think we can do with inline comments and javadocing (including the private methods), though I urge you to take another stab and see if you can refactor this to make it cleaner and reduce the cyclomatic complexity (this is one of those times where I think checkstyle got it right, it needs improvement).
There's a lot of complexity in creating proper JSON (commas in the right places, the header row being first, etc...). I wonder if there's a library that we can use to make that simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a ton of comments and simplified the methods. Hopefully you agree it's easier to understand.
I am using objectMapper.writeValueAsString
to do the heavy lifting of json serialization. The comma handling is a bit of a pain that I think I have to handle myself. The reason is just due to the partial flushing. I can't rely on a json library to write partial json (or at least I don't really know of any that do). This is actually exactly what it does for push queries. I'll take a look and see if any internet searching turns up any ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 this is way cleaner and easier for me to reason about. Thanks for the heavy refactoring!
ksqldb-rest-app/src/test/java/io/confluent/ksql/api/ApiTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well... morning quickly turned to afternoon and then evening but I finally got around to the tests for this PR.
ksqldb-engine/src/test/java/io/confluent/ksql/query/PullQueryQueueTest.java
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/query/PullQueryQueueTest.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java
Outdated
Show resolved
Hide resolved
} | ||
times[0]++; | ||
return null; | ||
}).when(pullQueryQueue).drainTo(any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it makes sense to have a TestPullQueryQueue
because I see this pattern come up in the tests quite often (of mocking what gets returned on which calls). It'll make things a little bit easier to write tests going forward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played a bit with trying to create one, and it's a little hard to do given how I'm using it in tests. In some cases, a real queue is fine, which I create in some tests. In others, I want to verify that the queue was used in a certain way, and a mock is required. In some others, I want to test surrounding logic and call various callbacks, which I often do from queue "answer" methods to simulate things happening while waiting for new rows. This could be done with a test queue, and methods like completeAfterEmpty
which would implicitly know to call the completion method. I'm not too sure how reusable given that the completion scenario is different in different tests. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I trust that if you played around with it and it wasn't clear then it would probably require more work to retrofit it than it's worth. I'm happy leaving it as is
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java
Outdated
Show resolved
Hide resolved
...c/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriterTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriterTest.java
Outdated
Show resolved
Hide resolved
Overall, the PR seems like a huge improvement. I have some questions inline (amongst a less-important small army of nits) that I'd like addressed before giving the green light. I'll mark those 3/4 comments with the 🚀 emoji so that we can focus on the important ones |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agavra Followed up on some of your comments. Still working through them.
ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java
Outdated
Show resolved
Hide resolved
// If the queue has been closed, we stop adding rows and cleanup. | ||
break; | ||
} | ||
pullQueryQueue.acceptRow(rowFactory.apply(row, schema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you're describing is likely how this would be done. Obviously any operation that cannot be done on a per-row basis complicates streaming since it requires caching many rows from the lower layer before returning even a single one.
Probably the easiest thing to do would be to allow for sorting up to N entries (or M bytes of memory) and if that's hit, throw an error. If there was sufficient interest, we might consider doing something more complex. You can do disk-based sorts if memory is limited. Conventional DBs will try to consider exactly that and I believe try to sort in memory up to a point before spilling to disk and doing a kind of merge-sort from disk.
ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/query/PullQueryQueueTest.java
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/query/PullQueryQueueTest.java
Outdated
Show resolved
Hide resolved
@@ -1797,22 +1797,23 @@ | |||
"statements": [ | |||
"CREATE STREAM INPUT (ID DOUBLE KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", | |||
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", | |||
"SELECT * FROM AGGREGATE WHERE ID IN (10.1, 8.1);", | |||
"SELECT * FROM AGGREGATE WHERE ID IN (10.5, 8.5);", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because in floating point representation in the CPU (i.e. binary), .1
cannot be represented exactly: https://www.exploringbinary.com/why-0-point-1-does-not-exist-in-floating-point
Doing a key lookup using equality with such a value creates weird flakey results sometimes: https://floating-point-gui.de/errors/comparison/
I was banging my head against the wall with these tests not returning the row I thought it should until I tried this. Look at the result and you'll see that the existing answer is wrong! 0.5
can be represented exactly in binary. With all this, it probably doesn't make sense to do double
key lookups, but I guess we allow it.
User beware!
eddb786
to
55d3da3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agavra I believe I've addressed your comments. PTAL
// This allows us to hit the limit without having to queue one last row | ||
if (queue.isEmpty()) { | ||
ctx.runOnContext(v -> sendComplete()); | ||
} else { | ||
ctx.runOnContext(v -> doSend()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With push queries, the only thing that would stop a stream was a limit being hit. The way that the code was set up was a row was enqueued and then if that made it hit the limit, it would fire off the limit handler. Look below in doSend()
. Effectively, the only way to trigger sendCompete was to push one more row into the queue. In the pull query case, we don't know when we push a row on the queue if that's the last until we try to fetch another.
To fix this, I could have either changed pull query behavior to match push queries in the way, or made this publisher do a sendComplete
without having to enqueue another row, and the latter is simpler and makes a fair amount of sense.
|
||
PullQueryExecutionUtil.checkRateLimit(rateLimiter); | ||
|
||
final PullQueryResult result = ksqlEngine.executePullQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. I think it was largely done that way so that we could have a big try/catch around it and ensure we called subscriber.onError(e);
if we hit an exception. In this case, even though it's kicked off immediately, it still sets result.onException(this::setError);
which should allow us to handle errors correctly.
I roughly patterned this off of push queries in PushQueryPublisher
and they also start the query immediately, so I don't think there should be a big difference so far as I can tell.
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void write(final OutputStream output) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a ton of comments and simplified the methods. Hopefully you agree it's easier to understand.
I am using objectMapper.writeValueAsString
to do the heavy lifting of json serialization. The comma handling is a bit of a pain that I think I have to handle myself. The reason is just due to the partial flushing. I can't rely on a json library to write partial json (or at least I don't really know of any that do). This is actually exactly what it does for push queries. I'll take a look and see if any internet searching turns up any ideas.
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/test/java/io/confluent/ksql/api/ApiTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriterTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriterTest.java
Outdated
Show resolved
Hide resolved
writeRow(toProcess, head, sb); | ||
if (sb.length() >= FLUSH_SIZE_BYTES || (clock.millis() - lastFlush) >= MAX_FLUSH_MS) { | ||
output.write(sb.toString().getBytes(StandardCharsets.UTF_8)); | ||
output.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we flush incomplete JSON. This is how push queries work at the moment, so I just copied that behavior (Check out QueryStreamWriter
). If you look in this PR at KsqlTarget
, you'll see how we do this for partial results for forwarded requests. I assume that the CLI does something similar.
} | ||
times[0]++; | ||
return null; | ||
}).when(pullQueryQueue).drainTo(any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played a bit with trying to create one, and it's a little hard to do given how I'm using it in tests. In some cases, a real queue is fine, which I create in some tests. In others, I want to verify that the queue was used in a certain way, and a mock is required. In some others, I want to test surrounding logic and call various callbacks, which I often do from queue "answer" methods to simulate things happening while waiting for new rows. This could be done with a test queue, and methods like completeAfterEmpty
which would implicitly know to call the completion method. I'm not too sure how reusable given that the completion scenario is different in different tests. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AlanConfluent! Big step forward. I'm still having a little trouble wrapping my head around the limit handler/reached end code, but I don't want to block this PR on that as you've clearly thought that out and the longer it sits the more there's a chance for nasty merge conflicts. I'll dig into that offline 🙂
final boolean hasAnotherRow | ||
) { | ||
// Send for a comma after the header | ||
if (!sentAtLeastOneRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified by always prepending ,\n
when we write a row that's not the header (as opposed to only in the case of the header). then when we're done, just append \n]
(so we don't need to check hasAnotherRow
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're actually right and that would honestly simplify my code a bit. The only issue with this is that the code parsing this (and specifically partial responses, chunks) in both KsqlTarget
and the CLI looks for "\n" and also assumes if there's a "," it's on the end. I can potentially fix this, but I don't want to make a big change at this point to the PR. Also, this is consistent with push queries at the moment. I'd rather do this as a followup, if we want to go that route.
// The head becomes the next thing to process and the newly polled row becomes the head. | ||
final PullQueryRow toProcess = head; | ||
head = row; | ||
return toProcess; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess your algorithm above handles this gracefully, but the first time you call pollNextRow
won't it always be null
? might just want to add a little comment there as well so that the next person doesn't scratch their head 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right. I'll add a comment.
} | ||
|
||
@Override | ||
public void write(final OutputStream output) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 this is way cleaner and easier for me to reason about. Thanks for the heavy refactoring!
fe21cfc
to
30e2976
Compare
Description
Before this PR, pull queries were gathered in memory as a
TableRows
object containing aList<List<?>> rows
across all of the endpoints where pull queries are exposed.This change adds the class
PullQueryQueue
, which is a queue of result rows meant to decouple the producers and consumers of row data, allowing for producer calls that are partially complete to be enqueued across both local and remote sources of data. This largely follows the methodology used by push queries. This allows:Specifically on each end of the queue:
PullPhysicalPlan
if the request is being handled locally orHARouting
if the request must be forwarded to another node.For each of the endpoints:
StreamedQueryResource
: This uses chunked encoding responses using the classPullQueryStreamWriter
which is aStreamingOutput
response type. This class periodically reads from the queue and writes a chunk to the response.QueryEndpoint
: This uses aKsqlPullQueryHandle
with the existingBlockingQueryPublisher
to connect a publisher to the queued data.PullQueryPublisher
: This usesPullQuerySubscription
, a newPollingSubscription
rather than the former block of logic that fed raw rows to the subscriber.Testing done
Ran unit and integration tests. Also manually experimented with batches of rows being streamed back to the user by introducing artificial delays.
Reviewer checklist