Skip to content

Commit

Permalink
Added debug logging as per ticket 34388.
Browse files Browse the repository at this point in the history
Fixed issue with connection being stuck on awaitUninterruptibly() as per fizzed#1
Fixed blocking issue at channel write.
Related issue RestComm/smpp-extensions#28
  • Loading branch information
olenara committed Sep 14, 2017
1 parent 4b8702b commit 6be5875
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 45 deletions.
101 changes: 62 additions & 39 deletions src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,25 @@ public SmppSession bind(SmppSessionConfiguration config, SmppSessionHandler sess
DefaultSmppSession session = null;
try {
// connect to the remote system and create the session
logger.debug("Connecting to remote system " + config.getName() + " host " + config.getHost() + ":" + config.getPort());
session = doOpen(config, sessionHandler);

// try to bind to the remote system (may throw an exception)
logger.debug("Binding to remote system " + config.getName());
doBind(session, config, sessionHandler);

logger.debug("Successfully bound to " + config.getName());
} finally {
// close the session if we weren't able to bind correctly
if (session != null && !session.isBound()) {
// make sure that the resources are always cleaned up
try { session.close(); } catch (Exception e) { }
try {
logger.debug("Closing session - not able to bind to " + config.getName());
session.close();
}
catch (Exception e) {
logger.debug("Exception while trying to close connection to " + config.getName(), e);
}
}
}
return session;
Expand All @@ -212,7 +222,9 @@ protected void doBind(DefaultSmppSession session, SmppSessionConfiguration confi
try {
// attempt to bind to the SMSC
// session implementation handles error checking, version negotiation, and can be discarded

bindResp = session.bind(bindRequest, config.getBindTimeout());
logger.debug("Bound to esme systemId=" + config.getSystemId());
} catch (RecoverablePduException e) {
// if a bind fails, there really is no recovery...
throw new UnrecoverablePduException(e.getMessage(), e);
Expand All @@ -229,19 +241,20 @@ protected DefaultSmppSession doOpen(SmppSessionConfiguration config, SmppSession
protected DefaultSmppSession createSession(Channel channel, SmppSessionConfiguration config, SmppSessionHandler sessionHandler) throws SmppTimeoutException, SmppChannelException, InterruptedException {
DefaultSmppSession session = new DefaultSmppSession(SmppSession.Type.CLIENT, config, channel, sessionHandler, monitorExecutor);

// add SSL handler
logger.debug("Creating session with esme " + config.getSystemId());
// add SSL handler
if (config.isUseSsl()) {
SslConfiguration sslConfig = config.getSslConfiguration();
if (sslConfig == null) throw new IllegalStateException("sslConfiguration must be set");
try {
SslContextFactory factory = new SslContextFactory(sslConfig);
SSLEngine sslEngine = factory.newSslEngine();
sslEngine.setUseClientMode(true);
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_SSL_NAME, new SslHandler(sslEngine));
} catch (Exception e) {
throw new SmppChannelConnectException("Unable to create SSL session]: " + e.getMessage(), e);
}
}
SslConfiguration sslConfig = config.getSslConfiguration();
if (sslConfig == null) throw new IllegalStateException("sslConfiguration must be set");
try {
SslContextFactory factory = new SslContextFactory(sslConfig);
SSLEngine sslEngine = factory.newSslEngine();
sslEngine.setUseClientMode(true);
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_SSL_NAME, new SslHandler(sslEngine));
} catch (Exception e) {
throw new SmppChannelConnectException("Unable to create SSL session]: " + e.getMessage(), e);
}
}

// add the thread renamer portion to the pipeline
if (config.getName() != null) {
Expand All @@ -254,11 +267,11 @@ protected DefaultSmppSession createSession(Channel channel, SmppSessionConfigura
SmppSessionLogger loggingHandler = new SmppSessionLogger(DefaultSmppSession.class.getCanonicalName(), config.getLoggingOptions());
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_LOGGER_NAME, loggingHandler);

// add a writeTimeout handler after the logger
if (config.getWriteTimeout() > 0) {
WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutTimer, config.getWriteTimeout(), TimeUnit.MILLISECONDS);
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_WRITE_TIMEOUT_NAME, writeTimeoutHandler);
}
// add a writeTimeout handler after the logger
if (config.getWriteTimeout() > 0) {
WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutTimer, config.getWriteTimeout(), TimeUnit.MILLISECONDS);
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_WRITE_TIMEOUT_NAME, writeTimeoutHandler);
}

