Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ScalablePushRegistry and peeking ability in a persistent query #7424

Merged

Conversation

AlanConfluent
Copy link
Member

Description

This is the first step toward scalable push queries. ScalablePushRegistry is held by a PersistentQueryMetadata and peeks at the rows passing through to the end of the topology.

It also introduces the ProcessingQueue which is the object registered for a given scalable push query with the ScalablePushRegistry, and where rows are offered.

Next steps will be to introduce the the physical operator which calls register and unregister and pipe it through to the physical plan.

Testing done

Ran unit tests.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@AlanConfluent AlanConfluent requested a review from a team as a code owner April 23, 2021 03:25
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick pass - didn't review tests yet. Overall LGTM, some data structure questions... I'm excited to have this work finally kick off! By the way, we might want to make the design doc public so that we have something to start pointing people to.

public static final String KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC =
"Enables whether scalable push queries are enabled. Scalable push queries require no window "
+ "functions, aggregations, or joins, but may include projections and filters.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the feature flag! sets a good example :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* starts dropping rows if they're past the capacity, and keeps track so it can be reported to the
* request.
*/
public class ProcessingQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sounds a lot like a circular buffer, any reason not to use one explicitly? (might be better from a memory allocation standpoint)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I changed it to an ArrayDeque. Using the poll and offer methods, which are constant time, it's effectively a circular buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a doubly linked list implementation of a Deque instead of an ArrayDeque? We can avoid rows getting dropped when we hit capacity using that as it can grow 'infinitely'. We can achieve constant time poll and offer methods using a DLL implementation as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to have a capacity so that we don't store unbounded data for a slow client, so being able to grow isn't a feature here.

Often allocating/deallocating in larger chunks can lead to less memory fragmentation. This is less of an issue in Java due to how most GC implementations work, but I took Almog's suggestion. We'd have to profile it to really know what was going on.

this.rowQueue = new LinkedList<>();
}

public synchronized boolean offer(final TableRow tableRow) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the concurrency involved here, and is a full mutex acceptable? also can we javadoc the method to explain what happens when it's full?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concurrency is that the consumer is a Vertx context while the producer is the stream thread which runs the Processor. When you say is a full mutex acceptable, do you mean "is it required"? I'm intending on protecting the deque. Other state such as the callback I believe are set by the request thread. So for this reason, I wanted to protect all state since it can get very confusing if some public methods are valid in some threads and others in other threads.

Will add lots of javadoc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was weighing between a mutex and some concurrent data structure - it seems like the contention would be pretty high (per event?). Probably better to be safe now and optimize later, but I would be interested to know the overhead spent on synchronization when we push the benchmark to it's limits

(c.f. https://lmax-exchange.github.io/disruptor/)

newRowCallback.run();
return true;
}
droppedRows = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this PR, but we might want to add a JIRA to track metrics on this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should result in the query failing with some kind of exception, so hopefully we'll have metrics for that for sure. We should also keep track of this particular type of error too. Will create a ticket for adding metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I didn't have that context that the query will fail elsewhere

