-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add stream pull queries to http2 #8124
Conversation
) { | ||
// If we are supposed to stop at the endOffsets, but we didn't get any, then we should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what scenario are we not getting any endOffsests even though we are supposed to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit weird, but if a partition is empty, then we just won't get an "end offset" for it at all. So, if all the partitions are empty, then the whole map will be empty. In this case, it just means we're supposed to stop at the "end" of an empty topic. Aka, we should return zero results.
queue.acceptRow(null, record.value()); | ||
} | ||
|
||
checkCompletion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For every record added to the queue, we check the endOffsets across all partitions in checkCompletion
. I am wondering if we can make it cheaper (might not be needed if this is already very cheap) by just checking for completion in the partition we are currently reading from, where the current record is read from since the other partitions cannot have reached endOffset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good observation, but it should actually be pretty cheap already, since this is only checking local variables. I'd be surprised if just checking the current record's partition is noticeably faster, but I'll go ahead and do it.
context.schedule( | ||
Duration.ofMillis(100), | ||
PunctuationType.WALL_CLOCK_TIME, | ||
timestamp -> checkCompletion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check for completion both periodically and when reading records. For my education, why is the periodic one needed as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right; I should add a comment :)
The process
call is only triggered when processing a record. If an upstream processor filters records out, then this processor (which is right at the end of the topology) might never get triggered and hence might never complete. I figured that out the hard way :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @vpapavas and @AlanConfluent ,
I'm ready for a real review pass now. I've left some breadcrumbs to help your review.
@@ -279,9 +281,9 @@ public static void setUpClass() throws Exception { | |||
} | |||
|
|||
private static void writeConnectConfigs(final String path, final Map<String, String> configs) throws Exception { | |||
try (PrintWriter out = new PrintWriter(new OutputStreamWriter( | |||
try (final PrintWriter out = new PrintWriter(new OutputStreamWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While trying to resolve all the checkstyle errors, I wound up accepting some IntelliJ suggestions. I went ahead and kept the super minor ones.
@@ -371,6 +373,74 @@ public void shouldStreamPushQuerySync() throws Exception { | |||
assertThat(streamedQueryResult.isComplete(), is(false)); | |||
} | |||
|
|||
@Test | |||
public void shouldStreamPullQueryOnStreamAsync() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In lieu of running all the functional test scenarios from RQTT, I'm just adding a couple of smoke tests as hand-written ITs for the HTTP/2 endpoint.
@@ -675,119 +745,6 @@ public void shouldHandleErrorResponseFromInsertInto() { | |||
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); | |||
} | |||
|
|||
@Test | |||
public void shouldStreamQueryWithProperties() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests actually mutate the shared state and therefore affect other tests. I've moved them to a new IT file called ClientMutationIntegrationTest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
private static void verifyStreamRows(final List<Row> rows, final int numRows) { | ||
assertThat(rows, hasSize(numRows)); | ||
for (int i = 0; i < numRows; i++) { | ||
for (int i = 0; i < Math.min(numRows, rows.size()); i++) { | ||
verifyStreamRowWithIndex(rows.get(i), i); | ||
} | ||
if (rows.size() < numRows) { | ||
fail("Expected " + numRows + " but only got " + rows.size()); | ||
} else if (rows.size() > numRows) { | ||
final List<Row> extra = rows.subList(numRows, rows.size()); | ||
fail("Expected " + numRows + " but got " + rows.size() + ". The extra rows were: " + extra); | ||
} | ||
|
||
// not strictly necessary after the other checks, but just to specify the invariant | ||
assertThat(rows, hasSize(numRows)); | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was part of my debugging that led to discovering that some of the tests were inserting new data into the shared stream (and ejecting them into a separate test file).
I figured I'd go ahead and keep this test improvement, since it is nice to get some extra information about why we didn't get the expected row count and whether or not the rows we did get were expected or not.
import org.junit.rules.RuleChain; | ||
|
||
@Category({IntegrationTest.class}) | ||
public class ClientMutationIntegrationTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically just copied from ClientIntegrationTest to isolate the tests that want to mutate the shared test data (like inserting new records). It seems like we didn't want to take the time to completely clean and re-create the test data in between each test, so the tests we write need to know whether we expect the data to change or not.
Separating the "mutation" tests from the "static" tests is advantageous because it makes it clear whether a particular test needs to worry about the underlying data changing or not.
callback = new LimitQueueCallback() { | ||
@Override | ||
public boolean shouldQueue() { | ||
return parent.shouldQueue(); | ||
} | ||
|
||
@Override | ||
public void onQueued() { | ||
parent.onQueued(); | ||
queuedCallback.run(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This complexity is a big chunk of the reason why I inlined this callback. We were already piggy-backing the "queued" callback on the limit handler, and it was really hard to see what was actually going on in the code. By flattening it into this class, we can actually follow this logic now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I always found this parent, chaining to be a bit hard to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if (remaining != null && remaining.decrementAndGet() <= 0) { | ||
limitHandler.limitReached(); | ||
} | ||
if (queuedCallback != null) { | ||
queuedCallback.run(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you drilled down into that composite limit + queued handler callback, this is all it was actually doing.
this("STRUCTURED_TYPES"); | ||
} | ||
|
||
public StructuredTypesDataProvider(final String namePrefix) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulling out this constructor lets us give the topics and tables/streams a different name (which we use for the new ClientMutableIntegrationTest)
@@ -93,17 +93,13 @@ | |||
} | |||
}, | |||
{ | |||
"name": "correct results", | |||
"name": "simple", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's a bummer to change the tests and logic at the same time, but having all these cases in the same test was making it hard to debug.
final String message = String.format( | ||
"%s - %s [%s] \"%s %s %s\" %d %d \"-\" \"%s\" %d", | ||
routingContext.request().remoteAddress().host(), | ||
socketAddress == null ? "null" : socketAddress.host(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why, but I got an NPE on this line a couple of times. This should prevent it.
6ac5f60
to
dfe4e0a
Compare
@@ -471,7 +422,7 @@ private TopicDescription getTopicDescription(final Admin admin, final String sou | |||
final ListOffsetsResult listOffsetsResult = admin.listOffsets( | |||
topicPartitions, | |||
new ListOffsetsOptions( | |||
isolationLevel | |||
IsolationLevel.READ_UNCOMMITTED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why read uncommitted rather than the code from before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hard-coded the stream pull query app to be "at least once" (because it is pure side-effect, there is no point in EOS), which means that we will only need to get the end offsets in "read uncommitted" mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I added a comment to clarify this.
// not currently used in pull queries, although future refactoring might be able to | ||
// take advantage of this mechanism. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I poked through the code and saw how we do this for pull queries at the moment, which is that we use the LimitHandler and just fire it, even if there's nothing in the queue. It's effectively the same thing as this completion handler. Can we make a followup task to consolidate these two?
|
||
// We ignore null values in streams as invalid data and drop the record. | ||
// If we're already done with the current partition, we just drop the record. | ||
if (record.value() != null && !alreadyDone) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do push queries not handle null data values? For some reason I thought they did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this code was here before, so I guess not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is consistent with KS semantics, that null-values are considered invalid and always filtered out. In tables they are considered as tombstones of course, and we sort of letting the users be responsible of the difference between the two when converting in between.
} | ||
} | ||
|
||
private void checkCompletion(final TopicPartition topicPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it just a bit confusing having 3 different versions of checkCompletion. Maybe it makes sense to call this something like checkCompletionPartition
the next checkCompletionAllPartitions
?
If you had checkCompletion()
call this method, you could inline the body of the third method here. Is the idea that you want a consistent snapshot of currentPositions? There is only one thread acting on everything, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roger that. I kind of phoned it in on naming these methods. I'll fix it.
It's handy to keep the third method so that we don't have to separately call currentPositions
before invoking the single-partition version. Maybe you can take another look after I push my current batch of updates and see if you think it's ok.
streamTask = ProcessorContextImpl.class.getDeclaredField("streamTask"); | ||
streamTask.setAccessible(true); | ||
final StreamTask task = (StreamTask) streamTask.get(context); | ||
final Method committableOffsetsAndMetadata = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make an issue to expose this in streams so that this isn't required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to sanity-check the overall algorithm before starting that process. Since it seems like you agree with the approach, I'll go ahead and propose it for AK now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a POC I put together for an AK feature, but I haven't taken the time to formally propose it: apache/kafka#11336
callback = new LimitQueueCallback() { | ||
@Override | ||
public boolean shouldQueue() { | ||
return parent.shouldQueue(); | ||
} | ||
|
||
@Override | ||
public void onQueued() { | ||
parent.onQueued(); | ||
queuedCallback.run(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I always found this parent, chaining to be a bit hard to understand.
@@ -650,4 +671,131 @@ private static void updateListProperty( | |||
valueList.add(value); | |||
properties.put(key, valueList); | |||
} | |||
|
|||
private static class TransientQuerySinkProcessor implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is complex enough, we might want to pull it out into its own class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea.
@@ -364,15 +354,16 @@ public StreamPullQueryMetadata createStreamPullQuery( | |||
.putAll(statementOrig.getSessionConfig().getOverrides()) | |||
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |||
.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this important for threadsafety in your processor? Might make sense to comment on that here if so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I just think it'd be wasteful to spend more than one thread on this operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out, I did have a comment explaining the full set of configs a few lines above, but I'd already forgotten to update it when I removed the commit interval, which isn't a good sign.
I moved the comments down to explain each config option independently, so this is hopefully clearer now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass as well, I really like the cleanup you did alongside this PR!
One meta question I had is around why we need to split the phases of stream pull queries
into create
first, and then wait
, instead as execute
as in others. I think I just lack some critical context here :P Will sync with @vvcephei separately offline.
streamTask = ProcessorContextImpl.class.getDeclaredField("streamTask"); | ||
streamTask.setAccessible(true); | ||
final StreamTask task = (StreamTask) streamTask.get(context); | ||
final Method committableOffsetsAndMetadata = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the current position is referring to the source topic's position, could we get them via ProcessorContext#recordMetadata()#offset()
? cc @AlanConfluent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that first, only to sadly be reminded that the endOffsets
returned by the admin client are not the "offsets of the last records in the partitions", but rather "the offset after the last record in the partition" aka "the offset that the next produced record will get".
The reason I cracked the safe to get at this method here is because it specifically computes that position for the purpose of committing offsets. I.e., we don't commit the offsets of the records we have processed, we commit the offsets of the next records we need to process. It might be tempting to just use recordMetadata#offset() + 1
for this, but the reality is that it's not so trivial, thanks for skips in the offset sequence due to compaction, "invisible" offsets for transaction markers, and stuff like that.
The method I'm exposing here specifically accounts for all of those effects:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L435-L455
In addition to the above, we have another problem: upstream processors might filter out arbitrary records before they even reach this processor. Specifically, if you write a stream pull query that should only match one record, and there are more records after it in the topic, the query would never terminate, since the process()
method would never see any offsets passed the one we're filtering for. That's why I also registered the punctuator, and the punctuator needs to be able to check "the current position" of the task as a whole, since it has no recordMetadata
available to get the offset of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks John, that makes sense!
Thanks for taking a look, @guozhangwang ! I responded above to your question about the Streams state I'm peeking into. Please let me know if it still seems wrong to you. About the create/wait separation: that separation is actually gone now. Specifically, this PR removes the "wait" method in favor of using the same mechanism to complete the query that we're currently using to implement the "limit" clause for push queries. That would have been a better way to do it from the start, but I didn't have the inspiration until now. I considered renaming the "create" method to "execute" like the others, but I'm actually more inclined to rename them all to "create" for the simple reason that none of those methods actually execute anything. They all only create the query metadata and then some other component later calls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the function names in KsqlEngine, and I'm wondering if we could consider renaming some of them, since:
executeTransientQuery
only create the query, and then trigger start() in the publisherexecuteTablePullQuery
may trigger start() if startImmediately is true (and in new APIs it is set to true indeed); if not it would trigger in the publisherexecuteScalablePushQuery
is the same as executeTransientQuery
So maybe it’s better to consider renaming all of them as createXXX similar to createStreamPullQuery
except createAndMaybeExecuteTablePullQuery
? WDYT @vvcephei @AlanConfluent @vpapavas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
Thanks for looking at that, @guozhangwang ! I agree with your proposal, but I'd like to tackle it in a later PR. |
Thanks for the review @AlanConfluent ! |
Adapting the stream pull query execution to the HTTP/2 endpoint, which wound up
being a significant refactor.
Reviewer checklist