Skip to content

Commit

Permalink
refactor: add an overload in MaterializedFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Mar 9, 2020
1 parent ae1de49 commit 795fc08
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public class KsMaterializationFunctionalTest {
private static final String USER_TABLE = "users_table";
private static final String USER_STREAM = "users_stream";

private static final String PAGEVIEWS_TOPIC = "pageviews_topic";
private static final String PAGEVIEWS_STREAM = "pageviews_stream";
private static final String PAGE_VIEWS_TOPIC = "page_views_topic";
private static final String PAGE_VIEWS_STREAM = "page_views_stream";

private static final Format VALUE_FORMAT = JSON;
private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider();
Expand Down Expand Up @@ -136,7 +136,7 @@ public class KsMaterializationFunctionalTest {

@BeforeClass
public static void classSetUp() {
TEST_HARNESS.ensureTopics(USERS_TOPIC, PAGEVIEWS_TOPIC);
TEST_HARNESS.ensureTopics(USERS_TOPIC, PAGE_VIEWS_TOPIC);

TEST_HARNESS.produceRows(
USERS_TOPIC,
Expand All @@ -146,7 +146,7 @@ public static void classSetUp() {

for (final Instant windowTime : WINDOW_START_INSTANTS) {
TEST_HARNESS.produceRows(
PAGEVIEWS_TOPIC,
PAGE_VIEWS_TOPIC,
PAGE_VIEW_DATA_PROVIDER,
VALUE_FORMAT,
windowTime::toEpochMilli
Expand Down Expand Up @@ -425,7 +425,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
public void shouldFailQueryWithRetentionSmallerThanGracePeriod() {
// Given:
executeQuery("CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS)"
+ " GROUP BY PAGEID;"
Expand All @@ -437,7 +437,7 @@ public void shouldQueryTumblingWindowMaterializedTableWithRetention() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW TUMBLING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " RETENTION " + (WINDOW_SEGMENT_DURATION.getSeconds() * 2) + " SECONDS,"
+ " GRACE PERIOD 0 SECONDS)"
Expand Down Expand Up @@ -466,7 +466,7 @@ public void shouldQueryHoppingWindowMaterializedTableWithRetention() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW HOPPING (SIZE " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " ADVANCE BY " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS, "
+ " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
Expand Down Expand Up @@ -495,7 +495,7 @@ public void shouldQuerySessionWindowMaterializedTableWithRetention() {
// Given:
final PersistentQueryMetadata query = executeQuery(
"CREATE TABLE " + output + " AS"
+ " SELECT COUNT(*) AS COUNT FROM " + PAGEVIEWS_STREAM
+ " SELECT COUNT(*) AS COUNT FROM " + PAGE_VIEWS_STREAM
+ " WINDOW SESSION (" + WINDOW_SEGMENT_DURATION.getSeconds()/2 + " SECONDS,"
+ " RETENTION " + WINDOW_SEGMENT_DURATION.getSeconds() + " SECONDS,"
+ " GRACE PERIOD 0 SECONDS"
Expand Down Expand Up @@ -754,10 +754,10 @@ private static void initializeKsql(final TestKsqlContext ksqlContext) {
+ ");"
);

ksqlContext.sql("CREATE STREAM " + PAGEVIEWS_STREAM + " "
ksqlContext.sql("CREATE STREAM " + PAGE_VIEWS_STREAM + " "
+ " (" + PAGE_VIEW_DATA_PROVIDER.ksqlSchemaString() + ")"
+ " WITH ("
+ " kafka_topic='" + PAGEVIEWS_TOPIC + "', "
+ " kafka_topic='" + PAGE_VIEWS_TOPIC + "', "
+ " value_format='" + VALUE_FORMAT.name() + "', "
+ " key = '" + PAGE_VIEW_DATA_PROVIDER.key() + "'"
+ ");"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ public Node visitHoppingWindowExpression(

final String sizeUnit = windowUnits.get(0).getText();
final String advanceByUnit = windowUnits.get(1).getText();

return new HoppingWindowExpression(
getLocation(ctx),
Long.parseLong(sizeStr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -81,7 +80,7 @@ static Materialized<Struct, GenericRow, KeyValueStore<Bytes, byte[]>> buildMater
buildValueSerde(formats, queryBuilder, physicalAggregationSchema, queryContext);

return materializedFactory
.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext), Optional.empty());
.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext));
}

static MaterializationInfo.Builder materializationInfoBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@
import org.apache.kafka.streams.processor.StateStore;

public interface MaterializedFactory {
<K, S extends StateStore> Materialized<K, GenericRow, S> create(Serde<K> keySerde,
Serde<GenericRow> valSerde,
String name,
Optional<Duration> retention);
<K, S extends StateStore> Materialized<K, GenericRow, S> create(
Serde<K> keySerde,
Serde<GenericRow> valSerde,
String name,
Optional<Duration> retention
);

<K, S extends StateStore> Materialized<K, GenericRow, S> create(
Serde<K> keySerde,
Serde<GenericRow> valSerde,
String name
);

static MaterializedFactory create() {
return create(
Expand Down Expand Up @@ -59,6 +67,14 @@ public <K, S extends StateStore> Materialized<K, GenericRow, S> create(
.withKeySerde(keySerde)
.withValueSerde(valSerde);
}

@Override
public <K, S extends StateStore> Materialized<K, GenericRow, S> create(
final Serde<K> keySerde,
final Serde<GenericRow> valSerde,
final String name) {
return create(keySerde, valSerde, name, Optional.empty());
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ public static KTableHolder<Struct> buildTable(
materializedFactory.create(
keySerde,
valueSerde,
tableChangeLogOpName(source.getProperties()),
Optional.empty()
tableChangeLogOpName(source.getProperties())
);

final KTable<Struct, GenericRow> ktable = buildKTable(
Expand Down Expand Up @@ -230,8 +229,7 @@ static KTableHolder<Windowed<Struct>> buildWindowedTable(
materializedFactory.create(
keySerde,
valueSerde,
tableChangeLogOpName(source.getProperties()),
Optional.empty()
tableChangeLogOpName(source.getProperties())
);

final KTable<Windowed<Struct>, GenericRow> ktable = buildKTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void setup() {
when(processingLoggerFactory.getLogger(anyString())).thenReturn(processingLogger);
when(streamsFactories.getConsumedFactory()).thenReturn(consumedFactory);
when(streamsFactories.getMaterializedFactory()).thenReturn(materializationFactory);
when(materializationFactory.create(any(), any(), any(), any()))
when(materializationFactory.create(any(), any(), any()))
.thenReturn((Materialized) materialized);

planBuilder = new KSPlanBuilder(
Expand Down Expand Up @@ -656,7 +656,7 @@ public void shouldBuildTableWithCorrectStoreName() {
tableSource.build(planBuilder);

// Then:
verify(materializationFactory).create(keySerde, valueSerde, "base-Reduce", Optional.empty());
verify(materializationFactory).create(keySerde, valueSerde, "base-Reduce");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void init() {

@SuppressWarnings("unchecked")
private void givenUnwindowedAggregate() {
when(materializedFactory.<Struct, KeyValueStore<Bytes, byte[]>>create(any(), any(), any(), any()))
when(materializedFactory.<Struct, KeyValueStore<Bytes, byte[]>>create(any(), any(), any()))
.thenReturn(materialized);
when(groupedStream.aggregate(any(), any(), any(Materialized.class))).thenReturn(aggregated);
when(aggregated.transformValues(any(), any(Named.class)))
Expand Down Expand Up @@ -361,7 +361,7 @@ public void shouldBuildMaterializedWithCorrectNameForUnwindowedAggregate() {
aggregate.build(planBuilder);

// Then:
verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any());
verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void init() {
when(aggregateParams.getAggregateSchema()).thenReturn(AGGREGATE_SCHEMA);
when(aggregateParams.getSchema()).thenReturn(AGGREGATE_SCHEMA);
when(aggregator.getResultMapper()).thenReturn(resultMapper);
when(materializedFactory.<Struct, KeyValueStore<Bytes, byte[]>>create(any(), any(), any(), any()))
when(materializedFactory.<Struct, KeyValueStore<Bytes, byte[]>>create(any(), any(), any()))
.thenReturn(materialized);
when(groupedTable.aggregate(any(), any(), any(), any(Materialized.class))).thenReturn(
aggregated);
Expand Down Expand Up @@ -218,7 +218,7 @@ public void shouldBuildMaterializedWithCorrectSerdesForAggregate() {
aggregate.build(planBuilder);

// Then:
verify(materializedFactory).create(same(keySerde), same(valueSerde), any(), any());
verify(materializedFactory).create(same(keySerde), same(valueSerde), any());
}

@Test
Expand All @@ -227,7 +227,7 @@ public void shouldBuildMaterializedWithCorrectNameForAggregate() {
aggregate.build(planBuilder);

// Then:
verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"), any());
verify(materializedFactory).create(any(), any(), eq("agg-regate-Materialize"));
}

@Test
Expand Down

0 comments on commit 795fc08

Please sign in to comment.