// queues.
@SuppressWarnings("unchecked")
private void registerPeek(final boolean windowed) {
final ProcessorSupplier<Object, GenericRow> peek = new Peek<>((key, value, timestamp) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be a little cleaner to have ScalablePushRegistry just implement ProcessorSupplier<K, V> and then call stream.process(aScalablePushRegistry) - it seems like, to me, passing the entire KStream into the constructor breaks some abstraction barriers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Made the change. I still have to pass some metadata like schema and isWindowed to the registry, which are tied to the stream I hope to process, but I think it's still a bit easy to understand.

if (!windowed) {
final GenericKey keyCopy = GenericKey.fromList(((GenericKey) key).values());
final GenericRow valueCopy = GenericRow.fromList(value.values());
row = Row.of(logicalSchema, keyCopy, valueCopy, timestamp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this outside of the for loop or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we seem to modify the values in some of our operators (adding key and system columns to the value columns), which could cause cross-query breakage. (Don't ask me about how I tracked this down!). I could change that code to make a copy instead, but I figured if I was going to copy, doing it here was the safest place to do it. It might be nice to make a change in the future that avoids a copy entirely, but I didn't want to tackle that now and it doesn't seem to have a huge effect on performance so far as I could tell.

I'll add a comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the detailed explanation - totally agree we shouldn't change the original code, it might have some real performance implications elsewhere.

.filter(streamsMetadata -> streamsMetadata != StreamsMetadata.NOT_AVAILABLE)
.map(StreamsMetadata::hostInfo)
.map(hi -> new Node(isLocalhost(hi), buildLocation(hi)))
.collect(Collectors.toCollection(LinkedHashSet::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why LinkedHashSet? do we care about ordering? (if so we should probably comment about why)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that solely so that I could write nice tests... I think that it was originally a Set to avoid any redundant hosts. I'll change it to a list with a call to distinct.

Comment on lines 237 to 238
// Must be created before the topology is created since the ScalablePushRegistry adds a peek
// to the topology.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this goes back to my other point about it being weird to register the peek call within the constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Made the suggested change. I still added some comments here to make it clear what's going on, but I changed the method to applyScalablePushProcessor so it should be clearer.

@agavra agavra requested a review from a team April 23, 2021 22:24
@AlanConfluent
Copy link
Member Author

AlanConfluent commented Apr 26, 2021

By the way, we might want to make the design doc public so that we have something to start pointing people to.

Good idea. Let's chat on the best way to do that.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm :)

}

public void register(final ProcessingQueue processingQueue) {
if (closed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition here - if someone calls close and another calls register then close can be in the middle of iterating when we add another one to the set. I can't tell whether this is meant to be concurrent code or not, but the fact the closed is volatile makes me think it is. It seems to me like we'd be safer off just synchronizing everything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code that calls this effectively shouldn't ever register anything after calling close, so it's a bit funny to reason about what it means to be consistent, but I agree that it's confusing this way. I added synchronized to protect the use of closed and changing the size of processingQueues.

I still use a ConcurrentHashMap for processingQueues so that I don't need to get a lock when processing rows. It gives a weakly consistent view of the map while iterating, which should be performant.

when(processorContext.timestamp()).thenReturn(TIMESTAMP);

// When:
// verify(stream).process(processorSupplier.capture());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these commented out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that was from before I made the registry a supplier itself. Removing.

Copy link
Contributor

@cprasad1 cprasad1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just had one question about the choice of a data structure

* starts dropping rows if they're past the capacity, and keeps track so it can be reported to the
* request.
*/
public class ProcessingQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a doubly linked list implementation of a Deque instead of an ArrayDeque? We can avoid rows getting dropped when we hit capacity using that as it can grow 'infinitely'. We can achieve constant time poll and offer methods using a DLL implementation as well


/**
* A queue for storing pre-processed rows for a given scalable push query request. This queue
* starts dropping rows if they're past the capacity, and keeps track so it can be reported to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the error message for dropped records: since for now we are relying on the peek function, which is more of a push model than a pull model (if we fetch from the topics), and hence we cannot control how fast we want to return the data --- on the other hand though, we can make sure that the push query is always at the tail of the stream, though it could be lossy --- that means with large number of push queries, dropping rows may be a norm, and currently we would only report an error messaged as "dropped rows". This could be confusing and worrisome to users.

I'm wondering if we should consider, from easy to hard: 1) report how many records were dropped, 2) report the dropped records' corresponding offset on the source topic, 3) fail the query immediately if we ever drop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if they push the server to the limit, it's possible that the queues couldn't be drained fast enough to keep up, though hopefully in normal operation, that's not the case unless the requester is slow.

I think that 3) is definitely the route we want to go. I might not have been clear, but I meant to send an error message and fail the query. For the near term, if you're too slow in reading rows, your request will be dropped. Doing 2) would be easy though I'm not sure how useful it would be. 1) would be good in a situation where we allowed requests to catch up starting at offset X, but this is much more advanced for the moment.

* Sets a callback which is invoked every time a new row has been enqueued.
* @param newRowCallback The callback to invoke
*/
public synchronized void setNewRowCallback(final Runnable newRowCallback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the other PR, it seems the setNewRowCallback call is only for triggering the poll first the first time, and after that we rely on the timer to context.owner().setTimer(100, timerId -> context.runOnContext(v -> maybeNext(publisher))); no matter if there's new row or not, is that intentional by design? If yes I'm wondering if we can just always rely on the setNewRowCallback for all triggering events: i.e. 1) whenever the set is called, we check immediately if there's any data available, and if yes executes the callback, and 2) whenever the callback is triggered, we reset it to null so that the callback would be triggered just once after it's set, 3) from caller PushPhysicalPlan, we call setNewRowCallback with the timer in makeNext.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right that technically the timer isn't required if we're guaranteed that this will always call back.

I don't want to rely solely on the timer because it will mean that there will be a certain waiting latency. We can try to ensure that we schedule just one call to mabeNext after a given call to the callback here. Let's take that discussion to the followup PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Thanks.

/**
* Gets the scalable push query registry, if one was created for this persistent query.
*/
public Optional<ScalablePushRegistry> getScalablePushRegistry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more or less for the other PR together: since I did not see the caller to construct this object I guess it is in other PRs, my question is though how do we find which PersistentQueryMetadata to pass in when we build the builder? That is because, it seems like the trace are:

PushPhysicalPlanBuilder#persistentQueryMetadata -> PeekStreamOperator#scalablePushRegistry -> scalablePushRegistry#register

And we would have one registry for each source stream/table only, where all registered push queries would be given the record from the peek call. This means we must be able to get the corresponding persistentQueryMetadata to pass to the PushPhysicalPlanBuilder, but at that time how we would know which source stream/table it is for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually in this PR. There's a factory method called ScalablePushRegistry.create.

If you look in QueryExecutor.applyScalablePushProcessor, you can see that when the PersistentQueryMetadata is being constructed, that's when this is created, so we know exactly how the two match up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I guess the confusion from my side is that I did not see where the constructor of PushPhysicalPlanBuilder is called in either PR, and hence hard to guess which persistentQueryMetadata parameter would be passed in. If you have that constructor called somewhere could you point me to it?

@AlanConfluent AlanConfluent force-pushed the scalable_push_queries_registry branch from 07bc7a6 to 72a0b16 Compare May 7, 2021 01:07
}
final Optional<ScalablePushRegistry> registry = ScalablePushRegistry.create(schema,
allPersistentQueries, windowed, streamsProperties);
registry.ifPresent(r -> stream.process(registry.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could this just be registry.ifPresent(r -> stream.process(r));?

@AlanConfluent AlanConfluent merged commit 89c1588 into confluentinc:master May 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants