Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: COPY my_table TO STDOUT BINARY #271

Merged
merged 54 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
71b4465
feat: support COPY table_name TO STDOUT
olavloite Jul 4, 2022
a123827
test: try to force more parallelism for testing
olavloite Jul 4, 2022
9c37ba7
fix: apply longer timeout for batch reads
olavloite Jul 4, 2022
4d7c222
feat: support binary copy in format
olavloite Jul 4, 2022
c80c6b3
feat: support binary copy in format
olavloite Jul 4, 2022
9393044
feat: copy in binary
olavloite Jul 5, 2022
f649cf3
Merge branch 'postgresql-dialect' into copy-in-binary
olavloite Jul 5, 2022
331e2e5
test: add integration tests using real PostgreSQL
olavloite Jul 5, 2022
4e79d8a
fix: remove invalid '-'
olavloite Jul 5, 2022
6501ba4
fix: use sudo to install
olavloite Jul 5, 2022
e94c54d
fix: try to connect with unix domain socket
olavloite Jul 5, 2022
e4a35a2
fix: swap source and host volume
olavloite Jul 5, 2022
1234d58
test: add timeout to fail test if stuck
olavloite Jul 5, 2022
cc91bca
fix: use custom folder to prevent interference
olavloite Jul 5, 2022
7f5b7bc
test: add testing using real PG
olavloite Jul 7, 2022
0ee7905
Merge branch 'copy-in-binary' of github.com:GoogleCloudPlatform/pgada…
olavloite Jul 7, 2022
c752ae9
test: compare data in real PG and Spangres after COPY
olavloite Jul 7, 2022
f8e40b1
fix: modify test case to match new behavior
olavloite Jul 7, 2022
e97a510
test: skip wrong dialect test
olavloite Jul 7, 2022
194cd41
test: fix integration tests
olavloite Jul 7, 2022
444ba6e
Merge branch 'postgresql-dialect' into copy-in-binary
olavloite Jul 7, 2022
1975de0
Merge branch 'postgresql-dialect' into integration-tests-with-postgresql
olavloite Jul 7, 2022
e956295
test: add logging to find where it gets stuck
olavloite Jul 7, 2022
d86c321
test: use print
olavloite Jul 7, 2022
6ece54c
test: more logging
olavloite Jul 7, 2022
c6b2e8d
test: more logging
olavloite Jul 7, 2022
42f47a2
fix: add env vars to build script
olavloite Jul 7, 2022
e91b0fd
Merge branch 'postgresql-dialect' into integration-tests-with-postgresql
olavloite Jul 7, 2022
28d82f9
fix: only set pg_password if defined
olavloite Jul 7, 2022
f59cd61
Merge branch 'postgresql-dialect' into copy-in-binary
olavloite Jul 7, 2022
56ca2a9
fix: close statement directly after receiving CopyDone
olavloite Jul 7, 2022
f7aed53
test: more logging
olavloite Jul 7, 2022
4f7fede
test: add error logging
olavloite Jul 7, 2022
0625a2e
test: use psql for testing
olavloite Jul 8, 2022
ed0ff14
Merge branch 'postgresql-dialect' into copy-in-binary
olavloite Jul 8, 2022
1707aeb
fix: add error handling to parsers
olavloite Jul 8, 2022
a32268f
Merge branch 'postgresql-dialect' into integration-tests-with-postgresql
olavloite Jul 8, 2022
4cdcdeb
Merge branch 'postgresql-dialect' into copy-in-binary
olavloite Jul 8, 2022
1fb8f26
Merge branch 'integration-tests-with-postgresql' into copy-in-binary
olavloite Jul 8, 2022
c2295b0
test: add integration test for binary copy
olavloite Jul 8, 2022
85c51c6
Merge branch 'postgresql-dialect' into copy-out
olavloite Jul 8, 2022
f88e3cb
fix: setup mocks for tests
olavloite Jul 8, 2022
96a31a0
Merge branch 'postgresql-dialect' into copy-out
olavloite Jul 8, 2022
b596f0c
test: add copy back-and-forth test
olavloite Jul 8, 2022
55da3be
test: copy back and forth between PG and CS
olavloite Jul 9, 2022
d28b1ba
test: add more tests
olavloite Jul 9, 2022
348d0f5
fix: ignore change as it is internal
olavloite Jul 9, 2022
011e9ef
Merge branch 'copy-out' of github.com:GoogleCloudPlatform/pgadapter i…
olavloite Jul 9, 2022
c4a0731
Merge branch 'postgresql-dialect' into copy-in-binary
olavloite Jul 9, 2022
50cb891
Merge branch 'copy-in-binary' into copy-out-binary
olavloite Jul 9, 2022
9b80b05
Merge branch 'copy-out' into copy-out-binary
olavloite Jul 9, 2022
f664275
feat: COPY my_table TO STDOUT BINARY
olavloite Jul 9, 2022
2936788
Merge branch 'postgresql-dialect' into copy-out-binary
olavloite Jul 11, 2022
08c606f
chore: add @InternalApi annotations
olavloite Jul 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
import com.google.cloud.spanner.pgadapter.ProxyServer.DataFormat;
import com.google.cloud.spanner.pgadapter.metadata.DescribePortalMetadata;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import com.google.cloud.spanner.pgadapter.parsers.Parser;
import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions;
import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions.Format;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyDataResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyDoneResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyOutResponse;
Expand All @@ -43,6 +45,10 @@
*/
@InternalApi
public class CopyToStatement extends IntermediatePortalStatement {
public static final byte[] COPY_BINARY_HEADER =
new byte[] {'P', 'G', 'C', 'O', 'P', 'Y', '\n', -1, '\r', '\n', '\0'};

private final CopyOptions copyOptions;
private final CSVFormat csvFormat;

public CopyToStatement(
Expand All @@ -56,30 +62,35 @@ public CopyToStatement(
name,
createParsedStatement(copyOptions),
createSelectStatement(copyOptions));
CSVFormat.Builder formatBuilder =
CSVFormat.Builder.create(CSVFormat.POSTGRESQL_TEXT)
.setNullString(
copyOptions.getNullString() == null
? CSVFormat.POSTGRESQL_TEXT.getNullString()
: copyOptions.getNullString())
.setRecordSeparator('\n')
.setDelimiter(
copyOptions.getDelimiter() == 0
? CSVFormat.POSTGRESQL_TEXT.getDelimiterString().charAt(0)
: copyOptions.getDelimiter())
.setQuote(
copyOptions.getQuote() == 0
? CSVFormat.POSTGRESQL_TEXT.getQuoteCharacter()
: copyOptions.getQuote())
.setEscape(
copyOptions.getEscape() == 0
? CSVFormat.POSTGRESQL_TEXT.getEscapeCharacter()
: copyOptions.getEscape())
.setQuoteMode(QuoteMode.NONE);
if (copyOptions.hasHeader()) {
formatBuilder.setHeader(copyOptions.getColumnNames().toArray(new String[0]));
this.copyOptions = copyOptions;
if (copyOptions.getFormat() == Format.BINARY) {
this.csvFormat = null;
} else {
CSVFormat.Builder formatBuilder =
CSVFormat.Builder.create(CSVFormat.POSTGRESQL_TEXT)
.setNullString(
copyOptions.getNullString() == null
? CSVFormat.POSTGRESQL_TEXT.getNullString()
: copyOptions.getNullString())
.setRecordSeparator('\n')
.setDelimiter(
copyOptions.getDelimiter() == 0
? CSVFormat.POSTGRESQL_TEXT.getDelimiterString().charAt(0)
: copyOptions.getDelimiter())
.setQuote(
copyOptions.getQuote() == 0
? CSVFormat.POSTGRESQL_TEXT.getQuoteCharacter()
: copyOptions.getQuote())
.setEscape(
copyOptions.getEscape() == 0
? CSVFormat.POSTGRESQL_TEXT.getEscapeCharacter()
: copyOptions.getEscape())
.setQuoteMode(QuoteMode.NONE);
if (copyOptions.hasHeader()) {
formatBuilder.setHeader(copyOptions.getColumnNames().toArray(new String[0]));
}
this.csvFormat = formatBuilder.build();
}
this.csvFormat = formatBuilder.build();
}

static ParsedStatement createParsedStatement(CopyOptions copyOptions) {
Expand Down Expand Up @@ -134,18 +145,36 @@ public IntermediatePortalStatement bind(
}

@Override
public CopyOutResponse createResultPrefix(ResultSet resultSet) {
return new CopyOutResponse(this.outputStream, resultSet.getColumnCount(), 0);
public WireOutput[] createResultPrefix(ResultSet resultSet) {
return this.copyOptions.getFormat() == Format.BINARY
? new WireOutput[] {
new CopyOutResponse(
this.outputStream,
resultSet.getColumnCount(),
DataFormat.POSTGRESQL_BINARY.getCode()),
CopyDataResponse.createBinaryHeader(this.outputStream)
}
: new WireOutput[] {
new CopyOutResponse(
this.outputStream, resultSet.getColumnCount(), DataFormat.POSTGRESQL_TEXT.getCode())
};
}

@Override
public WireOutput createDataRowResponse(ResultSet resultSet, QueryMode mode) {
return createDataResponse(resultSet);
public CopyDataResponse createDataRowResponse(ResultSet resultSet, QueryMode mode) {
return copyOptions.getFormat() == Format.BINARY
? createBinaryDataResponse(resultSet)
: createDataResponse(resultSet);
}

@Override
public CopyDoneResponse createResultSuffix() {
return new CopyDoneResponse(this.outputStream);
public WireOutput[] createResultSuffix() {
return this.copyOptions.getFormat() == Format.BINARY
? new WireOutput[] {
CopyDataResponse.createBinaryTrailer(this.outputStream),
new CopyDoneResponse(this.outputStream)
}
: new WireOutput[] {new CopyDoneResponse(this.outputStream)};
}

CopyDataResponse createDataResponse(ResultSet resultSet) {
Expand All @@ -161,4 +190,21 @@ CopyDataResponse createDataResponse(ResultSet resultSet) {
String row = csvFormat.format((Object[]) data);
return new CopyDataResponse(this.outputStream, row, csvFormat.getRecordSeparator().charAt(0));
}

CopyDataResponse createBinaryDataResponse(ResultSet resultSet) {
// Multiply number of columns by 4, as each column has as 4-byte length value.
// In addition, each row has a 2-byte number of columns value.s
int length = 2 + resultSet.getColumnCount() * 4;
byte[][] data = new byte[resultSet.getColumnCount()][];
for (int col = 0; col < resultSet.getColumnCount(); col++) {
if (!resultSet.isNull(col)) {
Parser<?> parser = Parser.create(resultSet, resultSet.getColumnType(col), col);
data[col] = parser.parse(DataFormat.POSTGRESQL_BINARY);
if (data[col] != null) {
length += data[col].length;
}
}
}
return new CopyDataResponse(this.outputStream, length, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.DataOutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

/**
* Data type to store simple SQL statement with designated metadata. Allows manipulation of
Expand All @@ -47,6 +46,8 @@
*/
@InternalApi
public class IntermediateStatement {
private static final WireOutput[] EMPTY_WIRE_OUTPUT_ARRAY = new WireOutput[0];

/**
* Indicates whether an attempt to get the result of a statement should block or fail if the
* result is not yet available. Normal SQL commands that can be executed directly on Cloud Spanner
Expand Down Expand Up @@ -315,17 +316,19 @@ public String getCommandTag() {
return this.commandTag;
}

public @Nullable WireOutput createResultPrefix(ResultSet resultSet) {
public WireOutput[] createResultPrefix(ResultSet resultSet) {
// This is a no-op for a normal query. COPY uses this to send a CopyOutResponse.
return null;
// COPY table_name TO STDOUT BINARY also uses this to add the binary copy header.
return EMPTY_WIRE_OUTPUT_ARRAY;
}

public WireOutput createDataRowResponse(ResultSet resultSet, QueryMode mode) {
return new DataRowResponse(this.outputStream, this, resultSet, this.options, mode);
}

public @Nullable WireOutput createResultSuffix() {
public WireOutput[] createResultSuffix() {
// This is a no-op for a normal query. COPY uses this to send a CopyDoneResponse.
return null;
// COPY table_name TO STDOUT BINARY also uses this to add the binary copy trailer.
return EMPTY_WIRE_OUTPUT_ARRAY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.cloud.spanner.pgadapter.utils;

import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Type;
Expand Down Expand Up @@ -47,8 +49,6 @@
*/
class BinaryCopyParser implements CopyInParser {
private static final Logger logger = Logger.getLogger(BinaryCopyParser.class.getName());
static final byte[] EXPECTED_HEADER =
new byte[] {'P', 'G', 'C', 'O', 'P', 'Y', '\n', -1, '\r', '\n', '\0'};

private final DataInputStream dataInputStream;
private boolean containsOids;
Expand Down Expand Up @@ -98,12 +98,12 @@ void verifyBinaryHeader() throws IOException {
// PGCOPY\n\377\r\n\0
byte[] header = new byte[11];
this.dataInputStream.readFully(header);
if (!Arrays.equals(EXPECTED_HEADER, header)) {
if (!Arrays.equals(COPY_BINARY_HEADER, header)) {
throw new IOException(
String.format(
"Invalid COPY header encountered.\nGot: %s\nWant: %s",
new String(header, StandardCharsets.UTF_8),
new String(EXPECTED_HEADER, StandardCharsets.UTF_8)));
new String(COPY_BINARY_HEADER, StandardCharsets.UTF_8)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.cloud.spanner.pgadapter.utils;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.pgadapter.parsers.copy.CopyTreeParser.CopyOptions;
Expand All @@ -25,7 +26,8 @@
import org.postgresql.jdbc.TimestampUtils;

/** Common interface for parsers that implement one or more of the PostgreSQL COPY formats. */
interface CopyInParser {
@InternalApi
public interface CopyInParser {
/**
* Creates a {@link CopyInParser} for the given format. The csvFormat argument is only required
* for non-binary formats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@

package com.google.cloud.spanner.pgadapter.utils;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;

/**
* {@link CopyRecord} is a common interface for COPY data records that are produced by a parser for
* a specific COPY format.
*/
interface CopyRecord {
@InternalApi
public interface CopyRecord {

/** Returns the number of columns in the record. */
int numColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,91 @@

package com.google.cloud.spanner.pgadapter.wireoutput;

import static com.google.cloud.spanner.pgadapter.statements.CopyToStatement.COPY_BINARY_HEADER;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.pgadapter.ProxyServer.DataFormat;
import java.io.DataOutputStream;
import java.nio.charset.StandardCharsets;

@InternalApi
public class CopyDataResponse extends WireOutput {
@InternalApi
public enum ResponseType {
HEADER,
ROW,
TRAILER,
}

private final ResponseType responseType;
private final DataFormat format;
private final byte[][] binaryData;
private final String stringData;
private final char rowTerminator;

/** Creates a {@link CopyDataResponse} message containing the fixed binary COPY header. */
@InternalApi
public static CopyDataResponse createBinaryHeader(DataOutputStream output) {
return new CopyDataResponse(output, COPY_BINARY_HEADER.length + 8, ResponseType.HEADER);
}

/** Creates a {@link CopyDataResponse} message containing the fixed binary COPY trailer. */
@InternalApi
public static CopyDataResponse createBinaryTrailer(DataOutputStream output) {
return new CopyDataResponse(output, 2, ResponseType.TRAILER);
}

private CopyDataResponse(DataOutputStream output, int length, ResponseType responseType) {
super(output, length + 4);
this.responseType = responseType;
this.format = DataFormat.POSTGRESQL_BINARY;
this.binaryData = null;
this.stringData = null;
this.rowTerminator = 0;
}

public CopyDataResponse(DataOutputStream output, String data, char rowTerminator) {
super(output, data.length() + 5);
this.responseType = ResponseType.ROW;
this.format = DataFormat.POSTGRESQL_TEXT;
this.stringData = data;
this.rowTerminator = rowTerminator;
this.binaryData = null;
}

public CopyDataResponse(DataOutputStream output, int length, byte[][] data) {
super(output, length + 4);
this.responseType = ResponseType.ROW;
this.format = DataFormat.POSTGRESQL_BINARY;
this.stringData = null;
this.rowTerminator = 0;
this.binaryData = data;
}

@Override
protected void sendPayload() throws Exception {
this.outputStream.write(this.stringData.getBytes(StandardCharsets.UTF_8));
this.outputStream.write(this.rowTerminator);
if (this.format == DataFormat.POSTGRESQL_TEXT) {
this.outputStream.write(this.stringData.getBytes(StandardCharsets.UTF_8));
this.outputStream.write(this.rowTerminator);
} else if (this.format == DataFormat.POSTGRESQL_BINARY) {
if (this.responseType == ResponseType.HEADER) {
this.outputStream.write(COPY_BINARY_HEADER);
this.outputStream.writeInt(0); // flags
this.outputStream.writeInt(0); // header extension area length
} else if (this.responseType == ResponseType.TRAILER) {
this.outputStream.writeShort(-1);
} else {
this.outputStream.writeShort(this.binaryData.length);
for (int col = 0; col < this.binaryData.length; col++) {
if (this.binaryData[col] == null) {
this.outputStream.writeInt(-1);
} else {
this.outputStream.writeInt(this.binaryData[col].length);
this.outputStream.write(this.binaryData[col]);
}
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@ public SendResultSetState sendResultSet(
hasData = runnable.hasData;
}

WireOutput suffix = describedResult.createResultSuffix();
if (suffix != null) {
for (WireOutput suffix : describedResult.createResultSuffix()) {
suffix.send(false);
}
return new SendResultSetState(describedResult.getCommandTag(), rows, hasData);
Expand Down Expand Up @@ -469,8 +468,7 @@ public Long call() throws Exception {
}
if (includePrefix) {
try {
WireOutput prefix = describedResult.createResultPrefix(resultSet);
if (prefix != null) {
for (WireOutput prefix : describedResult.createResultPrefix(resultSet)) {
prefix.send(false);
}
prefixSent.set(true);
Expand Down
Loading