Skip to content

Commit

Permalink
add INITIAL_ONLY snapshot mode
Browse files Browse the repository at this point in the history
This change accomplishes a quick and dirty `snapshot.mode=initial_only` by
canceling the `VStream` request after encountering a `COPY_COMPLETED`
`VEvent`.

Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Oct 16, 2024
1 parent cbbdcdd commit 1ee7d50
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 4 deletions.
Empty file added history.dat
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess;

/**
* Used to signal that Debezium should not continue consuming the streaming event source after encountering a COPY_COMPLETED VEvent from the VStream and when snapshot.mode=initial_only.
*/
public class VStreamCopyCompletedEventException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL("initial"),

/**
* Perform an initial snapshot when starting, if it does not detect a value in its offsets topic. Stops after VStream copy completed.
*/
INITIAL_ONLY("initial_only"),

/**
* Never perform an initial snapshot and only receive new data changes.
*/
Expand Down Expand Up @@ -448,6 +453,7 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withDescription("The criteria for running a snapshot upon startup of the connector. "
+ "Options include: "
+ "'initial' (the default) to specify the connector should always perform an initial sync when required; "
+ "'initial_only' to specify the connector should always perform an initial sync when required, and stop after VStream copy completed; "
+ "'never' to specify the connector should never perform an initial sync ");

