Skip to content

Commit

Permalink
fix: gracefully handle closed sockets and EOF (#2374)
Browse files Browse the repository at this point in the history
Connections that are terminated by just closing the socket could cause
log spamming, as these turn into `Socket closed` exceptions and/or
`EOFException`s in PGAdapter. This change handles those cases
gracefully by just terminating the connection in the regular way, as
if the connection received an `X` (Terminate) message.

Fixes #2106
  • Loading branch information
olavloite authored Sep 25, 2024
1 parent 9787ca4 commit 301ade7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.security.SecureRandom;
import java.text.MessageFormat;
import java.time.Duration;
Expand Down Expand Up @@ -504,22 +505,46 @@ public void handleMessages() throws Exception {
try {
message.nextHandler();
message.send();
} catch (IllegalArgumentException | IllegalStateException | EOFException fatalException) {
this.handleError(
PGException.newBuilder(fatalException)
.setSeverity(Severity.FATAL)
.setSQLState(SQLState.InternalError)
.build());
} catch (IllegalArgumentException | IllegalStateException fatalException) {
handleFatalException(fatalException);
// Only terminate the connection if we are not in COPY_IN mode. In COPY_IN mode the mode will
// switch to normal mode in these cases.
if (this.status != ConnectionStatus.COPY_IN) {
terminate();
}
} catch (EOFException eofException) {
// Handle an EOFException as a normal connection termination. The TCP connection can break
// without the server receiving a Terminate (X) message if for example the application does
// not try-catch all exceptions.
// We only terminate the connection if we are not in COPY_IN mode. In COPY_IN mode, this
// exception will cause the connection to leave COPY_IN mode, and return to normal operation.
if (this.status != ConnectionStatus.COPY_IN) {
terminate();
} else {
handleFatalException(eofException);
}
} catch (SocketException socketException) {
// Handle a SocketException when the socket has been closed as a normal connection
// termination. The TCP connection can break without the server receiving a Terminate (X)
// message if for example the application does not try-catch all exceptions.
if (socket.isClosed()) {
terminate();
} else {
this.handleError(PGExceptionFactory.toPGException(socketException));
}
} catch (Exception exception) {
this.handleError(PGExceptionFactory.toPGException(exception));
}
}

private void handleFatalException(Exception fatalException) throws Exception {
this.handleError(
PGException.newBuilder(fatalException)
.setSeverity(Severity.FATAL)
.setSQLState(SQLState.InternalError)
.build());
}

/** Called when a Terminate message is received. This closes this {@link ConnectionHandler}. */
public void handleTerminate() {
synchronized (this) {
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/com/google/cloud/spanner/pgadapter/ITQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.cloud.spanner.Database;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;
import java.io.ByteArrayInputStream;
Expand All @@ -28,6 +30,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -51,6 +54,13 @@ public static void setup() {

@AfterClass
public static void teardown() {
// Wait a bit until all connections have been terminated (or fail if they fail to terminate).
Stopwatch stopwatch = Stopwatch.createStarted();
while (testEnv.getServer().getNumberOfConnections() > 0
&& stopwatch.elapsed(TimeUnit.MILLISECONDS) < 1000) {
Thread.yield();
}
assertTrue(testEnv.getServer().getConnectionHandlers().isEmpty());
testEnv.stopPGAdapterServer();
testEnv.cleanUp();
}
Expand Down Expand Up @@ -109,6 +119,8 @@ public void simplePgQuery() throws Exception {
// Value of the column: '42'
assertEquals('4', dataRowIn.readByte());
assertEquals('2', dataRowIn.readByte());

clientSocket.close();
}

@Test
Expand Down Expand Up @@ -183,5 +195,7 @@ public void basicSelectTest() throws Exception {
assertArrayEquals(dataRow2, dataRows[2].getPayload());
assertArrayEquals(commandCompleteData, commandComplete.getPayload());
assertArrayEquals(readyForQueryData, readyForQuery.getPayload());

clientSocket.close();
}
}

0 comments on commit 301ade7

Please sign in to comment.