Skip to content

Commit

Permalink
[FIXED] EventLoop: Socket now closed only after event loop done polling
Browse files Browse the repository at this point in the history
The socket was closed by the NATS library itself, which could cause
some issue when, specifically libuv, could still be polling it.
We now defer to the event loop adapter to make sure that the event
loop library is done polling before invoking a new function that
will take care of closing the socket.

Resolves #814

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Oct 22, 2024
1 parent 7b5d3d7 commit cb8a5db
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 29 deletions.
6 changes: 6 additions & 0 deletions src/adapters/libevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ typedef struct
struct event *read;
struct event *write;
struct event *keepActive;
natsSock socket;

} natsLibeventEvents;

Expand Down Expand Up @@ -142,6 +143,7 @@ natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock so

if (s == NATS_OK)
{
nle->socket = socket;
nle->read = event_new(nle->loop, socket, EV_READ|EV_PERSIST,
natsLibevent_ProcessEvent, (void*) nle);
natsLibevent_Read((void*) nle, true);
Expand Down Expand Up @@ -175,7 +177,11 @@ natsLibevent_Read(void *userData, bool add)
if (add)
res = event_add(nle->read, NULL);
else
{
res = event_del_noblock(nle->read);
if (res == 0)
natsConnection_ProcessCloseEvent(&(nle->socket));
}

return (res == 0 ? NATS_OK : NATS_ERR);
}
Expand Down
4 changes: 4 additions & 0 deletions src/adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
if (nle->events)
res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
else
{
res = uv_poll_stop(nle->handle);
if (res == 0)
natsConnection_ProcessCloseEvent(&nle->socket);
}

if (res != 0)
return NATS_ERR;
Expand Down
65 changes: 47 additions & 18 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2146,9 +2146,21 @@ _evStopPolling(natsConnection *nc)

nc->sockCtx.useEventLoop = false;
nc->el.writeAdded = false;
s = nc->opts->evCbs.read(nc->el.data, NATS_EVENT_ACTION_REMOVE);
// The "write" event is added and removed as we write, however, we always
// have the "read" event added to the event loop. Removing it signals that
// the connection is closed and so the event loop adapter can then invoke
// natsConnection_ProcessCloseEvent() when the event loop is done polling
// the event. So we will remove "write" first, then finish with "read".
s = nc->opts->evCbs.write(nc->el.data, NATS_EVENT_ACTION_REMOVE);
if (s == NATS_OK)
s = nc->opts->evCbs.write(nc->el.data, NATS_EVENT_ACTION_REMOVE);
s = nc->opts->evCbs.read(nc->el.data, NATS_EVENT_ACTION_REMOVE);
if (s == NATS_OK)
{
// We can't close the socket here, but we will mark as invalid and
// clear SSL object if applicable.
nc->sockCtx.fd = NATS_SOCK_INVALID;
_clearSSL(nc);
}

return s;
}
Expand Down Expand Up @@ -2187,21 +2199,15 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
SET_WRITE_DEADLINE(nc);
natsConn_bufferFlush(nc);

natsSock_Shutdown(nc->sockCtx.fd);
if (!nc->el.attached)
natsSock_Shutdown(nc->sockCtx.fd);
nc->sockCtx.fdActive = false;
}

// If we use an external event loop, we need to stop polling
// on the socket since we are going to reconnect.
if (nc->el.attached)
{
ls = _evStopPolling(nc);
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
}