// add a new instance of a decoder (that takes care of handling frames)
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_PDU_DECODER_NAME, new SmppSessionPduDecoder(session.getTranscoder()));
Expand All @@ -275,29 +288,39 @@ protected Channel createConnectedChannel(String host, int port, long connectTime

// set the timeout
this.clientBootstrap.setOption("connectTimeoutMillis", connectTimeoutMillis);

// attempt to connect to the remote system
logger.debug("Attempting to connect to remote system " + host + ":" + port);
ChannelFuture connectFuture = this.clientBootstrap.connect(socketAddr);

// attempt to connect to the remote system
ChannelFuture connectFuture = this.clientBootstrap.connect(socketAddr);

// wait until the connection is made successfully
// boolean timeout = !connectFuture.await(connectTimeoutMillis);
// BAD: using .await(timeout)
// see http://netty.io/3.9/api/org/jboss/netty/channel/ChannelFuture.html
connectFuture.awaitUninterruptibly();
//assert connectFuture.isDone();

if (connectFuture.isCancelled()) {
throw new InterruptedException("connectFuture cancelled by user");
} else if (!connectFuture.isSuccess()) {
if (connectFuture.getCause() instanceof org.jboss.netty.channel.ConnectTimeoutException) {
throw new SmppChannelConnectTimeoutException("Unable to connect to host [" + host + "] and port [" + port + "] within " + connectTimeoutMillis + " ms", connectFuture.getCause());
} else {
throw new SmppChannelConnectException("Unable to connect to host [" + host + "] and port [" + port + "]: " + connectFuture.getCause().getMessage(), connectFuture.getCause());
}
}
/* Wait until the connection is made successfully.
* According to the netty documentation it is bad to use .await(timeout). Instead
* b.setOption("connectTimeoutMillis", 10000);
* should be used. See http://netty.io/3.9/api/org/jboss/netty/channel/ChannelFuture.html
* It turns out that under certain unknown circumstances the connect waits forever: https://github.com/twitter/cloudhopper-smpp/issues/117
* That's why the future is canceled 1 second after the specified timeout.
* This is a workaround and hopefully not needed after the switch to netty 4.
*/
if (!connectFuture.await(connectTimeoutMillis + 1000)) {
logger.error("connectFuture did not finish in expected time! Try to cancel the connectFuture");
boolean isCanceled = connectFuture.cancel();
logger.error("connectFuture: isCanceled {} isDone {} isSuccess {}", isCanceled, connectFuture.isDone(), connectFuture.isSuccess());
throw new SmppChannelConnectTimeoutException("Could not connect to the server within timeout");
}

// if we get here, then we were able to connect and get a channel
return connectFuture.getChannel();
if (connectFuture.isCancelled()) {
throw new InterruptedException("connectFuture cancelled by user");
} else if (!connectFuture.isSuccess()) {
if (connectFuture.getCause() instanceof org.jboss.netty.channel.ConnectTimeoutException) {
throw new SmppChannelConnectTimeoutException("Unable to connect to host [" + host + "] and port [" + port + "] within " + connectTimeoutMillis + " ms", connectFuture.getCause());
} else {
throw new SmppChannelConnectException("Unable to connect to host [" + host + "] and port [" + port + "]: " + connectFuture.getCause().getMessage(), connectFuture.getCause());
}
}

logger.debug("Successfully connected to remote system " + host + ":" + port);
// if we get here, then we were able to connect and get a channel
return connectFuture.getChannel();
}

}
21 changes: 15 additions & 6 deletions src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ protected BaseBindResp bind(BaseBind request, long timeoutInMillis) throws Recov
// bind failed for a specific reason
throw new SmppBindException(bindResponse);
}

// if we make it all the way here, we're good and bound
bound = true;

Expand Down Expand Up @@ -373,7 +372,12 @@ protected BaseBindResp bind(BaseBind request, long timeoutInMillis) throws Recov
setBound();
} else {
// the bind failed, we need to clean up resources
try { this.close(); } catch (Exception e) { }
try {
logger.debug("Bound failed for systemId=" + request.getSystemId());
this.close();
} catch (Exception e) {
logger.debug("Exception after bind failed for systemId=" + request.getSystemId());
}
}
}
}
Expand Down Expand Up @@ -527,8 +531,10 @@ public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pd
}

// write the pdu out & wait timeout amount of time
ChannelFuture channelFuture = this.channel.write(buffer).await();

ChannelFuture channelFuture = this.channel.write(buffer);
if(!channelFuture.await(timeoutMillis))
throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause());

// check if the write was a success
if (!channelFuture.isSuccess()) {
// the write failed, make sure to throw an exception
Expand Down Expand Up @@ -573,8 +579,11 @@ public void sendResponsePdu(PduResponse pdu) throws RecoverablePduException, Unr
}

// write the pdu out & wait timeout amount of time
ChannelFuture channelFuture = this.channel.write(buffer).await();

ChannelFuture channelFuture = this.channel.write(buffer);
//REQUESTED BY SERGEI VETUNIEV FOR TESTING
if(!channelFuture.await(10000))
throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause());

// check if the write was a success
if (!channelFuture.isSuccess()) {
// the write failed, make sure to throw an exception
Expand Down

0 comments on commit 6be5875

Please sign in to comment.