Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: force repartition on joins with SR-enabled key formats #6635

Merged
merged 15 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,6 @@ private static SourceName getSourceName(final PlanNode node) {
return node.getLeftmostSourceNode().getAlias();
}

private static FormatInfo getValueFormatForSource(final PlanNode sourceNode) {
return sourceNode.getLeftmostSourceNode()
.getDataSource()
.getKsqlTopic()
.getValueFormat()
.getFormatInfo();
}

private static class JoinerFactory {

private final Map<
Expand Down Expand Up @@ -406,26 +398,26 @@ public SchemaKStream<K> join() {
rightStream,
joinNode.getKeyColumnName(),
joinNode.withinExpression.get().joinWindow(),
getValueFormatForSource(joinNode.left),
getValueFormatForSource(joinNode.right),
JoiningNode.getValueFormatForSource(joinNode.left).getFormatInfo(),
JoiningNode.getValueFormatForSource(joinNode.right).getFormatInfo(),
contextStacker
);
case OUTER:
return leftStream.outerJoin(
rightStream,
joinNode.getKeyColumnName(),
joinNode.withinExpression.get().joinWindow(),
getValueFormatForSource(joinNode.left),
getValueFormatForSource(joinNode.right),
JoiningNode.getValueFormatForSource(joinNode.left).getFormatInfo(),
JoiningNode.getValueFormatForSource(joinNode.right).getFormatInfo(),
contextStacker
);
case INNER:
return leftStream.join(
rightStream,
joinNode.getKeyColumnName(),
joinNode.withinExpression.get().joinWindow(),
getValueFormatForSource(joinNode.left),
getValueFormatForSource(joinNode.right),
JoiningNode.getValueFormatForSource(joinNode.left).getFormatInfo(),
JoiningNode.getValueFormatForSource(joinNode.right).getFormatInfo(),
contextStacker
);
default:
Expand Down Expand Up @@ -460,15 +452,15 @@ public SchemaKStream<K> join() {
return leftStream.leftJoin(
rightTable,
joinNode.getKeyColumnName(),
getValueFormatForSource(joinNode.left),
JoiningNode.getValueFormatForSource(joinNode.left).getFormatInfo(),
contextStacker
);

case INNER:
return leftStream.join(
rightTable,
joinNode.getKeyColumnName(),
getValueFormatForSource(joinNode.left),
JoiningNode.getValueFormatForSource(joinNode.left).getFormatInfo(),
contextStacker
);
case OUTER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.util.GrammaticalJoiner;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
Expand Down Expand Up @@ -58,6 +59,13 @@ public interface JoiningNode {
*/
void setKeyFormat(FormatInfo format);

static ValueFormat getValueFormatForSource(final PlanNode sourceNode) {
return sourceNode.getLeftmostSourceNode()
.getDataSource()
.getKsqlTopic()
.getValueFormat();
}

@Immutable
class RequiredFormat {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.Repartitioning;
import java.util.Optional;
Expand All @@ -39,8 +42,11 @@ public class PreJoinRepartitionNode extends SingleSourcePlanNode implements Join

private final Expression partitionBy;
private final LogicalSchema schema;
private final Optional<JoiningNode> joiningNode;
private final Optional<JoinNode> joiningNode;
private final ValueFormat valueFormat;
private Optional<FormatInfo> forcedInternalKeyFormat = Optional.empty();
private boolean forceRepartition = false;
private boolean keyFormatSet = false;

public PreJoinRepartitionNode(
final PlanNodeId id,
Expand All @@ -51,9 +57,18 @@ public PreJoinRepartitionNode(
super(id, source.getNodeOutputType(), source.getSourceName(), source);
this.schema = requireNonNull(schema, "schema");
this.partitionBy = requireNonNull(partitionBy, "partitionBy");
this.joiningNode = source instanceof JoiningNode
? Optional.of((JoiningNode) source)
: Optional.empty();

if (source instanceof JoiningNode) {
if (!(source instanceof JoinNode)) {
throw new IllegalStateException(
"PreJoinRepartitionNode preceded by non-JoinNode JoiningNode: " + source.getClass());
}
this.joiningNode = Optional.of((JoinNode) source);
} else {
this.joiningNode = Optional.empty();
}

this.valueFormat = JoiningNode.getValueFormatForSource(this);
}

@Override
Expand Down Expand Up @@ -92,35 +107,32 @@ public Optional<FormatInfo> getPreferredKeyFormat() {

@Override
public void setKeyFormat(final FormatInfo format) {
if (requiresRepartition()) {
// Node is repartitioning already:
forcedInternalKeyFormat = Optional.of(format);
return;
}
final Optional<FormatInfo> requiredParentJoinFormat = maybeForceInternalKeyFormat(format);

if (joiningNode.isPresent()) {
final Optional<FormatInfo> preferred = joiningNode.get().getPreferredKeyFormat();
if (!preferred.isPresent() || preferred.get().equals(format)) {
// Parent node can handle any key format change:
joiningNode.get().setKeyFormat(format);
if (requiredParentJoinFormat.isPresent()) {
joiningNode.get().setKeyFormat(requiredParentJoinFormat.get());
} else {
forcedInternalKeyFormat = Optional.of(format);
joiningNode.get().resolveKeyFormats();
}
return;
}

if (!format.equals(getSourceKeyFormat())) {
forcedInternalKeyFormat = Optional.of(format);
}
keyFormatSet = true;
}

@Override
public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
if (!keyFormatSet) {
throw new IllegalStateException("PreJoinRepartitionNode must set key format");
}

return getSource().buildStream(builder)
.selectKey(
valueFormat.getFormatInfo(),
partitionBy,
forcedInternalKeyFormat,
builder.buildNodeContext(getId().toString())
builder.buildNodeContext(getId().toString()),
forceRepartition
);
}

Expand All @@ -133,4 +145,42 @@ private FormatInfo getSourceKeyFormat() {
return Iterators.getOnlyElement(getSourceNodes().iterator())
.getDataSource().getKsqlTopic().getKeyFormat().getFormatInfo();
}

/**
* Evaluates whether this node should repartition, based on the desired key format
*
* @param format key format being set on this node
* @return if applicable, the format that must be set on this node's parent JoinNode
*/
private Optional<FormatInfo> maybeForceInternalKeyFormat(final FormatInfo format) {
// Force repartition in case of schema inference, to avoid misses due to key schema ID mismatch
// See https://github.com/confluentinc/ksql/issues/6332 for context, and
// https://github.com/confluentinc/ksql/issues/6648 for a potential optimization
if (FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
forceRepartition = true;
}

if (requiresRepartition() || forceRepartition) {
// Node is repartitioning already:
forcedInternalKeyFormat = Optional.of(format);
return Optional.empty();
}

if (joiningNode.isPresent()) {
final Optional<FormatInfo> preferred = joiningNode.get().getPreferredKeyFormat();
if (!preferred.isPresent() || preferred.get().equals(format)) {
// Parent node can handle any key format change
return Optional.of(format);
} else {
forcedInternalKeyFormat = Optional.of(format);
return Optional.empty();
}
}

if (!format.equals(getSourceKeyFormat())) {
forcedInternalKeyFormat = Optional.of(format);
}

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.planner.Projection;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.structured.SchemaKStream;
import java.util.Optional;
import java.util.stream.Stream;
Expand All @@ -37,6 +38,7 @@ public class UserRepartitionNode extends SingleSourcePlanNode {
private final Expression partitionBy;
private final LogicalSchema schema;
private final Expression originalPartitionBy;
private final ValueFormat valueFormat;

public UserRepartitionNode(
final PlanNodeId id,
Expand All @@ -49,6 +51,10 @@ public UserRepartitionNode(
this.schema = requireNonNull(schema, "schema");
this.partitionBy = requireNonNull(partitionBy, "partitionBy");
this.originalPartitionBy = requireNonNull(originalPartitionBy, "originalPartitionBy");
this.valueFormat = getLeftmostSourceNode()
.getDataSource()
.getKsqlTopic()
.getValueFormat();
}

@Override
Expand All @@ -60,9 +66,11 @@ public LogicalSchema getSchema() {
public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
return getSource().buildStream(builder)
.selectKey(
valueFormat.getFormatInfo(),
partitionBy,
Optional.empty(),
builder.buildNodeContext(getId().toString())
builder.buildNodeContext(getId().toString()),
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,22 +304,44 @@ public SchemaKStream<K> outerJoin(
);
}

/**
* @param valueFormat value format used in constructing serdes. Unchanged by this step.
* @param keyExpression expression for the key being selected
* @param forceInternalKeyFormat new key format to be used, if present
* @param contextStacker context for this step
* @param forceRepartition if true, this step will repartition even if there is no change in
* either key format or value. Used to ensure co-partitioning for
* joins on Schema-Registry-enabled key formats
* @return result stream: repartitioned if needed or forced, else this stream unchanged
*/
@SuppressWarnings("unchecked")
public SchemaKStream<Struct> selectKey(
final FormatInfo valueFormat,
final Expression keyExpression,
final Optional<FormatInfo> forceInternalKeyFormat,
final Stacker contextStacker
final Stacker contextStacker,
final boolean forceRepartition
agavra marked this conversation as resolved.
Show resolved Hide resolved
) {
final boolean keyFormatChange = forceInternalKeyFormat.isPresent()
&& !forceInternalKeyFormat.get().equals(keyFormat.getFormatInfo());

if (!keyFormatChange && repartitionNotNeeded(ImmutableList.of(keyExpression))) {
if (!keyFormatChange
&& !forceRepartition
&& repartitionNotNeeded(ImmutableList.of(keyExpression))
) {
return (SchemaKStream<Struct>) this;
}

if (keyFormat.isWindowed()) {
throw new KsqlException("Implicit repartitioning of windowed sources is not supported. "
+ "See https://github.com/confluentinc/ksql/issues/4385.");
final String errorMsg = "Implicit repartitioning of windowed sources is not supported. "
+ "See https://github.com/confluentinc/ksql/issues/4385.";
final String additionalMsg = forceRepartition
? " As a result, ksqlDB does not support joins on windowed sources with "
+ "Schema-Registry-enabled key formats (AVRO, JSON_SR, PROTOBUF) at this time. "
+ "Please repartition your sources to use a different key format before performing "
+ "the join."
: "";
throw new KsqlException(errorMsg + additionalMsg);
}

final ExecutionStep<KStreamHolder<Struct>> step = ExecutionStepFactory
Expand Down
Loading