Skip to content

Commit

Permalink
Merge pull request #505 from aionnetwork/api_shutdown
Browse files Browse the repository at this point in the history
properly shutdown zmq socket
  • Loading branch information
AionJayT authored Jun 13, 2018
2 parents 96f57a4 + 54270e0 commit 5e3c07b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private List<AionTransaction> castRawTx(List<byte[]> broadCastTx) {

@Override
public void shutDown() {
log.info("BroadcastTxHandler shutdown!");
log.info("BroadcastTxHandler shutting down!");
if (ex != null) {
ex.shutdown();
}
Expand Down
101 changes: 52 additions & 49 deletions modApiServer/src/org/aion/api/server/zmq/ProtocolProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ProtocolProcessor implements Runnable {

private static final long zmqHWM = 100_000;
private static final int SOCKETID_LEN = 5;
private static final int SOCKET_RECV_TIMEOUT = 3000;

public ProtocolProcessor(IHdlr _handler, final CfgApiZmq cfg) {
this.handler = _handler;
Expand All @@ -73,6 +74,8 @@ public ProtocolProcessor(IHdlr _handler, final CfgApiZmq cfg) {

public void shutdown() throws InterruptedException {
handler.shutDown();
shutDown.set(true);
Thread.sleep(SOCKET_RECV_TIMEOUT);
Proxy.shutdown();
}

Expand Down Expand Up @@ -117,9 +120,8 @@ public void run() {
}

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down Sockets...");
LOG.info("Shutting down Zmq sockets...");
}
shutDown.set(true);
// Shutdown HdlrZmq
((HdlrZmq) handler).shutdown();
// Shutdown ZmqSocket
Expand All @@ -133,7 +135,7 @@ public void run() {

ctx.close();
if (LOG.isInfoEnabled()) {
LOG.info("Shutdown Sockets... Done!");
LOG.info("Shutdown Zmq sockets... Done!");
}

} catch (Exception e) {
Expand Down Expand Up @@ -172,7 +174,7 @@ private void eventRun(Context ctx) {
try {
byte[] socketId = ByteBuffer.allocate(5).put(ByteUtil.longToBytes(i), 3, 5).array();
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
}
Expand All @@ -187,8 +189,8 @@ private void eventRun(Context ctx) {
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close eventRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close eventRun sockets...");
}
}

Expand Down Expand Up @@ -228,22 +230,23 @@ private void callbackRun(Context ctx) {
}
try {
sock.send(tps.getSocketId(), ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
}
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close callbackRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close callbackRun sockets...");
}
}

private void workerRun(ZMQ.Context ctx) {
Socket sock = ctx.socket(ZMQ.DEALER);
sock.connect(AION_ZMQ_WK_TH);
sock.setReceiveTimeOut(SOCKET_RECV_TIMEOUT);

while (!shutDown.get()) {
try {
Expand All @@ -253,27 +256,27 @@ private void workerRun(ZMQ.Context ctx) {
}
if (socketId != null && socketId.length == SOCKETID_LEN) {
byte[] req = sock.recv(0);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun reqMsg: [{}]", Hex.toHexString(req));
}
byte[] rsp = ((HdlrZmq) this.handler).process(req, socketId);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun rspMsg: [{}]", Hex.toHexString(rsp));
}
if (req != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun reqMsg: [{}]",
Hex.toHexString(req));
}
byte[] rsp = ((HdlrZmq) this.handler).process(req, socketId);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun rspMsg: [{}]",
Hex.toHexString(rsp));
}

try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.workerRun sock.send exception: " + e.getMessage());
try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.workerRun sock.send exception: " + e
.getMessage());
}
}
}
} else {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.workerRun incorrect socketID [{}]",
socketId == null ? "null" : Hex.toHexString(socketId));
}
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
Expand All @@ -282,14 +285,15 @@ private void workerRun(ZMQ.Context ctx) {
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close workerRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close workerRun sockets...");
}
}

private void hbRun(ZMQ.Context ctx) {
Socket sock = ctx.socket(ZMQ.DEALER);
sock.connect(AION_ZMQ_HB_TH);
sock.setReceiveTimeOut(SOCKET_RECV_TIMEOUT);

while (!shutDown.get()) {
try {
Expand All @@ -299,27 +303,26 @@ private void hbRun(ZMQ.Context ctx) {
}
if (socketId != null && socketId.length == SOCKETID_LEN) {
byte[] req = sock.recv(0);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun reqMsg: [{}]", Hex.toHexString(req));
}
byte[] rsp = ApiUtil.toReturnHeader(JAVAAPI_VAR, Message.Retcode.r_heartbeatReturn_VALUE);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun rspMsg: [{}]", Hex.toHexString(rsp));
}
if (req != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun reqMsg: [{}]", Hex.toHexString(req));
}
byte[] rsp = ApiUtil
.toReturnHeader(JAVAAPI_VAR, Message.Retcode.r_heartbeatReturn_VALUE);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun rspMsg: [{}]", Hex.toHexString(rsp));
}

try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.hbRun sock.send exception: " + e.getMessage());
try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.hbRun sock.send exception: " + e
.getMessage());
}
}
}
} else {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.hbRun incorrect socketID [{}]",
socketId == null ? "null" : Hex.toHexString(socketId));
}
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
Expand All @@ -328,8 +331,8 @@ private void hbRun(ZMQ.Context ctx) {
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close hbRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close hbRun sockets...");
}
}
}
6 changes: 3 additions & 3 deletions modApiServer/src/org/aion/api/server/zmq/Proxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static void proxy(Socket frontend, Socket backend, Socket callback, Socket event
}
}

LOG.info("zmq-proxy thread was interrupted.");
LOG.debug("zmq-proxy thread was interrupted.");
} catch (Exception e) {
LOG.error("aion.api.server.zmq.Proxy exception" + e.getMessage());
}
Expand Down Expand Up @@ -173,10 +173,10 @@ private static boolean msgProcessSend(Socket receiver, Socket sender) {
}

public static void shutdown() throws InterruptedException {
LOG.info("zmq-proxy thread shuting down...");
LOG.debug("zmq-proxy thread shutting down...");
shutDown.set(true);

LOG.info("waiting zmq-proxy thread shutdown");
LOG.debug("waiting zmq-proxy thread shutdown");
Thread.sleep(3000);
}
}

0 comments on commit 5e3c07b

Please sign in to comment.