diff --git a/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java b/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java index 3626f827..8e98e855 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java @@ -16,23 +16,24 @@ package com.linecorp.decaton.processor; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; -import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.ProcessorSubscription; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; @@ -108,13 +109,13 @@ private void testRetryWithKeyValue( } @Test(timeout = 30000) - public void testPrintableAsciiStringKeyValue() throws Exception { + public void testBytesKeyValue() throws Exception { testRetryWithKeyValue( - new PrintableAsciiStringSerializer(), - "abc", - new PrintableAsciiStringSerializer(), - new StringDeserializer(), - "value" + new ByteArraySerializer(), + "key".getBytes(StandardCharsets.UTF_8), + new ByteArraySerializer(), + new ByteArrayDeserializer(), + "value".getBytes(StandardCharsets.UTF_8) ); }