Skip to content

Commit

Permalink
feat: Add support for multiple CopyData messages (#40)
Browse files Browse the repository at this point in the history
Adding support for multiple CopyData messages within a single Copy.  Copy data payloads will be concatenated together. The data will be parsed apart upon CopyDone when the mutations are constructed and written to Spanner.
  • Loading branch information
Vizerai authored Feb 22, 2022
1 parent b680c8e commit 24eeedc
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public MutationWriter getMutationWriter() {
return this.mutationWriter;
}

/** @return 0 for text/csv formatting and 1 for binary */
public int getFormatCode() {
return (options.getFormat() == CopyTreeParser.CopyOptions.Format.BINARY) ? 1 : 0;
}

private void verifyCopyColumns() throws SQLException {
if (options.getColumnNames().size() == 0) {
// Use all columns if none were specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.spanner.v1.TypeCode;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -42,18 +43,17 @@ public class MutationWriter {
private boolean hasHeader;
private boolean isHeaderParsed;
private int mutationCount;
private int batchSize;
private int rowCount;
private List<Mutation> mutations;
private String tableName;
private Map<String, TypeCode> tableColumns;
private CSVFormat format;
private FileWriter fileWriter;
private ByteArrayOutputStream payload = new ByteArrayOutputStream();

public MutationWriter(
String tableName, Map<String, TypeCode> tableColumns, CSVFormat format, boolean hasHeader) {
this.mutationCount = 0;
this.batchSize = 0;
this.hasHeader = hasHeader;
this.isHeaderParsed = false;
this.tableName = tableName;
Expand All @@ -72,25 +72,22 @@ public int getRowCount() {
return this.rowCount;
}

/** Build mutation to add to mutations list with data contained within a CopyData payload */
public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
List<CSVRecord> records = parsePayloadData(payload);
if (!records.isEmpty()
&& !payloadFitsInCurrentBatch(records.size() * records.get(0).size(), payload.length)) {
rollback(connectionHandler, payload);
long mutationCount = this.mutationCount + records.size() * records.get(0).size();
long commitSize = this.batchSize + payload.length;
public void addCopyData(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
this.payload.write(payload, 0, payload.length);
if (!commitSizeIsWithinLimit()) {
rollback(connectionHandler);
throw new SQLException(
"Mutation count: "
+ mutationCount
+ " or mutation commit size: "
+ commitSize
+ " has exceeded the limit.");
"Commit size: " + this.payload.size() + " has exceeded the limit: " + COMMIT_LIMIT);
}
}

/** Build mutation to add to mutations list with data contained within a CopyData payload */
public void buildMutationList(ConnectionHandler connectionHandler) throws Exception {
List<CSVRecord> records = parsePayloadData(this.payload.toByteArray());
for (CSVRecord record : records) {
// Check that the number of columns in a record matches the number of columns in the table
if (record.size() != this.tableColumns.keySet().size()) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException(
"Invalid COPY data: Row length mismatched. Expected "
+ this.tableColumns.keySet().size()
Expand Down Expand Up @@ -125,7 +122,7 @@ public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) t
break;
}
} catch (NumberFormatException | DateTimeParseException e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException(
"Invalid input syntax for type "
+ columnType.toString()
Expand All @@ -134,27 +131,39 @@ public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) t
+ recordValue
+ "\"");
} catch (IllegalArgumentException e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException("Invalid input syntax for column \"" + columnName + "\"");
} catch (Exception e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw e;
}
}
this.mutations.add(builder.build()); // Add write builder to mutation list
this.mutationCount += record.size(); // Increment the number of mutations being added
this.rowCount++; // Increment the number of COPY rows by one
}
this.batchSize += payload.length; // Increment the batch size based on payload length
if (!mutationCountIsWithinLimit()) {
rollback(connectionHandler);
throw new SQLException(
"Mutation count: " + mutationCount + " has exceeded the limit: " + MUTATION_LIMIT);
}
}

