Skip to content

Commit

Permalink
Remove onUnhandled in CustomIOHandler (#27372)
Browse files Browse the repository at this point in the history
* Remove onUnhandled in CustomIOHandler

* Renaming CustomIOHandler to TransportHandler. No longer extending from IOHandler.

* Adding TransportHandlerTest

* Fix typo.
  • Loading branch information
conniey authored Mar 9, 2022
1 parent c936a77 commit e519fff
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 21 deletions.
1 change: 1 addition & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Bugs Fixed

- Fixed issue where EndpointStates were not emitted serially. ([#24762](https://github.com/Azure/azure-sdk-for-java/issues/24762))
- Removing CustomIOHandler.onUnhandled which listens to every proton-j reactor event that could cause excessive logging. The underlying library could encounter `NullPointerException` if the selector is null.

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.implementation.handler.CustomIOHandler;
import com.azure.core.amqp.implementation.handler.ReactorHandler;
import com.azure.core.amqp.implementation.handler.TransportHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.reactor.Reactor;
Expand Down Expand Up @@ -39,7 +39,7 @@ public ReactorDispatcher getReactorDispatcher() {
* @throws IOException If the service could not create a Reactor instance.
*/
public Reactor createReactor(String connectionId, int maxFrameSize) throws IOException {
final CustomIOHandler globalHandler = new CustomIOHandler(connectionId);
final TransportHandler transportHandler = new TransportHandler(connectionId);
final ReactorHandler reactorHandler = new ReactorHandler(connectionId);

synchronized (lock) {
Expand All @@ -55,8 +55,8 @@ public Reactor createReactor(String connectionId, int maxFrameSize) throws IOExc
reactorOptions.setMaxFrameSize(maxFrameSize);
reactorOptions.setEnableSaslByDefault(true);

final Reactor reactor = Proton.reactor(reactorOptions, globalHandler, reactorHandler);
reactor.setGlobalHandler(globalHandler);
final Reactor reactor = Proton.reactor(reactorOptions, reactorHandler);
reactor.getGlobalHandler().add(transportHandler);

final Pipe ioSignal = Pipe.open();
final ReactorDispatcher dispatcher = new ReactorDispatcher(connectionId, reactor, ioSignal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public int getMaxFrameSize() {
*/
protected void addTransportLayers(Event event, TransportInternal transport) {
// default connection idle timeout is 0.
// Giving it a idle timeout will enable the client side to know broken connection faster.
// Giving it an idle timeout will enable the client side to know broken connection faster.
// Refer to http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-doc-idle-time-out
transport.setIdleTimeout(CONNECTION_IDLE_TIMEOUT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
package com.azure.core.amqp.implementation.handler;

import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.impl.IOHandler;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;

public class CustomIOHandler extends IOHandler {
/**
* Handles transport related events.
*/
public class TransportHandler extends BaseHandler {
private final ClientLogger logger;

public CustomIOHandler(final String connectionId) {
this.logger = new ClientLogger(CustomIOHandler.class, createContextWithConnectionId(connectionId));
public TransportHandler(final String connectionId) {
this.logger = new ClientLogger(TransportHandler.class, createContextWithConnectionId(connectionId));
}

@Override
Expand All @@ -29,20 +32,10 @@ public void onTransportClosed(Event event) {
.addKeyValue(HOSTNAME_KEY, connection != null ? connection.getHostname() : NOT_APPLICABLE)
.log("onTransportClosed");

// connection.getTransport returns null if already unbound.
// We need to unbind the transport so that we do not leak memory.
if (transport != null && connection != null && connection.getTransport() != null) {
transport.unbind();
}
}

@Override
public void onUnhandled(Event event) {
// logger.verbose("Unhandled event: {}, {}", event.getEventType(), event);

// There appears to be some race condition where it's possible to get a null selector key in proton-j.
try {
super.onUnhandled(event);
} catch (NullPointerException e) {
logger.error("Exception occurred when handling event in super.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation.handler;

import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Transport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests transport handler.
*/
public class TransportHandlerTest {
@Mock
private Transport transport;
@Mock
private Connection connection;
@Mock
private Event event;

private AutoCloseable autoCloseable;

@BeforeEach
public void beforeEach() {
autoCloseable = MockitoAnnotations.openMocks(this);
}

@AfterEach
public void afterEach() throws Exception {
Mockito.framework().clearInlineMock(this);

if (autoCloseable != null) {
autoCloseable.close();
}
}

/**
* Unbinds transport if there is one associated with the connection.
*/
@Test
public void unbindsTransport() {
// Arrange
when(event.getConnection()).thenReturn(connection);
when(event.getTransport()).thenReturn(transport);

when(connection.getTransport()).thenReturn(transport);

final TransportHandler handler = new TransportHandler("name");

// Act
handler.onTransportClosed(event);

// Assert
verify(transport).unbind();
}

@Test
public void noTransport() {
// Arrange
when(event.getTransport()).thenReturn(transport);
when(event.getConnection()).thenReturn(connection);

final TransportHandler handler = new TransportHandler("name");

// Act
handler.onTransportClosed(event);

// Assert
verify(transport, never()).unbind();
}
}

0 comments on commit e519fff

Please sign in to comment.