Skip to content

Commit

Permalink
Fix teardown issue of TCP interconnect
Browse files Browse the repository at this point in the history
Previously, for an interconnect connection, if no data are available at
sender peer, the sender sends a customized EOS packet to the receiver
and disables further send operations using shutdown(SHUT_WR), then
somehow, the sender closes the connection totally with close()
immediately and it counts on the kernel and TCP stack to guarantee the
data been transformed to the receiver. The problem is, on some platform,
if the connection is closed on one side, the TCP behave is undetermined,
the packets may be lost and receiver may report an unexpected error.

The correct way is sender blocks on the connection until receiver
getting the EOS packet and close its peer, then the sender can close
the connection safely.
  • Loading branch information
pengzhout committed Jun 7, 2018
1 parent 2c011ce commit 1c1f164
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions src/backend/cdb/motion/ic_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,21 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates,
/* cleanup a Sending motion node. */
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);

if (!forceEOS)
/*
* On a normal teardown routine, sender has sent an EOS packet and
* disabled further send operations on phase 1. sender can't close the
* connection immediately because EOS packet or data packets within the
* kernel sending buffer may be lost on some platform if sender close the
* connection totally.
*
* The correct way is sender blocks on the connection until recievers
* get the EOS packets and close the peer, then it's safe for sender to
* the connection totally.
*
* If some errors are happening, senders can skip this step to avoid hung
* issues, QD will take care of the error handling.
*/
if (!hasError)
waitOnOutbound(pEntry);

for (i = 0; i < pEntry->numConns; i++)
Expand Down Expand Up @@ -2459,7 +2473,7 @@ waitOnOutbound(ChunkTransportStateEntry *pEntry)
if (conn_count == 0)
return;

if (InterruptPending)
if (InterruptPending || QueryFinishPending)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "waitOnOutbound(): interrupt pending fast-track");
Expand All @@ -2481,7 +2495,7 @@ waitOnOutbound(ChunkTransportStateEntry *pEntry)
{
saved_err = errno;

if (InterruptPending)
if (InterruptPending || QueryFinishPending)
return;

/*
Expand All @@ -2502,19 +2516,20 @@ waitOnOutbound(ChunkTransportStateEntry *pEntry)

/* ready to read. */
count = recv(conn->sockfd, &buf, sizeof(buf), 0);
if (count == 0) /* done ! */

if (count == 0 || count == 1) /* done ! */
{
/* got a stop message */
AssertImply(count == 1, buf == 'S');

MPP_FD_CLR(conn->sockfd, &waitset);
/* we may have finished */
conn_count--;
continue;
}
if (count > 0 || (count < 0 && errno == EAGAIN))
continue;
/* Some other kind of error happened */
if (errno == EINTR)
else if (count < 0 && (errno == EAGAIN || errno == EINTR))
continue;

/*
* Something unexpected, but probably not horrible warn and
* return
Expand Down

0 comments on commit 1c1f164

Please sign in to comment.