/**
* @return True if adding payload to current batch will fit under mutation limit and batch size
* limit, false otherwise.
* @return True if current payload will fit within COMMIT_LIMIT. This is only an estimate and the
* actual commit size may still be rejected by Spanner.
*/
private boolean payloadFitsInCurrentBatch(int rowMutationCount, int payloadLength) {
return (this.mutationCount + rowMutationCount <= MUTATION_LIMIT
&& this.batchSize + payloadLength <= COMMIT_LIMIT);
private boolean commitSizeIsWithinLimit() {
return this.payload.size() <= COMMIT_LIMIT;
}

/**
* @return True if current mutation count will fit within MUTATION_LIMIT. This is only an estimate
* and the actual number of mutations may be different which could result in spanner rejecting
* the transaction.
*/
private boolean mutationCountIsWithinLimit() {
return this.mutationCount <= MUTATION_LIMIT;
}

/** @return list of CSVRecord rows parsed with CSVParser from CopyData payload byte array */
Expand Down Expand Up @@ -186,44 +195,34 @@ public int writeToSpanner(ConnectionHandler connectionHandler) throws SQLExcepti
// Reset mutations, mutation counter, and batch size count for a new batch
this.mutations = new ArrayList<>();
this.mutationCount = 0;
this.batchSize = 0;
return this.rowCount;
}

public void rollback(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
public void rollback(ConnectionHandler connectionHandler) throws Exception {
Connection connection = connectionHandler.getJdbcConnection();
connection.rollback();
this.mutations = new ArrayList<>();
this.mutationCount = 0;
this.batchSize = 0;
createErrorFile(payload);
writeCopyDataToErrorFile();
this.payload.reset();
}

public void createErrorFile(byte[] payload) throws IOException {
private void createErrorFile() throws IOException {
File unsuccessfulCopy = new File(ERROR_FILE);
if (unsuccessfulCopy.createNewFile()) {
this.fileWriter = new FileWriter(ERROR_FILE);
writeToErrorFile(payload);
} else {
System.err.println("File " + unsuccessfulCopy.getName() + " already exists");
}
}

public void writeToErrorFile(byte[] payload) throws IOException {
if (this.fileWriter != null) {
this.fileWriter.write(new String(payload, StandardCharsets.UTF_8).trim() + "\n");
}
this.fileWriter = new FileWriter(unsuccessfulCopy, false);
}

public void writeMutationsToErrorFile() throws IOException {
File unsuccessfulCopy = new File(ERROR_FILE);
if (unsuccessfulCopy.createNewFile()) {
this.fileWriter = new FileWriter(ERROR_FILE);
}

for (Mutation mutation : this.mutations) {
this.fileWriter.write(mutation.toString());
/**
* Copy data will be written to an error file if size limits were exceeded or a problem was
* encountered. Copy data will also written if an error was encountered while generating the
* mutaiton list or if Spanner returns an error upon commiting the mutations.
*/
public void writeCopyDataToErrorFile() throws IOException {
if (this.fileWriter == null) {
createErrorFile();
}
this.fileWriter.write(
new String(this.payload.toByteArray(), StandardCharsets.UTF_8).trim() + "\n");
}

public void closeErrorFile() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ protected void sendPayload() throws Exception {
MutationWriter mw = this.statement.getMutationWriter();
if (!statement.hasException()) {
try {
mw.buildMutation(this.connection, this.payload);
mw.addCopyData(this.connection, this.payload);
} catch (SQLException e) {
mw.writeToErrorFile(this.payload);
mw.writeCopyDataToErrorFile();
statement.handleExecutionException(e);
throw e;
}
} else {
mw.writeToErrorFile(this.payload);
mw.writeCopyDataToErrorFile();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,26 @@ protected void sendPayload() throws Exception {
MutationWriter mw = this.statement.getMutationWriter();
if (!statement.hasException()) {
try {
mw.buildMutationList(this.connection);
int rowCount =
mw.writeToSpanner(this.connection); // Write any remaining mutations to Spanner
statement.addUpdateCount(rowCount); // Increase the row count of number of rows copied.
this.sendSpannerResult(this.statement, QueryMode.SIMPLE, 0L);
} catch (Exception e) {
// Spanner returned an error when trying to commit the batch of mutations.
mw.writeMutationsToErrorFile();
mw.writeCopyDataToErrorFile();
mw.closeErrorFile();
// TODO: enable in next PR
// this.connection.setStatus(ConnectionStatus.IDLE);
this.connection.removeActiveStatement(this.statement);
throw e;
}
} else {
mw.closeErrorFile();
}
new ReadyResponse(this.outputStream, ReadyResponse.Status.IDLE).send();
// TODO: enable in next PR
// this.connection.setStatus(ConnectionStatus.IDLE);
this.connection.removeActiveStatement(this.statement);
}

Expand Down
Loading

0 comments on commit 24eeedc

Please sign in to comment.