Skip to content

Commit

Permalink
handle streams kip-265 updates (#926)
Browse files Browse the repository at this point in the history
  • Loading branch information
norwood authored Mar 13, 2018
1 parent 11bd83b commit 8bc3559
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.TestUtils;
Expand Down Expand Up @@ -133,7 +133,7 @@ public Deserializer keyDeserializer() {
if (window == null) {
return Serdes.String().deserializer();
}
return new WindowedDeserializer<>(Serdes.String().deserializer(), window.size());
return new TimeWindowedDeserializer(Serdes.String().deserializer(), window.size());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@

package io.confluent.ksql.integration;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.testutils.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.util.*;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +38,20 @@
import java.util.HashMap;
import java.util.Map;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.testutils.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.OrderDataProvider;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.TopicConsumer;
import io.confluent.ksql.util.TopicProducer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

Expand Down Expand Up @@ -259,12 +270,12 @@ private Map<Windowed<String>, GenericRow> readWindowedResults(
Schema resultSchema,
int expectedNumMessages
) {
Deserializer<Windowed<String>> keyDeserializer = new WindowedDeserializer<>(new StringDeserializer());
Deserializer<Windowed<String>> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer());
return topicConsumer.readResults(resultTopic, resultSchema, expectedNumMessages, keyDeserializer);
}

private void terminateQuery() {
ksqlEngine.terminateQuery(queryId, true);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package io.confluent.ksql.integration;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlContext;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.OrderDataProvider;
import io.confluent.ksql.util.QueryMetadata;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
Expand All @@ -27,8 +19,15 @@
import java.util.Map;
import java.util.Set;

import static org.hamcrest.MatcherAssert.assertThat;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlContext;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.OrderDataProvider;
import io.confluent.ksql.util.QueryMetadata;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Category({IntegrationTest.class})
public class WindowingIntTest {
Expand Down Expand Up @@ -137,7 +136,7 @@ public void shouldAggregateTumblingWindow() throws Exception {

final Map<String, GenericRow> results = new HashMap<>();
TestUtils.waitForCondition(() -> {
final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new WindowedDeserializer<>(new StringDeserializer()), MAX_POLL_PER_ITERATION);
final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new TimeWindowedDeserializer<>(new StringDeserializer()), MAX_POLL_PER_ITERATION);
updateResults(results, windowedResults);
final GenericRow actual = results.get("ITEM_1");
return expected.equals(actual);
Expand Down Expand Up @@ -189,7 +188,7 @@ public void shouldAggregateHoppingWindow() throws Exception {

final Map<String, GenericRow> results = new HashMap<>();
TestUtils.waitForCondition(() -> {
final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new WindowedDeserializer<>(new StringDeserializer()), 1000);
final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new TimeWindowedDeserializer<>(new StringDeserializer()), 1000);
updateResults(results, windowedResults);
final GenericRow actual = results.get("ITEM_1");
return expected.equals(actual);
Expand Down Expand Up @@ -236,7 +235,7 @@ public void shouldAggregateSessionWindow() throws Exception {
final Map<String, GenericRow> results = new HashMap<>();

TestUtils.waitForCondition(() -> {
final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, datasetOneMetaData.size(), new WindowedDeserializer<>(new StringDeserializer()), 1000);
final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, datasetOneMetaData.size(), new TimeWindowedDeserializer<>(new StringDeserializer()), 1000);
updateResults(results, windowedResults);
final GenericRow actual = results.get("ORDER_6");
return expectedResults.equals(actual) && results.size() == 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;

import java.util.Map;

Expand All @@ -33,8 +33,8 @@ public class WindowedSerde implements Serde<Windowed<String>> {
private final Deserializer<Windowed<String>> deserializer;

public WindowedSerde() {
serializer = new WindowedSerializer<>(new StringSerializer());
deserializer = new WindowedDeserializer<>(new StringDeserializer());
serializer = new TimeWindowedSerializer<>(new StringSerializer());
deserializer = new TimeWindowedDeserializer<>(new StringDeserializer());
}

@Override
Expand Down

0 comments on commit 8bc3559

Please sign in to comment.