Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e acknowledgment and checkpointing to RDS source #4819

Merged
merged 8 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class RdsService {
private final EventFactory eventFactory;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private ExecutorService executor;
private LeaderScheduler leaderScheduler;
private ExportScheduler exportScheduler;
Expand All @@ -60,11 +62,13 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final ClientFactory clientFactory,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.eventFactory = eventFactory;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;

rdsClient = clientFactory.buildRdsClient();
s3Client = clientFactory.buildS3Client();
Expand Down Expand Up @@ -94,7 +98,7 @@ public void start(Buffer<Record<Event>> buffer) {
exportScheduler = new ExportScheduler(
sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics);
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(exportScheduler);
runnableList.add(dataFileScheduler);
}
Expand All @@ -106,7 +110,8 @@ public void start(Buffer<Record<Event>> buffer) {
} else {
binaryLogClient.setSSLMode(SSLMode.DISABLED);
}
streamScheduler = new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics);
streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(streamScheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -35,17 +36,20 @@ public class RdsSource implements Source<Record<Event>>, UsesEnhancedSourceCoord
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final EventFactory eventFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private EnhancedSourceCoordinator sourceCoordinator;
private RdsService rdsService;

@DataPrepperPluginConstructor
public RdsSource(final PluginMetrics pluginMetrics,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
}
Expand All @@ -56,7 +60,7 @@ public void start(Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);
sourceCoordinator.createPartition(new LeaderPartition());

rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics);
rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics, acknowledgementSetManager);

LOG.info("Start RDS service");
rdsService.start(buffer);
Expand All @@ -80,4 +84,9 @@ public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordin
public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
return new PartitionFactory();
}

@Override
public boolean areAcknowledgementsEnabled() {
return sourceConfig.isAcknowledgmentsEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;

import java.time.Duration;
import java.util.List;

/**
Expand Down Expand Up @@ -56,6 +57,12 @@ public class RdsSourceConfig {
@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(30);

@JsonProperty("stream_acknowledgment_timeout")
private Duration streamAcknowledgmentTimeout = Duration.ofMinutes(10);

@JsonProperty("s3_bucket")
private String s3Bucket;

Expand Down Expand Up @@ -106,6 +113,14 @@ public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public Duration getDataFileAcknowledgmentTimeout() {
return dataFileAcknowledgmentTimeout;
}

public Duration getStreamAcknowledgmentTimeout() {
return streamAcknowledgmentTimeout;
}

public String getS3Bucket() {
return s3Bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,12 @@

public class StreamProgressState {

@JsonProperty("startPosition")
private BinlogCoordinate startPosition;

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("waitForExport")
private boolean waitForExport = false;

public BinlogCoordinate getStartPosition() {
return startPosition;
}

public void setStartPosition(BinlogCoordinate startPosition) {
this.startPosition = startPosition;
}

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
Expand Down Expand Up @@ -41,6 +43,9 @@ public class DataFileLoader implements Runnable {
private final InputCodec codec;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final ExportRecordConverter recordConverter;
private final EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSet acknowledgementSet;
private final Duration acknowledgmentTimeout;
private final Counter exportRecordsTotalCounter;
private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;
Expand All @@ -52,14 +57,20 @@ private DataFileLoader(final DataFilePartition dataFilePartition,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
this.dataFilePartition = dataFilePartition;
bucket = dataFilePartition.getBucket();
objectKey = dataFilePartition.getKey();
this.objectReader = objectReader;
this.codec = codec;
this.bufferAccumulator = bufferAccumulator;
this.recordConverter = recordConverter;
this.sourceCoordinator = sourceCoordinator;
this.acknowledgementSet = acknowledgementSet;
this.acknowledgmentTimeout = acknowledgmentTimeout;

exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
Expand All @@ -73,8 +84,12 @@ public static DataFileLoader create(final DataFilePartition dataFilePartition,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics);
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter,
pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout);
}

@Override
Expand All @@ -98,15 +113,19 @@ public void run() {

final long snapshotTime = progressState.getSnapshotTime();
final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
Record<Event> transformedRecord = new Record<>(
recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
primaryKeys,
snapshotTime,
eventVersionNumber));
bufferAccumulator.add(transformedRecord);
final Event transformedEvent = recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
primaryKeys,
snapshotTime,
eventVersionNumber);

if (acknowledgementSet != null) {
acknowledgementSet.add(transformedEvent);
}

bufferAccumulator.add(new Record<>(transformedEvent));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferAccumulator is not thread safe. I would recommend to instantiate this set with in the run method itself instead of passing to this thread. Or avoid using Accumulator if it is not really required.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I moved the instantiation into the run method.

eventCount.getAndIncrement();
bytesProcessedSummary.record(bytes);
} catch (Exception e) {
Expand All @@ -123,6 +142,10 @@ public void run() {

try {
bufferAccumulator.flush();
if (acknowledgementSet != null) {
sourceCoordinator.saveProgressStateForPartition(dataFilePartition, acknowledgmentTimeout);
acknowledgementSet.complete();
}
exportRecordSuccessCounter.increment(eventCount.get());
} catch (Exception e) {
LOG.error("Failed to write events to buffer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -32,6 +34,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import static org.opensearch.dataprepper.plugins.source.rds.RdsService.DATA_LOADER_MAX_JOB_COUNT;

Expand Down Expand Up @@ -63,6 +66,7 @@ public class DataFileScheduler implements Runnable {
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final ExportRecordConverter recordConverter;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;

private final Counter exportFileSuccessCounter;
private final Counter exportFileErrorCounter;
Expand All @@ -75,7 +79,8 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator,
final S3Client s3Client,
final EventFactory eventFactory,
final Buffer<Record<Event>> buffer,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.sourceConfig = sourceConfig;
codec = new ParquetInputCodec(eventFactory);
Expand All @@ -84,6 +89,7 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator,
recordConverter = new ExportRecordConverter();
executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT);
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;

this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT);
this.exportFileErrorCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_ERROR_COUNT);
Expand Down Expand Up @@ -133,23 +139,39 @@ public void shutdown() {
}

private void processDataFilePartition(DataFilePartition dataFilePartition) {
// Create AcknowledgmentSet
final boolean isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled();
AcknowledgementSet acknowledgementSet = null;
if (sourceConfig.isAcknowledgmentsEnabled()) {
acknowledgementSet = acknowledgementSetManager.create((result) -> {
if (result) {
completeDataLoader(dataFilePartition).accept(null, null);
LOG.info("Received acknowledgment of completion from sink for data file {}", dataFilePartition.getKey());
} else {
exportFileErrorCounter.increment();
LOG.warn("Negative acknowledgment received for data file {}, retrying", dataFilePartition.getKey());
sourceCoordinator.giveUpPartition(dataFilePartition);
Copy link
Contributor

@san81 san81 Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It me just me trying to understand the code here. Not a real comment about the code.

I would assume the retry on this partition and also keeping track of retryCount is handled by the sourceCoordinator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Once the datafile partition is given up, it will be available for acquire again and the file load will be retried by whichever node that picked up the partition.

}
}, sourceConfig.getDataFileAcknowledgmentTimeout());
}

Runnable loader = DataFileLoader.create(
dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics);
dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics,
sourceCoordinator, acknowledgementSet, sourceConfig.getDataFileAcknowledgmentTimeout());
CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor);

runLoader.whenComplete((v, ex) -> {
if (ex == null) {
exportFileSuccessCounter.increment();
// Update global state so we know if all s3 files have been loaded
updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT);
sourceCoordinator.completePartition(dataFilePartition);
} else {
exportFileErrorCounter.increment();
LOG.error("There was an exception while processing an S3 data file", ex);
sourceCoordinator.giveUpPartition(dataFilePartition);
}
numOfWorkers.decrementAndGet();
});
if (isAcknowledgmentsEnabled) {
runLoader.whenComplete((v, ex) -> {
if (ex != null) {
exportFileErrorCounter.increment();
LOG.error("There was an exception while processing an S3 data file: {}", ex);
sourceCoordinator.giveUpPartition(dataFilePartition);
}
numOfWorkers.decrementAndGet();
});
} else {
runLoader.whenComplete(completeDataLoader(dataFilePartition));
}
numOfWorkers.incrementAndGet();
}

Expand Down Expand Up @@ -183,4 +205,20 @@ private void updateLoadStatus(String exportTaskId, Duration timeout) {
}
}
}

private BiConsumer<Void, Throwable> completeDataLoader(DataFilePartition dataFilePartition) {
return (v, ex) -> {
if (ex == null) {
exportFileSuccessCounter.increment();
// Update global state, so we know if all s3 files have been loaded
updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT);
sourceCoordinator.completePartition(dataFilePartition);
} else {
exportFileErrorCounter.increment();
LOG.error("There was an exception while processing an S3 data file", ex);
sourceCoordinator.giveUpPartition(dataFilePartition);
}
numOfWorkers.decrementAndGet();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ private Map<String, List<String>> getPrimaryKeyMap() {
private void createStreamPartition(RdsSourceConfig sourceConfig) {
final StreamProgressState progressState = new StreamProgressState();
progressState.setWaitForExport(sourceConfig.isExportEnabled());
getCurrentBinlogPosition().ifPresent(progressState::setStartPosition);
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition);
StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);
sourceCoordinator.createPartition(streamPartition);
}

private Optional<BinlogCoordinate> getCurrentBinlogPosition() {
return schemaManager.getCurrentBinaryLogPosition();
Optional<BinlogCoordinate> binlogCoordinate = schemaManager.getCurrentBinaryLogPosition();
LOG.debug("Current binlog position: {}", binlogCoordinate.orElse(null));
return binlogCoordinate;
}
}
Loading
Loading