diff --git a/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java b/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java index b7a70999..9af75e69 100644 --- a/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java +++ b/client/src/main/java/com/linecorp/decaton/client/DecatonClient.java @@ -38,7 +38,7 @@ public interface DecatonClient extends AutoCloseable { * * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(byte[] key, T task); + CompletableFuture put(String key, T task); /** * Put a task onto associated decaton queue with specifying arbitrary timestamp. @@ -47,7 +47,7 @@ public interface DecatonClient extends AutoCloseable { * @param timestamp milliseconds precision timestamp which is to be used to set timestamp of task metadata. * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(byte[] key, T task, long timestamp); + CompletableFuture put(String key, T task, long timestamp); /** * Put a task onto associated decaton queue with specifying some fields of task metadata. @@ -56,7 +56,7 @@ public interface DecatonClient extends AutoCloseable { * @param overrideTaskMetadata taskMetaData which can be set by users and used for event publish. * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(byte[] key, T task, TaskMetadata overrideTaskMetadata); + CompletableFuture put(String key, T task, TaskMetadata overrideTaskMetadata); /** * Put a task onto associated decaton queue. @@ -77,7 +77,7 @@ public interface DecatonClient extends AutoCloseable { * * @return a {@link CompletableFuture} which represents the result of task put. */ - CompletableFuture put(byte[] key, T task, Consumer errorCallback); + CompletableFuture put(String key, T task, Consumer errorCallback); /** * Put a task onto associated decaton queue with specifying arbitrary timestamp. @@ -100,7 +100,7 @@ public interface DecatonClient extends AutoCloseable { * * @return a {@link CompletableFuture} which represents the result of task put. */ - default CompletableFuture put(byte[] key, T task, long timestamp, + default CompletableFuture put(String key, T task, long timestamp, Consumer errorCallback) { CompletableFuture result = put(key, task, timestamp); result.exceptionally(e -> { diff --git a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java index f19b4205..72c54783 100644 --- a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java +++ b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java @@ -26,11 +26,14 @@ import com.linecorp.decaton.client.DecatonClient; import com.linecorp.decaton.client.KafkaProducerSupplier; import com.linecorp.decaton.client.PutTaskResult; +import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer; import com.linecorp.decaton.common.Serializer; import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; public class DecatonClientImpl implements DecatonClient { + private static final org.apache.kafka.common.serialization.Serializer keySerializer = new PrintableAsciiStringSerializer(); + private final String topic; private final Serializer serializer; private final DecatonTaskProducer producer; private final String applicationId; @@ -44,6 +47,7 @@ public class DecatonClientImpl implements DecatonClient { Properties producerConfig, KafkaProducerSupplier producerSupplier, Supplier timestampSupplier) { + this.topic = topic; this.serializer = serializer; this.applicationId = applicationId; this.instanceId = instanceId; @@ -62,7 +66,7 @@ public DecatonClientImpl(String topic, } @Override - public CompletableFuture put(byte[] key, T task, long timestamp) { + public CompletableFuture put(String key, T task, long timestamp) { TaskMetadataProto taskMetadata = TaskMetadataProto.newBuilder() .setTimestampMillis(timestamp) .setSourceApplicationId(applicationId) @@ -73,17 +77,17 @@ public CompletableFuture put(byte[] key, T task, long timestamp) } @Override - public CompletableFuture put(byte[] key, T task, TaskMetadata overrideTaskMetadata) { + public CompletableFuture put(String key, T task, TaskMetadata overrideTaskMetadata) { return put(key, task, convertToTaskMetadataProto(overrideTaskMetadata)); } @Override - public CompletableFuture put(byte[] key, T task) { + public CompletableFuture put(String key, T task) { return put(key, task, timestampSupplier.get()); } @Override - public CompletableFuture put(byte[] key, T task, Consumer errorCallback) { + public CompletableFuture put(String key, T task, Consumer errorCallback) { return put(key, task, timestampSupplier.get(), errorCallback); } @@ -92,7 +96,8 @@ public void close() throws Exception { producer.close(); } - private CompletableFuture put(byte[] key, T task, TaskMetadataProto taskMetadataProto) { + private CompletableFuture put(String key, T task, TaskMetadataProto taskMetadataProto) { + byte[] serializedKey = keySerializer.serialize(topic, key); byte[] serializedTask = serializer.serialize(task); DecatonTaskRequest request = @@ -101,7 +106,7 @@ private CompletableFuture put(byte[] key, T task, TaskMetadataPro .setSerializedTask(ByteString.copyFrom(serializedTask)) .build(); - return producer.sendRequest(key, request); + return producer.sendRequest(serializedKey, request); } private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMetadata) { diff --git a/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java b/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java index f036d745..b0c1fe43 100644 --- a/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/DecatonClientTest.java @@ -45,22 +45,22 @@ public class DecatonClientTest { @Spy private final DecatonClient decaton = new DecatonClient() { @Override - public CompletableFuture put(byte[] key, HelloTask task) { + public CompletableFuture put(String key, HelloTask task) { return null; } @Override - public CompletableFuture put(byte[] key, HelloTask task, long timestamp) { + public CompletableFuture put(String key, HelloTask task, long timestamp) { return null; } @Override - public CompletableFuture put(byte[] key, HelloTask task, TaskMetadata overrideTaskMetadata) { + public CompletableFuture put(String key, HelloTask task, TaskMetadata overrideTaskMetadata) { return null; } @Override - public CompletableFuture put(byte[] key, HelloTask task, + public CompletableFuture put(String key, HelloTask task, Consumer errorCallback) { return null; } diff --git a/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java b/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java index 81891f7f..1899a4d0 100644 --- a/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.function.Supplier; @@ -76,7 +75,7 @@ APPLICATION_ID, INSTANCE_ID, new Properties(), public void testTimestampFieldSetInternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance()); + client.put("key", HelloTask.getDefaultInstance()); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); ProducerRecord record = captor.getValue(); @@ -88,7 +87,7 @@ public void testTimestampFieldSetInternally() { public void testTimestampFieldSetInternallyWithCallback() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), ignored -> {}); + client.put("key", HelloTask.getDefaultInstance(), ignored -> {}); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); ProducerRecord record = captor.getValue(); @@ -100,7 +99,7 @@ public void testTimestampFieldSetInternallyWithCallback() { public void testTimestampFieldSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678); + client.put("key", HelloTask.getDefaultInstance(), 5678); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); ProducerRecord record = captor.getValue(); @@ -112,7 +111,7 @@ public void testTimestampFieldSetExternally() { public void testTimestampFieldSetExternallyWithCallback() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678, ignored -> {}); + client.put("key", HelloTask.getDefaultInstance(), 5678, ignored -> {}); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); ProducerRecord record = captor.getValue(); @@ -124,10 +123,10 @@ public void testTimestampFieldSetExternallyWithCallback() { public void testTaskMetaDataSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder() - .timestamp(5678L) - .scheduledTime(6912L) - .build()); + client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder() + .timestamp(5678L) + .scheduledTime(6912L) + .build()); verifyAndAssertTaskMetadata(5678L, 6912L); } @@ -136,9 +135,9 @@ public void testTaskMetaDataSetExternally() { public void testWithScheduledTimeSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder() - .scheduledTime(181234L) - .build()); + client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder() + .scheduledTime(181234L) + .build()); verifyAndAssertTaskMetadata(1234L, 181234L); } @@ -147,7 +146,7 @@ public void testWithScheduledTimeSetExternally() { public void testWithEmptyTaskMetaDataSetExternally() { doReturn(1234L).when(timestampSupplier).get(); - client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder().build()); + client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build()); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); ProducerRecord record = captor.getValue(); diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java index 7b5bde6f..ff3cf490 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RateLimiterTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -59,16 +58,16 @@ public void tearDown() { @Test(timeout = 30000) public void testPropertyDynamicSwitch() throws Exception { - Set keys = new HashSet<>(); + Set keys = new HashSet<>(); for (int i = 0; i < 10000; i++) { - keys.add(ByteBuffer.wrap(("key" + i).getBytes(StandardCharsets.UTF_8))); + keys.add("key" + i); } - Set processedKeys = Collections.synchronizedSet(new HashSet<>()); + Set processedKeys = Collections.synchronizedSet(new HashSet<>()); CountDownLatch processLatch = new CountDownLatch(keys.size()); DecatonProcessor processor = (context, task) -> { - processedKeys.add(context.key()); + processedKeys.add(ByteBuffer.wrap(context.key())); processLatch.countDown(); }; @@ -83,11 +82,11 @@ public void testPropertyDynamicSwitch() throws Exception { DecatonClient client = TestUtils.client(topicName, rule.bootstrapServers())) { int count = 0; - for (ByteBuffer key : keys) { + for (String key : keys) { if (++count % 1000 == 0) { rateProp.set((long) count / 10); } - client.put(key.array(), HelloTask.getDefaultInstance()); + client.put(key, HelloTask.getDefaultInstance()); } processLatch.await(); }