Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for async tasks to finish in ProcessPipeline#close #79

Merged
merged 12 commits into from
Feb 24, 2021

Conversation

m50d
Copy link
Contributor

@m50d m50d commented Dec 29, 2020

Possible approach to #78

Keep track of in-flight async processing in ProcessPipeline via a Semaphore with a very large number of permits, and wait for the same number of permits on close.

Advantages:

  • Keeps the shutdown logic quite self-contained, avoids crossing too many abstraction boundaries
  • Piggybacks on existing whenComplete logic, no need to add extra tracking to async processors
  • Compatible with doing more thoroughly async termination in the future (i.e. more complete separation between initiateShutdown and awaitShutdown up the whole object graph).
  • Shouldn't block shutdown as long as async processing finishes somehow (exceptions are fine).

Disadvantages:

  • Feels like it's duplicating the existing rate limiting code a little bit. But the ExecutionScheduler doesn't have any visibility over when tasks actually finish
  • Can block close (until interrupted) if a badly-behaved processor calls deferCompletion and then never completes at all.
    • I would think a processor like this would cause problems anyway, unless it only does it during shutdown?
    • Not obvious what the migration path should be if we need to preserve the existing behaviour - I guess a configuration property that could live in the Scope?

WDYT?

@ocadaruma
Copy link
Contributor

@m50d Thanks for the PR.
Here's my early feedbacks:

By the way, there are several cases that processors may be closed.

  1. Subscription shutdown. In this case, processors of all scopes will be closed
  2. Partition revocation. In this case, partition-scoped and thread-scoped processors will be closed
  3. Dynamic partition concurrency change. In this case, thread-scoped processors will be closed

For 2, there's already a logic to wait pending tasks up to decaton.group.rebalance.timeout.ms.
For 3, there's already a logic to wait pending tasks before recreating processor units.

So only case we want to handle in this PR is case 1, thus just waiting pending tasks before destroying processors in shutdown sequence would be fine.

Remaining concern is should we have a timeout for case 1 or not. I think we should, and decaton.group.rebalance.timeout.ms should be reused for consistency with case 2.

@kawamuray what do you think?

@kawamuray
Copy link
Contributor

kawamuray commented Jan 5, 2021

Thanks for the PR @m50d :)

We already have a logic to wait pending tasks to complete by checking OOOCC's pending offsets (https://github.com/line/decaton/blob/master/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java#L74), so we should reuse the same logic rather than introducing semaphore I think

I agree with this first of all.

Remaining concern is should we have a timeout for case 1 or not. I think we should, and decaton.group.rebalance.timeout.ms should be reused for consistency with case 2.

I agree with that the case 1 should have configurable timeout, but disagree with reusing decaton.group.rebalance.timeout.ms because the name doesn't imply it will be used for awaiting termination at shutdown and it will absolutely make users confused.

I can imagine several possible interfaces to allow users doing this but not yet convinced which one would the most suitable considering realistic use cases.

  • Another property like decaton.processors.shutdown.timeout.ms
    • maybe with deprecating and unifying decaton.group.rebalance.timeout.ms, maybe not
  • Expose custom AsyncCloseable that returns CompletableFuture that completes upon all processors finish
  • Expose custom close methods with timeouts, like close(long timeout, TimeUnit unit) like kafka's Producer#close.

Expecting use from spring, the property approach would be suitable because spring calls AutoCloseable#close by convention and users have to manually change that convention to call into another shutdown method.

The CompletableFuture approach would be the most flexible as users can choose whatever to do with it, like whether to block the current thread until completion with/without timeout or leave it for later processing, even with some async frameworks I guess.

The close with timeout approach would be simplest to implement and it might be just sufficient for the purpose without needing to introduce another interface or another property that we would care wrt consistency with existing/future interfaces.

@m50d
Copy link
Contributor Author

m50d commented Jan 7, 2021

We already have a logic to wait pending tasks to complete by checking OOOCC's pending offsets (https://github.com/line/decaton/blob/master/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java#L74), so we should reuse the same logic rather than introducing semaphore I think

Makes sense. The polling loop is not ideal (though I guess that's a separate issue).

By the way, there are several cases that processors may be closed.

I found the existing logic quite hard to follow, because it's scattered across a lot of classes. IMO each processor/scope should be responsible for itself - shutting down a subscription should delegate to its partitions to shut them down, and shutting down a partition should delegate to its threads to shut them down. If I pushed down the existing polling-loop logic to run at thread level (so we'd run the polling loop once for each processor rather than at top level as now) it would be somewhat inelegant, but maybe it's ok? WDYT?

So I think whether waiting pending tasks or not should be controlled by subscription-level flag and set to NO by default.

SGTM. Will do.

Expecting use from spring, the property approach would be suitable because spring calls AutoCloseable#close by convention and users have to manually change that convention to call into another shutdown method.

Yeah, that's very much the best fit for our use case IMO.

@kawamuray
Copy link
Contributor

it's scattered across a lot of classes. IMO each processor/scope should be responsible for itself - shutting down a subscription should delegate to its partitions to shut them down, and shutting down a partition should delegate to its threads to shut them down.

I'm not sure which piece of code exactly you're talking about. Can you provide some pointers to code that you think it isn't done at the right place now?

@m50d
Copy link
Contributor Author

m50d commented Jan 12, 2021

Sorry for the overly flippant previous comment.

The ProcessorSubscription$Handler#waitForRemainingTasksCompletion code loops through each of the contexts separately to call updateHighWatermark and totalPendingTasks. I think this is mixing levels of abstraction - the control flow descends through PartitionContexts to the details of each PartitionContext on each loop iteration (and note this means we call updateHighWatermark for every partition each time we poll, even if pendingTasksCount was actually 0 for that partition).

I also couldn't see why this isn't sharing implementation with maybeHandlePropertyReload (and why maybeHandlePropertyReload is called from where it is, which seems to ensure that it might never reload the properties if new records are always available? Am I misunderstanding?). Looking again I guess the logic is that on property change we should not force waiting/closing whereas on partition rebalance we should?

@kawamuray
Copy link
Contributor

. I think this is mixing levels of abstraction - the control flow descends through PartitionContexts to the details of each PartitionContext on each loop iteration (and note this means we call updateHighWatermark for every partition each time we poll, even if pendingTasksCount was actually 0 for that partition).

Does that mean you think the implementation like following is better in terms of properly abstractions?

