Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ScalablePushRegistry and peeking ability in a persistent query #7424

Merged
Next Next commit
feat: Adds ScalablePushRegistry and peeking ability in a persistent q…
…uery
AlanConfluent committed May 7, 2021
commit 28025e6c0c5bfbfe1bb11d494f527841fc18acf9
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.confluent.ksql.physical.scalable_push;

import io.confluent.ksql.execution.streams.materialization.TableRow;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

public class ProcessingQueue {

private static final int BLOCKING_QUEUE_CAPACITY = 100;

private final LinkedList<TableRow> rowQueue;
private final int queueSizeLimit;
private boolean closed = false;
private boolean droppedRows = false;
private Runnable newRowCallback = () -> {};

public ProcessingQueue() {
this(BLOCKING_QUEUE_CAPACITY);
}

public ProcessingQueue(final int queueSizeLimit) {
this.queueSizeLimit = queueSizeLimit;
this.rowQueue = new LinkedList<>();
}

public synchronized boolean offer(final TableRow tableRow) {
if (rowQueue.size() < queueSizeLimit && !droppedRows) {
rowQueue.offer(tableRow);
newRowCallback.run();
return true;
}
droppedRows = true;
return false;
}

public synchronized TableRow poll() {
if (!closed) {
return rowQueue.poll();
}
return null;
}

public synchronized void close() {
closed = true;
}

public synchronized boolean isClosed() {
return closed;
}

public synchronized void setNewRowCallback(final Runnable newRowCallback) {
this.newRowCallback = newRowCallback;
}

public synchronized boolean hasDroppedRows() {
return droppedRows;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package io.confluent.ksql.physical.scalable_push;

import static io.confluent.ksql.schema.ksql.SystemColumns.ROWTIME_NAME;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.scalable_push.locator.AllHostsLocator;
import io.confluent.ksql.physical.scalable_push.locator.PushLocator;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.vertx.core.impl.ConcurrentHashSet;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

public class ScalablePushRegistry {

private final KStream<?, GenericRow> stream;
private final LogicalSchema logicalSchema;
private final LogicalSchema intermediateSchema;
private final PushLocator pushLocator;
private final Set<ProcessingQueue> processingQueues = new ConcurrentHashSet<>();

public ScalablePushRegistry(
final KStream<?, GenericRow> stream,
final LogicalSchema logicalSchema,
final PushLocator pushLocator,
final boolean windowed
) {
this.stream = stream;
this.logicalSchema = logicalSchema;
this.intermediateSchema = logicalSchema.withPseudoAndKeyColsInValue(windowed);
this.pushLocator = pushLocator;
registerPeek(windowed);
}

@SuppressWarnings("unchecked")
private void registerPeek(final boolean windowed) {
// final Column rowTimeColumn = intermediateSchema.findValueColumn(ROWTIME_NAME)
// .orElseThrow(() -> new IllegalStateException("No row time found"));
// final long rowTime = (long) v.get(rowTimeColumn.index());
ProcessorSupplier<Object, GenericRow> peek = new Peek<>((key, value, timestamp) -> {
TableRow row;
if (!windowed) {
row = Row.of(logicalSchema, (GenericKey) key, value, timestamp);
} else {
row = WindowedRow.of(logicalSchema, (Windowed<GenericKey>) key, value, timestamp);
}
System.out.println("PEEK2: " + key.toString() + ", " + value.toString());
for (ProcessingQueue queue : processingQueues) {
queue.offer(row);
}
});
stream.process(peek);
}

public void close() {
for (ProcessingQueue queue : processingQueues) {
queue.close();
}
}

class Peek<K, V> implements ProcessorSupplier<K, V> {
private final ForeachAction<K, V> action;

public Peek(ForeachAction<K, V> action) {
this.action = action;
}

public Processor<K, V> get() {
return new PeekProcessor();
}

private class PeekProcessor implements Processor<K, V> {

private ProcessorContext context;

private PeekProcessor() {
}

public void init(ProcessorContext context) {
this.context = context;
}

public void process(K key, V value) {
Peek.this.action.apply(key, value, this.context.timestamp());
this.context.forward(key, value);
}

@Override
public void close() {
}
}
}

public interface ForeachAction<K, V> {
void apply(K key, V value, long timestamp);
}


public void register(final ProcessingQueue processingQueue) {
processingQueues.add(processingQueue);
}

public void unregister(final ProcessingQueue processingQueue) {
processingQueues.remove(processingQueue);
}

public PushLocator getLocator() {
return pushLocator;
}


public static Optional<ScalablePushRegistry> create(
final KStream<?, GenericRow> stream,
final LogicalSchema logicalSchema,
final Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
final boolean windowed,
final Map<String, Object> streamsProperties
) {
final Object appServer = streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (appServer == null) {
return Optional.empty();
}

if (!(appServer instanceof String)) {
throw new IllegalArgumentException(StreamsConfig.APPLICATION_SERVER_CONFIG + " not String");
}

final URL localhost;
try {
localhost = new URL((String) appServer);
} catch (final MalformedURLException e) {
throw new IllegalArgumentException(StreamsConfig.APPLICATION_SERVER_CONFIG + " malformed: "
+ "'" + appServer + "'");
}

final PushLocator pushLocator = new AllHostsLocator(allPersistentQueries, localhost);
return Optional.of(new ScalablePushRegistry(stream, logicalSchema, pushLocator, windowed));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.confluent.ksql.physical.scalable_push.locator;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.streams.materialization.ks.KsLocator;
import io.confluent.ksql.execution.streams.materialization.ks.KsLocator.Node;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;

public class AllHostsLocator implements PushLocator {

private final Supplier<List<PersistentQueryMetadata>> allPersistentQueries;
private final URL localhost;

public AllHostsLocator(final Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
final URL localhost) {
this.allPersistentQueries = allPersistentQueries;
this.localhost = localhost;
}


public Set<KsqlNode> locate() {
final List<PersistentQueryMetadata> currentQueries = allPersistentQueries.get();
if (currentQueries.isEmpty()) {
return Collections.emptySet();
}

return currentQueries.stream()
.map(QueryMetadata::getAllMetadata)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.filter(streamsMetadata -> streamsMetadata != StreamsMetadata.NOT_AVAILABLE)
.map(StreamsMetadata::hostInfo)
.map(hi -> new Node(isLocalhost(hi), buildLocation(hi)))
.collect(Collectors.toSet());
}

private boolean isLocalhost(final HostInfo hostInfo) {
if (hostInfo.port() != localhost.getPort()) {
return false;
}

return hostInfo.host().equalsIgnoreCase(localhost.getHost())
|| hostInfo.host().equalsIgnoreCase("localhost");
}

private URI buildLocation(final HostInfo remoteInfo) {
try {
return new URL(
localhost.getProtocol(),
remoteInfo.host(),
remoteInfo.port(),
"/"
).toURI();
} catch (final Exception e) {
throw new IllegalStateException("Failed to convert remote host info to URL."
+ " remoteInfo: " + remoteInfo);
}
}

private static class Node implements KsqlNode {

private final boolean isLocal;
private final URI location;

public Node(final boolean isLocal, final URI location) {
this.isLocal = isLocal;
this.location = location;
}

@Override
public boolean isLocal() {
return isLocal;
}

@Override
public URI location() {
return location;
}

@Override
public String toString() {
return "Node{"
+ "isLocal = " + isLocal
+ ", location = " + location
+ "}";
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

final Node that = (Node) o;
return isLocal == that.isLocal
&& location.equals(that.location);
}

@Override
public int hashCode() {
return Objects.hash(isLocal, location);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.confluent.ksql.physical.scalable_push.locator;

import java.net.URI;
import java.util.Set;
import org.apache.kafka.streams.state.HostInfo;

public interface PushLocator {

Set<KsqlNode> locate();

interface KsqlNode {

/**
* @return {@code true} if this is the local node, i.e. the KSQL instance handling the call.
*/
boolean isLocal();

/**
* @return The base URI of the node, including protocol, host and port.
*/
URI location();
}
}
Loading