// Fail pending flush requests.
if (ls == NATS_OK)
Expand Down Expand Up @@ -2579,13 +2585,17 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
{
// If event loop attached, stop polling...
if (nc->el.attached)
{
_evStopPolling(nc);
}
else
{
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
}
}
else
{
Expand Down Expand Up @@ -3398,6 +3408,8 @@ natsConnection_ConnectTo(natsConnection **newConn, const char *url)
natsStatus
natsConnection_Reconnect(natsConnection *nc)
{
bool el = false;

if (nc == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

Expand All @@ -3407,10 +3419,17 @@ natsConnection_Reconnect(natsConnection *nc)
natsConn_Unlock(nc);
return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

natsSock_Shutdown(nc->sockCtx.fd);

el = nc->el.attached;
if (!el)
natsSock_Shutdown(nc->sockCtx.fd);
natsConn_Unlock(nc);
// For event loop, we need to invoke this so that the event loop
// adapter is the one closing the socket. We could of course invoke
// that in all cases, but decided to keep the socket shutdown when
// not using event loops.
if (el)
_processOpError(nc, NATS_CONNECTION_CLOSED, false);

return NATS_OK;
}

Expand Down Expand Up @@ -4163,6 +4182,16 @@ natsConnection_ProcessWriteEvent(natsConnection *nc)
(void) NATS_UPDATE_ERR_STACK(s);
}

void
natsConnection_ProcessCloseEvent(natsSock *socket)
{
if ((socket == NULL) || (*socket == NATS_SOCK_INVALID))
return;

natsSock_Close(*socket);
*socket = NATS_SOCK_INVALID;
}

natsStatus
natsConnection_GetClientID(natsConnection *nc, uint64_t *cid)
{
Expand Down
14 changes: 14 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4194,6 +4194,20 @@ natsConnection_Reconnect(natsConnection *nc);
NATS_EXTERN void
natsConnection_ProcessReadEvent(natsConnection *nc);

/** \brief Process a socket close event when using external event loop.
*
* When using an external event loop, and the library wants to close
* the connection, the event loop adapter will ensure that the event
* loop library stops polling, and then will invoke this function
* so that the socket can be safely closed.
*
* @param socket the pointer to the #natsSock object.
*
* \warning This API is reserved for external event loop adapters.
*/
NATS_EXTERN void
natsConnection_ProcessCloseEvent(natsSock *socket);

/** \brief Process a write event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
58 changes: 47 additions & 11 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20230,6 +20230,8 @@ _evLoopRead(void *userData, bool add)

natsMutex_Lock(arg->m);
arg->doRead = add;
if (!add)
natsConnection_ProcessCloseEvent(&(arg->sock));
natsCondition_Broadcast(arg->c);
natsMutex_Unlock(arg->m);

Expand Down Expand Up @@ -20266,7 +20268,6 @@ static void
_eventLoop(void *closure)
{
struct threadArg *arg = (struct threadArg *) closure;
natsSock sock = NATS_SOCK_INVALID;
natsConnection *nc = NULL;
bool read = false;
bool write= false;
Expand All @@ -20276,7 +20277,7 @@ _eventLoop(void *closure)
{
nats_Sleep(100);
natsMutex_Lock(arg->m);
while (!arg->evStop && ((sock = arg->sock) == NATS_SOCK_INVALID))
while (!arg->evStop && (arg->sock == NATS_SOCK_INVALID))
natsCondition_Wait(arg->c, arg->m);
stop = arg->evStop;
nc = arg->nc;
Expand Down Expand Up @@ -20355,6 +20356,28 @@ void test_EventLoop(void)
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

test("Publish: ");
s = natsConnection_PublishString(nc, "foo", "bar");
testCond(s == NATS_OK);

test("Check msg received: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond(s == NATS_OK);
natsMsg_Destroy(msg);

test("Explicit reconnect: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

test("Wait for reconnect: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

Expand Down Expand Up @@ -20382,7 +20405,7 @@ void test_EventLoop(void)

test("Check ev loop: ");
natsMutex_Lock(arg.m);
if (arg.attached != 2 || !arg.detached)
if (arg.attached != 3 || !arg.detached || (arg.sock != NATS_SOCK_INVALID))
s = NATS_ERR;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);
Expand Down Expand Up @@ -20544,23 +20567,36 @@ void test_EventLoopTLS(void)
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

test("Shutdown evLoop: ");
test("Explicit reconnect: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

test("Wait for reconnect: ");
natsMutex_Lock(arg.m);
arg.evStop = true;
natsCondition_Broadcast(arg.c);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
natsThread_Join(arg.t);
natsThread_Destroy(arg.t);
testCond(s == NATS_OK);

test("Close and wait for close cb: ");
natsConnection_Close(nc);
s = _waitForConnClosed(&arg);
testCond(s == NATS_OK);

test("Shutdown evLoop: ");
natsMutex_Lock(arg.m);
arg.evStop = true;
natsCondition_Broadcast(arg.c);
natsMutex_Unlock(arg.m);
natsThread_Join(arg.t);
natsThread_Destroy(arg.t);
testCond(s == NATS_OK);

natsConnection_Destroy(nc);
natsOptions_Destroy(opts);

Expand Down Expand Up @@ -21401,7 +21437,7 @@ void test_SSLServerNameIndication(void)
arg.control = 3;

s = _startMockupServer(&sock, "localhost", "4222");

// Start the thread that will try to connect to our server...
IFOK(s, natsThread_Create(&t, _connectToMockupServer, (void*) &arg));

Expand All @@ -21411,7 +21447,7 @@ void test_SSLServerNameIndication(void)
{
s = NATS_SYS_ERROR;
}

testCond((s == NATS_OK) && (ctx.fd > 0));

test("Read ClientHello from client: ");
Expand Down Expand Up @@ -34059,7 +34095,7 @@ void test_MicroQueueGroupForEndpoint(void)
#define _testQueueGroup(_expected, _actual) \
(_expected) == NULL ? (_actual) == NULL : strcmp((_expected), (_actual)) == 0

testCond((err == NULL) &&
testCond((err == NULL) &&
(info != NULL) && (info->EndpointsLen == 3) &&
(stats != NULL) && (stats->EndpointsLen == 3) &&
(_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[0].QueueGroup)) &&
Expand Down

0 comments on commit cb8a5db

Please sign in to comment.