Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-16941297: Scatter Gather timeout exception #14192

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

anandkarandikar
Copy link
Contributor

@anandkarandikar anandkarandikar commented Jan 31, 2025

Ticket

W-16941297

Cause

When Scatter Gather times out, the routes that opened db connections, stay open. The clean up is never invoked. Events need to complete in order to be called for their clean up jobs. In customer scenario, <db:select ... /> with a SELECT SLEEP(10) as an Event doesn't complete and thus no disposal of the database connection is called.

Other ideas

StreamingGhostBuster was deemed to have handled this since it's intention is to clean up unclosed stream and its related CursorStreamProvider object. However, in this situation, the reference is a strong reference thus StreamingGhostBuster wouldn't work.

Fix

  • For timeout handling, we are working with the events that timed out and calling .error(...) for those events.
  • Calling the .error(...) without the Scheduler pool caused the Scatter Gather to wait until the longest SLEEP(n) completes.
  • To help alleviate this behavior, this change utilizes timeoutScheduler making the Scatter Gather timeout as expected and present the composite routing exception messages to the user instantly while the SELECT SLEEP(n) query continue to execute. Once those complete, the .error(...) method is called and submitted to timeoutScheduler.
  • The timeoutScheduler that's created as cpuLightScheduler is incapable of handling nested Scatter Gather's or large number of routes. This fix was tested with 70 routes with almost all of them timing out. Changing this to ioScheduler was capable to handling this scaling issue.

Test Coverage

  • Currently there are tests in org/mule/runtime/core/internal/routing/forkjoin that have timeout events being raised.
  • Also leveraging a timeout scenario with Scatter Gather using test-extensions (marvel-extension) to mimic the delayed scenario. This is a similar approach to W-16941297: SG timeout issue mule-integration-tests#2634 but without needing the actual database in the picture, because we need to ensure that the underlying streams are closed.

timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config()
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need a good justification for this change.
The tasks being submitted to that scheduler are better defined by "CPU light" rather than "I/O intensive". If this change is needed for something to work, we definitely need to understand why, because initially it doesn't make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would having a underlying db connection counted as an I/O category?
We were able to notice that cpuLightScheduler doesn't work when Scatter Gather contains a lot of routes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.

Copy link
Contributor Author

@anandkarandikar anandkarandikar Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's possible. I've been scaling the SG with nested SG that maybe really not the actually customer scenario. Given that the bound is dependent on the # of cores, its possible my laptop wasn't able to handle that many cpu bound tasks.

@@ -146,7 +146,7 @@ private void handleTimeoutExceptionIfPresent(Scheduler timeoutScheduler,
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(error.get().getCause())));
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
Copy link
Contributor Author

@anandkarandikar anandkarandikar Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite working fine, there were failures in the log

ERROR 2025-01-28 17:26:29,170 [pool-5-thread-2] [processor: ; event: ] org.mule.runtime.core.privileged.processor.MessageProcessors: Uncaught exception in childContextResponseHandler
java.lang.ClassCastException: class java.util.concurrent.TimeoutException cannot be cast to class org.mule.runtime.core.privileged.exception.MessagingException (java.util.concurrent.TimeoutException is in module java.base of loader 'bootstrap'; org.mule.runtime.core.privileged.exception.MessagingException is in module [email protected] of loader jdk.internal.loader.Loader @10cc327a)
	at [email protected]/org.mule.runtime.core.privileged.processor.MessageProcessors.lambda$childContextResponseHandler$14(MessageProcessors.java:582) ~[mule-core-4.9.0-20241025.jar:?]
	at [email protected]/org.mule.runtime.core.internal.event.AbstractEventContext.signalConsumerSilently(AbstractEventContext.java:310) ~[?:?]
	at [email protected]/org.mule.runtime.core.internal.event.AbstractEventContext.receiveResponse(AbstractEventContext.java:210) ~[?:?]
	at [email protected]/org.mule.runtime.core.internal.event.AbstractEventContext.error(AbstractEventContext.java:189) ~[?:?]
	at [email protected]/org.mule.runtime.core.internal.routing.forkjoin.AbstractForkJoinStrategyFactory.lambda$handleTimeoutExceptionIfPresent$6(AbstractForkJoinStrategyFactory.java:173) ~[?:?]

Therefore, creating a MessageException instance

@anandkarandikar
Copy link
Contributor Author

--validate

Comment on lines +146 to +151
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about submitting just one task for all the child contexts?
Something like:

Suggested change
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
}
}
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
timeoutScheduler
.submit(() -> ((AbstractEventContext) context).forEachChild(ctx -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried this change with a scale of larger Scatter Gather routes (~70 db select) with the following scheduler combinations:

  • cpuLightScheduler
  • cpuIntensiveScheduler
  • ioScheduler

With each scheduler I am seeing large number of connections (~34) are unclosed.

However with

((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
                           .submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));

it can handle those 70 connections

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to understand why is this happening. How come scheduling 70 different tasks is working but scheduling just one is not... What is it that we don't know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're purely considering the resources used probably nesting of SG routes that I've created may have caused recursive calls of error(...) which also deals with reentrant locks. Not sure if there if anything there, but throwing ideas around.

timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config()
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.

@anandkarandikar
Copy link
Contributor Author

--validate

Comment on lines +146 to +151
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to understand why is this happening. How come scheduling 70 different tasks is working but scheduling just one is not... What is it that we don't know?

Comment on lines +120 to +121
CountDownLatch latch = new CountDownLatch(1);
latch.await(delay, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This latch is not useful because nobody is counting down on it... so it is basically the same as a sleep. I gave you some examples on Slack. I know that you are probably working on that right now, but I have to mark this in the review for completeness.

Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants