Skip to content

Commit

Permalink
Add log warnings for long running event handling (#39729)
Browse files Browse the repository at this point in the history
Recently we have had a number of test issues related to blocking
activity occuring on the io thread. This commit adds a log warning for
when handling event takes a >150 milliseconds. This is implemented
for the MockNioTransport which is the transport used in
ESIntegTestCase.
  • Loading branch information
Tim-Brooks authored Mar 8, 2019
1 parent 160adf7 commit 11fe52a
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 111 deletions.
35 changes: 22 additions & 13 deletions libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ protected void writeException(SocketChannelContext context, Exception exception)
context.handleException(exception);
}

/**
* This method is called when a task or listener attached to a channel is available to run.
*
* @param task to handle
*/
protected void handleTask(Runnable task) {
task.run();
}

/**
* This method is called when a task or listener attached to a channel operation throws an exception.
*
Expand All @@ -165,7 +174,11 @@ protected void taskException(Exception exception) {
*/
protected void postHandling(SocketChannelContext context) {
if (context.selectorShouldClose()) {
handleClose(context);
try {
handleClose(context);
} catch (IOException e) {
closeException(context, e);
}
} else {
SelectionKey selectionKey = context.getSelectionKey();
boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
Expand Down Expand Up @@ -203,34 +216,30 @@ protected void uncaughtException(Exception exception) {
*
* @param context that should be closed
*/
protected void handleClose(ChannelContext<?> context) {
try {
context.closeFromSelector();
} catch (IOException e) {
closeException(context, e);
}
protected void handleClose(ChannelContext<?> context) throws IOException {
context.closeFromSelector();
assert context.isOpen() == false : "Should always be done as we are on the selector thread";
}

/**
* This method is called when an attempt to close a channel throws an exception.
*
* @param channel that was being closed
* @param context that was being closed
* @param exception that occurred
*/
protected void closeException(ChannelContext<?> channel, Exception exception) {
channel.handleException(exception);
protected void closeException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}

/**
* This method is called when handling an event from a channel fails due to an unexpected exception.
* An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
* {@link java.nio.channels.CancelledKeyException}.
*
* @param channel that caused the exception
* @param context that caused the exception
* @param exception that was thrown
*/
protected void genericChannelException(ChannelContext<?> channel, Exception exception) {
channel.handleException(exception);
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}
}
32 changes: 16 additions & 16 deletions libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,15 @@ void preSelect() {
private void handleScheduledTasks(long nanoTime) {
Runnable task;
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
try {
task.run();
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(task);
}
}

private void handleTask(Runnable task) {
try {
eventHandler.handleTask(task);
} catch (Exception e) {
eventHandler.taskException(e);
}
}

Expand Down Expand Up @@ -353,11 +357,7 @@ public void writeToChannel(WriteOperation writeOperation) {
*/
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(() -> listener.accept(value, null));
}

/**
Expand All @@ -369,11 +369,7 @@ public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
*/
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(() -> listener.accept(null, exception));
}

private void cleanupPendingWrites() {
Expand Down Expand Up @@ -437,7 +433,11 @@ private void setUpNewChannels() {
private void closePendingChannels() {
ChannelContext<?> channelContext;
while ((channelContext = channelsToClose.poll()) != null) {
eventHandler.handleClose(channelContext);
try {
eventHandler.handleClose(channelContext);
} catch (Exception e) {
eventHandler.closeException(channelContext, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.mockito.Matchers.same;
Expand Down Expand Up @@ -243,10 +244,16 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException {
assertEquals(SelectionKey.OP_READ, key.interestOps());
}

public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
RuntimeException listenerException = new RuntimeException();
handler.taskException(listenerException);
verify(genericExceptionHandler).accept(listenerException);
public void testHandleTaskWillRunTask() throws Exception {
AtomicBoolean isRun = new AtomicBoolean(false);
handler.handleTask(() -> isRun.set(true));
assertTrue(isRun.get());
}

public void testTaskExceptionWillCallExceptionHandler() throws Exception {
RuntimeException exception = new RuntimeException();
handler.taskException(exception);
verify(genericExceptionHandler).accept(exception);
}

private class DoNotRegisterSocketContext extends BytesChannelContext {
Expand Down
64 changes: 43 additions & 21 deletions libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -86,6 +87,10 @@ public void setUp() throws Exception {
when(serverChannelContext.isOpen()).thenReturn(true);
when(serverChannelContext.getSelector()).thenReturn(selector);
when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey);
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(eventHandler).handleTask(any());
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand All @@ -102,6 +107,23 @@ public void testQueueChannelForClosed() throws IOException {
verify(eventHandler).handleClose(context);
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void testCloseException() throws IOException {
IOException ioException = new IOException();
NioChannel channel = mock(NioChannel.class);
ChannelContext context = mock(ChannelContext.class);
when(channel.getContext()).thenReturn(context);
when(context.getSelector()).thenReturn(selector);

selector.queueChannelClose(channel);

doThrow(ioException).when(eventHandler).handleClose(context);

selector.singleLoop();

verify(eventHandler).closeException(context, ioException);
}

public void testNioDelayedTasksAreExecuted() throws IOException {
AtomicBoolean isRun = new AtomicBoolean(false);
long nanoTime = System.nanoTime() - 1;
Expand All @@ -113,9 +135,27 @@ public void testNioDelayedTasksAreExecuted() throws IOException {
assertTrue(isRun.get());
}

public void testTaskExceptionsAreHandled() {
RuntimeException taskException = new RuntimeException();
long nanoTime = System.nanoTime() - 1;
Runnable task = () -> {
throw taskException;
};
selector.getTaskScheduler().scheduleAtRelativeTime(task, nanoTime);

doAnswer((a) -> {
task.run();
return null;
}).when(eventHandler).handleTask(same(task));

selector.singleLoop();
verify(eventHandler).taskException(taskException);
}

public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException {
long delay = new TimeValue(15, TimeUnit.MINUTES).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
}, System.nanoTime() + delay);

selector.singleLoop();
verify(rawSelector).select(300);
Expand All @@ -127,7 +167,8 @@ public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception {
assertBusy(() -> {
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
}, System.nanoTime() + delay);
selector.singleLoop();
verify(rawSelector).select(captor.capture());
assertTrue(captor.getValue() > 0);
Expand Down Expand Up @@ -455,23 +496,4 @@ public void testCleanup() throws Exception {
verify(eventHandler).handleClose(channelContext);
verify(eventHandler).handleClose(unregisteredContext);
}

public void testExecuteListenerWillHandleException() throws Exception {
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, null);

selector.executeListener(listener, null);

verify(eventHandler).taskException(exception);
}

public void testExecuteFailedListenerWillHandleException() throws Exception {
IOException ioException = new IOException();
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, ioException);

selector.executeFailedListener(listener, ioException);

verify(eventHandler).taskException(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected void doStart() {
boolean success = false;
try {
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
(s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));

ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");
Expand Down
Loading

0 comments on commit 11fe52a

Please sign in to comment.