Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ScalablePushRegistry and peeking ability in a persistent query #7424

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ public class KsqlConfig extends AbstractConfig {
+ " much faster for short-lived queries.";
public static final boolean KSQL_QUERY_PULL_INTERPRETER_ENABLED_DEFAULT = true;

public static final String KSQL_QUERY_PUSH_SCALABLE_ENABLED
= "ksql.query.push.scalable.enabled";
public static final String KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC =
"Enables whether scalable push queries are enabled. Scalable push queries require no window "
+ "functions, aggregations, or joins, but may include projections and filters.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the feature flag! sets a good example :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


public static final String KSQL_STRING_CASE_CONFIG_TOGGLE = "ksql.cast.strings.preserve.nulls";
public static final String KSQL_STRING_CASE_CONFIG_TOGGLE_DOC =
"When casting a SQLType to string, if false, use String.valueof(), else if true use"
Expand Down Expand Up @@ -864,6 +871,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_INTERPRETER_ENABLED_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_ENABLED,
Type.BOOLEAN,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT,
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC
)
.define(
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.scalablepush;

import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.query.QueryId;
import java.util.ArrayDeque;
import java.util.Deque;

/**
* A queue for storing pre-processed rows for a given scalable push query request. This queue
* starts dropping rows if they're past the capacity, and keeps track so it can be reported to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the error message for dropped records: since for now we are relying on the peek function, which is more of a push model than a pull model (if we fetch from the topics), and hence we cannot control how fast we want to return the data --- on the other hand though, we can make sure that the push query is always at the tail of the stream, though it could be lossy --- that means with large number of push queries, dropping rows may be a norm, and currently we would only report an error messaged as "dropped rows". This could be confusing and worrisome to users.

I'm wondering if we should consider, from easy to hard: 1) report how many records were dropped, 2) report the dropped records' corresponding offset on the source topic, 3) fail the query immediately if we ever drop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if they push the server to the limit, it's possible that the queues couldn't be drained fast enough to keep up, though hopefully in normal operation, that's not the case unless the requester is slow.

I think that 3) is definitely the route we want to go. I might not have been clear, but I meant to send an error message and fail the query. For the near term, if you're too slow in reading rows, your request will be dropped. Doing 2) would be easy though I'm not sure how useful it would be. 1) would be good in a situation where we allowed requests to catch up starting at offset X, but this is much more advanced for the moment.

* request.
*
* <p>The class is threadsafe since it's assumed that different threads are producing and consuming
* the data.
*/
public class ProcessingQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sounds a lot like a circular buffer, any reason not to use one explicitly? (might be better from a memory allocation standpoint)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I changed it to an ArrayDeque. Using the poll and offer methods, which are constant time, it's effectively a circular buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a doubly linked list implementation of a Deque instead of an ArrayDeque? We can avoid rows getting dropped when we hit capacity using that as it can grow 'infinitely'. We can achieve constant time poll and offer methods using a DLL implementation as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to have a capacity so that we don't store unbounded data for a slow client, so being able to grow isn't a feature here.

Often allocating/deallocating in larger chunks can lead to less memory fragmentation. This is less of an issue in Java due to how most GC implementations work, but I took Almog's suggestion. We'd have to profile it to really know what was going on.


static final int BLOCKING_QUEUE_CAPACITY = 100;

private final Deque<TableRow> rowQueue;
private final QueryId queryId;
private final int queueSizeLimit;
private boolean closed = false;
private boolean droppedRows = false;
private Runnable newRowCallback = () -> { };

public ProcessingQueue(final QueryId queryId) {
this(queryId, BLOCKING_QUEUE_CAPACITY);
}

public ProcessingQueue(final QueryId queryId, final int queueSizeLimit) {
this.queryId = queryId;
this.queueSizeLimit = queueSizeLimit;
this.rowQueue = new ArrayDeque<>();
}

/**
* Adds a {@link TableRow} to the queue. This is expected to be called from the processor streams
* thread when a new row arrives.
* @param tableRow The row to add
* @return if the row has been successfully added to the queue or if it's been dropped due to
* being at the size limit.
*/
public synchronized boolean offer(final TableRow tableRow) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the concurrency involved here, and is a full mutex acceptable? also can we javadoc the method to explain what happens when it's full?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concurrency is that the consumer is a Vertx context while the producer is the stream thread which runs the Processor. When you say is a full mutex acceptable, do you mean "is it required"? I'm intending on protecting the deque. Other state such as the callback I believe are set by the request thread. So for this reason, I wanted to protect all state since it can get very confusing if some public methods are valid in some threads and others in other threads.

Will add lots of javadoc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was weighing between a mutex and some concurrent data structure - it seems like the contention would be pretty high (per event?). Probably better to be safe now and optimize later, but I would be interested to know the overhead spent on synchronization when we push the benchmark to it's limits

(c.f. https://lmax-exchange.github.io/disruptor/)

if (closed) {
return false;
} else if (rowQueue.size() < queueSizeLimit && !droppedRows) {
rowQueue.offer(tableRow);
newRowCallback.run();
return true;
}
droppedRows = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this PR, but we might want to add a JIRA to track metrics on this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should result in the query failing with some kind of exception, so hopefully we'll have metrics for that for sure. We should also keep track of this particular type of error too. Will create a ticket for adding metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I didn't have that context that the query will fail elsewhere

return false;
}

/**
* Reads a row from the queue. This is expected to be called from the plan's physical operator
* which is called from the Vertx context.
* @return The next row or null if either the queue is closed or there's no data to return.
*/
public synchronized TableRow poll() {
if (!closed) {
return rowQueue.poll();
}
return null;
}

/**
* Closes the queue which causes rows to stop being returned.
*/
public synchronized void close() {
closed = true;
}

public synchronized boolean isClosed() {
return closed;
}

/**
* Sets a callback which is invoked every time a new row has been enqueued.
* @param newRowCallback The callback to invoke
*/
public synchronized void setNewRowCallback(final Runnable newRowCallback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the other PR, it seems the setNewRowCallback call is only for triggering the poll first the first time, and after that we rely on the timer to context.owner().setTimer(100, timerId -> context.runOnContext(v -> maybeNext(publisher))); no matter if there's new row or not, is that intentional by design? If yes I'm wondering if we can just always rely on the setNewRowCallback for all triggering events: i.e. 1) whenever the set is called, we check immediately if there's any data available, and if yes executes the callback, and 2) whenever the callback is triggered, we reset it to null so that the callback would be triggered just once after it's set, 3) from caller PushPhysicalPlan, we call setNewRowCallback with the timer in makeNext.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right that technically the timer isn't required if we're guaranteed that this will always call back.

