Skip to content

Commit

Permalink
Add e2e acknowledgment and checkpointing to RDS source (#4819)
Browse files Browse the repository at this point in the history
* Add acknowledgment and checkpoint for stream

Signed-off-by: Hai Yan <[email protected]>

* Add unit tests for stream checkpoint

Signed-off-by: Hai Yan <[email protected]>

* Add acknowledgment to export

Signed-off-by: Hai Yan <[email protected]>

* Simplify the stream checkpointing workflow

Signed-off-by: Hai Yan <[email protected]>

* A few fixes and cleanups

Signed-off-by: Hai Yan <[email protected]>

* Extend lease while waiting for ack

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

* Address more review comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Aug 26, 2024
1 parent 9244818 commit d6465ef
Show file tree
Hide file tree
Showing 23 changed files with 790 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class RdsService {
private final EventFactory eventFactory;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private ExecutorService executor;
private LeaderScheduler leaderScheduler;
private ExportScheduler exportScheduler;
Expand All @@ -60,11 +62,13 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final ClientFactory clientFactory,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.eventFactory = eventFactory;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;

rdsClient = clientFactory.buildRdsClient();
s3Client = clientFactory.buildS3Client();
Expand Down Expand Up @@ -94,7 +98,7 @@ public void start(Buffer<Record<Event>> buffer) {
exportScheduler = new ExportScheduler(
sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics);
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(exportScheduler);
runnableList.add(dataFileScheduler);
}
Expand All @@ -106,7 +110,8 @@ public void start(Buffer<Record<Event>> buffer) {
} else {
binaryLogClient.setSSLMode(SSLMode.DISABLED);
}
streamScheduler = new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics);
streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(streamScheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -35,17 +36,20 @@ public class RdsSource implements Source<Record<Event>>, UsesEnhancedSourceCoord
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final EventFactory eventFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private EnhancedSourceCoordinator sourceCoordinator;
private RdsService rdsService;

@DataPrepperPluginConstructor
public RdsSource(final PluginMetrics pluginMetrics,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
}
Expand All @@ -56,7 +60,7 @@ public void start(Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);
sourceCoordinator.createPartition(new LeaderPartition());

rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics);
rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics, acknowledgementSetManager);

LOG.info("Start RDS service");
rdsService.start(buffer);
Expand All @@ -80,4 +84,9 @@ public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordin
public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
return new PartitionFactory();
}

@Override
public boolean areAcknowledgementsEnabled() {
return sourceConfig.isAcknowledgmentsEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;

import java.time.Duration;
import java.util.List;

/**
Expand Down Expand Up @@ -56,6 +57,12 @@ public class RdsSourceConfig {
@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(30);

@JsonProperty("stream_acknowledgment_timeout")
private Duration streamAcknowledgmentTimeout = Duration.ofMinutes(10);

@JsonProperty("s3_bucket")
private String s3Bucket;

Expand Down Expand Up @@ -106,6 +113,14 @@ public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public Duration getDataFileAcknowledgmentTimeout() {
return dataFileAcknowledgmentTimeout;
}

public Duration getStreamAcknowledgmentTimeout() {
return streamAcknowledgmentTimeout;
}

public String getS3Bucket() {
return s3Bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,12 @@

public class StreamProgressState {

@JsonProperty("startPosition")
private BinlogCoordinate startPosition;

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("waitForExport")
private boolean waitForExport = false;

public BinlogCoordinate getStartPosition() {
return startPosition;
}

public void setStartPosition(BinlogCoordinate startPosition) {
this.startPosition = startPosition;
}

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
Expand All @@ -30,6 +33,8 @@ public class DataFileLoader implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);

static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5);
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed";
static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors";
Expand All @@ -41,8 +46,11 @@ public class DataFileLoader implements Runnable {
private final String objectKey;
private final S3ObjectReader objectReader;
private final InputCodec codec;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;
private final ExportRecordConverter recordConverter;
private final EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSet acknowledgementSet;
private final Duration acknowledgmentTimeout;
private final Counter exportRecordsTotalCounter;
private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;
Expand All @@ -51,17 +59,23 @@ public class DataFileLoader implements Runnable {

private DataFileLoader(final DataFilePartition dataFilePartition,
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final Buffer<Record<Event>> buffer,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
this.dataFilePartition = dataFilePartition;
bucket = dataFilePartition.getBucket();
objectKey = dataFilePartition.getKey();
this.objectReader = objectReader;
this.codec = codec;
this.bufferAccumulator = bufferAccumulator;
this.buffer = buffer;
this.recordConverter = recordConverter;
this.sourceCoordinator = sourceCoordinator;
this.acknowledgementSet = acknowledgementSet;
this.acknowledgmentTimeout = acknowledgmentTimeout;

exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
Expand All @@ -72,17 +86,23 @@ private DataFileLoader(final DataFilePartition dataFilePartition,

public static DataFileLoader create(final DataFilePartition dataFilePartition,
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final Buffer<Record<Event>> buffer,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics);
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
return new DataFileLoader(dataFilePartition, codec, buffer, objectReader, recordConverter,
pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout);
}

@Override
public void run() {
LOG.info(SENSITIVE, "Start loading s3://{}/{}", bucket, objectKey);

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);

AtomicLong eventCount = new AtomicLong();
try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) {
codec.parse(inputStream, record -> {
Expand All @@ -100,15 +120,19 @@ public void run() {

final long snapshotTime = progressState.getSnapshotTime();
final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
Record<Event> transformedRecord = new Record<>(
recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
primaryKeys,
snapshotTime,
eventVersionNumber));
bufferAccumulator.add(transformedRecord);
final Event transformedEvent = recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
primaryKeys,
snapshotTime,
eventVersionNumber);

if (acknowledgementSet != null) {
acknowledgementSet.add(transformedEvent);
}

bufferAccumulator.add(new Record<>(transformedEvent));
eventCount.getAndIncrement();
bytesProcessedSummary.record(bytes);
} catch (Exception e) {
Expand All @@ -125,6 +149,10 @@ public void run() {

try {
bufferAccumulator.flush();
if (acknowledgementSet != null) {
sourceCoordinator.saveProgressStateForPartition(dataFilePartition, acknowledgmentTimeout);
acknowledgementSet.complete();
}
exportRecordSuccessCounter.increment(eventCount.get());
} catch (Exception e) {
LOG.error("Failed to write events to buffer", e);
Expand Down
Loading

0 comments on commit d6465ef

Please sign in to comment.