Skip to content

Commit

Permalink
Use bytes for record keys instead of String
Browse files Browse the repository at this point in the history
  • Loading branch information
ajalab committed Jun 7, 2022
1 parent 39ef2af commit b3048ae
Show file tree
Hide file tree
Showing 34 changed files with 176 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface DecatonClient<T> extends AutoCloseable {
*
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(String key, T task);
CompletableFuture<PutTaskResult> put(byte[] key, T task);

/**
* Put a task onto associated decaton queue with specifying arbitrary timestamp.
Expand All @@ -47,7 +47,7 @@ public interface DecatonClient<T> extends AutoCloseable {
* @param timestamp milliseconds precision timestamp which is to be used to set timestamp of task metadata.
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(String key, T task, long timestamp);
CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp);

/**
* Put a task onto associated decaton queue with specifying some fields of task metadata.
Expand All @@ -56,7 +56,7 @@ public interface DecatonClient<T> extends AutoCloseable {
* @param overrideTaskMetadata taskMetaData which can be set by users and used for event publish.
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadata overrideTaskMetadata);
CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadata overrideTaskMetadata);

/**
* Put a task onto associated decaton queue.
Expand All @@ -77,7 +77,7 @@ public interface DecatonClient<T> extends AutoCloseable {
*
* @return a {@link CompletableFuture} which represents the result of task put.
*/
CompletableFuture<PutTaskResult> put(String key, T task, Consumer<Throwable> errorCallback);
CompletableFuture<PutTaskResult> put(byte[] key, T task, Consumer<Throwable> errorCallback);

/**
* Put a task onto associated decaton queue with specifying arbitrary timestamp.
Expand All @@ -100,7 +100,7 @@ public interface DecatonClient<T> extends AutoCloseable {
*
* @return a {@link CompletableFuture} which represents the result of task put.
*/
default CompletableFuture<PutTaskResult> put(String key, T task, long timestamp,
default CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp,
Consumer<Throwable> errorCallback) {
CompletableFuture<PutTaskResult> result = put(key, task, timestamp);
result.exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import com.linecorp.decaton.client.internal.DecatonClientImpl;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
Expand Down Expand Up @@ -54,9 +54,9 @@ public class DecatonClientBuilder<T> {

public static class DefaultKafkaProducerSupplier implements KafkaProducerSupplier {
@Override
public Producer<String, DecatonTaskRequest> getProducer(Properties config) {
public Producer<byte[], DecatonTaskRequest> getProducer(Properties config) {
return new KafkaProducer<>(config,
new PrintableAsciiStringSerializer(),
new ByteArraySerializer(),
new ProtocolBuffersKafkaSerializer<>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface KafkaProducerSupplier {
* @return an Kafka producer instance which implements {@link Producer}. The returned instance will be
* closed along with {@link DecatonClient#close} being called.
*/
Producer<String, DecatonTaskRequest> getProducer(Properties config);
Producer<byte[], DecatonTaskRequest> getProducer(Properties config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DecatonClientImpl(String topic,
}

@Override
public CompletableFuture<PutTaskResult> put(String key, T task, long timestamp) {
public CompletableFuture<PutTaskResult> put(byte[] key, T task, long timestamp) {
TaskMetadataProto taskMetadata = TaskMetadataProto.newBuilder()
.setTimestampMillis(timestamp)
.setSourceApplicationId(applicationId)
Expand All @@ -73,17 +73,17 @@ public CompletableFuture<PutTaskResult> put(String key, T task, long timestamp)
}

@Override
public CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadata overrideTaskMetadata) {
public CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadata overrideTaskMetadata) {
return put(key, task, convertToTaskMetadataProto(overrideTaskMetadata));
}

@Override
public CompletableFuture<PutTaskResult> put(String key, T task) {
public CompletableFuture<PutTaskResult> put(byte[] key, T task) {
return put(key, task, timestampSupplier.get());
}

@Override
public CompletableFuture<PutTaskResult> put(String key, T task, Consumer<Throwable> errorCallback) {
public CompletableFuture<PutTaskResult> put(byte[] key, T task, Consumer<Throwable> errorCallback) {
return put(key, task, timestampSupplier.get(), errorCallback);
}

Expand All @@ -92,7 +92,7 @@ public void close() throws Exception {
producer.close();
}

private CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadataProto taskMetadataProto) {
private CompletableFuture<PutTaskResult> put(byte[] key, T task, TaskMetadataProto taskMetadataProto) {
byte[] serializedTask = serializer.serialize(task);

DecatonTaskRequest request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DecatonTaskProducer implements AutoCloseable {
presetProducerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
}

private final Producer<String, DecatonTaskRequest> producer;
private final Producer<byte[], DecatonTaskRequest> producer;
private final String topic;

private static Properties completeProducerConfig(Properties producerConfig) {
Expand All @@ -61,8 +61,8 @@ public DecatonTaskProducer(String topic, Properties producerConfig,
this.topic = topic;
}

public CompletableFuture<PutTaskResult> sendRequest(String key, DecatonTaskRequest request) {
ProducerRecord<String, DecatonTaskRequest> record = new ProducerRecord<>(topic, key, request);
public CompletableFuture<PutTaskResult> sendRequest(byte[] key, DecatonTaskRequest request) {
ProducerRecord<byte[], DecatonTaskRequest> record = new ProducerRecord<>(topic, key, request);

CompletableFuture<PutTaskResult> result = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public class DecatonClientBuilderTest {
public MockitoRule rule = MockitoJUnit.rule();

@Mock
private Producer<String, DecatonTaskRequest> producer;
private Producer<byte[], DecatonTaskRequest> producer;

@Captor
private ArgumentCaptor<ProducerRecord<String, DecatonTaskRequest>> recordCaptor;
private ArgumentCaptor<ProducerRecord<byte[], DecatonTaskRequest>> recordCaptor;

private ProducerRecord<String, DecatonTaskRequest> doProduce(DecatonClient<HelloTask> dclient) {
private ProducerRecord<byte[], DecatonTaskRequest> doProduce(DecatonClient<HelloTask> dclient) {
dclient.put(null, HelloTask.getDefaultInstance());
verify(producer, times(1)).send(recordCaptor.capture(), any(Callback.class));
return recordCaptor.getValue();
Expand All @@ -69,7 +69,7 @@ public void testBuild() {
.producerSupplier(config -> producer)
.build();

ProducerRecord<String, DecatonTaskRequest> record = doProduce(dclient);
ProducerRecord<byte[], DecatonTaskRequest> record = doProduce(dclient);
assertEquals(topic, record.topic());

TaskMetadataProto metadata = record.value().getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ public class DecatonClientTest {
@Spy
private final DecatonClient<HelloTask> decaton = new DecatonClient<HelloTask>() {
@Override
public CompletableFuture<PutTaskResult> put(String key, HelloTask task) {
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task) {
return null;
}

@Override
public CompletableFuture<PutTaskResult> put(String key, HelloTask task, long timestamp) {
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task, long timestamp) {
return null;
}

@Override
public CompletableFuture<PutTaskResult> put(String key, HelloTask task, TaskMetadata overrideTaskMetadata) {
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task, TaskMetadata overrideTaskMetadata) {
return null;
}

@Override
public CompletableFuture<PutTaskResult> put(String key, HelloTask task,
public CompletableFuture<PutTaskResult> put(byte[] key, HelloTask task,
Consumer<Throwable> errorCallback) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.function.Supplier;

Expand Down Expand Up @@ -54,15 +55,15 @@ public class DecatonClientImplTest {
public MockitoRule rule = MockitoJUnit.rule();

@Mock
private Producer<String, DecatonTaskRequest> producer;
private Producer<byte[], DecatonTaskRequest> producer;

@Mock
private Supplier<Long> timestampSupplier;

private DecatonClientImpl<HelloTask> client;

@Captor
private ArgumentCaptor<ProducerRecord<String, DecatonTaskRequest>> captor;
private ArgumentCaptor<ProducerRecord<byte[], DecatonTaskRequest>> captor;

@Before
public void setUp() {
Expand All @@ -75,10 +76,10 @@ APPLICATION_ID, INSTANCE_ID, new Properties(),
public void testTimestampFieldSetInternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance());
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance());

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(1234, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -87,10 +88,10 @@ public void testTimestampFieldSetInternally() {
public void testTimestampFieldSetInternallyWithCallback() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance(), ignored -> {});
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), ignored -> {});

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(1234, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -99,10 +100,10 @@ public void testTimestampFieldSetInternallyWithCallback() {
public void testTimestampFieldSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance(), 5678);
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678);

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(5678, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -111,10 +112,10 @@ public void testTimestampFieldSetExternally() {
public void testTimestampFieldSetExternallyWithCallback() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance(), 5678, ignored -> {});
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), 5678, ignored -> {});

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(5678, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -123,10 +124,10 @@ public void testTimestampFieldSetExternallyWithCallback() {
public void testTaskMetaDataSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder()
.timestamp(5678L)
.scheduledTime(6912L)
.build());
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder()
.timestamp(5678L)
.scheduledTime(6912L)
.build());

verifyAndAssertTaskMetadata(5678L, 6912L);
}
Expand All @@ -135,9 +136,9 @@ public void testTaskMetaDataSetExternally() {
public void testWithScheduledTimeSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder()
.scheduledTime(181234L)
.build());
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder()
.scheduledTime(181234L)
.build());

verifyAndAssertTaskMetadata(1234L, 181234L);
}
Expand All @@ -146,18 +147,18 @@ public void testWithScheduledTimeSetExternally() {
public void testWithEmptyTaskMetaDataSetExternally() {
doReturn(1234L).when(timestampSupplier).get();

client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build());
client.put("key".getBytes(StandardCharsets.UTF_8), HelloTask.getDefaultInstance(), TaskMetadata.builder().build());

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertTrue(record.value().getMetadata().getTimestampMillis() > 0);
assertNotNull(record.value().getMetadata().getSourceApplicationId());
assertNotNull(record.value().getMetadata().getSourceInstanceId());
}

private void verifyAndAssertTaskMetadata(long timestamp, long scheduledTime) {
verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(timestamp, record.value().getMetadata().getTimestampMillis());
assertEquals(scheduledTime, record.value().getMetadata().getScheduledTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.junit.Assert.assertTrue;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -153,17 +154,17 @@ public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
private final Map<String, List<TestTask>> produced = new HashMap<>();
private final Map<String, List<TestTask>> processed = new HashMap<>();
private final Map<ByteBuffer, List<TestTask>> produced = new HashMap<>();
private final Map<ByteBuffer, List<TestTask>> processed = new HashMap<>();

@Override
public synchronized void onProduce(ProducedRecord record) {
produced.computeIfAbsent(record.key(), key -> new ArrayList<>()).add(record.task());
produced.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record.task());
}

@Override
public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) {
processed.computeIfAbsent(record.key(), key -> new ArrayList<>()).add(record.task());
processed.computeIfAbsent(ByteBuffer.wrap(record.key()), key -> new ArrayList<>()).add(record.task());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -57,12 +59,12 @@ public void tearDown() {

@Test(timeout = 30000)
public void testPropertyDynamicSwitch() throws Exception {
Set<String> keys = new HashSet<>();
Set<ByteBuffer> keys = new HashSet<>();

for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
keys.add(ByteBuffer.wrap(("key" + i).getBytes(StandardCharsets.UTF_8)));
}
Set<String> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<byte[]> processedKeys = Collections.synchronizedSet(new HashSet<>());
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
Expand All @@ -81,11 +83,11 @@ public void testPropertyDynamicSwitch() throws Exception {
DecatonClient<HelloTask> client = TestUtils.client(topicName, rule.bootstrapServers())) {

int count = 0;
for (String key : keys) {
for (ByteBuffer key : keys) {
if (++count % 1000 == 0) {
rateProp.set((long) count / 10);
}
client.put(key, HelloTask.getDefaultInstance());
client.put(key.array(), HelloTask.getDefaultInstance());
}
processLatch.await();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface ProcessingContext<T> {
* @return the key associated to the task now being processed. can be null if key isn't supplied for the
* task.
*/
String key();
byte[] key();

/**
* Returns the {@link Headers} which is associated to the source {@link ConsumerRecord} of the task.
Expand Down
Loading

0 comments on commit b3048ae

Please sign in to comment.