I don't want to rely solely on the timer because it will mean that there will be a certain waiting latency. We can try to ensure that we schedule just one call to mabeNext after a given call to the callback here. Let's take that discussion to the followup PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Thanks.

this.newRowCallback = newRowCallback;
}

/**
* Whether rows have been dropped due to hitting the queue limit.
*/
public synchronized boolean hasDroppedRows() {
return droppedRows;
}

public QueryId getQueryId() {
return queryId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.scalablepush;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.scalablepush.locator.AllHostsLocator;
import io.confluent.ksql.physical.scalablepush.locator.PushLocator;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This registry is kept with every persistent query, peeking at the stream which is the output
* of the topology. These rows are then fed to any registered ProcessingQueues where they are
* eventually passed on to scalable push queries.
*/
public class ScalablePushRegistry implements ProcessorSupplier<Object, GenericRow> {

private static final Logger LOG = LoggerFactory.getLogger(ScalablePushRegistry.class);

private final PushLocator pushLocator;
private final LogicalSchema logicalSchema;
private final boolean windowed;
// All mutable field accesses are protected with synchronized. The exception is when
// processingQueues is accessed to processed rows, in which case we want a weakly consistent
// view of the map, so we just iterate over the ConcurrentHashMap directly.
private final ConcurrentHashMap<QueryId, ProcessingQueue> processingQueues
= new ConcurrentHashMap<>();
private boolean closed = false;

public ScalablePushRegistry(
final PushLocator pushLocator,
final LogicalSchema logicalSchema,
final boolean windowed
) {
this.pushLocator = pushLocator;
this.logicalSchema = logicalSchema;
this.windowed = windowed;
}

public synchronized void close() {
for (ProcessingQueue queue : processingQueues.values()) {
queue.close();
}
processingQueues.clear();
closed = true;
}

public synchronized void register(final ProcessingQueue processingQueue) {
if (closed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition here - if someone calls close and another calls register then close can be in the middle of iterating when we add another one to the set. I can't tell whether this is meant to be concurrent code or not, but the fact the closed is volatile makes me think it is. It seems to me like we'd be safer off just synchronizing everything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code that calls this effectively shouldn't ever register anything after calling close, so it's a bit funny to reason about what it means to be consistent, but I agree that it's confusing this way. I added synchronized to protect the use of closed and changing the size of processingQueues.

I still use a ConcurrentHashMap for processingQueues so that I don't need to get a lock when processing rows. It gives a weakly consistent view of the map while iterating, which should be performant.

throw new IllegalStateException("Shouldn't register after closing");
}
processingQueues.put(processingQueue.getQueryId(), processingQueue);
}

public synchronized void unregister(final ProcessingQueue processingQueue) {
if (closed) {
throw new IllegalStateException("Shouldn't unregister after closing");
}
processingQueues.remove(processingQueue.getQueryId());
}

public PushLocator getLocator() {
return pushLocator;
}

@VisibleForTesting
int numRegistered() {
return processingQueues.size();
}

@SuppressWarnings("unchecked")
private void handleRow(
final Object key, final GenericRow value, final long timestamp) {
for (ProcessingQueue queue : processingQueues.values()) {
try {
// The physical operators may modify the keys and values, so we make a copy to ensure
// that there's no cross-query interference.
final TableRow row;
if (!windowed) {
final GenericKey keyCopy = GenericKey.fromList(((GenericKey) key).values());
final GenericRow valueCopy = GenericRow.fromList(value.values());
row = Row.of(logicalSchema, keyCopy, valueCopy, timestamp);
} else {
final Windowed<GenericKey> windowedKey = (Windowed<GenericKey>) key;
final Windowed<GenericKey> keyCopy =
new Windowed<>(GenericKey.fromList(windowedKey.key().values()),
windowedKey.window());
final GenericRow valueCopy = GenericRow.fromList(value.values());
row = WindowedRow.of(logicalSchema, keyCopy, valueCopy, timestamp);
}
queue.offer(row);
} catch (final Throwable t) {
LOG.error("Error while offering row", t);
}
}
}

@Override
public Processor<Object, GenericRow> get() {
return new PeekProcessor();
}

private final class PeekProcessor implements Processor<Object, GenericRow> {

private ProcessorContext context;

private PeekProcessor() {
}

public void init(final ProcessorContext context) {
this.context = context;
}

public void process(final Object key, final GenericRow value) {
handleRow(key, value, this.context.timestamp());
this.context.forward(key, value);
}

@Override
public void close() {
}
}

public static Optional<ScalablePushRegistry> create(
final LogicalSchema logicalSchema,
final Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
final boolean windowed,
final Map<String, Object> streamsProperties
) {
final Object appServer = streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (appServer == null) {
return Optional.empty();
}

if (!(appServer instanceof String)) {
throw new IllegalArgumentException(StreamsConfig.APPLICATION_SERVER_CONFIG + " not String");
}

final URL localhost;
try {
localhost = new URL((String) appServer);
} catch (final MalformedURLException e) {
throw new IllegalArgumentException(StreamsConfig.APPLICATION_SERVER_CONFIG + " malformed: "
+ "'" + appServer + "'");
}

final PushLocator pushLocator = new AllHostsLocator(allPersistentQueries, localhost);
return Optional.of(new ScalablePushRegistry(pushLocator, logicalSchema, windowed));
}
}
Loading