Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-16941297: Scatter Gather timeout exception #14192

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5c2b698
handling timeout exception
anandkarandikar Jan 22, 2025
d1b7b3a
Attempt with ExecutorService to process connection closing in the bac…
anandkarandikar Jan 27, 2025
5806098
Remove changes from AbstractEventContext
anandkarandikar Jan 28, 2025
ec561bd
Make ChildEventContext accessible and handle timeout exception in Abs…
anandkarandikar Jan 28, 2025
89d854e
Switch to ioScheduler
anandkarandikar Jan 31, 2025
59dbd95
Change visibility to private for ChildEventContext
anandkarandikar Jan 31, 2025
fce0083
Make AbstractEventContext public
anandkarandikar Jan 31, 2025
7540dc8
Use AbstractEventContext instead of ChildEventContext
anandkarandikar Jan 31, 2025
f3d2532
Remove unused import
anandkarandikar Jan 31, 2025
b1b1390
Handling ClassCastException failure
anandkarandikar Feb 3, 2025
3a90f7d
Merge branch 'master' into fix/W-16941297
anandkarandikar Feb 6, 2025
e66cf94
Add delay to sayMagicWords
anandkarandikar Feb 6, 2025
abadb33
Add tests for Scatter Gather timeout and non timeout scenario
anandkarandikar Feb 7, 2025
835e844
Apply formatter
anandkarandikar Feb 7, 2025
eceee29
Merge branch 'master' into fix/W-16941297
anandkarandikar Feb 7, 2025
8c3e66f
Add assertPayloadIsIteratorProvider check
anandkarandikar Feb 7, 2025
7c2045a
Replace Thread.sleep with CountDownLatch
anandkarandikar Feb 7, 2025
b24c317
Fixing model and schema file for marvel extension
anandkarandikar Feb 7, 2025
82e2d38
Revert the version to @mule.runtime.version@
anandkarandikar Feb 7, 2025
80b3412
Using assertThrows
anandkarandikar Feb 7, 2025
069b02f
Merge branch 'master' into fix/W-16941297
anandkarandikar Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
*
* @since 4.0
*/
abstract class AbstractEventContext implements SpanContextAware, BaseEventContext {
public abstract class AbstractEventContext implements SpanContextAware, BaseEventContext {
anandkarandikar marked this conversation as resolved.
Show resolved Hide resolved

private static final int STATE_READY = 0;
private static final int STATE_RESPONSE_RECEIVED = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void initialise() throws InitialisationException {
if (targetValue != null) {
targetValueExpression = compile(targetValue, expressionManager);
}
timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config()
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need a good justification for this change.
The tasks being submitted to that scheduler are better defined by "CPU light" rather than "I/O intensive". If this change is needed for something to work, we definitely need to understand why, because initially it doesn't make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would having a underlying db connection counted as an I/O category?
We were able to notice that cpuLightScheduler doesn't work when Scatter Gather contains a lot of routes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.

Copy link
Contributor Author

@anandkarandikar anandkarandikar Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's possible. I've been scaling the SG with nested SG that maybe really not the actually customer scenario. Given that the bound is dependent on the # of cores, its possible my laptop wasn't able to handle that many cpu bound tasks.

.withName(this.getClass().getName() + ".timeoutScheduler - " + getLocation().getLocation()));
timeoutErrorType = errorTypeRepository.getErrorType(TIMEOUT).get();
maxConcurrency = maxConcurrency != null ? maxConcurrency : getDefaultMaxConcurrency();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static reactor.core.publisher.Mono.error;
import static reactor.core.publisher.Mono.just;

import org.mule.runtime.api.event.EventContext;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.ItemSequenceInfo;
Expand All @@ -34,6 +35,7 @@
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.event.AbstractEventContext;
import org.mule.runtime.core.internal.event.DefaultEventBuilder;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
Expand Down Expand Up @@ -118,6 +120,7 @@ public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStra
})
.doOnNext(listBooleanPair -> {
if (listBooleanPair.getSecond()) {
handleTimeoutExceptionIfPresent(timeoutScheduler, listBooleanPair.getFirst());
throw propagate(createCompositeRoutingException(listBooleanPair.getFirst().stream()
.map(coreEventExceptionPair -> removeOriginalError(coreEventExceptionPair,
original.getError()))
Expand All @@ -130,6 +133,25 @@ public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStra
};
}

private void handleTimeoutExceptionIfPresent(Scheduler timeoutScheduler,
List<Pair<CoreEvent, EventProcessingException>> listBooleanPair) {
listBooleanPair
.forEach(
pair -> {
final Optional<Error> error = pair.getFirst().getError();
if (error.isPresent() &&
error.get().getCause() instanceof TimeoutException &&
error.get().getCause().getMessage()
.contains(TIMEOUT_EXCEPTION_DETAILED_DESCRIPTION_PREFIX)) {
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
Copy link
Contributor Author

@anandkarandikar anandkarandikar Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite working fine, there were failures in the log

ERROR 2025-01-28 17:26:29,170 [pool-5-thread-2] [processor: ; event: ] org.mule.runtime.core.privileged.processor.MessageProcessors: Uncaught exception in childContextResponseHandler
java.lang.ClassCastException: class java.util.concurrent.TimeoutException cannot be cast to class org.mule.runtime.core.privileged.exception.MessagingException (java.util.concurrent.TimeoutException is in module java.base of loader 'bootstrap'; org.mule.runtime.core.privileged.exception.MessagingException is in module [email protected] of loader jdk.internal.loader.Loader @10cc327a)
	at [email protected]/org.mule.runtime.core.privileged.processor.MessageProcessors.lambda$childContextResponseHandler$14(MessageProcessors.java:582) ~[mule-core-4.9.0-20241025.jar:?]
	at [email protected]/org.mule.runtime.core.internal.event.AbstractEventContext.signalConsumerSilently(AbstractEventContext.java:310) ~[?:?]
	at [email protected]/org.mule.runtime.core.internal.event.AbstractEventContext.receiveResponse(AbstractEventContext.java:210) ~[?:?]
	at [email protected]/org.mule.runtime.core.internal.event.AbstractEventContext.error(AbstractEventContext.java:189) ~[?:?]
	at [email protected]/org.mule.runtime.core.internal.routing.forkjoin.AbstractForkJoinStrategyFactory.lambda$handleTimeoutExceptionIfPresent$6(AbstractForkJoinStrategyFactory.java:173) ~[?:?]

Therefore, creating a MessageException instance

}
}
Comment on lines +146 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about submitting just one task for all the child contexts?
Something like:

Suggested change
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
}
}
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
timeoutScheduler
.submit(() -> ((AbstractEventContext) context).forEachChild(ctx -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried this change with a scale of larger Scatter Gather routes (~70 db select) with the following scheduler combinations:

  • cpuLightScheduler
  • cpuIntensiveScheduler
  • ioScheduler

With each scheduler I am seeing large number of connections (~34) are unclosed.

However with

((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
                           .submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));

it can handle those 70 connections

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to understand why is this happening. How come scheduling 70 different tasks is working but scheduling just one is not... What is it that we don't know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're purely considering the resources used probably nesting of SG routes that I've created may have caused recursive calls of error(...) which also deals with reentrant locks. Not sure if there if anything there, but throwing ideas around.

});
}

private boolean isOriginalError(Error newError, Optional<Error> originalError) {
return originalError.map(error -> error.equals(newError)).orElse(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
*/
package org.mule.test.module.extension.streaming;

import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.mule.runtime.api.util.DataUnit.KB;
Expand All @@ -26,6 +28,7 @@
import org.mule.runtime.api.meta.model.parameter.ParameterModel;
import org.mule.runtime.api.meta.model.parameter.ParameterizedModel;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
Expand Down Expand Up @@ -348,6 +351,33 @@ public void operationCalledAndOutputConsumedMultipleTimes() throws Exception {
assertThat(IOUtils.toString((InputStream) value), is(data));
}

@Test
@Issue("W-16941297")
@Description("Call scatter gather containing route to invoke delayed-say-magic-words operation")
public void scatterGatherWithTimeout() throws Exception {
try {
flowRunner("scatterGatherWithTimeout")
.keepStreamsOpen()
.withPayload(singletonList(data))
.run();
} catch (Exception e) {
assertThat(streamingManager.getStreamingStatistics().getOpenCursorsCount(), is(0));
}
anandkarandikar marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
@Issue("W-16941297")
public void scatterGatherWithGreaterTimeout() throws Exception {
try {
flowRunner("scatterGatherWithGreaterTimeout")
.keepStreamsOpen()
.withPayload(singletonList(data))
.run();
assertThat(streamingManager.getStreamingStatistics().getOpenCursorsCount(), greaterThan(1));
} catch (Exception e) {
}
}

private ParameterModel getStreamingStrategyParameterModel(Supplier<ParameterizedModel> model) {
return model.get().getAllParameterModels().stream()
.filter(p -> p.getName().equals(STREAMING_STRATEGY_PARAMETER_NAME))
Expand Down Expand Up @@ -395,6 +425,15 @@ public CoreEvent process(CoreEvent event) throws MuleException {
}
}

public static class AssertPayloadIsIteratorProvider implements Processor {

@Override
public CoreEvent process(CoreEvent event) throws MuleException {
assertThat(event.getMessage().getPayload().getValue(), instanceOf(CursorIteratorProvider.class));
return event;
}
}

public static class AssertPayloadIsNotStreamProvider implements Processor {

@Override
Expand Down
Loading