diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 6ff473b85..959e48164 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -33,19 +33,6 @@ jobs:
go-version: '^1.17.7'
- run: go version
- run: .ci/run-with-credentials.sh units
- integration:
- runs-on: ubuntu-latest
- steps:
- - uses: actions/checkout@v2
- - uses: actions/setup-java@v1
- with:
- java-version: 8
- - run: java -version
- - uses: actions/setup-go@v2
- with:
- go-version: '^1.17.7'
- - run: go version
- - run: .ci/run-with-credentials.sh integration
lint:
runs-on: ubuntu-latest
steps:
@@ -160,7 +147,7 @@ jobs:
runs-on: ubuntu-latest
# Only releases on the postgresql-dialect branch
if: github.head_ref == 'postgresql-dialect'
- needs: [ units, lint, clirr, integration, e2e-psql-v11, e2e-psql-v12, e2e-psql-v13 ]
+ needs: [ units, lint, clirr, e2e-psql-v11, e2e-psql-v12, e2e-psql-v13 ]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
@@ -180,7 +167,7 @@ jobs:
runs-on: ubuntu-latest
# Only releases on the postgresql-dialect branch
if: github.head_ref == 'postgresql-dialect'
- needs: [ units, lint, clirr, integration, e2e-psql-v11, e2e-psql-v12, e2e-psql-v13 ]
+ needs: [ units, lint, clirr, e2e-psql-v11, e2e-psql-v12, e2e-psql-v13 ]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml
new file mode 100644
index 000000000..47944875e
--- /dev/null
+++ b/.github/workflows/integration.yaml
@@ -0,0 +1,57 @@
+on:
+ pull_request:
+name: integration
+env:
+ GOOGLE_CLOUD_PROJECT: "span-cloud-testing"
+ GOOGLE_CLOUD_INSTANCE: "pgadapter-testing"
+ GOOGLE_CLOUD_DATABASE: "testdb_integration"
+ GOOGLE_CLOUD_ENDPOINT: "spanner.googleapis.com"
+jobs:
+ check-env:
+ outputs:
+ has-key: ${{ steps.project-id.outputs.defined }}
+ runs-on: ubuntu-latest
+ steps:
+ - id: project-id
+ env:
+ GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
+ if: "${{ env.GCP_PROJECT_ID != '' }}"
+ run: echo "::set-output name=defined::true"
+
+ test:
+ needs: [check-env]
+ if: needs.check-env.outputs.has-key == 'true'
+ timeout-minutes: 30
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup Java
+ uses: actions/setup-java@v3
+ with:
+ distribution: zulu
+ java-version: 8
+ - run: java -version
+ - name: Setup Go
+ uses: actions/setup-go@v2
+ with:
+ go-version: '^1.17.7'
+ - run: go version
+ - name: Run unit tests
+ run: mvn test -B -Ptest-all
+ - name: Setup GCloud
+ uses: google-github-actions/setup-gcloud@v0
+ with:
+ project_id: ${{ secrets.GCP_PROJECT_ID }}
+ service_account_key: ${{ secrets.JSON_SERVICE_ACCOUNT_CREDENTIALS }}
+ export_default_credentials: true
+ - name: Run integration tests
+ run: mvn verify -B -Dclirr.skip=true -DskipITs=false -DPG_ADAPTER_HOST="https://$GOOGLE_CLOUD_ENDPOINT" -DPG_ADAPTER_INSTANCE="$GOOGLE_CLOUD_INSTANCE" -DPG_ADAPTER_DATABASE="$GOOGLE_CLOUD_DATABASE"
+ - name: Upload coverage to Codecov
+ uses: codecov/codecov-action@v2
+ with:
+ directory: ./target/site/jacoco-merged-test-coverage-report
+ fail_ci_if_error: true
+ flags: all_tests
+ name: codecov-umbrella
+ path_to_write_report: ./coverage/codecov_report.txt
+ verbose: true
diff --git a/.github/workflows/units.yaml b/.github/workflows/units.yaml
index 32198e807..091df7dde 100644
--- a/.github/workflows/units.yaml
+++ b/.github/workflows/units.yaml
@@ -20,15 +20,6 @@ jobs:
go-version: '^1.17.7'
- run: go version
- run: mvn -B test -Ptest-all
- - name: Upload coverage to Codecov
- uses: codecov/codecov-action@v2
- with:
- directory: ./target/site/jacoco
- fail_ci_if_error: true
- flags: unittests
- name: codecov-umbrella
- path_to_write_report: ./coverage/codecov_report.txt
- verbose: true
windows:
runs-on: windows-latest
strategy:
diff --git a/pom.xml b/pom.xml
index 41c44ceeb..9bed578ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
com.google.cloud.spanner.pgadapter.IntegrationTest,com.google.cloud.spanner.pgadapter.golang.GolangTest
- 6.24.0
+ 6.25.0
4.0.0
@@ -71,6 +71,17 @@
postgresql
42.3.4
+
+ com.kohlschutter.junixsocket
+ junixsocket-core
+ 2.4.0
+ pom
+
+
+ com.kohlschutter.junixsocket
+ junixsocket-common
+ 2.4.0
+
com.googlecode.json-simple
json-simple
@@ -103,7 +114,7 @@
com.google.api
gax-grpc
- 2.16.0
+ 2.18.1
testlib
test
@@ -286,6 +297,7 @@
maven-surefire-plugin
3.0.0-M5
+ ${surefire.jacoco.args}
${excludedTests}
sponge_log
@@ -305,13 +317,73 @@
prepare-agent
+
+ ${project.build.directory}/jacoco-output/jacoco-unit-tests.exec
+ surefire.jacoco.args
+
- report
+ after-unit-test-execution
test
report
+
+ ${project.build.directory}/jacoco-output/jacoco-unit-tests.exec
+ ${project.reporting.outputDirectory}/jacoco-unit-test-coverage-report
+
+
+
+
+ before-integration-test-execution
+ pre-integration-test
+
+ prepare-agent
+
+
+ ${project.build.directory}/jacoco-output/jacoco-integration-tests.exec
+ failsafe.jacoco.args
+
+
+
+ after-integration-test-execution
+ post-integration-test
+
+ report
+
+
+ ${project.build.directory}/jacoco-output/jacoco-integration-tests.exec
+ ${project.reporting.outputDirectory}/jacoco-integration-test-coverage-report
+
+
+
+ merge-unit-and-integration
+ post-integration-test
+
+ merge
+
+
+
+
+ ${project.build.directory}/jacoco-output/
+
+ *.exec
+
+
+
+ ${project.build.directory}/jacoco-output/merged.exec
+
+
+
+ create-merged-report
+ post-integration-test
+
+ report
+
+
+ ${project.build.directory}/jacoco-output/merged.exec
+ ${project.reporting.outputDirectory}/jacoco-merged-test-coverage-report
+
@@ -333,6 +405,7 @@
+ ${failsafe.jacoco.args}
${skipITs}
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/ConnectionHandler.java b/src/main/java/com/google/cloud/spanner/pgadapter/ConnectionHandler.java
index 291495d4b..ea76e44b3 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/ConnectionHandler.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/ConnectionHandler.java
@@ -239,12 +239,21 @@ public void handleTerminate() {
* Terminates this connection at the request of the server. This is called if the server is
* shutting down while the connection is still active.
*/
- void terminate() throws IOException {
+ void terminate() {
if (this.status != ConnectionStatus.TERMINATED) {
handleTerminate();
- }
- if (!socket.isClosed()) {
- socket.close();
+ try {
+ if (!socket.isClosed()) {
+ socket.close();
+ }
+ } catch (IOException exception) {
+ logger.log(
+ Level.WARNING,
+ exception,
+ () ->
+ String.format(
+ "Failed to close connection handler with ID %s: %s", getName(), exception));
+ }
}
}
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/ProxyServer.java b/src/main/java/com/google/cloud/spanner/pgadapter/ProxyServer.java
index 94eb170f5..496f99394 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/ProxyServer.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/ProxyServer.java
@@ -15,7 +15,9 @@
package com.google.cloud.spanner.pgadapter;
import com.google.api.core.AbstractApiService;
+import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata.TextFormat;
@@ -23,8 +25,10 @@
import com.google.cloud.spanner.pgadapter.wireoutput.ErrorResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.ErrorResponse.Severity;
import com.google.cloud.spanner.pgadapter.wireprotocol.WireMessage;
+import com.google.common.collect.ImmutableList;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
@@ -35,9 +39,13 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.newsclub.net.unix.AFUNIXServerSocket;
+import org.newsclub.net.unix.AFUNIXSocketAddress;
/**
* The proxy server listens for incoming client connections and starts a new {@link
@@ -50,7 +58,18 @@ public class ProxyServer extends AbstractApiService {
private final Properties properties;
private final List handlers = Collections.synchronizedList(new LinkedList<>());
- private ServerSocket serverSocket;
+ /**
+ * Latch that is closed when the TCP server has started. We need this to know the exact port that
+ * the TCP socket was assigned, so we can assign the same port number to the Unix domain socket.
+ */
+ private final CountDownLatch tcpStartedLatch = new CountDownLatch(1);
+ /**
+ * List of server sockets accepting connections. It currently only contains one TCP socket and
+ * optionally one Unix domain socket, but could in theory be expanded to contain multiple sockets
+ * of each type.
+ */
+ private final List serverSockets = Collections.synchronizedList(new LinkedList<>());
+
private int localPort;
/** The server will keep track of all messages it receives if it started in DEBUG mode. */
@@ -102,42 +121,73 @@ public void startServer() {
logger.log(Level.INFO, () -> String.format("Server started on port %d", getLocalPort()));
}
+ interface ServerRunnable {
+ void run(CountDownLatch startupLatch, CountDownLatch stoppedLatch)
+ throws IOException, InterruptedException;
+ }
+
@Override
protected void doStart() {
- Thread listenerThread =
- new Thread("spanner-postgres-adapter-proxy-listener") {
- @Override
- public void run() {
- try {
- runServer();
- } catch (IOException e) {
- logger.log(
- Level.WARNING,
- e,
- () ->
- String.format(
- "Server on port %s stopped by exception: %s", getLocalPort(), e));
+ ImmutableList serverRunnables;
+ if (options.isDomainSocketEnabled()) {
+ serverRunnables = ImmutableList.of(this::runTcpServer, this::runDomainSocketServer);
+ } else {
+ serverRunnables = ImmutableList.of(this::runTcpServer);
+ }
+ CountDownLatch startupLatch = new CountDownLatch(serverRunnables.size());
+ CountDownLatch stoppedLatch = new CountDownLatch(serverRunnables.size());
+ for (ServerRunnable server : serverRunnables) {
+ Thread listenerThread =
+ new Thread("spanner-postgres-adapter-proxy-listener") {
+ @Override
+ public void run() {
+ try {
+ server.run(startupLatch, stoppedLatch);
+ } catch (Exception exception) {
+ logger.log(
+ Level.WARNING,
+ exception,
+ () ->
+ String.format(
+ "Server on port %s stopped by exception: %s",
+ getLocalPort(), exception));
+ }
}
- }
- };
- listenerThread.start();
+ };
+ listenerThread.start();
+ }
+ try {
+ if (startupLatch.await(30L, TimeUnit.SECONDS)) {
+ notifyStarted();
+ } else {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.DEADLINE_EXCEEDED, "The server did not start in a timely fashion.");
+ }
+ } catch (InterruptedException interruptedException) {
+ throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
+ }
}
@Override
protected void doStop() {
- try {
- logger.log(Level.INFO, () -> String.format("Server on port %d is stopping", getLocalPort()));
- this.serverSocket.close();
- logger.log(
- Level.INFO, () -> String.format("Server socket on port %d closed", getLocalPort()));
- } catch (IOException exception) {
- logger.log(
- Level.WARNING,
- exception,
- () ->
- String.format(
- "Closing server socket on port %d failed: %s", getLocalPort(), exception));
+ for (ServerSocket serverSocket : this.serverSockets) {
+ try {
+ logger.log(
+ Level.INFO, () -> String.format("Server on socket %s is stopping", serverSocket));
+ serverSocket.close();
+ logger.log(
+ Level.INFO, () -> String.format("Server socket on socket %s closed", serverSocket));
+ } catch (IOException exception) {
+ logger.log(
+ Level.WARNING,
+ exception,
+ () -> String.format("Closing server socket %s failed: %s", serverSocket, exception));
+ }
+ }
+ for (ConnectionHandler handler : this.handlers) {
+ handler.terminate();
}
+ notifyStopped();
}
/** Safely stops the server (iff started), closing specific socket and cleaning up. */
@@ -147,14 +197,37 @@ public void stopServer() {
}
/**
- * Thread logic: opens the listening socket and instantiates the connection handler.
+ * Thread logic: opens the TCP listening socket and instantiates the connection handler.
*
* @throws IOException if ServerSocket cannot start.
*/
- void runServer() throws IOException {
- this.serverSocket = new ServerSocket(this.options.getProxyPort());
- this.localPort = serverSocket.getLocalPort();
- notifyStarted();
+ void runTcpServer(CountDownLatch startupLatch, CountDownLatch stoppedLatch) throws IOException {
+ ServerSocket tcpSocket = new ServerSocket(this.options.getProxyPort());
+ this.serverSockets.add(tcpSocket);
+ this.localPort = tcpSocket.getLocalPort();
+ tcpStartedLatch.countDown();
+ runServer(tcpSocket, startupLatch, stoppedLatch);
+ }
+
+ void runDomainSocketServer(CountDownLatch startupLatch, CountDownLatch stoppedLatch)
+ throws IOException, InterruptedException {
+ // Wait until the TCP server has started, so we can get the port number it is using.
+ if (!tcpStartedLatch.await(30L, TimeUnit.SECONDS)) {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.DEADLINE_EXCEEDED, "Timeout while waiting for TCP server to start");
+ }
+ File tempDir = new File(this.options.getSocketFile(getLocalPort()));
+ AFUNIXServerSocket domainSocket = AFUNIXServerSocket.newInstance();
+ domainSocket.bind(AFUNIXSocketAddress.of(tempDir));
+ this.serverSockets.add(domainSocket);
+ runServer(domainSocket, startupLatch, stoppedLatch);
+ }
+
+ void runServer(
+ ServerSocket serverSocket, CountDownLatch startupLatch, CountDownLatch stoppedLatch)
+ throws IOException {
+ startupLatch.countDown();
+ awaitRunning();
try {
while (isRunning()) {
Socket socket = serverSocket.accept();
@@ -170,23 +243,11 @@ void runServer() throws IOException {
Level.INFO,
() ->
String.format(
- "Socket exception on port %d: %s. This is normal when the server is stopped.",
- getLocalPort(), e));
+ "Socket exception on socket %s: %s. This is normal when the server is stopped.",
+ serverSocket, e));
} finally {
- for (ConnectionHandler handler : this.handlers) {
- try {
- handler.terminate();
- } catch (Exception exception) {
- logger.log(
- Level.WARNING,
- exception,
- () ->
- String.format(
- "Connection handler %s could not be terminated: %s", handler, exception));
- }
- }
- logger.log(Level.INFO, () -> String.format("Socket on port %d stopped", getLocalPort()));
- notifyStopped();
+ logger.log(Level.INFO, () -> String.format("Socket %s stopped", serverSocket));
+ stoppedLatch.countDown();
}
}
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/metadata/OptionsMetadata.java b/src/main/java/com/google/cloud/spanner/pgadapter/metadata/OptionsMetadata.java
index 962f97a49..211fc45f4 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/metadata/OptionsMetadata.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/metadata/OptionsMetadata.java
@@ -61,6 +61,7 @@ public enum DdlTransactionMode {
private static final String DEFAULT_USER_AGENT = "pg-adapter";
private static final String OPTION_SERVER_PORT = "s";
+ private static final String OPTION_SOCKET_FILE = "f";
private static final String OPTION_PROJECT_ID = "p";
private static final String OPTION_INSTANCE_ID = "i";
private static final String OPTION_DATABASE_NAME = "d";
@@ -77,16 +78,19 @@ public enum DdlTransactionMode {
private static final String OPTION_HELP = "h";
private static final String DEFAULT_PORT = "5432";
private static final int MIN_PORT = 0, MAX_PORT = 65535;
+ private static final String DEFAULT_SOCKET_FILE = "/tmp/.s.PGSQL.%d";
/*Note: this is a private preview feature, not meant for GA version. */
private static final String OPTION_SPANNER_ENDPOINT = "e";
private static final String OPTION_JDBC_PROPERTIES = "r";
private static final String OPTION_SERVER_VERSION = "v";
private static final String OPTION_DEBUG_MODE = "debug";
+ private final String osName;
private final CommandLine commandLine;
private final CommandMetadataParser commandMetadataParser;
private final String defaultConnectionUrl;
private final int proxyPort;
+ private final String socketFile;
private final TextFormat textFormat;
private final boolean binaryFormat;
private final boolean authenticate;
@@ -100,6 +104,11 @@ public enum DdlTransactionMode {
private final boolean debugMode;
public OptionsMetadata(String[] args) {
+ this(System.getProperty("os.name", ""), args);
+ }
+
+ OptionsMetadata(String osName, String[] args) {
+ this.osName = osName;
this.commandLine = buildOptions(args);
this.commandMetadataParser = new CommandMetadataParser();
if (this.commandLine.hasOption(OPTION_DATABASE_NAME)) {
@@ -109,6 +118,7 @@ public OptionsMetadata(String[] args) {
this.defaultConnectionUrl = null;
}
this.proxyPort = buildProxyPort(commandLine);
+ this.socketFile = buildSocketFile(commandLine);
this.textFormat = TextFormat.POSTGRESQL;
this.binaryFormat = commandLine.hasOption(OPTION_BINARY_FORMAT);
this.authenticate = commandLine.hasOption(OPTION_AUTHENTICATE);
@@ -134,10 +144,34 @@ public OptionsMetadata(
boolean requiresMatcher,
boolean replaceJdbcMetadataQueries,
JSONObject commandMetadata) {
+ this(
+ System.getProperty("os.name", ""),
+ defaultConnectionUrl,
+ proxyPort,
+ textFormat,
+ forceBinary,
+ authenticate,
+ requiresMatcher,
+ replaceJdbcMetadataQueries,
+ commandMetadata);
+ }
+
+ OptionsMetadata(
+ String osName,
+ String defaultConnectionUrl,
+ int proxyPort,
+ TextFormat textFormat,
+ boolean forceBinary,
+ boolean authenticate,
+ boolean requiresMatcher,
+ boolean replaceJdbcMetadataQueries,
+ JSONObject commandMetadata) {
+ this.osName = osName;
this.commandLine = null;
this.commandMetadataParser = new CommandMetadataParser();
this.defaultConnectionUrl = defaultConnectionUrl;
this.proxyPort = proxyPort;
+ this.socketFile = isWindows() ? "" : DEFAULT_SOCKET_FILE;
this.textFormat = textFormat;
this.binaryFormat = forceBinary;
this.authenticate = authenticate;
@@ -194,6 +228,13 @@ private int buildProxyPort(CommandLine commandLine) {
return port;
}
+ private String buildSocketFile(CommandLine commandLine) {
+ // Unix domain sockets are disabled by default on Windows.
+ return commandLine
+ .getOptionValue(OPTION_SOCKET_FILE, isWindows() ? "" : DEFAULT_SOCKET_FILE)
+ .trim();
+ }
+
/**
* Get credential file path from either command line or application default. If neither throw
* error.
@@ -306,6 +347,15 @@ private CommandLine buildOptions(String[] args) {
Options options = new Options();
options.addOption(
OPTION_SERVER_PORT, "server-port", true, "This proxy's port number (Default 5432).");
+ options.addOption(
+ OPTION_SOCKET_FILE,
+ "server-socket-file",
+ true,
+ String.format(
+ "This proxy's domain socket file (Default %s). "
+ + "Domain sockets are disabled by default on Windows. Set this property to a non-empty value on Windows to enable domain sockets. "
+ + "Set this property to the empty string on other operating systems to disable domain sockets.",
+ String.format(DEFAULT_SOCKET_FILE, 5432)));
options.addRequiredOption(
OPTION_PROJECT_ID,
"project",
@@ -493,6 +543,14 @@ public int getProxyPort() {
return this.proxyPort;
}
+ public boolean isDomainSocketEnabled() {
+ return !Strings.isNullOrEmpty(this.socketFile);
+ }
+
+ public String getSocketFile(int localPort) {
+ return String.format(this.socketFile, localPort);
+ }
+
public TextFormat getTextFormat() {
return this.textFormat;
}
@@ -521,6 +579,11 @@ public String getServerVersion() {
return serverVersion;
}
+ /** Returns true if the OS is Windows. */
+ public boolean isWindows() {
+ return osName.toLowerCase().contains("win");
+ }
+
/**
* The PostgreSQL wire protocol can send data in both binary and text format. When using text
* format, the {@link Server} will normally send output back to the client using a format
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyStatement.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyStatement.java
index 308369379..288502c60 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyStatement.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyStatement.java
@@ -294,9 +294,9 @@ public void handleExecutionException(SpannerException e) {
}
@Override
- public void handleExecutionException(int index, SpannerException e) {
+ public void handleExecutionException(int index, SpannerException exception) {
executor.shutdownNow();
- super.handleExecutionException(index, e);
+ super.handleExecutionException(index, exception);
}
@Override
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePortalStatement.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePortalStatement.java
index e5dd694e8..1377c73f8 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePortalStatement.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePortalStatement.java
@@ -25,7 +25,6 @@
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import java.util.ArrayList;
import java.util.List;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -94,9 +93,9 @@ public DescribeMetadata describe() {
ResultSet statementResult = connection.executeQuery(this.statement);
setStatementResult(0, statementResult);
return new DescribePortalMetadata(statementResult);
- } catch (SpannerException e) {
- logger.log(Level.SEVERE, e, e::getMessage);
- throw e;
+ } catch (SpannerException exception) {
+ handleExecutionExceptionAndTransactionStatus(0, exception);
+ throw exception;
}
}
}
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePreparedStatement.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePreparedStatement.java
index 5e10f5fbf..fb097a07d 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePreparedStatement.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediatePreparedStatement.java
@@ -15,9 +15,11 @@
package com.google.cloud.spanner.pgadapter.statements;
import com.google.api.core.InternalApi;
+import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
@@ -32,6 +34,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.logging.Logger;
import org.postgresql.core.Oid;
/**
@@ -39,7 +42,8 @@
*/
@InternalApi
public class IntermediatePreparedStatement extends IntermediateStatement {
-
+ private static final Logger logger =
+ Logger.getLogger(IntermediatePreparedStatement.class.getName());
protected int[] parameterDataTypes;
protected Statement statement;
@@ -77,6 +81,12 @@ public void setParameterDataTypes(int[] parameterDataTypes) {
@Override
public void execute() {
+ // TODO(230579451): Refactor to use ClientSideStatement information.
+ if (connectionHandler.getStatus() == ConnectionStatus.TRANSACTION_ABORTED) {
+ handleTransactionAborted();
+ return;
+ }
+
// If the portal has already been described, the statement has already been executed, and we
// don't need to do that once more.
if (getStatementResult(0) == null) {
@@ -94,12 +104,31 @@ public void execute() {
connectionHandler.setStatus(
connection.isInTransaction() ? ConnectionStatus.TRANSACTION : ConnectionStatus.IDLE);
}
- } catch (SpannerException e) {
- handleExecutionException(0, e);
+ } catch (SpannerException exception) {
+ handleExecutionExceptionAndTransactionStatus(0, exception);
}
}
}
+ private void handleTransactionAborted() {
+ // TODO(230579451): Refactor to use ClientSideStatement information.
+ String command = getCommand(0);
+ if ("COMMIT".equals(command) || "ROLLBACK".equals(command)) {
+ connectionHandler.setStatus(ConnectionStatus.IDLE);
+ // COMMIT rollbacks aborted transaction
+ commandTags.set(0, "ROLLBACK");
+ if (connection.isInTransaction()) {
+ connection.rollback();
+ }
+ } else {
+ handleExecutionException(
+ executedCount,
+ SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INVALID_ARGUMENT, TRANSACTION_ABORTED_ERROR));
+ executedCount++;
+ }
+ }
+
/**
* Bind this statement (that is to say, transform it into a portal by giving it the data items to
* complete the statement.
@@ -130,16 +159,21 @@ public IntermediatePortalStatement bind(
@Override
public DescribeMetadata describe() {
- if (this.parsedStatement.isQuery()) {
- Statement statement = Statement.of(this.parsedStatement.getSqlWithoutComments());
- try (ResultSet resultSet = connection.analyzeQuery(statement, QueryAnalyzeMode.PLAN)) {
- // TODO: Remove ResultSet.next() call once this is supported in the client library.
- // See https://github.com/googleapis/java-spanner/pull/1691
- resultSet.next();
- return new DescribeStatementMetadata(getParameterTypes(), resultSet);
+ try {
+ if (this.parsedStatement.isQuery()) {
+ Statement statement = Statement.of(this.parsedStatement.getSqlWithoutComments());
+ try (ResultSet resultSet = connection.analyzeQuery(statement, QueryAnalyzeMode.PLAN)) {
+ // TODO: Remove ResultSet.next() call once this is supported in the client library.
+ // See https://github.com/googleapis/java-spanner/pull/1691
+ resultSet.next();
+ return new DescribeStatementMetadata(getParameterTypes(), resultSet);
+ }
}
+ return new DescribeStatementMetadata(getParameterTypes(), null);
+ } catch (SpannerException exception) {
+ this.handleExecutionExceptionAndTransactionStatus(0, exception);
+ throw exception;
}
- return new DescribeStatementMetadata(getParameterTypes(), null);
}
/**
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java
index 52c640dde..8f588f0b5 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java
@@ -346,18 +346,19 @@ protected void updateBatchResultCount(int fromIndex, long[] updateCounts) {
/**
* Clean up and save metadata when an exception occurs.
*
- * @param e The exception to store.
+ * @param exception The exception to store.
*/
- protected void handleExecutionException(int index, SpannerException e) {
- this.exceptions[index] = e;
+ protected void handleExecutionException(int index, SpannerException exception) {
+ this.exceptions[index] = exception;
this.hasMoreData[index] = false;
}
- private void handleExecutionExceptionAndTransactionStatus(int index, SpannerException e) {
+ protected void handleExecutionExceptionAndTransactionStatus(
+ int index, SpannerException exception) {
if (executionStatus == ExecutionStatus.EXPLICIT_TRANSACTION) {
connectionHandler.setStatus(ConnectionStatus.TRANSACTION_ABORTED);
}
- handleExecutionException(index, e);
+ handleExecutionException(index, exception);
}
/** Execute the SQL statement, storing metadata. */
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/DescribeMessage.java b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/DescribeMessage.java
index ce5319e25..00e84ee6c 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/DescribeMessage.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/DescribeMessage.java
@@ -15,6 +15,7 @@
package com.google.cloud.spanner.pgadapter.wireprotocol;
import com.google.api.core.InternalApi;
+import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
import com.google.cloud.spanner.pgadapter.metadata.DescribePortalMetadata;
@@ -100,14 +101,18 @@ public void handleDescribePortal() throws Exception {
new NoDataResponse(this.outputStream).send();
break;
case QUERY:
- new RowDescriptionResponse(
- this.outputStream,
- this.statement,
- ((DescribePortalMetadata) this.statement.describe()).getMetadata(),
- this.connection.getServer().getOptions(),
- QueryMode.EXTENDED)
- .send();
- break;
+ try {
+ new RowDescriptionResponse(
+ this.outputStream,
+ this.statement,
+ ((DescribePortalMetadata) this.statement.describe()).getMetadata(),
+ this.connection.getServer().getOptions(),
+ QueryMode.EXTENDED)
+ .send();
+ break;
+ } catch (SpannerException exception) {
+ this.handleError(exception);
+ }
}
}
}
@@ -118,18 +123,22 @@ public void handleDescribePortal() throws Exception {
* @throws Exception if sending the message back to the client causes an error.
*/
public void handleDescribeStatement() throws Exception {
- DescribeStatementMetadata metadata = (DescribeStatementMetadata) this.statement.describe();
- new ParameterDescriptionResponse(this.outputStream, metadata.getParameters()).send();
- if (metadata.getResultSet() != null) {
- new RowDescriptionResponse(
- this.outputStream,
- this.statement,
- metadata.getResultSet(),
- this.connection.getServer().getOptions(),
- QueryMode.EXTENDED)
- .send();
- } else {
- new NoDataResponse(this.outputStream).send();
+ try {
+ DescribeStatementMetadata metadata = (DescribeStatementMetadata) this.statement.describe();
+ new ParameterDescriptionResponse(this.outputStream, metadata.getParameters()).send();
+ if (metadata.getResultSet() != null) {
+ new RowDescriptionResponse(
+ this.outputStream,
+ this.statement,
+ metadata.getResultSet(),
+ this.connection.getServer().getOptions(),
+ QueryMode.EXTENDED)
+ .send();
+ } else {
+ new NoDataResponse(this.outputStream).send();
+ }
+ } catch (SpannerException exception) {
+ this.handleError(exception);
}
}
}
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ExecuteMessage.java b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ExecuteMessage.java
index 66b2154c5..833dfedbe 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ExecuteMessage.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ExecuteMessage.java
@@ -84,8 +84,8 @@ private void handleExecute() throws Exception {
try {
this.sendSpannerResult(0, this.statement, QueryMode.EXTENDED, this.maxRows);
this.outputStream.flush();
- } catch (Exception e) {
- handleError(e);
+ } catch (Exception exception) {
+ handleError(exception);
}
}
this.connection.cleanUp(this.statement);
diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/QueryMessage.java b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/QueryMessage.java
index f83931e43..dadbdc4f0 100644
--- a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/QueryMessage.java
+++ b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/QueryMessage.java
@@ -31,7 +31,6 @@
import com.google.cloud.spanner.pgadapter.wireoutput.ErrorResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.ErrorResponse.State;
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse;
-import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse.Status;
import com.google.cloud.spanner.pgadapter.wireoutput.RowDescriptionResponse;
import java.text.MessageFormat;
@@ -134,20 +133,14 @@ public void handleQuery() throws Exception {
this.sendSpannerResult(index, this.statement, QueryMode.SIMPLE, 0L);
}
}
- boolean inTransaction = connection.getSpannerConnection().isInTransaction();
- Status transactionStatus = Status.IDLE;
- if (inTransaction) {
- if (connection.getStatus() == ConnectionStatus.TRANSACTION_ABORTED) {
- transactionStatus = Status.FAILED;
- // Actively rollback the aborted transaction but still block clients
- // Clear any statement tags, as these are not allowed for rollbacks.
- connection.getSpannerConnection().setStatementTag(null);
- connection.getSpannerConnection().rollback();
- } else {
- transactionStatus = Status.TRANSACTION;
- }
+ if (connection.getSpannerConnection().isInTransaction()
+ && connection.getStatus() == ConnectionStatus.TRANSACTION_ABORTED) {
+ // Actively rollback the aborted transaction but still block clients
+ // Clear any statement tags, as these are not allowed for rollbacks.
+ connection.getSpannerConnection().setStatementTag(null);
+ connection.getSpannerConnection().rollback();
}
- new ReadyResponse(this.outputStream, transactionStatus).send();
+ new ReadyResponse(this.outputStream, connection.getStatus().getReadyResponseStatus()).send();
this.connection.cleanUp(this.statement);
}
}
diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/AbstractMockServerTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/AbstractMockServerTest.java
index 84ee84709..c326ed616 100644
--- a/src/test/java/com/google/cloud/spanner/pgadapter/AbstractMockServerTest.java
+++ b/src/test/java/com/google/cloud/spanner/pgadapter/AbstractMockServerTest.java
@@ -333,7 +333,7 @@ public static void stopMockSpannerAndPgAdapterServers() throws Exception {
if (e.getErrorCode() == ErrorCode.FAILED_PRECONDITION
&& e.getMessage()
.contains(
- "There is/are 1 connection(s) still open. Close all connections before calling closeSpanner()")) {
+ "connection(s) still open. Close all connections before calling closeSpanner()")) {
// Ignore this exception for now. It is caused by the fact that the PgAdapter proxy server
// is not gracefully shutting down all connections when the proxy is stopped, and it also
// does not wait until any connections that have been requested to close, actually have
diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/ConnectionHandlerTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/ConnectionHandlerTest.java
new file mode 100644
index 000000000..0e5e65e09
--- /dev/null
+++ b/src/test/java/com/google/cloud/spanner/pgadapter/ConnectionHandlerTest.java
@@ -0,0 +1,77 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.cloud.spanner.pgadapter;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ConnectionHandlerTest {
+
+ @Test
+ public void testTerminateClosesSocket() throws IOException {
+ ProxyServer server = mock(ProxyServer.class);
+ Socket socket = mock(Socket.class);
+ InetAddress address = mock(InetAddress.class);
+ when(socket.getInetAddress()).thenReturn(address);
+
+ ConnectionHandler connection = new ConnectionHandler(server, socket);
+
+ connection.terminate();
+ verify(socket).close();
+ }
+
+ @Test
+ public void testTerminateDoesNotCloseSocketTwice() throws IOException {
+ ProxyServer server = mock(ProxyServer.class);
+ Socket socket = mock(Socket.class);
+ when(socket.isClosed()).thenReturn(false, true);
+ InetAddress address = mock(InetAddress.class);
+ when(socket.getInetAddress()).thenReturn(address);
+
+ ConnectionHandler connection = new ConnectionHandler(server, socket);
+
+ connection.terminate();
+ // Calling terminate a second time should be a no-op.
+ connection.terminate();
+
+ // Verify that close was only called once.
+ verify(socket).close();
+ }
+
+ @Test
+ public void testTerminateHandlesCloseError() throws IOException {
+ ProxyServer server = mock(ProxyServer.class);
+ Socket socket = mock(Socket.class);
+ InetAddress address = mock(InetAddress.class);
+ when(socket.getInetAddress()).thenReturn(address);
+ // IOException should be handled internally in terminate().
+ doThrow(new IOException("test exception")).when(socket).close();
+
+ ConnectionHandler connection = new ConnectionHandler(server, socket);
+
+ connection.terminate();
+ verify(socket).close();
+ }
+}
diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/ErrorHandlingTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/ErrorHandlingTest.java
new file mode 100644
index 000000000..de5a98522
--- /dev/null
+++ b/src/test/java/com/google/cloud/spanner/pgadapter/ErrorHandlingTest.java
@@ -0,0 +1,125 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.cloud.spanner.pgadapter;
+
+import static com.google.cloud.spanner.pgadapter.statements.IntermediateStatement.TRANSACTION_ABORTED_ERROR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
+import com.google.cloud.spanner.Statement;
+import com.google.spanner.v1.CommitRequest;
+import com.google.spanner.v1.RollbackRequest;
+import io.grpc.Status;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ErrorHandlingTest extends AbstractMockServerTest {
+ private static final String INVALID_SELECT = "SELECT * FROM unknown_table";
+
+ @Parameter public String preferQueryMode;
+
+ @Parameters(name = "preferQueryMode = {0}")
+ public static Object[] data() {
+ return new Object[] {"extended", "simple"};
+ }
+
+ @BeforeClass
+ public static void loadPgJdbcDriver() throws Exception {
+ // Make sure the PG JDBC driver is loaded.
+ Class.forName("org.postgresql.Driver");
+ }
+
+ @BeforeClass
+ public static void setupErrorResults() {
+ mockSpanner.putStatementResult(
+ StatementResult.exception(
+ Statement.of(INVALID_SELECT), Status.NOT_FOUND.asRuntimeException()));
+ }
+
+ private String createUrl() {
+ return String.format(
+ "jdbc:postgresql://localhost:%d/?preferQueryMode=%s",
+ pgServer.getLocalPort(), preferQueryMode);
+ }
+
+ @Test
+ public void testInvalidQueryNoTransaction() throws SQLException {
+ try (Connection connection = DriverManager.getConnection(createUrl())) {
+ SQLException exception =
+ assertThrows(
+ SQLException.class, () -> connection.createStatement().executeQuery(INVALID_SELECT));
+ assertTrue(exception.getMessage(), exception.getMessage().contains("NOT_FOUND"));
+
+ // The connection should be usable, as there was no transaction.
+ assertTrue(connection.createStatement().execute("SELECT 1"));
+ }
+ }
+
+ @Test
+ public void testInvalidQueryInTransaction() throws SQLException {
+ try (Connection connection = DriverManager.getConnection(createUrl())) {
+ connection.setAutoCommit(false);
+
+ SQLException exception =
+ assertThrows(
+ SQLException.class, () -> connection.createStatement().executeQuery(INVALID_SELECT));
+ assertTrue(exception.getMessage(), exception.getMessage().contains("NOT_FOUND"));
+
+ // The connection should be in the aborted state.
+ exception =
+ assertThrows(SQLException.class, () -> connection.createStatement().execute("SELECT 1"));
+ assertTrue(
+ exception.getMessage(), exception.getMessage().contains(TRANSACTION_ABORTED_ERROR));
+
+ // Rolling back the transaction should bring the connection back to a usable state.
+ connection.rollback();
+ assertTrue(connection.createStatement().execute("SELECT 1"));
+ }
+ }
+
+ @Test
+ public void testCommitAbortedTransaction() throws SQLException {
+ try (Connection connection = DriverManager.getConnection(createUrl())) {
+ connection.setAutoCommit(false);
+
+ SQLException exception =
+ assertThrows(
+ SQLException.class, () -> connection.createStatement().executeQuery(INVALID_SELECT));
+ assertTrue(exception.getMessage(), exception.getMessage().contains("NOT_FOUND"));
+
+ // The connection should be in the aborted state.
+ exception =
+ assertThrows(SQLException.class, () -> connection.createStatement().execute("SELECT 1"));
+ assertTrue(
+ exception.getMessage(), exception.getMessage().contains(TRANSACTION_ABORTED_ERROR));
+
+ // Committing the transaction will actually execute a rollback.
+ connection.commit();
+ }
+ // Check that we only received a rollback and no commit.
+ assertEquals(1, mockSpanner.countRequestsOfType(RollbackRequest.class));
+ assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class));
+ }
+}
diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/ITJdbcTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/ITJdbcTest.java
index e46dc598c..bf3468a9c 100644
--- a/src/test/java/com/google/cloud/spanner/pgadapter/ITJdbcTest.java
+++ b/src/test/java/com/google/cloud/spanner/pgadapter/ITJdbcTest.java
@@ -27,6 +27,7 @@
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import java.io.FileInputStream;
import java.io.IOException;
import java.math.BigDecimal;
@@ -37,8 +38,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -59,9 +62,26 @@ public class ITJdbcTest implements IntegrationTest {
@Parameter public String preferQueryMode;
- @Parameters(name = "preferQueryMode = {0}")
- public static Object[] data() {
- return new Object[] {"extended", "simple"};
+ @Parameter(1)
+ public boolean useDomainSocket;
+
+ @Parameters(name = "preferQueryMode = {0}, useDomainSocket = {1}")
+ public static List