Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly shutdown zmq socket #505

Merged
merged 1 commit into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private List<AionTransaction> castRawTx(List<byte[]> broadCastTx) {

@Override
public void shutDown() {
log.info("BroadcastTxHandler shutdown!");
log.info("BroadcastTxHandler shutting down!");
if (ex != null) {
ex.shutdown();
}
Expand Down
101 changes: 52 additions & 49 deletions modApiServer/src/org/aion/api/server/zmq/ProtocolProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ProtocolProcessor implements Runnable {

private static final long zmqHWM = 100_000;
private static final int SOCKETID_LEN = 5;
private static final int SOCKET_RECV_TIMEOUT = 3000;

public ProtocolProcessor(IHdlr _handler, final CfgApiZmq cfg) {
this.handler = _handler;
Expand All @@ -73,6 +74,8 @@ public ProtocolProcessor(IHdlr _handler, final CfgApiZmq cfg) {

public void shutdown() throws InterruptedException {
handler.shutDown();
shutDown.set(true);
Thread.sleep(SOCKET_RECV_TIMEOUT);
Proxy.shutdown();
}

Expand Down Expand Up @@ -117,9 +120,8 @@ public void run() {
}

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down Sockets...");
LOG.info("Shutting down Zmq sockets...");
}
shutDown.set(true);
// Shutdown HdlrZmq
((HdlrZmq) handler).shutdown();
// Shutdown ZmqSocket
Expand All @@ -133,7 +135,7 @@ public void run() {

ctx.close();
if (LOG.isInfoEnabled()) {
LOG.info("Shutdown Sockets... Done!");
LOG.info("Shutdown Zmq sockets... Done!");
}

} catch (Exception e) {
Expand Down Expand Up @@ -172,7 +174,7 @@ private void eventRun(Context ctx) {
try {
byte[] socketId = ByteBuffer.allocate(5).put(ByteUtil.longToBytes(i), 3, 5).array();
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
}
Expand All @@ -187,8 +189,8 @@ private void eventRun(Context ctx) {
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close eventRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close eventRun sockets...");
}
}

Expand Down Expand Up @@ -228,22 +230,23 @@ private void callbackRun(Context ctx) {
}
try {
sock.send(tps.getSocketId(), ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
}
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close callbackRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close callbackRun sockets...");
}
}

private void workerRun(ZMQ.Context ctx) {
Socket sock = ctx.socket(ZMQ.DEALER);
sock.connect(AION_ZMQ_WK_TH);
sock.setReceiveTimeOut(SOCKET_RECV_TIMEOUT);

while (!shutDown.get()) {
try {
Expand All @@ -253,27 +256,27 @@ private void workerRun(ZMQ.Context ctx) {
}
if (socketId != null && socketId.length == SOCKETID_LEN) {
byte[] req = sock.recv(0);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun reqMsg: [{}]", Hex.toHexString(req));
}
byte[] rsp = ((HdlrZmq) this.handler).process(req, socketId);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun rspMsg: [{}]", Hex.toHexString(rsp));
}
if (req != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun reqMsg: [{}]",
Hex.toHexString(req));
}
byte[] rsp = ((HdlrZmq) this.handler).process(req, socketId);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun rspMsg: [{}]",
Hex.toHexString(rsp));
}

try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.workerRun sock.send exception: " + e.getMessage());
try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.workerRun sock.send exception: " + e
.getMessage());
}
}
}
} else {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.workerRun incorrect socketID [{}]",
socketId == null ? "null" : Hex.toHexString(socketId));
}
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
Expand All @@ -282,14 +285,15 @@ private void workerRun(ZMQ.Context ctx) {
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close workerRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close workerRun sockets...");
}
}

private void hbRun(ZMQ.Context ctx) {
Socket sock = ctx.socket(ZMQ.DEALER);
sock.connect(AION_ZMQ_HB_TH);
sock.setReceiveTimeOut(SOCKET_RECV_TIMEOUT);

while (!shutDown.get()) {
try {
Expand All @@ -299,27 +303,26 @@ private void hbRun(ZMQ.Context ctx) {
}
if (socketId != null && socketId.length == SOCKETID_LEN) {
byte[] req = sock.recv(0);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun reqMsg: [{}]", Hex.toHexString(req));
}
byte[] rsp = ApiUtil.toReturnHeader(JAVAAPI_VAR, Message.Retcode.r_heartbeatReturn_VALUE);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun rspMsg: [{}]", Hex.toHexString(rsp));
}
if (req != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun reqMsg: [{}]", Hex.toHexString(req));
}
byte[] rsp = ApiUtil
.toReturnHeader(JAVAAPI_VAR, Message.Retcode.r_heartbeatReturn_VALUE);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.hbRun rspMsg: [{}]", Hex.toHexString(rsp));
}

try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.PAIR);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.hbRun sock.send exception: " + e.getMessage());
try {
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.hbRun sock.send exception: " + e
.getMessage());
}
}
}
} else {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.hbRun incorrect socketID [{}]",
socketId == null ? "null" : Hex.toHexString(socketId));
}
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
Expand All @@ -328,8 +331,8 @@ private void hbRun(ZMQ.Context ctx) {
}
}
sock.close();
if (LOG.isInfoEnabled()) {
LOG.info("close hbRun Sockets...");
if (LOG.isDebugEnabled()) {
LOG.debug("close hbRun sockets...");
}
}
}
6 changes: 3 additions & 3 deletions modApiServer/src/org/aion/api/server/zmq/Proxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static void proxy(Socket frontend, Socket backend, Socket callback, Socket event
}
}

LOG.info("zmq-proxy thread was interrupted.");
LOG.debug("zmq-proxy thread was interrupted.");
} catch (Exception e) {
LOG.error("aion.api.server.zmq.Proxy exception" + e.getMessage());
}
Expand Down Expand Up @@ -173,10 +173,10 @@ private static boolean msgProcessSend(Socket receiver, Socket sender) {
}

public static void shutdown() throws InterruptedException {
LOG.info("zmq-proxy thread shuting down...");
LOG.debug("zmq-proxy thread shutting down...");
shutDown.set(true);

LOG.info("waiting zmq-proxy thread shutdown");
LOG.debug("waiting zmq-proxy thread shutdown");
Thread.sleep(3000);
}
}