Skip to content

Commit

Permalink
make export and stream works together.
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 6, 2023
1 parent 2c39825 commit 04a8ec2
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@SuppressWarnings("deprecation")
public abstract class KafkaConnectSource implements Source<Record<Object>> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectSource.class);
private final ConnectorConfig connectorConfig;
public final ConnectorConfig connectorConfig;
private final String pipelineName;
private KafkaConnectConfig kafkaConnectConfig;
private KafkaConnect kafkaConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class MongoDBSource extends KafkaConnectSource implements UsesSourceCoord

private final PluginMetrics pluginMetrics;

private final MongoDBConfig mongoDBConfig;

private final AcknowledgementSetManager acknowledgementSetManager;

private MongoDBService mongoDBService;
Expand All @@ -60,7 +58,6 @@ public MongoDBSource(final MongoDBConfig mongoDBConfig,
final KafkaConnectConfigSupplier kafkaConnectConfigSupplier) {
super(mongoDBConfig, pluginMetrics, pipelineDescription, kafkaClusterConfigSupplier, kafkaConnectConfigSupplier);
this.pluginMetrics = pluginMetrics;
this.mongoDBConfig = mongoDBConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.byteDecoder = new JsonDecoder();
Expand All @@ -71,7 +68,7 @@ public void start(Buffer<Record<Object>> buffer) {
super.start(buffer);
if (shouldStartInitialLoad()) {
LOG.info("Starting initial load");
this.mongoDBService = MongoDBService.create(mongoDBConfig, sourceCoordinator, buffer, acknowledgementSetManager, pluginMetrics);
this.mongoDBService = MongoDBService.create((MongoDBConfig) this.connectorConfig, sourceCoordinator, buffer, acknowledgementSetManager, pluginMetrics);
this.mongoDBService.start();
}
}
Expand Down Expand Up @@ -103,11 +100,12 @@ public ByteDecoder getDecoder() {

@Override
public boolean shouldStartKafkaConnect() {
return false;
// return mongoDBConfig.getSnapshotMode().equals("export_stream") || mongoDBConfig.getSnapshotMode().equals("stream");
final MongoDBConfig mongoDBConfig = (MongoDBConfig) this.connectorConfig;
return mongoDBConfig.getIngestionMode().equals("export_stream") || mongoDBConfig.getIngestionMode().equals("stream");
}

private boolean shouldStartInitialLoad() {
final MongoDBConfig mongoDBConfig = (MongoDBConfig) this.connectorConfig;
return mongoDBConfig.getIngestionMode().equals("export_stream") || mongoDBConfig.getIngestionMode().equals("export");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.kafkaconnect.source;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -16,6 +17,7 @@
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -35,6 +37,11 @@ public class MongoDBSourceTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@BeforeEach
void setup() {
mongoDBConfig = mock(MongoDBConfig.class);
when(mongoDBConfig.getIngestionMode()).thenReturn("export_stream");
}

@Test
void testConstructorValidations() {
Expand Down

0 comments on commit 04a8ec2

Please sign in to comment.