From 8bc35599bb660428a15604f12cd5e70567111e58 Mon Sep 17 00:00:00 2001 From: dan norwood Date: Tue, 13 Mar 2018 11:03:23 -0700 Subject: [PATCH] handle streams kip-265 updates (#926) --- .../confluent/ksql/QueryTranslationTest.java | 4 +-- .../ksql/integration/JsonFormatTest.java | 33 ++++++++++++------- .../ksql/integration/WindowingIntTest.java | 25 +++++++------- .../confluent/ksql/serde/WindowedSerde.java | 8 ++--- 4 files changed, 40 insertions(+), 30 deletions(-) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/QueryTranslationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/QueryTranslationTest.java index 87ef54a683f0..3a5a4b844aba 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/QueryTranslationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/QueryTranslationTest.java @@ -26,9 +26,9 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.test.TestUtils; @@ -133,7 +133,7 @@ public Deserializer keyDeserializer() { if (window == null) { return Serdes.String().deserializer(); } - return new WindowedDeserializer<>(Serdes.String().deserializer(), window.size()); + return new TimeWindowedDeserializer(Serdes.String().deserializer(), window.size()); } @SuppressWarnings("unchecked") diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index 16292f98e098..9abc0ac2d281 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -16,13 +16,6 @@ package io.confluent.ksql.integration; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.KsqlEngine; -import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.testutils.EmbeddedSingleNodeKafkaCluster; -import io.confluent.ksql.util.*; - import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.RecordMetadata; @@ -30,9 +23,13 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +38,20 @@ import java.util.HashMap; import java.util.Map; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.KsqlEngine; +import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.testutils.EmbeddedSingleNodeKafkaCluster; +import io.confluent.ksql.util.KafkaTopicClient; +import io.confluent.ksql.util.KafkaTopicClientImpl; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.OrderDataProvider; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.SchemaUtil; +import io.confluent.ksql.util.TopicConsumer; +import io.confluent.ksql.util.TopicProducer; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -259,7 +270,7 @@ private Map, GenericRow> readWindowedResults( Schema resultSchema, int expectedNumMessages ) { - Deserializer> keyDeserializer = new WindowedDeserializer<>(new StringDeserializer()); + Deserializer> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer()); return topicConsumer.readResults(resultTopic, resultSchema, expectedNumMessages, keyDeserializer); } @@ -267,4 +278,4 @@ private void terminateQuery() { ksqlEngine.terminateQuery(queryId, true); } -} \ No newline at end of file +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java index d806eb52af4d..6a141279c14c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java @@ -1,19 +1,11 @@ package io.confluent.ksql.integration; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.KsqlContext; -import io.confluent.ksql.serde.DataSource; -import io.confluent.ksql.util.KafkaTopicClient; -import io.confluent.ksql.util.KafkaTopicClientImpl; -import io.confluent.ksql.util.OrderDataProvider; -import io.confluent.ksql.util.QueryMetadata; - import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -27,8 +19,15 @@ import java.util.Map; import java.util.Set; -import static org.hamcrest.MatcherAssert.assertThat; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.KsqlContext; +import io.confluent.ksql.util.KafkaTopicClient; +import io.confluent.ksql.util.KafkaTopicClientImpl; +import io.confluent.ksql.util.OrderDataProvider; +import io.confluent.ksql.util.QueryMetadata; + import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; @Category({IntegrationTest.class}) public class WindowingIntTest { @@ -137,7 +136,7 @@ public void shouldAggregateTumblingWindow() throws Exception { final Map results = new HashMap<>(); TestUtils.waitForCondition(() -> { - final Map, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new WindowedDeserializer<>(new StringDeserializer()), MAX_POLL_PER_ITERATION); + final Map, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new TimeWindowedDeserializer<>(new StringDeserializer()), MAX_POLL_PER_ITERATION); updateResults(results, windowedResults); final GenericRow actual = results.get("ITEM_1"); return expected.equals(actual); @@ -189,7 +188,7 @@ public void shouldAggregateHoppingWindow() throws Exception { final Map results = new HashMap<>(); TestUtils.waitForCondition(() -> { - final Map, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new WindowedDeserializer<>(new StringDeserializer()), 1000); + final Map, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new TimeWindowedDeserializer<>(new StringDeserializer()), 1000); updateResults(results, windowedResults); final GenericRow actual = results.get("ITEM_1"); return expected.equals(actual); @@ -236,7 +235,7 @@ public void shouldAggregateSessionWindow() throws Exception { final Map results = new HashMap<>(); TestUtils.waitForCondition(() -> { - final Map, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, datasetOneMetaData.size(), new WindowedDeserializer<>(new StringDeserializer()), 1000); + final Map, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, datasetOneMetaData.size(), new TimeWindowedDeserializer<>(new StringDeserializer()), 1000); updateResults(results, windowedResults); final GenericRow actual = results.get("ORDER_6"); return expectedResults.equals(actual) && results.size() == 6; diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/WindowedSerde.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/WindowedSerde.java index 445d575aed85..54bba4c4778d 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/WindowedSerde.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/WindowedSerde.java @@ -21,9 +21,9 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedSerializer; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; -import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import java.util.Map; @@ -33,8 +33,8 @@ public class WindowedSerde implements Serde> { private final Deserializer> deserializer; public WindowedSerde() { - serializer = new WindowedSerializer<>(new StringSerializer()); - deserializer = new WindowedDeserializer<>(new StringDeserializer()); + serializer = new TimeWindowedSerializer<>(new StringSerializer()); + deserializer = new TimeWindowedDeserializer<>(new StringDeserializer()); } @Override