-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Narrow down the scope of waiting for pending tasks to per partition #191
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for submitting the PR.
This PR seems trying to observe pending task count per-partition.
However, it's not enough because as long as reloadRequested
is true, Decaton pauses all processing so anyways none of partitions can make progress. (
Line 215 in c4bbbc1
return processingRateProp.value() == RateLimiter.PAUSED || reloadRequested.get(); |
So we need to manage reload-requested flag per-partition I think
Thanks for your review. |
@ocadaruma |
public void reloading(boolean reloading) { | ||
this.reloading = reloading; | ||
if (reloading) { | ||
pause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pausing here by PartitionContext itself doesn't work to pause fetching actually.
Decaton's fetch-pause works like:
- (1) ConsumeManager retrieves partition list that needs pause https://github.com/line/decaton/blob/v6.2.0/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManager.java#L182
- (2) Call Consumer#pause to stop fetching further records. From now on, Consumer#poll doesn't return records for these partitions
So, if we pause here by PartitionContext itself, it will not be included in (1) (because it's already "paused").
Correct approach here may be holding per-partition reloading
flag in PartitionContexts, and include them in partitionsNeedsPause
processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java
Show resolved
Hide resolved
@ocadaruma |
@@ -53,6 +57,8 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti | |||
private final Map<TopicPartition, PartitionContext> contexts; | |||
|
|||
private final AtomicBoolean reloadRequested; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this reloadRequested flag? Since we manage reloading states in reloadStates hash map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reloadRequested
is useful for quickly determining if contexts should be reloaded without iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, IMO having duplicated info (with per-partition reload) could be an error prone when we maintain this code in the future, while iterating over partitions unlikely becomes a bottleneck (since we already doing iterating partitions for updating high watermarks, checking partition needs pause/resume) so the benefit to have reloadRequested
separately is small.
List<TopicPartition> reloadableTopicPartitions = reloadStates.entrySet() | ||
.stream() | ||
.filter(entry -> entry.getValue() | ||
&& contexts.get(entry.getKey()).pendingTasksCount() == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could cause NPE because currently reloadStates
entry never removed even when partition is removed.
To fix that, we can add a code to remove reloadStates
entry at the same time we modify contexts
.
However, then, we have to maintain two separate Map which holds per-partition states, which is bothersome and an error-prone.
So I think we should reloadRequested
flag in PartitionContext itself. WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got the point.
Having a reloadRequested
flag in PartitionContext
seems good.
@@ -214,7 +233,9 @@ public List<TopicPartition> partitionsNeedsPause() { | |||
return contexts.values().stream() | |||
.filter(c -> !c.revoking()) | |||
.filter(c -> !c.paused()) | |||
.filter(c -> pausingAll || shouldPartitionPaused(c.pendingTasksCount())) | |||
.filter(c -> pausingAll | |||
|| reloadStates.get(c.topicPartition()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, we also need to modify partitionNeedsResume
to avoid returning partitions that reload is requested, because otherwise, partitions will be resumed immediately so new tasks will be kept feeding so pendingTasks never becomes 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left few comments but almost looks good.
Also would you mind adding an integration test to check changing the concurrency in the middle of processing doesn't cause problem? (referring RateLimiterTest might be helpful)
*/ | ||
@Getter | ||
@Setter | ||
private boolean reloadState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reloadRequested
sounds more clear?- Should be marked as
volatile
to ensure latest value (set by property-listener thread) is visible to subscription thread
@@ -52,7 +56,7 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti | |||
private final int maxPendingRecords; | |||
private final Map<TopicPartition, PartitionContext> contexts; | |||
|
|||
private final AtomicBoolean reloadRequested; | |||
private final ReentrantLock lock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename to something like propertyReloadLock
for understandability
if (!reloadRequested.getAndSet(true)) { | ||
lock.lock(); | ||
try { | ||
contexts.values().forEach(context -> context.reloadState(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To prevent contexts
are modified during the iteration on this property-listener thread, we need to surround all places that maintaining contexts
entry with lock. (currently only maybeHandlePropertyReload
is surrounded, but there are several places modifying contexts
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean having another ReentrantLock
for updating the contexts
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, use propertyReloadLock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left last few nits comments!
Also could you fix this comment?
decaton/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java
Line 77 in 5659ec7
* Reloading this property will pause all assigned partitions until current pending tasks have done. |
.filter(entry -> entry.getValue().reloadRequested() | ||
&& entry.getValue().pendingTasksCount() == 0) | ||
.map(Entry::getKey) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nits] toList
is already static-imported so let's omit Collectors
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nits] There are several unused imports
|
||
logger.info("Start dropping partition contexts"); | ||
private void reloadContexts(Collection<TopicPartition> topicPartitions) { | ||
logger.info("Start dropping partition context({})", topicPartitions); | ||
removePartition(topicPartitions); | ||
logger.info("Finished dropping partition contexts. Start recreating partition contexts"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nits] This log and this line
Line 316 in 5659ec7
logger.info("Completed reloading property"); |
topicPartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nice!
Motivation:
Described in #172
Modifications:
Result: