Skip to content

Commit

Permalink
feat: COPY my_table TO STDOUT BINARY (#271)
Browse files Browse the repository at this point in the history
* feat: support COPY table_name TO STDOUT

* test: try to force more parallelism for testing

* fix: apply longer timeout for batch reads

* feat: support binary copy in format

* feat: support binary copy in format

* feat: copy in binary

Adds support for COPY my_table FROM STDIN BINARY. The binary format can
be more efficient and is also immune to problems with escaping null
values etc. The binary format is however only usable when copying
to/from tables with exactly the same columns.

* test: add integration tests using real PostgreSQL

* fix: remove invalid '-'

* fix: use sudo to install

* fix: try to connect with unix domain socket

* fix: swap source and host volume

* test: add timeout to fail test if stuck

* fix: use custom folder to prevent interference

* test: add testing using real PG

* test: compare data in real PG and Spangres after COPY

* fix: modify test case to match new behavior

* test: skip wrong dialect test

* test: fix integration tests

* test: add logging to find where it gets stuck

* test: use print

* test: more logging

* test: more logging

* fix: add env vars to build script

* fix: only set pg_password if defined

* fix: close statement directly after receiving CopyDone

* test: more logging

* test: add error logging

* test: use psql for testing

* fix: add error handling to parsers

* test: add integration test for binary copy

* fix: setup mocks for tests

* test: add copy back-and-forth test

* test: copy back and forth between PG and CS

* test: add more tests

* fix: ignore change as it is internal

* feat: COPY my_table TO STDOUT BINARY

Adds support for COPY my_table TO STDOUT BINARY. The binary copy format is
slightly more efficient than the text format, and has does not need any
escaping of special valuesa, as there is no field delimiter or line terminator
involved.

* chore: add @internalapi annotations
  • Loading branch information
olavloite authored Jul 11, 2022
1 parent 7155783 commit d8c4c77
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
import com.google.cloud.spanner.pgadapter.ProxyServer.DataFormat;
import com.google.cloud.spanner.pgadapter.metadata.DescribePortalMetadata;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import com.google.cloud.spanner.pgadapter.parsers.Parser;
import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions;
import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions.Format;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyDataResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyDoneResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyOutResponse;
Expand All @@ -43,6 +45,10 @@
*/
@InternalApi
public class CopyToStatement extends IntermediatePortalStatement {
public static final byte[] COPY_BINARY_HEADER =
new byte[] {'P', 'G', 'C', 'O', 'P', 'Y', '\n', -1, '\r', '\n', '\0'};

private final CopyOptions copyOptions;
private final CSVFormat csvFormat;

public CopyToStatement(
Expand All @@ -56,30 +62,35 @@ public CopyToStatement(
name,
createParsedStatement(copyOptions),
createSelectStatement(copyOptions));
CSVFormat.Builder formatBuilder =
CSVFormat.Builder.create(CSVFormat.POSTGRESQL_TEXT)
.setNullString(
copyOptions.getNullString() == null
? CSVFormat.POSTGRESQL_TEXT.getNullString()
: copyOptions.getNullString())
.setRecordSeparator('\n')
.setDelimiter(
copyOptions.getDelimiter() == 0
? CSVFormat.POSTGRESQL_TEXT.getDelimiterString().charAt(0)
: copyOptions.getDelimiter())
.setQuote(
copyOptions.getQuote() == 0
? CSVFormat.POSTGRESQL_TEXT.getQuoteCharacter()
: copyOptions.getQuote())
.setEscape(
copyOptions.getEscape() == 0
? CSVFormat.POSTGRESQL_TEXT.getEscapeCharacter()
: copyOptions.getEscape())
.setQuoteMode(QuoteMode.NONE);
if (copyOptions.hasHeader()) {
formatBuilder.setHeader(copyOptions.getColumnNames().toArray(new String[0]));
this.copyOptions = copyOptions;
if (copyOptions.getFormat() == Format.BINARY) {
this.csvFormat = null;
} else {
CSVFormat.Builder formatBuilder =
CSVFormat.Builder.create(CSVFormat.POSTGRESQL_TEXT)
.setNullString(
copyOptions.getNullString() == null
? CSVFormat.POSTGRESQL_TEXT.getNullString()
: copyOptions.getNullString())
.setRecordSeparator('\n')
.setDelimiter(
copyOptions.getDelimiter() == 0
? CSVFormat.POSTGRESQL_TEXT.getDelimiterString().charAt(0)
: copyOptions.getDelimiter())
.setQuote(
copyOptions.getQuote() == 0
? CSVFormat.POSTGRESQL_TEXT.getQuoteCharacter()
: copyOptions.getQuote())
.setEscape(
copyOptions.getEscape() == 0
? CSVFormat.POSTGRESQL_TEXT.getEscapeCharacter()
: copyOptions.getEscape())
.setQuoteMode(QuoteMode.NONE);
if (copyOptions.hasHeader()) {
formatBuilder.setHeader(copyOptions.getColumnNames().toArray(new String[0]));
}
this.csvFormat = formatBuilder.build();
}
this.csvFormat = formatBuilder.build();
}

static ParsedStatement createParsedStatement(CopyOptions copyOptions) {
Expand Down Expand Up @@ -134,18 +145,36 @@ public IntermediatePortalStatement bind(
}

@Override
public CopyOutResponse createResultPrefix(ResultSet resultSet) {
return new CopyOutResponse(this.outputStream, resultSet.getColumnCount(), 0);
public WireOutput[] createResultPrefix(ResultSet resultSet) {
return this.copyOptions.getFormat() == Format.BINARY
? new WireOutput[] {
new CopyOutResponse(
this.outputStream,
resultSet.getColumnCount(),
DataFormat.POSTGRESQL_BINARY.getCode()),
CopyDataResponse.createBinaryHeader(this.outputStream)
}
: new WireOutput[] {
new CopyOutResponse(
this.outputStream, resultSet.getColumnCount(), DataFormat.POSTGRESQL_TEXT.getCode())
};
}

@Override
public WireOutput createDataRowResponse(ResultSet resultSet, QueryMode mode) {
return createDataResponse(resultSet);
public CopyDataResponse createDataRowResponse(ResultSet resultSet, QueryMode mode) {
return copyOptions.getFormat() == Format.BINARY
? createBinaryDataResponse(resultSet)
: createDataResponse(resultSet);
}

@Override
public CopyDoneResponse createResultSuffix() {
return new CopyDoneResponse(this.outputStream);
public WireOutput[] createResultSuffix() {
return this.copyOptions.getFormat() == Format.BINARY
? new WireOutput[] {
CopyDataResponse.createBinaryTrailer(this.outputStream),
new CopyDoneResponse(this.outputStream)
}
: new WireOutput[] {new CopyDoneResponse(this.outputStream)};
}

CopyDataResponse createDataResponse(ResultSet resultSet) {
Expand All @@ -161,4 +190,21 @@ CopyDataResponse createDataResponse(ResultSet resultSet) {
String row = csvFormat.format((Object[]) data);
return new CopyDataResponse(this.outputStream, row, csvFormat.getRecordSeparator().charAt(0));
}

CopyDataResponse createBinaryDataResponse(ResultSet resultSet) {
// Multiply number of columns by 4, as each column has as 4-byte length value.
// In addition, each row has a 2-byte number of columns value.s
int length = 2 + resultSet.getColumnCount() * 4;
byte[][] data = new byte[resultSet.getColumnCount()][];
for (int col = 0; col < resultSet.getColumnCount(); col++) {
if (!resultSet.isNull(col)) {
Parser<?> parser = Parser.create(resultSet, resultSet.getColumnType(col), col);
data[col] = parser.parse(DataFormat.POSTGRESQL_BINARY);
if (data[col] != null) {
length += data[col].length;
}
}
}
return new CopyDataResponse(this.outputStream, length, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.DataOutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

/**
* Data type to store simple SQL statement with designated metadata. Allows manipulation of
Expand All @@ -47,6 +46,8 @@
*/
@InternalApi
public class IntermediateStatement {
private static final WireOutput[] EMPTY_WIRE_OUTPUT_ARRAY = new WireOutput[0];

/**
* Indicates whether an attempt to get the result of a statement should block or fail if the
* result is not yet available. Normal SQL commands that can be executed directly on Cloud Spanner
Expand Down Expand Up @@ -315,17 +316,19 @@ public String getCommandTag() {
return this.commandTag;
}

public @Nullable WireOutput createResultPrefix(ResultSet resultSet) {
public WireOutput[] createResultPrefix(ResultSet resultSet) {
// This is a no-op for a normal query. COPY uses this to send a CopyOutResponse.
return null;
// COPY table_name TO STDOUT BINARY also uses this to add the binary copy header.
return EMPTY_WIRE_OUTPUT_ARRAY;
}

public WireOutput createDataRowResponse(ResultSet resultSet, QueryMode mode) {
return new DataRowResponse(this.outputStream, this, resultSet, this.options, mode);
}

public @Nullable WireOutput createResultSuffix() {
public WireOutput[] createResultSuffix() {
// This is a no-op for a normal query. COPY uses this to send a CopyDoneResponse.
return null;
// COPY table_name TO STDOUT BINARY also uses this to add the binary copy trailer.
return EMPTY_WIRE_OUTPUT_ARRAY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.cloud.spanner.pgadapter.utils;

import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Type;
Expand Down Expand Up @@ -47,8 +49,6 @@
*/
class BinaryCopyParser implements CopyInParser {
private static final Logger logger = Logger.getLogger(BinaryCopyParser.class.getName());
static final byte[] EXPECTED_HEADER =
new byte[] {'P', 'G', 'C', 'O', 'P', 'Y', '\n', -1, '\r', '\n', '\0'};

private final DataInputStream dataInputStream;
private boolean containsOids;
Expand Down Expand Up @@ -98,12 +98,12 @@ void verifyBinaryHeader() throws IOException {
// PGCOPY\n\377\r\n\0
byte[] header = new byte[11];
this.dataInputStream.readFully(header);
if (!Arrays.equals(EXPECTED_HEADER, header)) {
if (!Arrays.equals(COPY_BINARY_HEADER, header)) {
throw new IOException(
String.format(
"Invalid COPY header encountered.\nGot: %s\nWant: %s",
new String(header, StandardCharsets.UTF_8),
new String(EXPECTED_HEADER, StandardCharsets.UTF_8)));
new String(COPY_BINARY_HEADER, StandardCharsets.UTF_8)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.cloud.spanner.pgadapter.utils;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions;
Expand All @@ -25,7 +26,8 @@
import org.postgresql.jdbc.TimestampUtils;

/** Common interface for parsers that implement one or more of the PostgreSQL COPY formats. */
interface CopyInParser {
@InternalApi
public interface CopyInParser {
/**
* Creates a {@link CopyInParser} for the given format. The csvFormat argument is only required
* for non-binary formats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@

package com.google.cloud.spanner.pgadapter.utils;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;

/**
* {@link CopyRecord} is a common interface for COPY data records that are produced by a parser for
* a specific COPY format.
*/
interface CopyRecord {
@InternalApi
public interface CopyRecord {

/** Returns the number of columns in the record. */
int numColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,91 @@

package com.google.cloud.spanner.pgadapter.wireoutput;

import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.pgadapter.ProxyServer.DataFormat;
import java.io.DataOutputStream;
import java.nio.charset.StandardCharsets;

@InternalApi
public class CopyDataResponse extends WireOutput {
@InternalApi
public enum ResponseType {
HEADER,
ROW,
TRAILER,
}

private final ResponseType responseType;
private final DataFormat format;
private final byte[][] binaryData;
private final String stringData;
private final char rowTerminator;

/** Creates a {@link CopyDataResponse} message containing the fixed binary COPY header. */
@InternalApi
public static CopyDataResponse createBinaryHeader(DataOutputStream output) {
return new CopyDataResponse(output, COPY_BINARY_HEADER.length + 8, ResponseType.HEADER);
}

/** Creates a {@link CopyDataResponse} message containing the fixed binary COPY trailer. */
@InternalApi
public static CopyDataResponse createBinaryTrailer(DataOutputStream output) {
return new CopyDataResponse(output, 2, ResponseType.TRAILER);
}

private CopyDataResponse(DataOutputStream output, int length, ResponseType responseType) {
super(output, length + 4);
this.responseType = responseType;
this.format = DataFormat.POSTGRESQL_BINARY;
this.binaryData = null;
this.stringData = null;
this.rowTerminator = 0;
}

public CopyDataResponse(DataOutputStream output, String data, char rowTerminator) {
super(output, data.length() + 5);
this.responseType = ResponseType.ROW;
this.format = DataFormat.POSTGRESQL_TEXT;
this.stringData = data;
this.rowTerminator = rowTerminator;
this.binaryData = null;
}

public CopyDataResponse(DataOutputStream output, int length, byte[][] data) {
super(output, length + 4);
this.responseType = ResponseType.ROW;
this.format = DataFormat.POSTGRESQL_BINARY;
this.stringData = null;
this.rowTerminator = 0;
this.binaryData = data;
}

@Override
protected void sendPayload() throws Exception {
this.outputStream.write(this.stringData.getBytes(StandardCharsets.UTF_8));
this.outputStream.write(this.rowTerminator);
if (this.format == DataFormat.POSTGRESQL_TEXT) {
this.outputStream.write(this.stringData.getBytes(StandardCharsets.UTF_8));
this.outputStream.write(this.rowTerminator);
} else if (this.format == DataFormat.POSTGRESQL_BINARY) {
if (this.responseType == ResponseType.HEADER) {
this.outputStream.write(COPY_BINARY_HEADER);
this.outputStream.writeInt(0); // flags
this.outputStream.writeInt(0); // header extension area length
} else if (this.responseType == ResponseType.TRAILER) {
this.outputStream.writeShort(-1);
} else {
this.outputStream.writeShort(this.binaryData.length);
for (int col = 0; col < this.binaryData.length; col++) {
if (this.binaryData[col] == null) {
this.outputStream.writeInt(-1);
} else {
this.outputStream.writeInt(this.binaryData[col].length);
this.outputStream.write(this.binaryData[col]);
}
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@ public SendResultSetState sendResultSet(
hasData = runnable.hasData;
}

WireOutput suffix = describedResult.createResultSuffix();
if (suffix != null) {
for (WireOutput suffix : describedResult.createResultSuffix()) {
suffix.send(false);
}
return new SendResultSetState(describedResult.getCommandTag(), rows, hasData);
Expand Down Expand Up @@ -469,8 +468,7 @@ public Long call() throws Exception {
}
if (includePrefix) {
try {
WireOutput prefix = describedResult.createResultPrefix(resultSet);
if (prefix != null) {
for (WireOutput prefix : describedResult.createResultPrefix(resultSet)) {
prefix.send(false);
}
prefixSent.set(true);
Expand Down
Loading

0 comments on commit d8c4c77

Please sign in to comment.