diff --git a/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java b/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java index 9af75e69..b7a70999 100644 --- a/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java +++ b/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java @@ -38,7 +38,7 @@ public interface DecatonClient extends AutoCloseable { * * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(String key, T task); + CompletableFuture put(byte[] key, T task); /** * Put a task onto associated decaton queue with specifying arbitrary timestamp. @@ -47,7 +47,7 @@ public interface DecatonClient extends AutoCloseable { * @param timestamp milliseconds precision timestamp which is to be used to set timestamp of task metadata. * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(String key, T task, long timestamp); + CompletableFuture put(byte[] key, T task, long timestamp); /** * Put a task onto associated decaton queue with specifying some fields of task metadata. @@ -56,7 +56,7 @@ public interface DecatonClient extends AutoCloseable { * @param overrideTaskMetadata taskMetaData which can be set by users and used for event publish. * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(String key, T task, TaskMetadata overrideTaskMetadata); + CompletableFuture put(byte[] key, T task, TaskMetadata overrideTaskMetadata); /** * Put a task onto associated decaton queue. @@ -77,7 +77,7 @@ public interface DecatonClient extends AutoCloseable { * * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(String key, T task, Consumer errorCallback); + CompletableFuture put(byte[] key, T task, Consumer errorCallback); /** * Put a task onto associated decaton queue with specifying arbitrary timestamp. @@ -100,7 +100,7 @@ public interface DecatonClient extends AutoCloseable { * * @return a {@link CompletableFuture} which represents the result of task put. */ - default CompletableFuture put(String key, T task, long timestamp, + default CompletableFuture put(byte[] key, T task, long timestamp, Consumer errorCallback) { CompletableFuture result = put(key, task, timestamp); result.exceptionally(e -> { diff --git a/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java b/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java index ef512e12..699d58d4 100644 --- a/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java +++ b/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java @@ -24,9 +24,9 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import com.linecorp.decaton.client.internal.DecatonClientImpl; -import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer; import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer; import com.linecorp.decaton.common.Serializer; import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; @@ -54,9 +54,9 @@ public class DecatonClientBuilder { public static class DefaultKafkaProducerSupplier implements KafkaProducerSupplier { @Override - public Producer getProducer(Properties config) { + public Producer getProducer(Properties config) { return new KafkaProducer<>(config, - new PrintableAsciiStringSerializer(), + new ByteArraySerializer(), new ProtocolBuffersKafkaSerializer<>()); } } diff --git a/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java b/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java index bd9cb5b3..8d48efd2 100644 --- a/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java +++ b/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java @@ -39,5 +39,5 @@ public interface KafkaProducerSupplier { * @return an Kafka producer instance which implements {@link Producer}. The returned instance will be * closed along with {@link DecatonClient#close} being called. */ - Producer getProducer(Properties config); + Producer getProducer(Properties config); } diff --git a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java index 27736a2b..f19b4205 100644 --- a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java +++ b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java @@ -62,7 +62,7 @@ public DecatonClientImpl(String topic, } @Override - public CompletableFuture put(String key, T task, long timestamp) { + public CompletableFuture put(byte[] key, T task, long timestamp) { TaskMetadataProto taskMetadata = TaskMetadataProto.newBuilder() .setTimestampMillis(timestamp) .setSourceApplicationId(applicationId) @@ -73,17 +73,17 @@ public CompletableFuture put(String key, T task, long timestamp) } @Override - public CompletableFuture put(String key, T task, TaskMetadata overrideTaskMetadata) { + public CompletableFuture put(byte[] key, T task, TaskMetadata overrideTaskMetadata) { return put(key, task, convertToTaskMetadataProto(overrideTaskMetadata)); } @Override - public CompletableFuture put(String key, T task) { + public CompletableFuture put(byte[] key, T task) { return put(key, task, timestampSupplier.get()); } @Override - public CompletableFuture put(String key, T task, Consumer errorCallback) { + public CompletableFuture put(byte[] key, T task, Consumer errorCallback) { return put(key, task, timestampSupplier.get(), errorCallback); } @@ -92,7 +92,7 @@ public void close() throws Exception { producer.close(); } - private CompletableFuture put(String key, T task, TaskMetadataProto taskMetadataProto) { + private CompletableFuture put(byte[] key, T task, TaskMetadataProto taskMetadataProto) { byte[] serializedTask = serializer.serialize(task); DecatonTaskRequest request = diff --git a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java index 1f95d31b..809b9916 100644 --- a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java +++ b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java @@ -44,7 +44,7 @@ public class DecatonTaskProducer implements AutoCloseable { presetProducerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); } - private final Producer producer; + private final Producer producer; private final String topic; private static Properties completeProducerConfig(Properties producerConfig) { @@ -61,8 +61,8 @@ public DecatonTaskProducer(String topic, Properties producerConfig, this.topic = topic; } - public CompletableFuture sendRequest(String key, DecatonTaskRequest request) { - ProducerRecord record = new ProducerRecord<>(topic, key, request); + public CompletableFuture sendRequest(byte[] key, DecatonTaskRequest request) { + ProducerRecord record = new ProducerRecord<>(topic, key, request); CompletableFuture result = new CompletableFuture<>(); producer.send(record, (metadata, exception) -> { diff --git a/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java b/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java index 08488b8c..dea6490f 100644 --- a/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java @@ -44,12 +44,12 @@ public class DecatonClientBuilderTest { public MockitoRule rule = MockitoJUnit.rule(); @Mock - private Producer producer; + private Producer producer; @Captor - private ArgumentCaptor> recordCaptor; + private ArgumentCaptor> recordCaptor; - private ProducerRecord doProduce(DecatonClient dclient) { + private ProducerRecord doProduce(DecatonClient dclient) { dclient.put(null, HelloTask.getDefaultInstance()); verify(producer, times(1)).send(recordCaptor.capture(), any(Callback.class)); return recordCaptor.getValue(); @@ -69,7 +69,7 @@ public void testBuild() { .producerSupplier(config -> producer) .build(); - ProducerRecord record = doProduce(dclient); + ProducerRecord record = doProduce(dclient); assertEquals(topic, record.topic()); TaskMetadataProto metadata = record.value().getMetadata(); diff --git a/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java b/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java index b0c1fe43..f036d745 100644 --- a/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java @@ -45,22 +45,22 @@ public class DecatonClientTest { @Spy private final DecatonClient decaton = new DecatonClient() { @Override - public CompletableFuture put(String key, HelloTask task) { + public CompletableFuture put(byte[] key, HelloTask task) { return null; } @Override - public CompletableFuture put(String key, HelloTask task, long timestamp) { + public CompletableFuture put(byte[] key, HelloTask task, long timestamp) { return null; } @Override - public CompletableFuture put(String key, HelloTask task, TaskMetadata overrideTaskMetadata) { + public CompletableFuture put(byte[] key, HelloTask task, TaskMetadata overrideTaskMetadata) { return null; } @Override - public CompletableFuture put(String key, HelloTask task, + public CompletableFuture put(byte[] key, HelloTask task, Consumer errorCallback) { return null; } diff --git a/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java b/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java index f55d003e..81891f7f 100644 --- a/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.function.Supplier; @@ -54,7 +55,7 @@ public class DecatonClientImplTest { public MockitoRule rule = MockitoJUnit.rule(); @Mock - private Producer producer; + private Producer producer; @Mock private Supplier timestampSupplier; @@ -62,7 +63,7 @@ public class DecatonClientImplTest { private DecatonClientImpl client; @Captor - private ArgumentCaptor> captor; + private ArgumentCaptor> captor; @Before public void setUp() { @@ -75,10 +76,10 @@ APPLICATION_ID, INSTANCE_ID, new Properties(), public void testTimestampFieldSetInternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance()); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance()); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); assertEquals(1234, record.value().getMetadata().getTimestampMillis()); } @@ -87,10 +88,10 @@ public void testTimestampFieldSetInternally() { public void testTimestampFieldSetInternallyWithCallback() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance(), ignored -> {}); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), ignored -> {}); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); assertEquals(1234, record.value().getMetadata().getTimestampMillis()); } @@ -99,10 +100,10 @@ public void testTimestampFieldSetInternallyWithCallback() { public void testTimestampFieldSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance(), 5678); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); assertEquals(5678, record.value().getMetadata().getTimestampMillis()); } @@ -111,10 +112,10 @@ public void testTimestampFieldSetExternally() { public void testTimestampFieldSetExternallyWithCallback() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance(), 5678, ignored -> {}); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678, ignored -> {}); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); assertEquals(5678, record.value().getMetadata().getTimestampMillis()); } @@ -123,10 +124,10 @@ public void testTimestampFieldSetExternallyWithCallback() { public void testTaskMetaDataSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder() - .timestamp(5678L) - .scheduledTime(6912L) - .build()); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder() + .timestamp(5678L) + .scheduledTime(6912L) + .build()); verifyAndAssertTaskMetadata(5678L, 6912L); } @@ -135,9 +136,9 @@ public void testTaskMetaDataSetExternally() { public void testWithScheduledTimeSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder() - .scheduledTime(181234L) - .build()); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder() + .scheduledTime(181234L) + .build()); verifyAndAssertTaskMetadata(1234L, 181234L); } @@ -146,10 +147,10 @@ public void testWithScheduledTimeSetExternally() { public void testWithEmptyTaskMetaDataSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build()); + client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder().build()); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertTrue(record.value().getMetadata().getTimestampMillis() > 0); assertNotNull(record.value().getMetadata().getSourceApplicationId()); assertNotNull(record.value().getMetadata().getSourceInstanceId()); @@ -157,7 +158,7 @@ public void testWithEmptyTaskMetaDataSetExternally() { private void verifyAndAssertTaskMetadata(long timestamp, long scheduledTime) { verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); assertEquals(timestamp, record.value().getMetadata().getTimestampMillis()); assertEquals(scheduledTime, record.value().getMetadata().getScheduledTimeMillis()); diff --git a/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java b/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java index 23bab456..416bec65 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -153,17 +154,17 @@ public void testSingleThreadProcessing() throws Exception { // Note that this processing semantics is not be considered as Decaton specification which users can rely on. // Rather, this is just a expected behavior based on current implementation when we set concurrency to 1. ProcessingGuarantee noDuplicates = new ProcessingGuarantee() { - private final Map> produced = new HashMap<>(); - private final Map> processed = new HashMap<>(); + private final Map> produced = new HashMap<>(); + private final Map> processed = new HashMap<>(); @Override public synchronized void onProduce(ProducedRecord record) { - produced.computeIfAbsent(record.key(), key -> new ArrayList<>()).add(record.task()); + produced.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record.task()); } @Override public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) { - processed.computeIfAbsent(record.key(), key -> new ArrayList<>()).add(record.task()); + processed.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record.task()); } @Override diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java index 2ae3b496..7b5bde6f 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.assertEquals; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -57,12 +59,12 @@ public void tearDown() { @Test(timeout = 30000) public void testPropertyDynamicSwitch() throws Exception { - Set keys = new HashSet<>(); + Set keys = new HashSet<>(); for (int i = 0; i < 10000; i++) { - keys.add("key" + i); + keys.add(ByteBuffer.wrap(("key" + i).getBytes(StandardCharsets.UTF_8))); } - Set processedKeys = Collections.synchronizedSet(new HashSet<>()); + Set processedKeys = Collections.synchronizedSet(new HashSet<>()); CountDownLatch processLatch = new CountDownLatch(keys.size()); DecatonProcessor processor = (context, task) -> { @@ -81,11 +83,11 @@ public void testPropertyDynamicSwitch() throws Exception { DecatonClient client = TestUtils.client(topicName, rule.bootstrapServers())) { int count = 0; - for (String key : keys) { + for (ByteBuffer key : keys) { if (++count % 1000 == 0) { rateProp.set((long) count / 10); } - client.put(key, HelloTask.getDefaultInstance()); + client.put(key.array(), HelloTask.getDefaultInstance()); } processLatch.await(); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/ProcessingContext.java b/processor/src/main/java/com/linecorp/decaton/processor/ProcessingContext.java index 7434c5de..8eeeb4ce 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/ProcessingContext.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/ProcessingContext.java @@ -40,7 +40,7 @@ public interface ProcessingContext { * @return the key associated to the task now being processed. can be null if key isn't supplied for the * task. */ - String key(); + byte[] key(); /** * Returns the {@link Headers} which is associated to the source {@link ConsumerRecord} of the task. diff --git a/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java b/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java index 51ec0e60..e8e09a8f 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java @@ -104,7 +104,7 @@ public void close() { public class SubscriptionMetrics extends AbstractMetrics { volatile KafkaClientMetrics kafkaClientMetrics; - public void bindClientMetrics(Consumer consumer) { + public void bindClientMetrics(Consumer consumer) { kafkaClientMetrics = new KafkaClientMetrics(consumer, availableTags.subscriptionScope()); kafkaClientMetrics.bindTo(registry); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/CompactionProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/CompactionProcessor.java index 18699ae3..eaa872b8 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/CompactionProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/CompactionProcessor.java @@ -16,6 +16,7 @@ package com.linecorp.decaton.processor.processors; +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -84,7 +85,7 @@ public enum CompactChoice { } private final ScheduledExecutorService executor; - private final ConcurrentMap> windowedTasks = new ConcurrentHashMap<>(); + private final ConcurrentMap> windowedTasks = new ConcurrentHashMap<>(); private final BiFunction, CompactingTask, CompactChoice> compactor; private final long lingerMillis; @@ -145,8 +146,8 @@ public CompactionProcessor( this(lingerMillis, compactor, null); } - private void flush(String key) throws InterruptedException { - CompactingTask task = windowedTasks.remove(key); + private void flush(byte[] key) throws InterruptedException { + CompactingTask task = windowedTasks.remove(ByteBuffer.wrap(key)); if (task == null) { return; } @@ -155,7 +156,7 @@ private void flush(String key) throws InterruptedException { // visible for testing Runnable flushTask(ProcessingContext context) { - String key = context.key(); + byte[] key = context.key(); return () -> { try { flush(key); @@ -182,15 +183,15 @@ private void scheduleFlush(ProcessingContext context) { @Override public void process(ProcessingContext context, T task) throws InterruptedException { - String key = context.key(); + byte[] key = context.key(); CompactingTask newTask = new CompactingTask<>(context.deferCompletion(), context, task); // Even though we do this read and following updates in separate operation, race condition can't be // happened because tasks are guaranteed to be serialized by it's key, so simultaneous processing // of tasks sharing the same key won't be happen. - CompactingTask prevTask = windowedTasks.get(key); + CompactingTask prevTask = windowedTasks.get(ByteBuffer.wrap(key)); if (prevTask == null) { - windowedTasks.put(key, newTask); + windowedTasks.put(ByteBuffer.wrap(key), newTask); scheduleFlush(context); return; } @@ -205,7 +206,7 @@ public void process(ProcessingContext context, T task) throws InterruptedExce case PICK_RIGHT: case PICK_EITHER: // Newer task has larger offset. We want to forward consumed offset. // Update the last task with new one. - Object oldEntry = windowedTasks.put(key, newTask); + Object oldEntry = windowedTasks.put(ByteBuffer.wrap(key), newTask); if (oldEntry == null) { // By race condition, there is a chance that the scheduled flush for preceding task just // got fired right after this method checked the key's existence at the beginning of this diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index 274a580b..ad368666 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -93,7 +93,7 @@ public void updateAssignment(Collection newAssignment) { } @Override - public void receive(ConsumerRecord record) { + public void receive(ConsumerRecord record) { TopicPartition tp = new TopicPartition(record.topic(), record.partition()); PartitionContext context = contexts.get(tp); @@ -130,7 +130,7 @@ public void receive(ConsumerRecord record) { } ProcessorSubscription(SubscriptionScope scope, - Supplier> consumerSupplier, + Supplier> consumerSupplier, Processors processors, ProcessorProperties props, SubscriptionStateListener stateListener, @@ -141,7 +141,7 @@ public void receive(ConsumerRecord record) { this.contexts = contexts; metrics = Metrics.withTags("subscription", scope.subscriptionId()).new SubscriptionMetrics(); - Consumer consumer = consumerSupplier.get(); + Consumer consumer = consumerSupplier.get(); if (props.get(CONFIG_BIND_CLIENT_METRICS).value()) { metrics.bindClientMetrics(consumer); } @@ -159,7 +159,7 @@ public void receive(ConsumerRecord record) { } public ProcessorSubscription(SubscriptionScope scope, - Supplier> consumerSupplier, + Supplier> consumerSupplier, Processors processors, ProcessorProperties props, SubscriptionStateListener stateListener) { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/BlacklistedKeysFilter.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/BlacklistedKeysFilter.java index 9e27e1d1..c2ac193c 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/BlacklistedKeysFilter.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/BlacklistedKeysFilter.java @@ -18,6 +18,7 @@ import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_IGNORE_KEYS; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Set; @@ -37,11 +38,18 @@ public BlacklistedKeysFilter(ProcessorProperties props) { .listen((oldValue, newValue) -> ignoreKeys = new HashSet<>(newValue)); } - public boolean shouldTake(ConsumerRecord record) { + public boolean shouldTake(ConsumerRecord record) { + final byte[] key = record.key(); + if (key == null) { + return true; + } + + final String stringKey = new String(key, StandardCharsets.UTF_8); + // Preceding isEmpty() check is for reducing tiny overhead applied for each contains() by calling // Object#hashCode. Since ignoreKeys should be empty for most cases.. - if (!ignoreKeys.isEmpty() && ignoreKeys.contains(record.key())) { - logger.debug("Ignore task which has key configured to ignore: {}", record.key()); + if (!ignoreKeys.isEmpty() && ignoreKeys.contains(stringKey)) { + logger.debug("Ignore task which has key configured to ignore: {}", stringKey); return false; } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManager.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManager.java index 6775f8c4..a2b9c707 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManager.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManager.java @@ -93,16 +93,16 @@ public interface ConsumerHandler { * Process a {@link ConsumerRecord} that has been consumed from the topic. * @param record a record that has been fetched from the target topic. */ - void receive(ConsumerRecord record); + void receive(ConsumerRecord record); } - private final Consumer consumer; + private final Consumer consumer; private final PartitionStates states; private final ConsumerHandler handler; private final SubscriptionMetrics metrics; private final AtomicBoolean consumerClosing; - public ConsumeManager(Consumer consumer, + public ConsumeManager(Consumer consumer, PartitionStates states, ConsumerHandler handler, SubscriptionMetrics metrics) { @@ -162,7 +162,7 @@ public void onPartitionsAssigned(Collection partitions) { */ public void poll() { Timer timer = Utils.timer(); - ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MILLIS); + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MILLIS); metrics.consumerPollTime.record(timer.duration()); timer = Utils.timer(); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumerSupplier.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumerSupplier.java index 39be5e60..65b171c2 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumerSupplier.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumerSupplier.java @@ -26,9 +26,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -public class ConsumerSupplier implements Supplier> { +public class ConsumerSupplier implements Supplier> { public static final int DEFAULT_MAX_POLL_RECORDS = 100; private static final Map configOverwrites = new HashMap() {{ @@ -42,8 +41,8 @@ public ConsumerSupplier(Properties config) { } @Override - public Consumer get() { - return new KafkaConsumer<>(mergedProps(), new StringDeserializer(), new ByteArrayDeserializer()); + public Consumer get() { + return new KafkaConsumer<>(mergedProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); } private Properties mergedProps() { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImpl.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImpl.java index b950ee2a..89cc8402 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImpl.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImpl.java @@ -68,7 +68,7 @@ public TaskMetadata metadata() { } @Override - public String key() { + public byte[] key() { return request.key(); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubPartitioner.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubPartitioner.java index 6298d273..2128a404 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubPartitioner.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/SubPartitioner.java @@ -16,6 +16,7 @@ package com.linecorp.decaton.processor.runtime.internal; +import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; class SubPartitioner { @@ -31,7 +32,7 @@ private static int toPositive(int number) { return number & 2147483647; } - public int partitionFor(String key) { + public int partitionFor(byte[] key) { if (key == null) { return toPositive((int) monotonicValueSupplier.getAndIncrement()) % bound; } else { @@ -40,8 +41,13 @@ public int partitionFor(String key) { // Here just by adding few bytes to the key we can "shift" hashing of the key and // can get back better distribution again in murmur2 result to evenly distribute keys // for subpartitions. - String shiftedKey = "s:" + key; - int hash = org.apache.kafka.common.utils.Utils.murmur2(shiftedKey.getBytes()); + // TODO: Eliminate array copy here + final ByteBuffer bb = ByteBuffer.allocate(key.length + 2); + bb.put((byte) 's'); + bb.put((byte) ':'); + bb.put(key); + + int hash = org.apache.kafka.common.utils.Utils.murmur2(bb.array()); return toPositive(hash) % bound; } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java index 7d88eba0..64a013e7 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java @@ -32,7 +32,7 @@ public class TaskRequest { private final TopicPartition topicPartition; private final long recordOffset; private final OffsetState offsetState; - private final String key; + private final byte[] key; @ToString.Exclude private final Headers headers; @ToString.Exclude @@ -43,7 +43,7 @@ public class TaskRequest { public TaskRequest(TopicPartition topicPartition, long recordOffset, OffsetState offsetState, - String key, + byte[] key, Headers headers, RecordTraceHandle trace, byte[] rawRequestBytes) { diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java index 254c42c5..44ee2134 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -44,12 +45,12 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import com.linecorp.decaton.processor.Completion; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactChoice; import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactingTask; -import com.linecorp.decaton.processor.Completion; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.internal.ProcessingContextImpl; @@ -101,7 +102,7 @@ private TaskInput put(DecatonProcessor processor, taskData, taskData.toByteArray()); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, name, null, NoopTrace.INSTANCE, null); + new TopicPartition("topic", 1), 1, null, name.getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, null); ProcessingContext context = spy(new ProcessingContextImpl<>("subscription", request, task, Arrays.asList(processor, downstream), null, diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index e9036643..98b0b3fe 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -38,11 +39,11 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -61,7 +62,6 @@ import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.DeferredCompletion; -import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.SubscriptionStateListener.State; import com.linecorp.decaton.processor.runtime.internal.ConsumerSupplier; @@ -75,13 +75,13 @@ public class ProcessorSubscriptionTest { public MockitoRule rule = MockitoJUnit.rule(); @Mock - Consumer consumer; + Consumer consumer; /** * A mock consumer which exposes rebalance listener so that can be triggered manually * ({@link MockConsumer} doesn't simulate rebalance listener invocation. refs: KAFKA-6968). */ - private static class DecatonMockConsumer extends MockConsumer { + private static class DecatonMockConsumer extends MockConsumer { volatile ConsumerRebalanceListener rebalanceListener; private DecatonMockConsumer() { @@ -109,7 +109,7 @@ private static SubscriptionScope scope(String topic, long waitForProcessingOnClo ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS); } - private static ProcessorSubscription subscription(Consumer consumer, + private static ProcessorSubscription subscription(Consumer consumer, SubscriptionStateListener listener, TopicPartition tp, DecatonProcessor processor) { @@ -172,7 +172,8 @@ public void testOffsetRegression() throws Exception { feedOffsets.add(100L); feedOffsets.add(101L); CountDownLatch processLatch = new CountDownLatch(1); - ProcessorSubscription subscription = subscription(consumer, ignored -> {}, tp, (context, task) -> { + ProcessorSubscription subscription = subscription(consumer, ignored -> { + }, tp, (context, task) -> { if ("101".equals(task)) { processLatch.countDown(); } @@ -195,7 +196,7 @@ public void testOffsetRegression() throws Exception { if (offset != null) { return new ConsumerRecords<>(singletonMap(tp, Collections.singletonList( // Feed one record, then a subsequent record of the regressing offset. - new ConsumerRecord<>(tp.topic(), tp.partition(), offset, "abc", + new ConsumerRecord<>(tp.topic(), tp.partition(), offset, "abc".getBytes(StandardCharsets.UTF_8), String.valueOf(offset).getBytes())))); } else { Thread.sleep(invocation.getArgument(0)); @@ -217,7 +218,7 @@ public void testTerminateAsync() throws Exception { TopicPartition tp = new TopicPartition("topic", 0); DecatonMockConsumer consumer = new DecatonMockConsumer() { @Override - public synchronized ConsumerRecords poll(Duration timeout) { + public synchronized ConsumerRecords poll(Duration timeout) { rebalanceListener.onPartitionsAssigned(assignment()); return super.poll(timeout); } @@ -262,9 +263,9 @@ public synchronized ConsumerRecords poll(Duration timeout) { // First task finishes synchronous part of processing, starts async processing // Second task blocks during synchronous part of processing // Third task will be queued behind it - consumer.addRecord(new ConsumerRecord<>(tp.topic(), tp.partition(), 10, "", NO_DATA)); - consumer.addRecord(new ConsumerRecord<>(tp.topic(), tp.partition(), 11, "", NO_DATA)); - consumer.addRecord(new ConsumerRecord<>(tp.topic(), tp.partition(), 12, "", NO_DATA)); + consumer.addRecord(new ConsumerRecord<>(tp.topic(), tp.partition(), 10, new byte[0], NO_DATA)); + consumer.addRecord(new ConsumerRecord<>(tp.topic(), tp.partition(), 11, new byte[0], NO_DATA)); + consumer.addRecord(new ConsumerRecord<>(tp.topic(), tp.partition(), 12, new byte[0], NO_DATA)); asyncProcessingStarted.await(); subscription.initiateShutdown(); assertTrue(consumer.committed(singleton(tp)).isEmpty()); @@ -280,7 +281,8 @@ public synchronized ConsumerRecords poll(Duration timeout) { @Test(timeout = 5000) public void closeWithoutStart() throws Exception { TopicPartition tp = new TopicPartition("topic", 0); - ProcessorSubscription subscription = subscription(consumer, null, tp, (context, task) -> {}); + ProcessorSubscription subscription = subscription(consumer, null, tp, (context, task) -> { + }); // The main point is that the below close returns within timeout. subscription.close(); verify(consumer).close(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java index e6425a7c..61e6fe4d 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java @@ -62,10 +62,10 @@ public class ConsumeManagerTest { static final String TOPIC = "topic"; @Mock - Consumer consumer; + Consumer consumer; @Captor - ArgumentCaptor> recordsCaptor; + ArgumentCaptor> recordsCaptor; @Mock PartitionStates states; diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java index 4cb459c9..0feb46f6 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -76,13 +77,13 @@ public void setUp() { processor = new DecatonTaskRetryQueueingProcessor(scope, producer); doReturn(CompletableFuture.completedFuture(null)).when(producer).sendRequest(any(), any()); doReturn(new CompletionImpl()).when(context).deferCompletion(); - doReturn("key").when(context).key(); + doReturn("key".getBytes(StandardCharsets.UTF_8)).when(context).key(); doReturn(TaskMetadata.builder().build()).when(context).metadata(); } @Test public void testRetryRequest() throws InterruptedException { - String key = "key"; + byte[] key = "key".getBytes(StandardCharsets.UTF_8); TaskMetadata meta = TaskMetadata.builder() .sourceApplicationId("unit-test") diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java index c36a696d..07f67473 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.util.Collections; import java.util.Optional; @@ -92,7 +93,7 @@ public class ProcessPipelineTest { private static TaskRequest taskRequest() { return new TaskRequest( - new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST", null, NoopTrace.INSTANCE, REQUEST.toByteArray()); + new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, REQUEST.toByteArray()); } @Rule @@ -252,4 +253,3 @@ public void testScheduleThenProcess_Terminate() throws InterruptedException { assertFalse(request.offsetState().completion().isComplete()); } } - diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java index 7bb03707..d40f616a 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -110,7 +111,7 @@ private static void terminateExecutor(ExecutorService executor) throws Interrupt private static ProcessingContextImpl context(RecordTraceHandle traceHandle, DecatonProcessor... processors) { TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST", + new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, traceHandle, REQUEST.toByteArray()); DecatonTask task = new DecatonTask<>( TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray()); @@ -361,7 +362,7 @@ public void process(ProcessingContext context, byte[] task) } }); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST", null, null, REQUEST.toByteArray()); + new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, REQUEST.toByteArray()); DecatonTask task = new DecatonTask<>( TaskMetadata.fromProto(REQUEST.getMetadata()), TASK.toByteArray(), TASK.toByteArray()); @@ -405,7 +406,7 @@ public void process(ProcessingContext context, byte[] task) } }); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST", null, null, REQUEST.toByteArray()); + new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, REQUEST.toByteArray()); DecatonTask task = new DecatonTask<>( TaskMetadata.fromProto(REQUEST.getMetadata()), TASK.toByteArray(), TASK.toByteArray()); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/SubPartitionerTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/SubPartitionerTest.java index 222904c8..25fc9a2e 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/SubPartitionerTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/SubPartitionerTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -35,12 +36,12 @@ public class SubPartitionerTest { static final int[] SUBPARTITION_COUNTS = { 10, 17, 32, 64 }; static final double TARGET_STDDEV_RATIO = 0.05; - static final String[] keys = new String[DIST_KEYS_COUNT]; + static final byte[][] keys = new byte[DIST_KEYS_COUNT][]; @Before public void setUp() { for (int i = 0; i < keys.length; i++) { - keys[i] = String.valueOf(i); + keys[i] = String.valueOf(i).getBytes(StandardCharsets.UTF_8); } } @@ -54,14 +55,14 @@ private static double stddev(int[] counts) { @Test public void testEvenlyDistributedSelection() { for (int partitionCount : PARTITION_COUNTS) { - List> partitions = new ArrayList<>(partitionCount); + List> partitions = new ArrayList<>(partitionCount); for (int i = 0; i < partitionCount; i++) { partitions.add(new ArrayList<>()); } - for (String key : keys) { + for (byte[] key : keys) { // This is the way used to determine partition in Kafka's DefaultPartitioner - int partition = (Utils.murmur2(key.getBytes()) & 2147483647) % partitionCount; + int partition = (Utils.murmur2(key) & 2147483647) % partitionCount; partitions.get(partition).add(key); } @@ -71,10 +72,10 @@ public void testEvenlyDistributedSelection() { partitionCount, partStddev, Arrays.toString(partCounts)); for (int subpartitionCount : SUBPARTITION_COUNTS) { - for (List partition : partitions) { + for (List partition : partitions) { int[] counts = new int[subpartitionCount]; SubPartitioner subPartitioner = new SubPartitioner(counts.length); - for (String key : partition) { + for (byte[] key : partition) { int subPartition = subPartitioner.partitionFor(key); counts[subPartition]++; } @@ -96,7 +97,7 @@ public void testEvenlyDistributedSelection() { public void testConsistentSelectionForSameKeys() { for (int subpartitionCount : SUBPARTITION_COUNTS) { SubPartitioner subPartitioner = new SubPartitioner(subpartitionCount); - for (String key : keys) { + for (byte[] key : keys) { int assign1 = subPartitioner.partitionFor(key); int assign2 = subPartitioner.partitionFor(key); assertEquals(String.format("[%d] assign of %s", subpartitionCount, key), diff --git a/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java b/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java index 16c6b659..cd066261 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java @@ -29,27 +29,24 @@ import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; import java.util.function.Consumer; -import java.util.function.Supplier; -import com.linecorp.decaton.processor.runtime.ProcessorProperties; -import com.linecorp.decaton.processor.runtime.Property; -import com.linecorp.decaton.processor.runtime.PropertySupplier; -import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; -import com.linecorp.decaton.processor.runtime.SubscriptionStateListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; import com.google.protobuf.MessageLite; import com.linecorp.decaton.client.DecatonClient; -import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer; import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer; import com.linecorp.decaton.common.Serializer; -import com.linecorp.decaton.processor.runtime.SubscriptionStateListener.State; import com.linecorp.decaton.processor.runtime.ProcessorSubscription; +import com.linecorp.decaton.processor.runtime.Property; +import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; +import com.linecorp.decaton.processor.runtime.SubscriptionStateListener; +import com.linecorp.decaton.processor.runtime.SubscriptionStateListener.State; import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer; import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; @@ -112,9 +109,9 @@ public static DecatonClient client(String topic, * @param bootstrapServers bootstrap servers to connect * @return {@link Producer} instance with preset configurations */ - public static Producer producer(String bootstrapServers) { + public static Producer producer(String bootstrapServers) { return new KafkaProducer<>(defaultProducerProps(bootstrapServers), - new PrintableAsciiStringSerializer(), + new ByteArraySerializer(), new ProtocolBuffersKafkaSerializer<>()); } diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessOrdering.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessOrdering.java index a7607622..de5d15d7 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessOrdering.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessOrdering.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -33,26 +34,26 @@ public class ProcessOrdering implements ProcessingGuarantee { private final Map taskToOffset = new HashMap<>(); - private final Map> producedRecords = new HashMap<>(); - private final Map> processedRecords = new HashMap<>(); + private final Map> producedRecords = new HashMap<>(); + private final Map> processedRecords = new HashMap<>(); @Override public synchronized void onProduce(ProducedRecord record) { taskToOffset.put(record.task(), record.offset()); - producedRecords.computeIfAbsent(record.key(), + producedRecords.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record.task()); } @Override public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) { - processedRecords.computeIfAbsent(record.key(), + processedRecords.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record.task()); } @Override public void doAssert() { - for (Entry> entry : producedRecords.entrySet()) { - String key = entry.getKey(); + for (Entry> entry : producedRecords.entrySet()) { + final ByteBuffer key = entry.getKey(); List produced = entry.getValue(); List processed = processedRecords.get(key); diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessedRecord.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessedRecord.java index 6f15f831..8fad8907 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessedRecord.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessedRecord.java @@ -30,7 +30,7 @@ public class ProcessedRecord { /** * Key of the task */ - String key; + byte[] key; /** * Processed task */ diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java index 2bfc6663..78a48ffd 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java @@ -18,6 +18,7 @@ import static com.linecorp.decaton.testing.TestUtils.DEFINITELY_TOO_SLOW; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; @@ -95,7 +96,7 @@ public class ProcessorTestSuite { private final Set semantics; private final SubscriptionStatesListener statesListener; private final TracingProvider tracingProvider; - private final Function> producerSupplier; + private final Function> producerSupplier; private final TaskExtractor customTaskExtractor; private static final int DEFAULT_NUM_TASKS = 10000; @@ -153,7 +154,7 @@ public static class Builder { * Expected use case: * supply a producer which adds tracing id to each message to test tracing-functionality in e2e */ - private Function> producerSupplier = TestUtils::producer; + private Function> producerSupplier = TestUtils::producer; /** * Supply custom {@link TaskExtractor} to be used to extract a task. */ @@ -221,7 +222,7 @@ public void run() throws InterruptedException, ExecutionException, TimeoutExcept CountDownLatch rollingRestartLatch = new CountDownLatch(numTasks / 2); ProcessorSubscription[] subscriptions = new ProcessorSubscription[NUM_SUBSCRIPTION_INSTANCES]; - try (Producer producer = producerSupplier.apply(rule.bootstrapServers())) { + try (Producer producer = producerSupplier.apply(rule.bootstrapServers())) { ConcurrentMap retryOffsets = new ConcurrentHashMap<>(); for (int i = 0; i < subscriptions.length; i++) { subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch), retryOffsets); @@ -351,7 +352,7 @@ private void awaitAllOffsetsCommitted(Map producedOffsets) * @return A CompletableFuture of Map, which holds partition as the key and max offset as the value */ private CompletableFuture> produceTasks( - Producer producer, + Producer producer, String topic, Consumer onProduce) { @SuppressWarnings("unchecked") @@ -360,7 +361,7 @@ private CompletableFuture> produceTasks( TestTaskSerializer serializer = new TestTaskSerializer(); for (int i = 0; i < produceFutures.length; i++) { TestTask task = new TestTask(String.valueOf(i)); - String key = String.valueOf(i % NUM_KEYS); + byte[] key = String.valueOf(i % NUM_KEYS).getBytes(StandardCharsets.UTF_8); TaskMetadataProto taskMetadata = TaskMetadataProto.newBuilder() .setTimestampMillis(System.currentTimeMillis()) @@ -372,7 +373,7 @@ private CompletableFuture> produceTasks( .setMetadata(taskMetadata) .setSerializedTask(ByteString.copyFrom(serializer.serialize(task))) .build(); - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>(topic, null, taskMetadata.getTimestampMillis(), key, request); CompletableFuture future = new CompletableFuture<>(); produceFutures[i] = future; @@ -403,17 +404,17 @@ private CompletableFuture> produceTasks( }); } - private static class InterceptingProducer extends ProducerAdaptor { + private static class InterceptingProducer extends ProducerAdaptor { private final Consumer interceptor; - InterceptingProducer(Producer delegate, + InterceptingProducer(Producer delegate, Consumer interceptor) { super(delegate); this.interceptor = interceptor; } @Override - public Future send(ProducerRecord record, + public Future send(ProducerRecord record, Callback callback) { return super.send(record, (meta, e) -> { if (meta != null) { diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducedRecord.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducedRecord.java index 3dec4a55..75ed832a 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducedRecord.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducedRecord.java @@ -31,7 +31,7 @@ public class ProducedRecord { /** * Key of the task */ - String key; + byte[] key; /** * Topic partition the record was sent to */ diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/SerialProcessing.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/SerialProcessing.java index ebd70131..149c72ec 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/SerialProcessing.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/SerialProcessing.java @@ -19,6 +19,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.lessThan; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -29,7 +30,7 @@ import com.linecorp.decaton.processor.TaskMetadata; public class SerialProcessing implements ProcessingGuarantee { - private final Map> records = new HashMap<>(); + private final Map> records = new HashMap<>(); @Override public void onProduce(ProducedRecord record) { @@ -38,14 +39,14 @@ public void onProduce(ProducedRecord record) { @Override public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) { - records.computeIfAbsent(record.key(), + records.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record); } @Override public void doAssert() { // Checks there's no overlap between two consecutive records' processing time - for (Entry> entry : records.entrySet()) { + for (Entry> entry : records.entrySet()) { List perKeyRecords = entry.getValue(); perKeyRecords.sort(Comparator.comparingLong(ProcessedRecord::startTimeNanos)); diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java index 96736c51..7f9cfd8d 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java @@ -29,12 +29,12 @@ import com.linecorp.decaton.processor.tracing.TestTracingProvider; import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; -public class TestTracingProducer extends ProducerAdaptor { - public TestTracingProducer(Producer delegate) { +public class TestTracingProducer extends ProducerAdaptor { + public TestTracingProducer(Producer delegate) { super(delegate); } - private static void propagateCurrentTrace(ProducerRecord record) { + private static void propagateCurrentTrace(ProducerRecord record) { String traceId = TestTracingProvider.getCurrentTraceId(); if (null == traceId) { traceId = "trace-" + UUID.randomUUID(); @@ -45,13 +45,13 @@ private static void propagateCurrentTrace(ProducerRecord send(ProducerRecord record) { + public Future send(ProducerRecord record) { propagateCurrentTrace(record); return super.send(record); } @Override - public Future send(ProducerRecord record, Callback callback) { + public Future send(ProducerRecord record, Callback callback) { propagateCurrentTrace(record); return super.send(record, callback); }