diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyToStatement.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyToStatement.java index 0b9e81bd2..bbdce312a 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyToStatement.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyToStatement.java @@ -21,10 +21,12 @@ import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType; import com.google.cloud.spanner.pgadapter.ConnectionHandler; import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode; +import com.google.cloud.spanner.pgadapter.ProxyServer.DataFormat; import com.google.cloud.spanner.pgadapter.metadata.DescribePortalMetadata; import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata; import com.google.cloud.spanner.pgadapter.parsers.Parser; import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions; +import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions.Format; import com.google.cloud.spanner.pgadapter.wireoutput.CopyDataResponse; import com.google.cloud.spanner.pgadapter.wireoutput.CopyDoneResponse; import com.google.cloud.spanner.pgadapter.wireoutput.CopyOutResponse; @@ -43,6 +45,10 @@ */ @InternalApi public class CopyToStatement extends IntermediatePortalStatement { + public static final byte[] COPY_BINARY_HEADER = + new byte[] {'P', 'G', 'C', 'O', 'P', 'Y', '\n', -1, '\r', '\n', '\0'}; + + private final CopyOptions copyOptions; private final CSVFormat csvFormat; public CopyToStatement( @@ -56,30 +62,35 @@ public CopyToStatement( name, createParsedStatement(copyOptions), createSelectStatement(copyOptions)); - CSVFormat.Builder formatBuilder = - CSVFormat.Builder.create(CSVFormat.POSTGRESQL_TEXT) - .setNullString( - copyOptions.getNullString() == null - ? CSVFormat.POSTGRESQL_TEXT.getNullString() - : copyOptions.getNullString()) - .setRecordSeparator('\n') - .setDelimiter( - copyOptions.getDelimiter() == 0 - ? CSVFormat.POSTGRESQL_TEXT.getDelimiterString().charAt(0) - : copyOptions.getDelimiter()) - .setQuote( - copyOptions.getQuote() == 0 - ? CSVFormat.POSTGRESQL_TEXT.getQuoteCharacter() - : copyOptions.getQuote()) - .setEscape( - copyOptions.getEscape() == 0 - ? CSVFormat.POSTGRESQL_TEXT.getEscapeCharacter() - : copyOptions.getEscape()) - .setQuoteMode(QuoteMode.NONE); - if (copyOptions.hasHeader()) { - formatBuilder.setHeader(copyOptions.getColumnNames().toArray(new String[0])); + this.copyOptions = copyOptions; + if (copyOptions.getFormat() == Format.BINARY) { + this.csvFormat = null; + } else { + CSVFormat.Builder formatBuilder = + CSVFormat.Builder.create(CSVFormat.POSTGRESQL_TEXT) + .setNullString( + copyOptions.getNullString() == null + ? CSVFormat.POSTGRESQL_TEXT.getNullString() + : copyOptions.getNullString()) + .setRecordSeparator('\n') + .setDelimiter( + copyOptions.getDelimiter() == 0 + ? CSVFormat.POSTGRESQL_TEXT.getDelimiterString().charAt(0) + : copyOptions.getDelimiter()) + .setQuote( + copyOptions.getQuote() == 0 + ? CSVFormat.POSTGRESQL_TEXT.getQuoteCharacter() + : copyOptions.getQuote()) + .setEscape( + copyOptions.getEscape() == 0 + ? CSVFormat.POSTGRESQL_TEXT.getEscapeCharacter() + : copyOptions.getEscape()) + .setQuoteMode(QuoteMode.NONE); + if (copyOptions.hasHeader()) { + formatBuilder.setHeader(copyOptions.getColumnNames().toArray(new String[0])); + } + this.csvFormat = formatBuilder.build(); } - this.csvFormat = formatBuilder.build(); } static ParsedStatement createParsedStatement(CopyOptions copyOptions) { @@ -134,18 +145,36 @@ public IntermediatePortalStatement bind( } @Override - public CopyOutResponse createResultPrefix(ResultSet resultSet) { - return new CopyOutResponse(this.outputStream, resultSet.getColumnCount(), 0); + public WireOutput[] createResultPrefix(ResultSet resultSet) { + return this.copyOptions.getFormat() == Format.BINARY + ? new WireOutput[] { + new CopyOutResponse( + this.outputStream, + resultSet.getColumnCount(), + DataFormat.POSTGRESQL_BINARY.getCode()), + CopyDataResponse.createBinaryHeader(this.outputStream) + } + : new WireOutput[] { + new CopyOutResponse( + this.outputStream, resultSet.getColumnCount(), DataFormat.POSTGRESQL_TEXT.getCode()) + }; } @Override - public WireOutput createDataRowResponse(ResultSet resultSet, QueryMode mode) { - return createDataResponse(resultSet); + public CopyDataResponse createDataRowResponse(ResultSet resultSet, QueryMode mode) { + return copyOptions.getFormat() == Format.BINARY + ? createBinaryDataResponse(resultSet) + : createDataResponse(resultSet); } @Override - public CopyDoneResponse createResultSuffix() { - return new CopyDoneResponse(this.outputStream); + public WireOutput[] createResultSuffix() { + return this.copyOptions.getFormat() == Format.BINARY + ? new WireOutput[] { + CopyDataResponse.createBinaryTrailer(this.outputStream), + new CopyDoneResponse(this.outputStream) + } + : new WireOutput[] {new CopyDoneResponse(this.outputStream)}; } CopyDataResponse createDataResponse(ResultSet resultSet) { @@ -161,4 +190,21 @@ CopyDataResponse createDataResponse(ResultSet resultSet) { String row = csvFormat.format((Object[]) data); return new CopyDataResponse(this.outputStream, row, csvFormat.getRecordSeparator().charAt(0)); } + + CopyDataResponse createBinaryDataResponse(ResultSet resultSet) { + // Multiply number of columns by 4, as each column has as 4-byte length value. + // In addition, each row has a 2-byte number of columns value.s + int length = 2 + resultSet.getColumnCount() * 4; + byte[][] data = new byte[resultSet.getColumnCount()][]; + for (int col = 0; col < resultSet.getColumnCount(); col++) { + if (!resultSet.isNull(col)) { + Parser parser = Parser.create(resultSet, resultSet.getColumnType(col), col); + data[col] = parser.parse(DataFormat.POSTGRESQL_BINARY); + if (data[col] != null) { + length += data[col].length; + } + } + } + return new CopyDataResponse(this.outputStream, length, data); + } } diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java index 66734aa5b..9320403f8 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/IntermediateStatement.java @@ -38,7 +38,6 @@ import java.io.DataOutputStream; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import javax.annotation.Nullable; /** * Data type to store simple SQL statement with designated metadata. Allows manipulation of @@ -47,6 +46,8 @@ */ @InternalApi public class IntermediateStatement { + private static final WireOutput[] EMPTY_WIRE_OUTPUT_ARRAY = new WireOutput[0]; + /** * Indicates whether an attempt to get the result of a statement should block or fail if the * result is not yet available. Normal SQL commands that can be executed directly on Cloud Spanner @@ -315,17 +316,19 @@ public String getCommandTag() { return this.commandTag; } - public @Nullable WireOutput createResultPrefix(ResultSet resultSet) { + public WireOutput[] createResultPrefix(ResultSet resultSet) { // This is a no-op for a normal query. COPY uses this to send a CopyOutResponse. - return null; + // COPY table_name TO STDOUT BINARY also uses this to add the binary copy header. + return EMPTY_WIRE_OUTPUT_ARRAY; } public WireOutput createDataRowResponse(ResultSet resultSet, QueryMode mode) { return new DataRowResponse(this.outputStream, this, resultSet, this.options, mode); } - public @Nullable WireOutput createResultSuffix() { + public WireOutput[] createResultSuffix() { // This is a no-op for a normal query. COPY uses this to send a CopyDoneResponse. - return null; + // COPY table_name TO STDOUT BINARY also uses this to add the binary copy trailer. + return EMPTY_WIRE_OUTPUT_ARRAY; } } diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParser.java b/src/main/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParser.java index d8b3cfd39..123218b85 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParser.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParser.java @@ -14,6 +14,8 @@ package com.google.cloud.spanner.pgadapter.utils; +import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER; + import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.Type; @@ -47,8 +49,6 @@ */ class BinaryCopyParser implements CopyInParser { private static final Logger logger = Logger.getLogger(BinaryCopyParser.class.getName()); - static final byte[] EXPECTED_HEADER = - new byte[] {'P', 'G', 'C', 'O', 'P', 'Y', '\n', -1, '\r', '\n', '\0'}; private final DataInputStream dataInputStream; private boolean containsOids; @@ -98,12 +98,12 @@ void verifyBinaryHeader() throws IOException { // PGCOPY\n\377\r\n\0 byte[] header = new byte[11]; this.dataInputStream.readFully(header); - if (!Arrays.equals(EXPECTED_HEADER, header)) { + if (!Arrays.equals(COPY_BINARY_HEADER, header)) { throw new IOException( String.format( "Invalid COPY header encountered.\nGot: %s\nWant: %s", new String(header, StandardCharsets.UTF_8), - new String(EXPECTED_HEADER, StandardCharsets.UTF_8))); + new String(COPY_BINARY_HEADER, StandardCharsets.UTF_8))); } } diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyInParser.java b/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyInParser.java index 81e2c4071..8a701a301 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyInParser.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyInParser.java @@ -14,6 +14,7 @@ package com.google.cloud.spanner.pgadapter.utils; +import com.google.api.core.InternalApi; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions; @@ -25,7 +26,8 @@ import org.postgresql.jdbc.TimestampUtils; /** Common interface for parsers that implement one or more of the PostgreSQL COPY formats. */ -interface CopyInParser { +@InternalApi +public interface CopyInParser { /** * Creates a {@link CopyInParser} for the given format. The csvFormat argument is only required * for non-binary formats. diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyRecord.java b/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyRecord.java index 18be9e863..e6c252aa4 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyRecord.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/utils/CopyRecord.java @@ -14,6 +14,7 @@ package com.google.cloud.spanner.pgadapter.utils; +import com.google.api.core.InternalApi; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Value; @@ -21,7 +22,8 @@ * {@link CopyRecord} is a common interface for COPY data records that are produced by a parser for * a specific COPY format. */ -interface CopyRecord { +@InternalApi +public interface CopyRecord { /** Returns the number of columns in the record. */ int numColumns(); diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/wireoutput/CopyDataResponse.java b/src/main/java/com/google/cloud/spanner/pgadapter/wireoutput/CopyDataResponse.java index 24a42a341..8b2568627 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/wireoutput/CopyDataResponse.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/wireoutput/CopyDataResponse.java @@ -14,24 +14,91 @@ package com.google.cloud.spanner.pgadapter.wireoutput; +import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER; + +import com.google.api.core.InternalApi; +import com.google.cloud.spanner.pgadapter.ProxyServer.DataFormat; import java.io.DataOutputStream; import java.nio.charset.StandardCharsets; +@InternalApi public class CopyDataResponse extends WireOutput { + @InternalApi + public enum ResponseType { + HEADER, + ROW, + TRAILER, + } + private final ResponseType responseType; + private final DataFormat format; + private final byte[][] binaryData; private final String stringData; private final char rowTerminator; + /** Creates a {@link CopyDataResponse} message containing the fixed binary COPY header. */ + @InternalApi + public static CopyDataResponse createBinaryHeader(DataOutputStream output) { + return new CopyDataResponse(output, COPY_BINARY_HEADER.length + 8, ResponseType.HEADER); + } + + /** Creates a {@link CopyDataResponse} message containing the fixed binary COPY trailer. */ + @InternalApi + public static CopyDataResponse createBinaryTrailer(DataOutputStream output) { + return new CopyDataResponse(output, 2, ResponseType.TRAILER); + } + + private CopyDataResponse(DataOutputStream output, int length, ResponseType responseType) { + super(output, length + 4); + this.responseType = responseType; + this.format = DataFormat.POSTGRESQL_BINARY; + this.binaryData = null; + this.stringData = null; + this.rowTerminator = 0; + } + public CopyDataResponse(DataOutputStream output, String data, char rowTerminator) { super(output, data.length() + 5); + this.responseType = ResponseType.ROW; + this.format = DataFormat.POSTGRESQL_TEXT; this.stringData = data; this.rowTerminator = rowTerminator; + this.binaryData = null; + } + + public CopyDataResponse(DataOutputStream output, int length, byte[][] data) { + super(output, length + 4); + this.responseType = ResponseType.ROW; + this.format = DataFormat.POSTGRESQL_BINARY; + this.stringData = null; + this.rowTerminator = 0; + this.binaryData = data; } @Override protected void sendPayload() throws Exception { - this.outputStream.write(this.stringData.getBytes(StandardCharsets.UTF_8)); - this.outputStream.write(this.rowTerminator); + if (this.format == DataFormat.POSTGRESQL_TEXT) { + this.outputStream.write(this.stringData.getBytes(StandardCharsets.UTF_8)); + this.outputStream.write(this.rowTerminator); + } else if (this.format == DataFormat.POSTGRESQL_BINARY) { + if (this.responseType == ResponseType.HEADER) { + this.outputStream.write(COPY_BINARY_HEADER); + this.outputStream.writeInt(0); // flags + this.outputStream.writeInt(0); // header extension area length + } else if (this.responseType == ResponseType.TRAILER) { + this.outputStream.writeShort(-1); + } else { + this.outputStream.writeShort(this.binaryData.length); + for (int col = 0; col < this.binaryData.length; col++) { + if (this.binaryData[col] == null) { + this.outputStream.writeInt(-1); + } else { + this.outputStream.writeInt(this.binaryData[col].length); + this.outputStream.write(this.binaryData[col]); + } + } + } + } } @Override diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java index 183917730..8ea9adaad 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java @@ -312,8 +312,7 @@ public SendResultSetState sendResultSet( hasData = runnable.hasData; } - WireOutput suffix = describedResult.createResultSuffix(); - if (suffix != null) { + for (WireOutput suffix : describedResult.createResultSuffix()) { suffix.send(false); } return new SendResultSetState(describedResult.getCommandTag(), rows, hasData); @@ -469,8 +468,7 @@ public Long call() throws Exception { } if (includePrefix) { try { - WireOutput prefix = describedResult.createResultPrefix(resultSet); - if (prefix != null) { + for (WireOutput prefix : describedResult.createResultPrefix(resultSet)) { prefix.send(false); } prefixSent.set(true); diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/CopyOutMockServerTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/CopyOutMockServerTest.java index 65bb736ba..44f524ebb 100644 --- a/src/test/java/com/google/cloud/spanner/pgadapter/CopyOutMockServerTest.java +++ b/src/test/java/com/google/cloud/spanner/pgadapter/CopyOutMockServerTest.java @@ -18,13 +18,22 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.MockSpannerServiceImpl; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata; +import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions.Format; +import com.google.cloud.spanner.pgadapter.utils.CopyInParser; +import com.google.cloud.spanner.pgadapter.utils.CopyRecord; import com.google.protobuf.ByteString; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; @@ -38,6 +47,8 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PipedOutputStream; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.sql.Connection; @@ -45,6 +56,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Scanner; import java.util.concurrent.CountDownLatch; @@ -232,6 +244,117 @@ public void testCopyOutNulls() throws SQLException, IOException { } } + private static boolean isPsqlAvailable() { + ProcessBuilder builder = new ProcessBuilder(); + String[] psqlCommand = new String[] {"psql", "--version"}; + builder.command(psqlCommand); + try { + Process process = builder.start(); + int res = process.waitFor(); + + return res == 0; + } catch (Exception ignored) { + return false; + } + } + + @Test + public void testCopyOutBinaryPsql() throws Exception { + assumeTrue("This test requires psql", isPsqlAvailable()); + mockSpanner.putStatementResult( + StatementResult.query(Statement.of("select * from all_types"), ALL_TYPES_RESULTSET)); + + ProcessBuilder builder = new ProcessBuilder(); + builder.command( + "psql", + "-h", + (useDomainSocket ? "/tmp" : "localhost"), + "-p", + String.valueOf(pgServer.getLocalPort()), + "-c", + "copy all_types to stdout binary"); + Process process = builder.start(); + StringBuilder errorBuilder = new StringBuilder(); + try (Scanner scanner = new Scanner(new InputStreamReader(process.getErrorStream()))) { + while (scanner.hasNextLine()) { + errorBuilder.append(scanner.nextLine()).append('\n'); + } + } + PipedOutputStream pipedOutputStream = new PipedOutputStream(); + CopyInParser copyParser = + CopyInParser.create(Format.BINARY, null, pipedOutputStream, 1 << 16, false); + int b; + while ((b = process.getInputStream().read()) != -1) { + pipedOutputStream.write(b); + } + int res = process.waitFor(); + assertEquals("", errorBuilder.toString()); + assertEquals(0, res); + + Iterator iterator = copyParser.iterator(); + assertTrue(iterator.hasNext()); + CopyRecord record = iterator.next(); + assertFalse(iterator.hasNext()); + + assertEquals(Value.int64(1L), record.getValue(Type.int64(), 0)); + assertEquals(Value.bool(true), record.getValue(Type.bool(), 1)); + assertEquals(Value.bytes(ByteArray.copyFrom("test")), record.getValue(Type.bytes(), 2)); + assertEquals(Value.float64(3.14), record.getValue(Type.float64(), 3)); + assertEquals(Value.int64(100L), record.getValue(Type.int64(), 4)); + assertEquals(Value.pgNumeric("6.626"), record.getValue(Type.pgNumeric(), 5)); + // Note: The binary format truncates timestamptz value to microsecond precision. + assertEquals( + Value.timestamp(Timestamp.parseTimestamp("2022-02-16T13:18:02.123456000Z")), + record.getValue(Type.timestamp(), 6)); + assertEquals(Value.date(Date.parseDate("2022-03-29")), record.getValue(Type.date(), 7)); + assertEquals(Value.string("test"), record.getValue(Type.string(), 8)); + } + + @Test + public void testCopyOutNullsBinaryPsql() throws Exception { + assumeTrue("This test requires psql", isPsqlAvailable()); + mockSpanner.putStatementResult( + StatementResult.query(Statement.of("select * from all_types"), ALL_TYPES_NULLS_RESULTSET)); + + ProcessBuilder builder = new ProcessBuilder(); + builder.command( + "psql", + "-h", + (useDomainSocket ? "/tmp" : "localhost"), + "-p", + String.valueOf(pgServer.getLocalPort()), + "-c", + "copy all_types to stdout binary"); + Process process = builder.start(); + StringBuilder errorBuilder = new StringBuilder(); + try (Scanner scanner = new Scanner(new InputStreamReader(process.getErrorStream()))) { + while (scanner.hasNextLine()) { + errorBuilder.append(scanner.nextLine()).append('\n'); + } + } + PipedOutputStream pipedOutputStream = new PipedOutputStream(); + CopyInParser copyParser = + CopyInParser.create(Format.BINARY, null, pipedOutputStream, 1 << 16, false); + int b; + while ((b = process.getInputStream().read()) != -1) { + pipedOutputStream.write(b); + } + int res = process.waitFor(); + assertEquals("", errorBuilder.toString()); + assertEquals(0, res); + + Iterator iterator = copyParser.iterator(); + assertTrue(iterator.hasNext()); + CopyRecord record = iterator.next(); + assertFalse(iterator.hasNext()); + + for (int col = 0; col < record.numColumns(); col++) { + // Note: Null values in a COPY BINARY stream are untyped, so it does not matter what type we + // specify when getting the value. + assertTrue(record.getValue(Type.string(), col).isNull()); + } + } + @Test public void testCopyOutPartitioned() throws SQLException, IOException { final int expectedRowCount = 100; diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/ITPsqlTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/ITPsqlTest.java index 0e40c6930..358dd0d19 100644 --- a/src/test/java/com/google/cloud/spanner/pgadapter/ITPsqlTest.java +++ b/src/test/java/com/google/cloud/spanner/pgadapter/ITPsqlTest.java @@ -149,7 +149,12 @@ private static void createDataModel() throws Exception { "-d", POSTGRES_DATABASE, "-c", - String.join(";", DEFAULT_DATA_MODEL) + ";\n" + // We need to change the `col_int int` column definition into `col_int bigint` to make the + // local PostgreSQL database match the actual data model of the Cloud Spanner database. + DEFAULT_DATA_MODEL.stream() + .map(s -> s.replace("col_int int", "col_int bigint")) + .collect(Collectors.joining(";")) + + ";\n" }; ProcessBuilder builder = new ProcessBuilder().command(createTablesCommand); setPgPassword(builder); @@ -337,7 +342,8 @@ public void testCopyFromPostgreSQLToCloudSpanner() throws Exception { "bash", "-c", "psql" - + " -c \"copy all_types to stdout\" " + + " -c \"copy all_types to stdout" + + (binary ? " binary \" " : "\" ") + " -h " + (POSTGRES_HOST.startsWith("/") ? "/tmp" : testEnv.getPGAdapterHost()) + " -p " @@ -353,7 +359,9 @@ public void testCopyFromPostgreSQLToCloudSpanner() throws Exception { + POSTGRES_USER + " -d " + POSTGRES_DATABASE - + " -c \"copy all_types from stdin;\"\n"); + + " -c \"copy all_types from stdin " + + (binary ? "binary" : "") + + ";\"\n"); setPgPassword(copyToPostgresBuilder); Process copyToPostgresProcess = copyToPostgresBuilder.start(); InputStream errorStream = copyToPostgresProcess.getErrorStream(); diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/parsers/DateParserTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/parsers/DateParserTest.java index 59dc29b31..cb4f998d8 100644 --- a/src/test/java/com/google/cloud/spanner/pgadapter/parsers/DateParserTest.java +++ b/src/test/java/com/google/cloud/spanner/pgadapter/parsers/DateParserTest.java @@ -40,7 +40,6 @@ public void testToDate() { @Test public void testStringParse() { - assertEquals("2022-07-08", new DateParser(Date.fromYearMonthDay(2022, 7, 8)).stringParse()); assertNull(new DateParser(null).stringParse()); } diff --git a/src/test/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParserTest.java b/src/test/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParserTest.java index 76e4afcfe..aa0c72a61 100644 --- a/src/test/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParserTest.java +++ b/src/test/java/com/google/cloud/spanner/pgadapter/utils/BinaryCopyParserTest.java @@ -14,7 +14,7 @@ package com.google.cloud.spanner.pgadapter.utils; -import static com.google.cloud.spanner.pgadapter.utils.BinaryCopyParser.EXPECTED_HEADER; +import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -46,7 +46,7 @@ public void testValidHeader() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); parser.verifyBinaryHeader(); } @@ -69,7 +69,7 @@ public void testGetIterator() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); // flags data.writeInt(0); // header extensions length @@ -82,7 +82,7 @@ public void testGetIterator_MissingData() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); pipedOutputStream.close(); assertThrows(SpannerException.class, parser::iterator); @@ -94,7 +94,7 @@ public void testIteratorHasNext_Trailer() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -111,7 +111,7 @@ public void testIteratorHasNext() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -136,7 +136,7 @@ public void testIteratorHasNext_InvalidFieldCount() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -156,7 +156,7 @@ public void testIteratorHasNext_DifferentFieldCounts() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -182,7 +182,7 @@ public void testIteratorHasNext_EndOfFile() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -200,7 +200,7 @@ public void testIteratorNext_WithNoMoreElements() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -218,7 +218,7 @@ public void testIteratorNext_NullField() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -242,7 +242,7 @@ public void testIteratorNext_GetValue() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -267,7 +267,7 @@ public void testIteratorNext_NegativeFieldLength() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -286,7 +286,7 @@ public void testIteratorNext_EndOfFile() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(0); data.writeInt(0); @@ -306,7 +306,7 @@ public void testIteratorNext_WithOid() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(1 << 16); data.writeInt(0); @@ -336,7 +336,7 @@ public void testIteratorNext_InvalidOidLength() throws IOException { BinaryCopyParser parser = new BinaryCopyParser(pipedOutputStream, 256); DataOutputStream data = new DataOutputStream(pipedOutputStream); - data.write(EXPECTED_HEADER); + data.write(COPY_BINARY_HEADER); data.writeInt(1 << 16); data.writeInt(0);