-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide a way of specifying a partition when producing a task #182
Conversation
* | ||
* @return a {@link CompletableFuture} which represents the result of task put. | ||
*/ | ||
CompletableFuture<PutTaskResult> put(String key, T task, int partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a concern about this API.
If users use the following API with int
as the argument for timestamp
, I think the behavior will be different after this change.
CompletableFuture<PutTaskResult> put(String key, T task, long timestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this could be an error prone for users.
Also, if we want to specify timestamp or other task metadata with partition, we have to add other overloads and it's a mess.
So, how about adding one "universal" method instead? i.e.
put(String key, T task, TaskMetadata overrideTaskMetadata, Integer partition)
With allowing setting overrideTaskMetadata or partition to be null for users who don't need to set them explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems good
* | ||
* @return a {@link CompletableFuture} which represents the result of task put. | ||
*/ | ||
CompletableFuture<PutTaskResult> put(String key, T task, int partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this could be an error prone for users.
Also, if we want to specify timestamp or other task metadata with partition, we have to add other overloads and it's a mess.
So, how about adding one "universal" method instead? i.e.
put(String key, T task, TaskMetadata overrideTaskMetadata, Integer partition)
With allowing setting overrideTaskMetadata or partition to be null for users who don't need to set them explicitly.
/** | ||
* Put a task onto a specified kafka partition. | ||
* Note that the specified partition here is prioritised over the result of | ||
* {@link Partitioner#partition(String, Object, byte[], Object, byte[], Cluster)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need to mention about precedence here explicitly, because this is KafkaProducer's behavior rather than DecatonClient's interface.
* Put a task onto a specified kafka partition with specifying some fields of task metadata. | ||
* @param key the criteria to shuffle and order tasks. null can be specified if it doesn't matters. | ||
* @param task an instance of task. Should never be null. | ||
* @param partition the id of the partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's mention also when partition, overrideTaskMetadata can be null
@@ -81,6 +82,19 @@ public CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadata ove | |||
return put(key, task, convertToTaskMetadataProto(overrideTaskMetadata)); | |||
} | |||
|
|||
@Override | |||
public CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadata overrideTaskMetadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method supersedes below put(String key, T task, TaskMetadataProto taskMetadataProto)
so let's change it to just calling this method?
return sendRequest(record); | ||
} | ||
|
||
public CompletableFuture<PutTaskResult> sendRequest(byte[] key, DecatonTaskRequest request, int partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DecatonTaskProducer is in internal
package so not considered to be public API, so we don't need to keep backward compatibility. Let's just change original sendRequest
to receive Integer partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left last 2 comments, but almost lgtm
@@ -118,6 +127,9 @@ private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMe | |||
|
|||
final Long timestamp = overrideTaskMetadata.getTimestamp(); | |||
final Long scheduledTime = overrideTaskMetadata.getScheduledTime(); | |||
if (overrideTaskMetadata == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This null check should be above overrideTaskMetadata.getTimestamp()
.
Otherwise it causes NPE
public void testSpecifyingPartitionWithoutMetadata() { | ||
doReturn(1234L).when(timestampSupplier).get(); | ||
|
||
client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build(), 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this test supposed to pass null
as the taskmetadata argument? (rather than empty task metadata)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Motivation:
To resolve #159
Modifications:
DecatonClient
andDecatonTaskProducer
Result: