Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e acknowledgment and checkpointing to RDS source #4819

Merged
merged 8 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class RdsService {
private final EventFactory eventFactory;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private ExecutorService executor;
private LeaderScheduler leaderScheduler;
private ExportScheduler exportScheduler;
Expand All @@ -60,11 +62,13 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final ClientFactory clientFactory,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.eventFactory = eventFactory;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;

rdsClient = clientFactory.buildRdsClient();
s3Client = clientFactory.buildS3Client();
Expand Down Expand Up @@ -94,7 +98,7 @@ public void start(Buffer<Record<Event>> buffer) {
exportScheduler = new ExportScheduler(
sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics);
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(exportScheduler);
runnableList.add(dataFileScheduler);
}
Expand All @@ -106,7 +110,8 @@ public void start(Buffer<Record<Event>> buffer) {
} else {
binaryLogClient.setSSLMode(SSLMode.DISABLED);
}
streamScheduler = new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics);
streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(streamScheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -35,17 +36,20 @@ public class RdsSource implements Source<Record<Event>>, UsesEnhancedSourceCoord
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final EventFactory eventFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private EnhancedSourceCoordinator sourceCoordinator;
private RdsService rdsService;

@DataPrepperPluginConstructor
public RdsSource(final PluginMetrics pluginMetrics,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
}
Expand All @@ -56,7 +60,7 @@ public void start(Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);
sourceCoordinator.createPartition(new LeaderPartition());

rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics);
rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics, acknowledgementSetManager);

LOG.info("Start RDS service");
rdsService.start(buffer);
Expand All @@ -80,4 +84,9 @@ public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordin
public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
return new PartitionFactory();
}

@Override
public boolean areAcknowledgementsEnabled() {
return sourceConfig.isAcknowledgmentsEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;

import java.time.Duration;
import java.util.List;

/**
Expand Down Expand Up @@ -56,6 +57,12 @@ public class RdsSourceConfig {
@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(30);

@JsonProperty("stream_acknowledgment_timeout")
private Duration streamAcknowledgmentTimeout = Duration.ofMinutes(10);

@JsonProperty("s3_bucket")
private String s3Bucket;

Expand Down Expand Up @@ -106,6 +113,14 @@ public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public Duration getDataFileAcknowledgmentTimeout() {
return dataFileAcknowledgmentTimeout;
}

public Duration getStreamAcknowledgmentTimeout() {
return streamAcknowledgmentTimeout;
}

public String getS3Bucket() {
return s3Bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,12 @@

public class StreamProgressState {

@JsonProperty("startPosition")
private BinlogCoordinate startPosition;

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("waitForExport")
private boolean waitForExport = false;

public BinlogCoordinate getStartPosition() {
return startPosition;
}

public void setStartPosition(BinlogCoordinate startPosition) {
this.startPosition = startPosition;
}

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
Expand All @@ -28,6 +31,8 @@ public class DataFileLoader implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);

static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5);
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed";
static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors";
Expand All @@ -39,8 +44,11 @@ public class DataFileLoader implements Runnable {
private final String objectKey;
private final S3ObjectReader objectReader;
private final InputCodec codec;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;
private final ExportRecordConverter recordConverter;
private final EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSet acknowledgementSet;
private final Duration acknowledgmentTimeout;
private final Counter exportRecordsTotalCounter;
private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;
Expand All @@ -49,17 +57,23 @@ public class DataFileLoader implements Runnable {

private DataFileLoader(final DataFilePartition dataFilePartition,
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final Buffer<Record<Event>> buffer,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
this.dataFilePartition = dataFilePartition;
bucket = dataFilePartition.getBucket();
objectKey = dataFilePartition.getKey();
this.objectReader = objectReader;
this.codec = codec;
this.bufferAccumulator = bufferAccumulator;
this.buffer = buffer;
this.recordConverter = recordConverter;
this.sourceCoordinator = sourceCoordinator;
this.acknowledgementSet = acknowledgementSet;
this.acknowledgmentTimeout = acknowledgmentTimeout;

exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
Expand All @@ -70,17 +84,23 @@ private DataFileLoader(final DataFilePartition dataFilePartition,

public static DataFileLoader create(final DataFilePartition dataFilePartition,
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final Buffer<Record<Event>> buffer,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics);
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
return new DataFileLoader(dataFilePartition, codec, buffer, objectReader, recordConverter,
pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout);
}

@Override
public void run() {
LOG.info("Start loading s3://{}/{}", bucket, objectKey);

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);

AtomicLong eventCount = new AtomicLong();
try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) {
codec.parse(inputStream, record -> {
Expand All @@ -98,15 +118,19 @@ public void run() {

final long snapshotTime = progressState.getSnapshotTime();
final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
Record<Event> transformedRecord = new Record<>(
recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
primaryKeys,
snapshotTime,
eventVersionNumber));
bufferAccumulator.add(transformedRecord);
final Event transformedEvent = recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
primaryKeys,
snapshotTime,
eventVersionNumber);

if (acknowledgementSet != null) {
acknowledgementSet.add(transformedEvent);
}

bufferAccumulator.add(new Record<>(transformedEvent));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferAccumulator is not thread safe. I would recommend to instantiate this set with in the run method itself instead of passing to this thread. Or avoid using Accumulator if it is not really required.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I moved the instantiation into the run method.

eventCount.getAndIncrement();
bytesProcessedSummary.record(bytes);
} catch (Exception e) {
Expand All @@ -123,6 +147,10 @@ public void run() {

try {
bufferAccumulator.flush();
if (acknowledgementSet != null) {
sourceCoordinator.saveProgressStateForPartition(dataFilePartition, acknowledgmentTimeout);
acknowledgementSet.complete();
}
exportRecordSuccessCounter.increment(eventCount.get());
} catch (Exception e) {
LOG.error("Failed to write events to buffer", e);
Expand Down
Loading
Loading