Skip to content

Commit

Permalink
Use String for DecatonTask key in DecatonClient
Browse files Browse the repository at this point in the history
  • Loading branch information
ajalab committed Jun 13, 2022
1 parent b3048ae commit 33d026e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface DecatonClient<T> extends AutoCloseable {
*
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(byte[] key, T task);
CompletableFuture<PutTaskResult> put(String key, T task);

/**
* Put a task onto associated decaton queue with specifying arbitrary timestamp.
Expand All @@ -47,7 +47,7 @@ public interface DecatonClient<T> extends AutoCloseable {
* @param timestamp milliseconds precision timestamp which is to be used to set timestamp of task metadata.
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp);
CompletableFuture<PutTaskResult> put(String key, T task, long timestamp);

/**
* Put a task onto associated decaton queue with specifying some fields of task metadata.
Expand All @@ -56,7 +56,7 @@ public interface DecatonClient<T> extends AutoCloseable {
* @param overrideTaskMetadata taskMetaData which can be set by users and used for event publish.
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadata overrideTaskMetadata);
CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadata overrideTaskMetadata);

/**
* Put a task onto associated decaton queue.
Expand All @@ -77,7 +77,7 @@ public interface DecatonClient<T> extends AutoCloseable {
*
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(byte[] key, T task, Consumer<Throwable> errorCallback);
CompletableFuture<PutTaskResult> put(String key, T task, Consumer<Throwable> errorCallback);

/**
* Put a task onto associated decaton queue with specifying arbitrary timestamp.
Expand All @@ -100,7 +100,7 @@ public interface DecatonClient<T> extends AutoCloseable {
*
* @return a {@link CompletableFuture} which represents the result of task put.
*/
default CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp,
default CompletableFuture<PutTaskResult> put(String key, T task, long timestamp,
Consumer<Throwable> errorCallback) {
CompletableFuture<PutTaskResult> result = put(key, task, timestamp);
result.exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class DecatonClientImpl<T> implements DecatonClient<T> {
private static final org.apache.kafka.common.serialization.Serializer<String> keySerializer = new PrintableAsciiStringSerializer();
private final String topic;
private final Serializer<T> serializer;
private final DecatonTaskProducer producer;
private final String applicationId;
Expand All @@ -44,6 +47,7 @@ public class DecatonClientImpl<T> implements DecatonClient<T> {
Properties producerConfig,
KafkaProducerSupplier producerSupplier,
Supplier<Long> timestampSupplier) {
this.topic = topic;
this.serializer = serializer;
this.applicationId = applicationId;
this.instanceId = instanceId;
Expand All @@ -62,7 +66,7 @@ public DecatonClientImpl(String topic,
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp) {
public CompletableFuture<PutTaskResult> put(String key, T task, long timestamp) {
TaskMetadataProto taskMetadata = TaskMetadataProto.newBuilder()
.setTimestampMillis(timestamp)
.setSourceApplicationId(applicationId)
Expand All @@ -73,17 +77,17 @@ public CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp)
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadata overrideTaskMetadata) {
public CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadata overrideTaskMetadata) {
return put(key, task, convertToTaskMetadataProto(overrideTaskMetadata));
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, T task) {
public CompletableFuture<PutTaskResult> put(String key, T task) {
return put(key, task, timestampSupplier.get());
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, T task, Consumer<Throwable> errorCallback) {
public CompletableFuture<PutTaskResult> put(String key, T task, Consumer<Throwable> errorCallback) {
return put(key, task, timestampSupplier.get(), errorCallback);
}

Expand All @@ -92,7 +96,8 @@ public void close() throws Exception {
producer.close();
}

private CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadataProto taskMetadataProto) {
private CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadataProto taskMetadataProto) {
byte[] serializedKey = keySerializer.serialize(topic, key);
byte[] serializedTask = serializer.serialize(task);

DecatonTaskRequest request =
Expand All @@ -101,7 +106,7 @@ private CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadataPro
.setSerializedTask(ByteString.copyFrom(serializedTask))
.build();

return producer.sendRequest(key, request);
return producer.sendRequest(serializedKey, request);
}

private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ public class DecatonClientTest {
@Spy
private final DecatonClient<HelloTask> decaton = new DecatonClient<HelloTask>() {
@Override
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task) {
public CompletableFuture<PutTaskResult> put(String key, HelloTask task) {
return null;
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task, long timestamp) {
public CompletableFuture<PutTaskResult> put(String key, HelloTask task, long timestamp) {
return null;
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task, TaskMetadata overrideTaskMetadata) {
public CompletableFuture<PutTaskResult> put(String key, HelloTask task, TaskMetadata overrideTaskMetadata) {
return null;
}

@Override
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task,
public CompletableFuture<PutTaskResult> put(String key, HelloTask task,
Consumer<Throwable> errorCallback) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.function.Supplier;

Expand Down Expand Up @@ -76,7 +75,7 @@ APPLICATION_ID, INSTANCE_ID, new Properties(),
public void testTimestampFieldSetInternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance());
client.put("key", HelloTask.getDefaultInstance());

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
Expand All @@ -88,7 +87,7 @@ public void testTimestampFieldSetInternally() {
public void testTimestampFieldSetInternallyWithCallback() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), ignored -> {});
client.put("key", HelloTask.getDefaultInstance(), ignored -> {});

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
Expand All @@ -100,7 +99,7 @@ public void testTimestampFieldSetInternallyWithCallback() {
public void testTimestampFieldSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678);
client.put("key", HelloTask.getDefaultInstance(), 5678);

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
Expand All @@ -112,7 +111,7 @@ public void testTimestampFieldSetExternally() {
public void testTimestampFieldSetExternallyWithCallback() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678, ignored -> {});
client.put("key", HelloTask.getDefaultInstance(), 5678, ignored -> {});

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
Expand All @@ -124,10 +123,10 @@ public void testTimestampFieldSetExternallyWithCallback() {
public void testTaskMetaDataSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder()
.timestamp(5678L)
.scheduledTime(6912L)
.build());
client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder()
.timestamp(5678L)
.scheduledTime(6912L)
.build());

verifyAndAssertTaskMetadata(5678L, 6912L);
}
Expand All @@ -136,9 +135,9 @@ public void testTaskMetaDataSetExternally() {
public void testWithScheduledTimeSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder()
.scheduledTime(181234L)
.build());
client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder()
.scheduledTime(181234L)
.build());

verifyAndAssertTaskMetadata(1234L, 181234L);
}
Expand All @@ -147,7 +146,7 @@ public void testWithScheduledTimeSetExternally() {
public void testWithEmptyTaskMetaDataSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder().build());
client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build());

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -59,16 +58,16 @@ public void tearDown() {

@Test(timeout = 30000)
public void testPropertyDynamicSwitch() throws Exception {
Set<ByteBuffer> keys = new HashSet<>();
Set<String> keys = new HashSet<>();

for (int i = 0; i < 10000; i++) {
keys.add(ByteBuffer.wrap(("key" + i).getBytes(StandardCharsets.UTF_8)));
keys.add("key" + i);
}
Set<byte[]> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<ByteBuffer> processedKeys = Collections.synchronizedSet(new HashSet<>());
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
processedKeys.add(context.key());
processedKeys.add(ByteBuffer.wrap(context.key()));
processLatch.countDown();
};

Expand All @@ -83,11 +82,11 @@ public void testPropertyDynamicSwitch() throws Exception {
DecatonClient<HelloTask> client = TestUtils.client(topicName, rule.bootstrapServers())) {

int count = 0;
for (ByteBuffer key : keys) {
for (String key : keys) {
if (++count % 1000 == 0) {
rateProp.set((long) count / 10);
}
client.put(key.array(), HelloTask.getDefaultInstance());
client.put(key, HelloTask.getDefaultInstance());
}
processLatch.await();
}
Expand Down

0 comments on commit 33d026e

Please sign in to comment.