public static final Field BIGINT_UNSIGNED_HANDLING_MODE = Field.create("bigint.unsigned.handling.mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.grpc.Status;

/**
* Read events from source and dispatch each event using {@link EventDispatcher} to the {@link
Expand Down Expand Up @@ -76,7 +77,13 @@ public void execute(ChangeEventSourceContext context, VitessPartition partition,
}
}
catch (Throwable e) {
errorHandler.setProducerThrowable(e);
Status s = Status.fromThrowable(e);
if (s.getCode() == Status.Code.CANCELLED && s.getCause() instanceof VStreamCopyCompletedEventException) {
LOGGER.info("VStream stopped after COPY_COMPLETED event");
}
else {
errorHandler.setProducerThrowable(e);
}
}
finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.connector.vitess.VStreamCopyCompletedEventException;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnector;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessConnectorConfig.SnapshotMode;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.util.Strings;
import io.grpc.ChannelCredentials;
Expand All @@ -31,12 +33,15 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.vitess.client.Proto;
import io.vitess.client.grpc.StaticAuthCredentials;
import io.vitess.proto.Topodata;
import io.vitess.proto.Vtgate;
import io.vitess.proto.Vtgate.VStreamRequest;
import io.vitess.proto.grpc.VitessGrpc;

import binlogdata.Binlogdata;
Expand Down Expand Up @@ -100,13 +105,15 @@ public void startStreaming(
stub = MetadataUtils.attachHeaders(stub, metadata);
}

StreamObserver<Vtgate.VStreamResponse> responseObserver = new StreamObserver<Vtgate.VStreamResponse>() {
StreamObserver<Vtgate.VStreamResponse> responseObserver = new ClientResponseObserver<Vtgate.VStreamRequest, Vtgate.VStreamResponse>() {
private ClientCallStreamObserver<VStreamRequest> requestStream;
private List<VEvent> bufferedEvents = new ArrayList<>();
private Vgtid newVgtid;
private boolean beginEventSeen;
private boolean commitEventSeen;
private int numOfRowEvents;
private int numResponses;
private boolean copyCompletedEventSeen;

@Override
public void onNext(Vtgate.VStreamResponse response) {
Expand Down Expand Up @@ -174,6 +181,13 @@ public void onNext(Vtgate.VStreamResponse response) {
}
commitEventSeen = true;
break;
case COPY_COMPLETED:
// After all shards are copied, Vitess will send a final COPY_COMPLETED event.
// See: https://github.com/vitessio/vitess/blob/v19.0.0/go/vt/vtgate/vstream_manager.go#L791-L808
if (event.getKeyspace() == "" && event.getShard() == "") {
copyCompletedEventSeen = true;
}
continue;
case DDL:
case OTHER:
// If receiving DDL and OTHER, process them immediately to rotate vgtid in offset.
Expand All @@ -190,14 +204,14 @@ public void onNext(Vtgate.VStreamResponse response) {

// We only proceed when we receive a complete transaction after seeing both BEGIN and COMMIT events,
// OR if sendNow flag is true (meaning we should process buffered events immediately).
if ((!beginEventSeen || !commitEventSeen) && !sendNow) {
if ((!beginEventSeen || !commitEventSeen) && !sendNow && !copyCompletedEventSeen) {
LOGGER.debug("Received partial transaction: number of responses so far is {}", numResponses);
return;
}
if (numResponses > 1) {
LOGGER.debug("Processing multi-response transaction: number of responses is {}", numResponses);
}
if (newVgtid == null) {
if (newVgtid == null && !copyCompletedEventSeen) {
LOGGER.warn("Skipping because no vgtid is found in buffered event types: {}",
bufferedEvents.stream().map(VEvent::getType).map(Objects::toString).collect(Collectors.joining(", ")));
reset();
Expand Down Expand Up @@ -225,6 +239,11 @@ public void onNext(Vtgate.VStreamResponse response) {
finally {
reset();
}

if (copyCompletedEventSeen && config.getSnapshotMode() == SnapshotMode.INITIAL_ONLY) {
LOGGER.info("Cancel the copy operation after receiving COPY_COMPLETED event");
requestStream.cancel("Cancel the copy operation after receiving COPY_COMPLETED event", new VStreamCopyCompletedEventException());
}
}

@Override
Expand Down Expand Up @@ -260,6 +279,11 @@ private void setError(String msg) {
error.compareAndSet(null, new DebeziumException(msg));
reset();
}

@Override
public void beforeStart(ClientCallStreamObserver<VStreamRequest> requestStream) {
this.requestStream = requestStream;
}
};

Vtgate.VStreamFlags.Builder flagBuilder = Vtgate.VStreamFlags.newBuilder()
Expand Down
1 change: 1 addition & 0 deletions src/test/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ FROM vitess/lite:v12.0.3
USER root

RUN apt-key adv --no-tty --recv-keys --keyserver keyserver.ubuntu.com 467B942D3A79BD29
RUN apt-key adv --no-tty --recv-keys --keyserver keyserver.ubuntu.com B7B3B788A8D3785C
RUN apt-get update
RUN apt-get install -y sudo curl vim jq

Expand Down
53 changes: 53 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,59 @@ public void testCopyAndReplicatePerTaskOffsetStorage() throws Exception {
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);
}

@Test
public void testInitialOnlySnapshot() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
int expectedSnapshotRecordsCount = 100;
String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
String tableName = "numeric_table";
StringBuilder insertRows = new StringBuilder().append("INSERT INTO numeric_table ("
+ "tinyint_col,"
+ "tinyint_unsigned_col,"
+ "smallint_col,"
+ "smallint_unsigned_col,"
+ "mediumint_col,"
+ "mediumint_unsigned_col,"
+ "int_col,"
+ "int_unsigned_col,"
+ "bigint_col,"
+ "bigint_unsigned_col,"
+ "bigint_unsigned_overflow_col,"
+ "float_col,"
+ "double_col,"
+ "decimal_col,"
+ "boolean_col)"
+ " VALUES " + rowValue);
for (int i = 1; i < expectedSnapshotRecordsCount; i++) {
insertRows.append(", ").append(rowValue);
}

String insertRowsStatement = insertRows.toString();
TestHelper.execute(insertRowsStatement);

// Take initial snapshot.
String tableInclude = TEST_UNSHARDED_KEYSPACE + "." + tableName;
startConnector(Function.identity(), false, false, 1,
-1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL_ONLY, TestHelper.TEST_SHARD);

// Consume snapshot rows.
consumer = testConsumer(expectedSnapshotRecordsCount, tableInclude);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);

for (int i = 1; i <= expectedSnapshotRecordsCount; i++) {
assertRecordInserted(TEST_UNSHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD);
}

// Add more rows.
TestHelper.execute(insertRowsStatement);

// Try consuming new rows, there should be no change to # of consumed rows.
consumer = testConsumer(0, tableInclude);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);

stopConnector();
}

private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_UNSHARDED_KEYSPACE);

Expand Down

0 comments on commit 1ee7d50

Please sign in to comment.