Remove redundant delaySubscription
from FunctionConfiguration
#2978
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Related to: spring-projects/spring-integration#9362
After the fix in Spring Integration: spring-projects/spring-integration@bdcb856 we ended up in a deadlock situation with a
beginPublishingTrigger
in theFunctionConfiguration
used for thedelaySubscription()
on an originalPublisher
. TheFluxMessageChannel
uses its owndelaySubscription()
until the channel has its subscribers. Apparently the logic before was with errors, so theFluxMessageChannel
was marked as active even if its subscriber is not ready yet, leading to famousDispatcher does not have subscribers
error. So, looks like thisbeginPublishingTrigger
was introduced back in days in Spring Cloud Stream to mitigate that situation until we really emit aBindingCreatedEvent
.The deadlock (and the flaw, respectively) is with the
setupBindingTrigger()
method implementation whereFluxMessageChannel
now "really" delays a subscription to the providedPublisher
, therefore not triggering thatMono.create()
fulfilment immediately. TheBindingCreatedEvent
arrives earlier, than we have a subscriber on the channel, buttriggerRef.get()
isnull
, so we don'tsuccess()
it and in the end don't subscribe to an originalPublisher
sincedelaySubscription()
on it is never completed.Since
FunctionConfiguration
fully relies onIntegrationFlow.from(Publisher)
, which ends up with the mentionedFluxMessageChannel.subscribeTo()
and its owndelaySubscription()
(which, in turn, apparently fixed now), we don't need our owndelaySubscription()
any more. Therefore the fix in this PR is to propose to removebeginPublishingTrigger
logic altogether.