// at PartitionContext
private void waitRemainingTasks(long timeoutMillis) {
 loop until timeout {
     updateHighWatermark();
     if (pendingTasks() is zero) {
         return;
     }
}

// at waitForRemainingTasksCompletion in ProcessorSubscription
public waitForRemainingTasksCompletion(timeout) {
  for each partitionContexts {
    context.waitRemainingTasks(remainingTimeout);
  }
}

in case, I think it doesn't work because we must ensure we call updateHighWatermark for every partition before exiting from waitForRemainingTasksCompletion, even when it ended up with timing out. If we iterate each partition over, one at the time, we'll end up not calling updateHighWatermark for the partitions that has in later in the list, causing their offsets to not committed properly, leading more duplicates.

I also couldn't see why this isn't sharing implementation with maybeHandlePropertyReload (and why maybeHandlePropertyReload is called from where it is, which seems to ensure that it might never reload the properties if new records are always available?

The reason is, on partition rebalance we don't need to (actually can't) call consumer.poll() while we should in case of property reload.
If we block for the moment exceeding max.poll.interval.ms, that'll cause partition rebalance which is not the behavior we want.

@m50d
Copy link
Contributor Author

m50d commented Jan 13, 2021

in case, I think it doesn't work because we must ensure we call updateHighWatermark for every partition before exiting from waitForRemainingTasksCompletion, even when it ended up with timing out. If we iterate each partition over, one at the time, we'll end up not calling updateHighWatermark for the partitions that has in later in the list, causing their offsets to not committed properly, leading more duplicates.

Hmm. How about an interface akin to AsyncShutdownable where there's a request shutdown method and a wait for shutdown method? And then we could have a generic implementation of "shut down this thing and wait up to a given time limit". I think it's probably easier to show than describe - let me try implementing it and see what you think.

@m50d m50d force-pushed the async-termination branch from a4b3fa9 to 5f2f275 Compare January 13, 2021 09:22
@m50d
Copy link
Contributor Author

m50d commented Jan 13, 2021

Reworked this branch with the approach I was trying to describe - WDYT?

@@ -29,7 +29,7 @@
* Represents consumption processing progress of records consumed from a single partition.
* This class manages sequence of offsets and a flag which represents if each of them was completed or not.
*/
public class OutOfOrderCommitControl {
public class OutOfOrderCommitControl implements AsyncShutdownPollable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this doesn't feel right because OOOCC is just a datastructure that is indented to be controlled by PartitionContext.
Calling it "shutdownable" makes me feel like it represents a flow from start to something shutdown eventually, which doesn't really exists. Since the lifetime of this object totally dpeneds on its user class (i.e, this class keeps working and never shuts down as long as the user class keep calling reportOffset regardless to if initiateShutdown has ever called. So what this interfaces says is "the pollShutdown() will eventually return true if you stop calling reportFetchedOffset, which doesn't sounds like "shutdown"), this sounds semantically incorrect and makes it unnecessary abstracted while we knew what's happening when we were calling updateHighWatermark + pendingOffsetsCount explicitly.

@@ -77,6 +78,8 @@
*/
private TracingProvider tracingProvider = NoopTracingProvider.INSTANCE;

private Optional<Long> waitForProcessingOnClose = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it one of ProcessorProperties ?
What we want to configure here is just "timeout that decaton waits already being processed tasks to complete before returning from ProcessorSubscription.close() returns. So having a property expressing that timeout benefits us by:

  • consistent handling with other configurables
  • users can configure it through existing PropertySupplier.

@kawamuray
Copy link
Contributor

kawamuray commented Jan 18, 2021

Sorry for the delay in review and thanks for sharing your idea.

Besides the inline comments, let me summarize my feeling about the proposed approach.

  • I understand the AsyncShutdownPollable as the abstraction of the loop such as : while (checkCondition()) { updateStatus() } which is fine as itself.

  • I'm not agreeing with using AsyncShutdownPollable as the abstraction between ProcessorSubscription, PartitionContexts, PartitionContext and OutOfOrderCommitControl by following reasons:

    • None of them are actually shutdownable. Except ProcessorSubscription, all other classes are just a datastructure or a collection of accessors to partition states and they never shutdown. Instead, it either represents a progress of processing at time T (OOOCC) or bridges ProcessorSubscription to DecatonProcessors running in ProcessorUnit and provide ProcessorSubscription some information (the next offset to commit, how many records remaining, etc) to judge progress of processing records it had consumed from Kafka.
    • The primary and only logic extracted out by AsyncShutdownPollable is pollShutdown, which is actually used only by two places in ProcessorSubscription (one is waitForRemainingTasksCompletion and the another is a method we will add to wait completion on shutdown).
    • While I agree with that the flow itself can be abstracted in the way you did, I disagree with it's necessity because I think ProcessorSubscription should stay aware of what's happening (in particular, each PartitionContext's are updating their watermark) while it is awaiting each PartitionContexts to reach certain point (pending tasks == 0). From the interface AsyncShutdownPollable, it doesn't read as ProcessorSubscription is contracting with PartitionContext to update watermark as possible as it can at "shutdown", because there's no concept of "shutdown" in PartitionProcessor, OutOfOrderCommitControl in the first place.
    • To make such an abstraction, I think we need to make PartitionContext completely self contained, even including the work "commit it's offset". However it isn't possible because a Consumer is shared instance among all partitions, and it needs to delegate the commit work to the ProcessorSubscription. Without that, the abstraction level like "we don't know what's it doing inside, but anyway it finishes cleanup" (= roughly what AsyncShutdownPollable represents AFAIU) is too high level abstraction given that ProcessorSubscription expects certain work to PartitionContext at many points.
  • I feel it doesn't make sense to make ProcessorSubscription an AsyncShutdownPollable because as we already gave timeout parameter as a input when configuring ProcessorSubscription, just calling .close() should work (and it's preferred interface to shutdown ProcessorSubscription, given the Spring's premise) for users expecting the way to provide "wait up to N ms on shutdown before completing termination". So there's no point to expose AsyncShutdownPollable to users.

Sorry if my explanation isn't straightforward to understand. It's about high level class design and was bit hard to express my feeling exactly as statements TBH..

@m50d
Copy link
Contributor Author

m50d commented Jan 22, 2021

Thanks.

None of them are actually shutdownable. Except ProcessorSubscription, all other classes are just a datastructure or a collection of accessors to partition states and they never shutdown. Instead, it either represents a progress of processing at time T (OOOCC) or bridges ProcessorSubscription to DecatonProcessors running in ProcessorUnit and provide ProcessorSubscription some information (the next offset to commit, how many records remaining, etc) to judge progress of processing records it had consumed from Kafka.

Agreed; I did wonder about making it something like "pollWorkInProgress", because it's a bit misleading to call it shutdown if it's not actually shutting down.

To make such an abstraction, I think we need to make PartitionContext completely self contained, even including the work "commit it's offset". However it isn't possible because a Consumer is shared instance among all partitions, and it needs to delegate the commit work to the ProcessorSubscription.

I agree, but I think something along those lines has to be the right direction overall - figuring out how far processing has got should surely follow the same hierarchy of responsibility as processing itself, so that each object has a clear area of responsibility? I found it hard to follow how tasks were handed out down to processors but then the progress information came back by a separate path through OutOfOrderCommitControl that doesn't mirror the same hierarchy. But maybe that's just me and it makes sense to everyone else as-is?

I feel it doesn't make sense to make ProcessorSubscription an AsyncShutdownPollable because as we already gave timeout parameter as a input when configuring ProcessorSubscription, just calling .close() should work (and it's preferred interface to shutdown ProcessorSubscription, given the Spring's premise) for users expecting the way to provide "wait up to N ms on shutdown before completing termination". So there's no point to expose AsyncShutdownPollable to users.

I think it makes sense for larger systems that might want to do the same thing themselves - e.g. a program might want to call initiateShutdown() on receiving SIGINT and then calling awaitShutdown() in a loop, or interleave polling for the shutdown of several different components. We should definitely provide a good default for users who want to just call .close(), but the lower-level interface is valuable too.

@kawamuray
Copy link
Contributor

kawamuray commented Jan 25, 2021

I found it hard to follow how tasks were handed out down to processors but then the progress information came back by a separate path through OutOfOrderCommitControl that doesn't mirror the same hierarchy. But maybe that's just me and it makes sense to everyone else as-is?

Hm, while I could partially understand that you had hard time to track down the path of delivery tasks from consumer to ProcessorUnit and get back offset information from there, my take is that it isn't caused by insufficient abstraction or responsibility separation, but is mainly due to some limitation that we must follow when implementing the processing (threading) model like decaton using a single consumer instance.
That are:

  1. We have to keep running poll() loop periodically even when we want to wait on something and then proceed to update some status in the middle of the loop.
  2. We can't let worker threads (an executor in ProcessorUnit) to update the final state (= watermark) directly, making ProcessorSubscription required to insert "tick" operation (= updateHighwatermark). Actually this is a result of optimization against performance bottleneck we've experienced in the past due to synchronization among threads.
  3. Since individual partitions should be considered as isolated unit of processing, we should not process them completely sequentially, but need to give them fair chances to update and report their latest status (so we can't do something like loop { partition.waitCompletingAllTaskks() }. Consequently, the processing flow needs to be kind of traversal of tree-ish hierarchy (ProcessorSubscription is root, PartitionContext is node and there are leaf like OOOCC, PartitionProcessor).

Maybe this is because I'm an original author of those class, but I'm taking classes like PartitionContext as an interface abstraction of minimal required APIs for PartitionProcessor to interact with detailed data structure (OOOCC) and subcomponents like PartitionProcessor to traverse each partition's state, so not feeling that much as they are mixing responsibility.

I think it makes sense for larger systems that might want to do the same thing themselves - e.g. a program might want to call initiateShutdown() on receiving SIGINT and then calling awaitShutdown() in a loop, or interleave polling for the shutdown of several different components. We should definitely provide a good default for users who want to just call .close(), but the lower-level interface is valuable too.

Okay, so it will be like the following right?

  • initShutdown() and awaitShutdown(timeout) => ProcessorSubscription waits up to decaton.shutdown.timeout before the last commit attempt, and commit the watermark regardless if all tasks could've completed. awaitShutdown() returns normally if decaton.shutdown.timeout is longer than the given timeout (because termination flag will set off), but throws an exception if the given timeout was shorter than the decaton.shutdown.timeout because until it expires, shutdown sequence won't complete.
  • close() => awaitShutdown(Integer.MAX_VALUE) version of the above flow.

which is bit confusing isn't it?
or, alternatively we may throw an exception from awaitShutdown if there were some tasks remain incomplete. In that case, awaitShutdown() will end up throwing an exception after 5 seconds (assuming decaton.shutodwn.timeout=5sec), despite the timeout given as argument was 30 seconds, which also sounds confusing.

What'd be the right expected "meaning" of those timeouts?

@m50d
Copy link
Contributor Author

m50d commented Jan 26, 2021

Maybe this is because I'm an original author of those class, but I'm taking classes like PartitionContext as an interface abstraction of minimal required APIs for PartitionProcessor to interact with detailed data structure (OOOCC) and subcomponents like PartitionProcessor to traverse each partition's state, so not feeling that much as they are mixing responsibility.

I agree with your 3 points; I think it's OOOCC that I found confusing; it owns both OffsetStates that go with specific records owned by specific threads, watermarks that belong to the whole partition, and the logic for computing one from the other. It also kind of flips how DeferredCompletion is used compared to within the processing - in ProcessingContext (and DecatonProcessor) it's kind of a callback or a continuation that's passed in, but OOOCC instantiates one and returns it to ProcessorSubscription. (Though I've always struggled to understand continuation-passing style code, so this may just be me).

I think it'd make more sense to me if we kept the DecatonTask and its corresponding OffsetAndMetadata closer together, or if there was more symmetry between how the tasks are passed down and how the completions come back up. Maybe something like each ProcessorUnit keeping its own sorted set of OffsetStates that are in progress, and having ProcessorSubscription compute the high watermark at commit time. Or conversely have OOOCC responsible for passing out tasks to PartitionProcessor. As a concrete incremental step, what about looking to combine the calls to context.registerOffset and context.addRequest in ProcessorSubscription#receive? That way at least OOOCC more clearly belongs to PartitionContext and we could see where the DeferredCompletion comes from without having to look at the processor code.

initShutdown() and awaitShutdown(timeout) => ProcessorSubscription waits up to decaton.shutdown.timeout before the last commit attempt, and commit the watermark regardless if all tasks could've completed. awaitShutdown() returns normally if decaton.shutdown.timeout is longer than the given timeout (because termination flag will set off), but throws an exception if the given timeout was shorter than the decaton.shutdown.timeout because until it expires, shutdown sequence won't complete.

If we provide a version of awaitShutdown() that takes a timeout then I'd expect that to override decaton.shutdown.timeout i.e. decaton.shutdown.timeout is the default value if we don't pass a timeout in code, it's ignored if we do pass a timeout. Alternatively we could not accept any timeout argument and always use decaton.shutdown.timeout. I think that's still useful, e.g. a server that has 3 decaton subscriptions could call initShutdown() on each of them before calling awaitShutdown() on each in turn, and even though the worst-case wait time is the same (3 * decaton.shutdown.timeout), that would make for a quicker shutdown most of the time in practice.

@kawamuray
Copy link
Contributor

kawamuray commented Jan 28, 2021

Ok, now I see bit clearer about your point.

if we kept the DecatonTask and its corresponding OffsetAndMetadata closer together

To tell you a bit of history, OOOCC has been the most important core of Decaton and has been evolved its implementation several times in the past to optimize performance because it has tended to be a bottleneck. So the class's responsibility is limited and isolated from other parts of Decaton core with strong intention, to keep it produce optimal performance for the very specific role:

  • Given a sequence of offsets, store it and corresponding state (completed or not)
  • Offset reporting and update for watermark (traversal) is guaranteed to be done by one thread, in serial.
  • There should be no synchronization among any threads involving to change list of offsets or each offset's state.

You can see the previous implementations at here as we keep all of them to track performance comparison when we update the logic.

Maybe something like each ProcessorUnit keeping its own sorted set of OffsetStates that are in progress, and having ProcessorSubscription compute the high watermark at commit time.

That's one possible way doing it.
We can have an another layer at ProcessorUnit that holds the state of offsets that it got and now being processed, but that'll decouple the data structure holding offset states and logic to compute watermark (ProcessorUnit maintains per-subpartition watermark, ProcessorSubscription and its faimily computes partition-level watermark from watermark of subpartitions) which makes it multi-dimensional and bit more complex than as-is.
In addition, the list of offsets is a property belongs a partition, while their states (completed or not) may belong to each subpartition (or processors), which makes me feel it not super optimal but at least correct to hold OOOCC at partition level.

what about looking to combine the calls to context.registerOffset and context.addRequest in ProcessorSubscription#receive?

That's possible, but would that make it really better?
#receive currently does more than just reporting offset to OOOCC and enqueue it to ProcessorUnit's executor. It takes care of offset regression which is detected by report form OOOCC and requires to manipulate ProcessorSubscription level property - contexts -, and filtering keys based on blacklist, which also does not necessarily be done in any lower layer, while the record's offset anyway needs to be completed (that is, reported and marked completed without pushing it into ProcessorUnit queue)

that takes a timeout then I'd expect that to override decaton.shutdown.timeout i.e. decaton.shutdown.timeout is the default value if we don't pass a timeout in code,

Hm, it might depends on interface, but to me an interface like awaitShutdown(long timeout) doesn't feel like it does something more than just observing shutdown status up to timeout milliseconds and then return, while it actually interferes how ProcessorSubscription thread finishes its work.
In addition, it will introduce inconsistent behavior when awaitShutdown is called by 2 threads simultaneously (not likely to happen, but something we should not prohibit?) because they might call it with different timeouts while just the first one is actually regarded.

Alternatively we could not accept any timeout argument and always use decaton.shutdown.timeout. I think that's still useful, e.g. a server that has 3 decaton subscriptions could call initShutdown() on each of them before calling awaitShutdown() on each in turn, and even though the worst-case wait time is the same (3 * decaton.shutdown.timeout), that would make for a quicker shutdown most of the time in practice.

Yeah, that would make more sense. For that usage, what about adding an interface and implement in ProcessorSubscription like below?

public CompletableFuture<Void> initShutdown();
  • The user can do whatever they want with the CF. e.g, call get() on it with/without timeout, or pass it to their own async processing framework.
  • It is clear that ProcessorSubscription returns CF that just completes upon it completes shutdown. The timeout the user may give, to get() is clearly not regarded by ProcessorSubscription so it is clear that the CF completes at the time ProcessorSubscription finishes shutdown, rather than to do something with the timeout.

@m50d
Copy link
Contributor Author

m50d commented Jan 29, 2021

That's possible, but would that make it really better?
#receive currently does more than just reporting offset to OOOCC and enqueue it to ProcessorUnit's executor. It takes care of offset regression which is detected by report form OOOCC and requires to manipulate ProcessorSubscription level property - contexts -, and filtering keys based on blacklist, which also does not necessarily be done in any lower layer, while the record's offset anyway needs to be completed (that is, reported and marked completed without pushing it into ProcessorUnit queue)

#receive definitely has some things that rightfully belong there, but AFAICS offsetCompletion shouldn't really appear in that method at all - it takes it from context and then passes it straight back to context? I don't know, it's out of scope for this task anyway, but keeping track of where the DeferredCompletion came from and went to was a part I found hard to understand, so if there's any way to improve that I think it'd be helpful. (Relatedly: would it be possible/desirable to use standard Futures in place of DeferredCompletion? AFAICS the functionality is very similar).

that takes a timeout then I'd expect that to override decaton.shutdown.timeout i.e. decaton.shutdown.timeout is the default value if we don't pass a timeout in code,

Hm, it might depends on interface, but to me an interface like awaitShutdown(long timeout) doesn't feel like it does something more than just observing shutdown status up to timeout milliseconds and then return

Yes, that's exactly the intention. All I mean is that close() calls initShutdown() followed by awaitShutdown(decaton.shutdown.timeout), and/or awaitShutdown() calls awaitShutdown(decaton.shutdown.timeout).

In addition, it will introduce inconsistent behavior when awaitShutdown is called by 2 threads simultaneously (not likely to happen, but something we should not prohibit?) because they might call it with different timeouts while just the first one is actually regarded.

That's not the intention - if two threads call awaitShutdown then each one's timeout is respected for that thread. Doesn't this impl work like that? (The two threads will both be calling updateHighWatermarks(), so it's not completely true that each one is "just observing", but that's an implementation detail; the semantics should be the same).

It is clear that ProcessorSubscription returns CF that just completes upon it completes shutdown. The timeout the user may give, to get() is clearly not regarded by ProcessorSubscription so it is clear that the CF completes at the time ProcessorSubscription finishes shutdown, rather than to do something with the timeout.

I'm modelling the behaviour on ExecutorService: initShutdown should work like ExecutorService#shutdown and awaitShutdown should work like ExecutorService#awaitTermination i.e. it returns either when everything has finished shutting down or when the timeout is reached. I was under the impression that was a reasonably standard approach, though looking around it seems like the future-based approach is becoming more common as well. However I think in this case returning a Future would be more difficult, because it means we don't have a thread available to poll with and therefore there's nothing to call updateHighWatermarks()?

@kawamuray
Copy link
Contributor

kawamuray commented Feb 2, 2021

it takes it from context and then passes it straight back to context? I

hm, that's a good point.
Since all offsets needs to be managed by OOOCC, we can't have a separate processing path to mark a offset which is filtered out by blacklistted filter.
One possible way to address this is to complete the current logic in receive at PartitionContext level, but that will compromise to bring blacklistted filter which is not a business of per-partition layer into PartitionContext, and also makes it necessary to change the way to re-create PartitionContext from object swapping to internal reset of the fields (or to have another layer) as it doesn't makes sense the PartitionContext to touch Map<TopicPartition, PartitionContext>, which I dislike.

Relatedly: would it be possible/desirable to use standard Futures in place of DeferredCompletion? AFAICS the functionality is very similar

Yeah, originally I was thinking to add more methods to DeferredCompletion to support various ways to finish processing, like completion.fail(), completion.failWithReason(xxx), completion.abortSubscription(), which I'm now now thinking isn't immediately required by the most of processors.
At the same time, I'm not so motivated to switch it to CF with breaking API compatibility since the current interface just works.

Yes, that's exactly the intention. All I mean is that close() calls initShutdown() followed by awaitShutdown(decaton.shutdown.timeout), and/or awaitShutdown() calls awaitShutdown(decaton.shutdown.timeout).
That's not the intention - if two threads call awaitShutdown then each one's timeout is respected for that thread. Doesn't this impl work like that? (The two threads will both be calling updateHighWatermarks(), so it's not completely true that each one is "just observing", but that's an implementation detail; the semantics should be the same).

Okay, I didn't imagine that.
In that case, how does ProcessorSubscription#run behaves and who will take care of committing the lastly completed offset by calling consumer.commitSync()?

Besides, calling updateHighwatermark from arbitrary thread is an idea which is very hard to accept. As you might seen in the history, OOOCC is designed with very concrete threading model, that is there is always just 1 thread calling reportFetchedOffset and updateHighwatermark. Currently they are marked as synchronized, but that doesn't actually mean they are fine to be called by multiple threads. The intention is to just mark them as a critical section while we expect it to be always accssed by consumer loop thread only (the cost of synchronize modifier is trivial AFAIK when there is actually no race with other threads). With the current implementation, even if we let some threads to call it, works still well, but I don't want to add that dependency from public interface to OOOCC which we might change the implementation in the future and could be a blocker to change its implementation persuading higher performance.
So to implement a shutdown interface that observes completion of processor subscription, I think we should implement it simply as an observer of ProcessorSubscription thread, not by letting it to cause any side effect.
(another important point might that a KafkaConsumer isn't thread safe and it needs to be touched only by an owning thread)

However I think in this case returning a Future would be more difficult, because it means we don't have a thread available to poll with and therefore there's nothing to call updateHighWatermarks()?

is that so? I was imagining something like this:

class ProcessorSubscription {
   private CompletableFuture<Void> shutdownFuture;

   public run() {
       finally { shutdownFuture.complete(); };
   }

   public CompletableFuture<Void> initShutdown() {
       shutdownFlag = true;
       return shutdownFuture;
   } 
}

@m50d
Copy link
Contributor Author

m50d commented Feb 3, 2021

Since all offsets needs to be managed by OOOCC, we can't have a separate processing path to mark a offset which is filtered out by blacklistted filter.

At the same time, I'm not so motivated to switch it to CF with breaking API compatibility since the current interface just works.

Makes sense. Thanks for explaining.

Besides, calling updateHighwatermark from arbitrary thread is an idea which is very hard to accept. As you might seen in the history, OOOCC is designed with very concrete threading model, that is there is always just 1 thread calling reportFetchedOffset and updateHighwatermark. Currently they are marked as synchronized, but that doesn't actually mean they are fine to be called by multiple threads. The intention is to just mark them as a critical section while we expect it to be always accssed by consumer loop thread only (the cost of synchronize modifier is trivial AFAIK when there is actually no race with other threads).

Ok, I misunderstood this; I wanted to reuse the same "shutdown" code between what ProcessorSubscription does at the end of the thread and if we call shutdown from outside, but if shutdown always needs to be done by the thread itself then that doesn't work. Let me rework it along the lines you're saying.

(Side point: I also found it a bit confusing that ProcessorSubscription is both a thread and an object with methods that might be called from other threads - again out of scope for this issue, but would it make sense to separate those?)

@m50d m50d force-pushed the async-termination branch from 5f2f275 to 54b1b46 Compare February 3, 2021 06:59
@m50d
Copy link
Contributor Author

m50d commented Feb 3, 2021

With the idea that the updateHighWatermarks has to be called from the ProcessorSubscription thread only, it looks like there's an embarrassingly simple way to achieve what I originally wanted. WDYT? I haven't pursued the Future implementation we were talking about because ProcessorSubscription is already AsyncShutdownable which is good enough for my current use cases; this shouldn't make it any harder to offer that Future-based API in the future though.

@m50d m50d requested a review from kawamuray February 3, 2021 07:03
@kawamuray
Copy link
Contributor

kawamuray commented Feb 4, 2021

(Side point: I also found it a bit confusing that ProcessorSubscription is both a thread and an object with methods that might be called from other threads - again out of scope for this issue, but would it make sense to separate those?)

Not sure. The original intention to made it a Thread, is to enable running it independently without needing to get any "runtime" from users (e.g, ExecutorService). Which is good so far because most users using it by calling .buildAndStart() of the SubscriptionBuilder. Still some users prefers to run it through their own thread executor can do that, which is just fine?

With the idea that the updateHighWatermarks has to be called from the ProcessorSubscription thread only, it looks like there's an embarrassingly simple way to achieve what I originally wanted. WDYT? I haven't pursued the Future implementation we were talking about because ProcessorSubscription is already AsyncShutdownable which is good enough for my current use cases; this shouldn't make it any harder to offer that Future-based API in the future though.

Hm, while I still CF approach would the most convenient and understandable for users, I think AsyncShutdownable interfaces is also fine, primarily because I turn out that while AsyncShutdownable interface is private for now, initiateShutodwn and awaitShutdown are both public API as users can access it just directly through ProcessorSubscription w/o needing a reference to AsyncSutdownable...

However I think we should enhance it bit by adding a method that takes timeout, like awaitShutdown(Duration timeout) or I don't feel like it is a complete interface because while the intention is to provide asynchronous way to initiate and observe subscription shutdown, users anyway has to call awaitShutdown that they have no idea how long it may blocks.

So we should:

  1. Move AsyncShutdownable's package to make it a public API
  2. add awaitShutdown(Duration timeout)

I think?

@m50d
Copy link
Contributor Author

m50d commented Feb 5, 2021

However I think we should enhance it bit by adding a method that takes timeout, like awaitShutdown(Duration timeout) or I don't feel like it is a complete interface because while the intention is to provide asynchronous way to initiate and observe subscription shutdown, users anyway has to call awaitShutdown that they have no idea how long it may blocks.

So we should:

Move AsyncShutdownable's package to make it a public API
add awaitShutdown(Duration timeout)
I think?

I think that makes sense. Do you want to treat that as a blocker for this PR or push it out as a separate task?

@m50d m50d requested a review from kawamuray February 5, 2021 06:29
@m50d m50d force-pushed the async-termination branch from 54b1b46 to d08c30d Compare February 5, 2021 06:48
@kawamuray
Copy link
Contributor

I think that makes sense. Do you want to treat that as a blocker for this PR or push it out as a separate task?

Might be okay to be done in a separate PR but want it to be done before making a release.

Copy link
Contributor

@kawamuray kawamuray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

public void awaitShutdown() throws InterruptedException {
join();
public boolean awaitShutdown(Duration limit) throws InterruptedException {
join(limit.toMillis());
metrics.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think aliveness check needs to be performed before this because we should not close metrics while its still running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think the right thing is for the thread to call metrics.close() when it's finished everything else, since initiateShutdown() should cause the shutdown to happen and awaitShutdown should observe that process.

metrics.close();
log.info("Subscription thread terminated: {}", getName());
if (isAlive()) {
log.warn("Subscription thread didn't terminate within time limit: {}", getName());
Copy link
Contributor

@kawamuray kawamuray Feb 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I don't agree with writing such log because the caller of awaitShutdown(timeout) may supply short timeout and maybe it is totally normal to end up without completing the shutdown process. It's a matter of caller's, not ours.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to lower it to info, but I think we do want to know which case applied and it's not always clear from outside (we return a boolean but that doesn't necessarily tell us the details of which subpartition didn't terminate).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which subpartition

which subscription you mean?

Can we check "absence" of the below log instead?

log.info("ProcessorSubscription {} terminated in {} ms", scope,

Anyway, my opinion for such logging is that its allowed only when:

  • The interface intends to complete the whole shutdown process within the method call (either with or without timeout). I think awaitShutdown doesn't intend that (it observers shutdown for the given amount of time and just returns the result) or,
  • termination failed with obvious error (e.g, exception)

Otherwise, we're putting assumption on it such that this is the single last invocation from user and they'll leak some resources if we don't complete everything here, such might not be a case, making it inconsistent with interface design I think.

Still fine to have such logging but debug level is appropriate I think? (if the above "absence" check works)

for (ProcessorUnit unit : units) {
unit.awaitShutdown();
final Duration unitLimit = Duration.between(Instant.now(), absLimit);
clean &= unit.awaitShutdown(unitLimit.isNegative() ? Duration.ZERO : unitLimit);
}
Utils.runInParallel(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think here too, if !clean, then we should not proceed to this process I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think this can only be done in blocking close (or indefinite awaitTermination because it has to be done after shutdown success).

public void awaitShutdown() throws InterruptedException {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
public boolean awaitShutdown(Duration limit) throws InterruptedException {
final boolean clean = executor.awaitTermination(limit.toMillis(), TimeUnit.MILLISECONDS);
metrics.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this too.

asyncProcessingStarted.await();
subscription.initiateShutdown();
assertTrue(consumer.committed(singleton(tp)).isEmpty());
letTasksComplete.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is this working as a case for this #79 (comment) ?

I think in this implementation, 2 tasks are first fed into DecatonProcessor anyway (because they returns immediately) and decaton's internal queue quickly becomes empty, so not testing that "decaton processes tasks that are fetched but still in queue at the beginning shutdown" I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first task can't complete until letTasksComplete.countDown(); is called, so the second task will still be queued. I'll add an assert that checks they're still pending at this point?

Copy link
Contributor

@kawamuray kawamuray Feb 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decaton gives the 2nd task to DecatonProcessor#process even when the 1st task hasn't yet completed though?
When a processor uses DeferredCompletion, it is processor's responsibility to serialize execution of multiple tasks with the same key for after #process returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed. I've added blocking in the synchronous part of processing as well so that we test that case.

@m50d m50d requested a review from kawamuray February 10, 2021 02:39
@kawamuray
Copy link
Contributor

kawamuray commented Feb 12, 2021

I think the right thing is for the thread to call metrics.close() when it's finished everything else, since initiateShutdown() should cause the shutdown to happen and awaitShutdown should observe that process.
I think this can only be done in blocking close (or indefinite awaitTermination because it has to be done after shutdown success).

That's why I thought CF style works well for such interface.

I've tried to mockup my idea.

public interface AsyncShutdownable extends AutoCloseable {
    CompletableFuture<Void> beginShutdown();

    /**
     * Shut down, blocking until shutdown is complete
     */
    @Override
    default void close() throws Exception {
        beginShutdown().get();
    }
}


public class ProcessorSubscription extends Thread implements AsyncShutdownable {
...

    public void run() {
...
            termFuture.complete(null);
            updateState(SubscriptionStateListener.State.TERMINATED);
        }
    }

    private final CompletableFuture<Void> termFuture = new CompletableFuture<>();
    private CompletableFuture<Void> shutdownFuture;
    @Override
    public synchronized CompletableFuture<Void> beginShutdown() {
        if (!terminated.get()) {
            shutdownFuture = termFuture.whenComplete((unused, throwable) -> {
                metrics.close();
            });
            terminated.set(true);
        }
        return shutdownFuture;
    }
}


public class PartitionProcessor implements AsyncShutdownable {
...
    private CompletableFuture<Void> shutdownFuture;
    @Override
    public synchronized CompletableFuture<Void> beginShutdown() {
        if (shutdownFuture == null) {
            CompletableFuture[] cfs = new CompletableFuture[units.size()];
            for (ProcessorUnit unit : units) {
                try {
                    cfs[0] = unit.beginShutdown();
                } catch (RuntimeException e) {
                    logger.error("Processor unit threw exception on shutdown", e);
                }
            }
            try {
                rateLimiter.close();
            } catch (Exception e) {
                logger.error("Error thrown while closing rate limiter", e);
            }
            shutdownFuture = CompletableFuture.allOf(cfs).thenCompose(
                    unused -> Utils.runInParallel(
                            "DestroyThreadScopedProcessors",
                            IntStream.range(0, units.size())
                                     .mapToObj(this::destroyThreadProcessorTask)
                                     .collect(toList())));
        }
        return shutdownFuture;
    }
}

By using CF, we can manage multi-step shutdown sequence within single CF instance that is returned by beginShutdown, so users are only required to join on the returne'd CF's complete, while beginShutdown can be called arbitrary number of times by just returning a single CF.
A step that depends on the previous step's to complete, such that metrics.close() after ProcessorSubscription's shutdown, can be represented and is guaranteed to be executed upon ProcessorSubscription#run's end.

However, it turns out one issue while implementing it for ProcessorUnit. To implement in this model, we need someone to complete CF upon finishing some work that we're awaiting for, and ExecutorService currently has no interface for that, despite it is actually a representation of asynchronous work that we may want to synchronize for its completion (termination).
One possible workaround is to giveup calling awaitTermination and use the last executed task as the indicator of termination, which is actually valid, because calling awaitTermination of ExecutorService is actually optional, and still some resources it holds (threads) are guaranteed to be released after some moment, just that it doesn't feel 100% clean.

public class ProcessorUnit implements AsyncShutdownable {
...
    private CompletableFuture<Void> shutdownFuture;

    @Override
    public synchronized CompletableFuture<Void> beginShutdown() {
        if (shutdownFuture == null) {
            terminated = true;
            pipeline.close();

            CompletableFuture<Void> executorShutdown = new CompletableFuture<>();
            shutdownFuture = executorShutdown.whenComplete((unused, throwable) -> metrics.close());
            executor.execute(() -> executorShutdown.complete(null));
            executor.shutdown();
        }
        return shutdownFuture;
    }
}

I'm not sure if that way best suites for us because it cerintaly introduces some complexiity, espcially for unpredictability about who's calling shutdown method of an object (e.g, who's calling metrics.close() on ProcessorSubscription? well ProcessorSubscription thread does, but that's not straightly readable from code), but one thing I'm very sure is that we should not assume awaitTermination(timeout) to always called with sufficient long timeout and only once.

Maybe the name like pollTermination() or tryCompleteShutdown(timeout) would be much correct for the responsibility of awaitTermination, but anyway, as long as we proceed with the current model, it cannot be just an "obeserver" implementation wise (because we need some thread to give their time for proceeding subsequent stages after completing the previous stage), but still it should be seen as an observer method from the user's perspective by not letting the timeout argument to interfere its behavior (e.g, close metrics.close() after 100ms(=timeout argument) despite it knows the ProcessorSubscription still runs).

Copy link
Contributor

@kawamuray kawamuray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments.

@m50d
Copy link
Contributor Author

m50d commented Feb 12, 2021

However, it turns out one issue while implementing it for ProcessorUnit. To implement in this model, we need someone to complete CF upon finishing some work that we're awaiting for, and ExecutorService currently has no interface for that, despite it is actually a representation of asynchronous work that we may want to synchronize for its completion (termination).
One possible workaround is to giveup calling awaitTermination and use the last executed task as the indicator of termination, which is actually valid, because calling awaitTermination of ExecutorService is actually optional, and still some resources it holds (threads) are guaranteed to be released after some moment, just that it doesn't feel 100% clean.

Yeah. I tried to stick to mirroring what ExecutorService does and I couldn't see a way to make that apply to the CF approach. I agree that the CF approach is potentially nicer but also more complex. I don't think doing it this way closes off the possibility of doing it that way in the future (because we'll always want to offer an awaitTermination style interface, if we ultimately go the CF route we can just have awaitTermination(limit) delegate to terminationFuture().get(limit)).

one thing I'm very sure is that we should not assume awaitTermination(timeout) to always called with sufficient long timeout and only once.

Maybe the name like pollTermination() or tryCompleteShutdown(timeout) would be much correct for the responsibility of awaitTermination, but anyway, as long as we proceed with the current model, it cannot be just an "obeserver" implementation wise (because we need some thread to give their time for proceeding subsequent stages after completing the previous stage), but still it should be seen as an observer method from the user's perspective by not letting the timeout argument to interfere its behavior (e.g, close metrics.close() after 100ms(=timeout argument) despite it knows the ProcessorSubscription still runs).

Agreed, and that's how I've implemented it in the current version of this branch - PartitionProcessor#awaitTermination only observes, and there's some extra logic in PartitionProcessor#close that does the necessary cleanup after termination has been observed to finish. The price of that is that calling PartitionProcessor#initiateShutdown alone won't result in a clean shutdown, but PartitionProcessor is an internal class and I've put a warning comment on that method; I think that's the least-bad way forward for now (and I'll emphasise again that this doesn't close off moving to the CF you've outlined in the future).

@m50d m50d requested a review from kawamuray February 12, 2021 06:02
@ocadaruma
Copy link
Contributor

ocadaruma commented Feb 12, 2021

Maybe bit late, but let me confirm about my understanding about the initial intention and current patch.

  • I read that the intentions of this patch are below two points, right?
    • To await currently queued tasks up to ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS upon shutdown, to minimize process duplicates and not to try continue processing after processors's are destroyed (which could cause unwanted ERROR logs)
    • Expose initiateShutdown(), to make it able to close multiple subscriptions in parallel

Then, I still don't get why we should have awaitShutdown(Duration limit), which blocks the caller thread up to passed timeout but it's not guaranteed that the ProcessorSubscription is properly closed after it returns. (so we may have to close() anyways)

And, I'm also wondering the reason to add awaitShutdown(Duration limit) to AsyncShutdownable, which doesn't make a sense to implement in other classes than ProcessorSubscription to achieve the purpose of this PR.

So, I think we should simply do like following:

  • Move AsyncShutdownable package to make it Public API
    • So that users can use initiateShutdown()
  • Make ProcessorSubscription to wait pending tasks up to CONFIG_SHUTDOWN_TIMEOUT_MS

What do you think?
(If I just overlooked some discussions so far, please tell me the link for it. I'll read through it again)

@m50d
Copy link
Contributor Author

m50d commented Feb 15, 2021

Then, I still don't get why we should have awaitShutdown(Duration limit), which blocks the caller thread up to passed timeout but it's not guaranteed that the ProcessorSubscription is properly closed after it returns. (so we may have to close() anyways)

With this change ProcessorSubscription may take a long time to close, and some callers might not be able to tolerate that (we're a library so we don't control all the use cases). awaitShutdown is for callers who need to finish within a given time limit even if that means not completing a graceful shutdown. It's the same API that ExecutorService offers.

As-is: close() will always return quickly.
To-be: close() will perform graceful shutdown, callers that are time-bound can call awaitShutdown(Duration limit).

If we just change the behaviour of close(), there's no API that callers with a time requirement can use. (You could say those callers should just decaton.shutdown.timeout to match their time requirement, but imagine a use case like: try to shut down these 10 decaton subscriptions gracefully if possible, but definitely return within 30 seconds whatever happens. It's very difficult to implement that without some API like this).

And, I'm also wondering the reason to add awaitShutdown(Duration limit) to AsyncShutdownable, which doesn't make a sense to implement in other classes than ProcessorSubscription to achieve the purpose of this PR.

To implement a "time-bounded await shutdown" for ProcessorSubscription we'll need to implement some kind of "time-bounded await" in PartitionProcessor, and to implement it there we need "time-bounded await" in ProcessorUnit. So I think it makes sense to include it in AsyncShutdownable.

@ocadaruma
Copy link
Contributor

awaitShutdown is for callers who need to finish within a given time limit even if that means not completing a graceful shutdown

I see, thanks for the explanation.
Sounds makes sense, but I'm wondering if it's good to introduce such kind of bounded await method over a complexity (not only in Decaton's internal implementation, but also for API such that awaitTermination(timeout) tries to await and close up to specified timeout. returning false means the timeout has reached, so some resources might not be cleaned up. close() is a shorthand for initiateShutdown() => awaitTermination(INFINITE)), because for such use case, users can implement their own simple timeout layer on top of ExecutorService.

As the summary, my thoughts about this PR:

  • Wait for queued tasks up to CONFIG_SHUTDOWN_TIMEOUT_MS upon shutdown: This is actually necessary to be provided by Decaton to reduce duplicates on shutdown
  • Providing awaitTermination(timeout): Not sure how it's helpful (over a complexity). As I described above.

we'll need to implement some kind of "time-bounded await" in PartitionProcessor

Hm, I'm not sure we need to implement bounded ver of PartitionProcessor, ProcessorUnit because we can simply wait ProcessorSubscription's termination by join(timeout) (as you implemented. by the way, currently PartitionProcessors's bounded awaitTermination is never used?).

And, for ProcessorUnit, if a task being processed at shutdown takes long time to process (it's possible because users can implement arbitrary logic in DecatonProcessor), it could block regardless how we implement ProcessorUnit#awaitTermination(timeout) and in case we will return without executing some "clean-up"s, so I'm not sure if it's meaningful than just waiting at top-level ProcessorSubscription#join(timeout)

@m50d
Copy link
Contributor Author

m50d commented Feb 16, 2021

Sounds makes sense, but I'm wondering if it's good to introduce such kind of bounded await method over a complexity (not only in Decaton's internal implementation, but also for API such that awaitTermination(timeout) tries to await and close up to specified timeout. returning false means the timeout has reached, so some resources might not be cleaned up. close() is a shorthand for initiateShutdown() => awaitTermination(INFINITE)), because for such use case, users can implement their own simple timeout layer on top of ExecutorService.

It's possible, but it's very rare for users to do this; it means you have to spawn a thread during shutdown to implement the timeout, and then interrupt your shutdown thread, which is complex to reason about. I'd say that in a Java library it's normal good practice that whenever you implement a method that could block indefinitely, you offer a variant that takes a time limit as well, like with ExecutorService#awaitTermination but also e.g. BlockingQueue#offer, or Thread#join as you mentioned.

Like I said to @kawamuray in my earlier comment #79 (comment) I think it's a logically separate change, so I'd be happy to split it into a separate PR. But I do think it makes sense.

Hm, I'm not sure we need to implement bounded ver of PartitionProcessor, ProcessorUnit because we can simply wait ProcessorSubscription's termination by join(timeout) (as you implemented. by the way, currently PartitionProcessors's bounded awaitTermination is never used?).

Ah yeah, that's true, my mistake. Still, the current implementation works by having a polling loop in ProcessorSubscription which is not the ideal design - it we ever want to reimplement that in a non-polling way (which I think is desirable) then we would need an API like that on PartitionProcessor.

And, for ProcessorUnit, if a task being processed at shutdown takes long time to process (it's possible because users can implement arbitrary logic in DecatonProcessor), it could block regardless how we implement ProcessorUnit#awaitTermination(timeout) and in case we will return without executing some "clean-up"s, so I'm not sure if it's meaningful than just waiting at top-level ProcessorSubscription#join(timeout)

True in the current implementation - ideally I'd like to implement asynchronous processors by returning a Future that supports cancel() (and get with a timeout), and then we could call cancel() if we hit the timeout. I agree that ultimately the current implementation doesn't behave significantly better than just joining on the thread. But I think it's worth trying to follow best practices where we can, even if we can't get the benefits yet without more changes elsewhere in the system.

@kawamuray
Copy link
Contributor

kawamuray commented Feb 16, 2021

H-m this discussion's getting bit hard to follow all the different ideas and their intentions (I didn't expected the way to implement async shutdown interface to get such controversial :p)

IMO, the interface that provides "initiate shutdown sequence and asynchronously observe the completion" must:

  • have an interface to flag the start of shutdown w/o blocking the caller thread
  • have an interface to observe the progress of shutdown for arbitrary duration
    • the above interface can be called arbitrary times until it tells the shutdown as completed
  • not require any subsequent operation to finalize the shutdown sequence after the above interface tells the user that the shutdown sequence has completed

Let me try to summarize my opinion for each proposed ideas so that it might be more clear for you guys about what I'm thinking and preferring. If you guys don't mind, can you please follow the same style and summarize pros/cons for each ideas from your point of view (especially wants to hear cons you think for Idea 1 and 2) so I might be able to understand your intention better (sorry if I missed any idea, please complement in case).

Idea 1: Use CompletableFuture

The one I showed POC in this comment. Please also see #89 for full POC.

Pros:

  • Provides highest flexibility and understandability(IMO) of the interface for users. CompletableFuture has been used as a standard way to represent asynchronously computed (determined) value in modern Java, and having access to it users can wait on its completion with or without timeout, setup hook upon completion, chain its completion for subsequent operations as necessary. (wrt this part, I'm having hard time to understand why @m50d is trying to stick with ExecutorService-style API, which I never felt a good interface)
  • We can use it's CompletationStages to represent multiple steps that is required in most of shutdown sequences well and intuitively. Please see: this part for example.
  • We can use the "completer" (who completes CF)'s thread time to proceed the subsequent cleanup operations without needing to prepare another execution context (e.g, ExecutorService) manually. Please see this example if we don't take this approach, we need to use a separately prepared executor just like it currently implemented.

Cons:

  • Complex? (I thought it's gonna be bit complex, but after implementing POC, now I'm not sure what part is actually complex. Can you point out which part becomes more complicated referring my POC please?)
  • Each class needs to hold a mutable CompletableFuture field which is a bit of boilerplate, but that often replaces necessity of terminated boolean field, so not that bad IMO.

Idea 2: Use AsyncShutdownable as-is, but with adding awaitShutdown(timeout)

Pros:

  • Relatively less changes
  • Can keep API compatibility

Cons:

  • awaitShutdown() methods will be doing synchronous part of subsequent shutdown process, making it not actually a pure "observer" but is more a like "procedure to complete shutdown sequence". So it should be named better such as tryCompletableShutdown or so, but doing so will break API compatibility. However, I'm not thinking this as a required step because, definining shutdown protocol to do either one of "call close()" or "call initiateShutdown() and call awaitShutdown() until it returns true" is a reasonable and valid contract (otherwise, it's not guaranteed that is has completed shutdown - natural hun?). Just that while the name implies just observing, it actually does a bit of cleanup work internally using the caller's thread time, which is still not an issue as long as it returns within the given timeout. (in CF approach, the calller thread - calling CF.get() - stay idle vs in this approach, the thread may does some work by itself like metrics.close() - but who cares?)
  • Some awaitShutdown() implementation requires state(progress) management. While the most of process like calling metrics.close() should be safe enough to called multiple times (that's the contract of AutoCloseable#close IIRC), there're some operations which shouldn't be repeated multiple times. Currently there's no many such cases but DestroyThreadScopedProcessors would be one such example.

Idea 3: Use AsyncShutdownable as-is without adding awaitShutdown(timeout)

Pros:

  • No extra change required

Cons:

  • In general, I feel the interface that provides a way to initiate shutdown, but doesn't provide a way to observe its completion asynchronously, is a broken interface. (Yes, I'm thinking the current (in master branch) AsyncShutdownable is a broken interface, and I'd like to fix it before making it a public API, while I can compromise when it's staying private)
  • It covers a very limited usecases for asynchronously shutting down ProcessorSubscription. For example, say the user has subscription A and B, they could initiate shutdown of A and then B simultaneously, but can't observe completion of B until it observes for A. if A does a mess and blocks up to N seconds which is outer, absolute timeout then B will lost its chance to cleanly complete its shtudown sequence, which doesn't sounds valid to me.

Idea 4: Use AsyncShutdownable as-is, but with adding awaitShutdown(timeout) which is just an "observer" (the current state of this branch)

Cons:

  • Cleanup logic needs to be placed in somewhere that where the object's execution contexts ends, (e.g, last part of run() method in ProcessorSubscription), making the cleanup logic cluttered in different places, which is quite difficult to follow.
  • As an interface, AsyncShutdownable requires the caller to call 3 methods when they want asynchronous shutdown, 1. initiateShutdown(), 2. awaitShutdown and 3. close(), which is unncessary complex than it should be. AsyncShutdownable extends AutoCloseable obviously implies that calling initateShutdown() + awaitShutdown() is equivalent to calling close().

Besides the above comments, as @okadaruma briefly mentioned, I think it is an option too to introduce a separate interface from internal-only AsyncShutdownable if its difficult to reach agreement in reasonable amount of time, since that 1. current AsyncShutdownable is sufficient for internal uses and 2. we can implement awaitShutdown(timeout) for ProcessorSubscription relatively easier than other internal classes.

@ocadaruma
Copy link
Contributor

@m50d Thanks for the explanation. I understood the intention, and now things look much reasonable.
@kawamuray Thanks for summarizing the ideas and PoC. That helped to cleanup my thoughts as well.

To provide "initiate shutdown" I/F, I vote for idea 1 for almost same reasons as you described.
Additional pros/cons:

Idea 1: Use CompletableFuture

Cons:

  • It will be "breaking change" although I think almost no one is using ProcessorSubscription#initiateShutdown(),awaitShutdown() currently (since they're internal AsyncShutdownable methods)
    • This is not a big cons though, we should avoid exposing internal interface's methods as public API (by implementing it in "public" classes) in the future.

Idea 2: Use AsyncShutdownable as-is, but with adding awaitShutdown(timeout)

Cons:

  • idempotency is NOT the contract of AutoCloseable#close, so it actually needs state management.
    • image

@m50d
Copy link
Contributor Author

m50d commented Feb 17, 2021

I'm not actively against the CompletableFuture approach, I just a) am not familiar with this as a technique for close/shutdown management b) assumed (perhaps wrongly) that since ultimately we need to shutdown an ExecutorService at the lowest level, keeping the same API at every level would be easiest to implement.

I don't really see this as a choice between alternatives so much as a series of steps: A) make close() wait B) offer non-blocking API for closing C) offer time-bounded version of B D) offer async version of B/C. I don't think the current version of this PR (stage C) represents a choice not to do D - this PR does expose the AsyncShutdownable interface, but we can implement that interface fine on top of CompletableFuture, so I don't think it's a real negative.

My big concern about a CF-style API was that it didn't play nice with shutting down ExecutorService - the "last task completes the future" trick feels fragile (it will break if executor is ever not single-threaded). But the current version of this PR also relies on that trick, so I guess it comes out the same thing. I'll try implementing something on those lines.

@kawamuray
Copy link
Contributor

Thanks for the follow-up.

My big concern about a CF-style API was that it didn't play nice with shutting down ExecutorService - the "last task completes the future" trick feels fragile (it will break if executor is ever not single-threaded).

True, but it's not likely happen in case of ProcessorUnit because ProcessorUnit itself represents subpartition, which is a representation of single-threaded sequential execution unit. So we will never likely gonna configure it to have multiple threads. However in general, this tricks might not always applicable for executors.

Copy link
Contributor

@kawamuray kawamuray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncShutdownable is still bit confusing interface due to co-existence of CF style and existing style APIs + necessity of calling initiateShutdown even though we just want to kick of the shutdown and get the CF back. However it might be a good compromise between compatibility and fulfilling required functionalities at this moment. We might gonna have a chance to reorganize these interfaces and internals with some breaking changes (deprecation) at the time we release the next major version.

LGTM, thanks 👍

@kawamuray kawamuray added the new feature Add a new feature label Feb 18, 2021
Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reworking.
Added a comment.

@@ -83,13 +91,15 @@ private void processTask(TaskRequest request) {
@Override
public void initiateShutdown() {
terminated = true;
pipeline.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should close pipeline at initiateShutdown, to avoid causing unnecessary delay on shutdown.
Current (and expected) behavior:

1. ProcessorUnit#initiateShutdown()
2. ProcessPipeline#close()
  - pipeline's termination flag is on, so processing further tasks will be skipped (return immediately without completing DeferredCompletion)
  - ExecutionScheduler also closed, so sleep() returns immediately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this earlier on this PR - if we skip further tasks then in general (with out-of-order commit) some tasks will have completed but their offsets will not be able to be committed. With this implementation we treat it similarly to rebalance (so max.poll.records needs to be small enough to be processed within the time limit - but that's already the case when we rebalance).

Copy link
Contributor

@ocadaruma ocadaruma Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the discussion is this, right?: #79 (comment)

My point is rather different. Current patch is changing the order of calling ProcessPipeline#close() like below:
AS-IS: ProcessorUnit#initiateShutdown() => ProcessPipeline#close() => ProcessorUnit#awaitShutdown()
TO-BE: ProcessorUnit#initiateShutdown() => submit ProcessPipeline#close() to the executor (i.e. processed after all current tasks in the queue finished) => ProcessorUnit#awaitShutdown()

As long as pending tasks are finished within shutdown.timeout.ms, that should be fine.
But thing is different if waitingForRemainingTasks exceeds timeout (e.g. due to long per-task duration or large max.pending.records or both).

By calling ProcessPipeline#close() first, tasks are skipped (i.e. not passed to DecatonProcessor) without completing it so ProcessorUnit#awaitShutdown() returns almost immediately. (except a task which is being processed now)

If we call ProcessPipeline#close() at last, all pending tasks are passed to DecatonProcessor even AFTER shutdown.timeout.ms has passed, which could cause shutdown time unreasonably long.

Expected behavior should be like below:

1. ProcessorSubscription#initiateShutdown()
2. waitRemainingTasks up to shutdown.timeout.ms
3. After shutdown.timeout.ms, if still there are remaining tasks, we proceed shutdown without waiting them (so re-processing likely to be happen)
4. Complete shutdown sequence.

I think that's the reason why @kawamuray calls ProcessPipeline#close() at the time initiating shutdown in his PoC (https://github.com/line/decaton/pull/89/files#diff-505ddd33c135748a163c4146ae63acb3d7c40a754a7f2bae4015afbb2e956c1aR89)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed, sorry for misunderstanding your point. I guess we need to implement something like a shutdownNow() path for forcibly terminating if we've reached the timeout. Will have a look.

Copy link
Contributor

@ocadaruma ocadaruma Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think the thing is such complicated, just calling ProcessPipeline#close() at the beginning of ProcessorUnit#initiateShutdown should be fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have sketched out an implementation but I'm not sure if it's the right approach -PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

H-m, I still don't get the point to add shutdownNow() to AsyncShutdownable and implement it.

What we expect to ProcessorSubscription#close() is "to wait remaining tasks up to shutdown.timeout.ms, after that proceed shutdown sequence then close all processors and other all resources gracefully", rather than providing another path (shutdownNow()) to proceed shutdown forcibly.

So only problem of the patch (at I commented this #79 (comment)) was just a timing to call ProcessPipeline#close() in ProcessorUnit#initiateShutdown(), which should be at right after setting terminate = true.

Copy link
Contributor Author

@m50d m50d Feb 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, yeah, I see - I had forgotten that this isn't implemented by delegation, so ProcessorUnit#initiateShutdown() doesn't get called until after the wait has finished. Will do it that way.

@m50d m50d requested a review from ocadaruma February 19, 2021 10:12
@m50d m50d force-pushed the async-termination branch from 68f5088 to cd84d5e Compare February 22, 2021 02:23
Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new feature Add a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants