Skip to content

Commit

Permalink
init unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 6, 2023
1 parent 04a8ec2 commit 3f7d26f
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,4 @@ public void stop() {
}

}

private Record<Object> getEventRecordFromData(final String json) {
return new Record<>(JacksonEvent
.builder()
.withData(json)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ public void test_get_mongodb_connectors() throws IOException {
assertThat(actualConfig.get("mongodb.ssl.enabled"), is("false"));
}

@Test
public void test_get_mongodb_config_props() throws IOException {
MongoDBConfig testConfig = buildTestConfig("sample-mongodb-pipeline.yaml");
assertThat(testConfig, notNullValue());
assertThat(testConfig.getIngestionMode(), is("export_stream"));
assertThat(testConfig.getCredentialsConfig().getUsername(), is("debezium"));
assertThat(testConfig.getHostname(), is("localhost"));
assertThat(testConfig.getPort(), is("27017"));
assertThat(testConfig.getSSLEnabled(), is(false));
assertThat(testConfig.getSSLInvalidHostAllowed(), is(false));
assertThat(testConfig.getCollections().size(), is(1));
assertThat(testConfig.getExportConfig().getAcknowledgements(), is(false));
assertThat(testConfig.getExportConfig().getItemsPerPartition(), is(4000L));
assertThat(testConfig.getExportConfig().getReadPreference(), is("secondaryPreferred"));
}

private MongoDBConfig buildTestConfig(final String resourceFileName) throws IOException {
//Added to load Yaml file - Start
Yaml yaml = new Yaml();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,29 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;
import org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB.MongoDBService;
import org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB.MongoDBSnapshotProgressState;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -37,14 +51,24 @@ public class MongoDBSourceTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private SourceCoordinator sourceCoordinator;

@Mock
private MongoDBService mongoDBService;

@Mock
private Buffer<Record<Object>> buffer;

@BeforeEach
void setup() {
mongoDBConfig = mock(MongoDBConfig.class);
when(mongoDBConfig.getIngestionMode()).thenReturn("export_stream");
sourceCoordinator = mock(SourceCoordinator.class);
}

@Test
void testConstructorValidations() {
when(mongoDBConfig.getIngestionMode()).thenReturn("export_stream");
assertThrows(IllegalArgumentException.class, () -> new MongoDBSource(
mongoDBConfig,
pluginMetrics,
Expand All @@ -54,4 +78,32 @@ void testConstructorValidations() {
null,
null));
}

@Test
void testExportConstructor() {
when(mongoDBConfig.getIngestionMode()).thenReturn("export");
doNothing().when(sourceCoordinator).giveUpPartitions();
MongoDBSource mongoDBSource = new MongoDBSource(
mongoDBConfig,
pluginMetrics,
pipelineDescription,
acknowledgementSetManager,
awsCredentialsSupplier,
null,
null);
mongoDBSource.setSourceCoordinator(sourceCoordinator);
assertThat(mongoDBSource.getPartitionProgressStateClass(), equalTo(MongoDBSnapshotProgressState.class));
assertThat(mongoDBSource.getDecoder(), instanceOf(ByteDecoder.class));
try (MockedStatic<MongoDBService> mockedStatic = mockStatic((MongoDBService.class))) {
mongoDBService = mock(MongoDBService.class);
doNothing().when(mongoDBService).start();
doNothing().when(mongoDBService).stop();
mockedStatic.when(() -> MongoDBService.create(any(), any(), any(), any(), any())).thenReturn(mongoDBService);
mongoDBSource.start(buffer);
verify(mongoDBService).start();
mongoDBSource.stop();
verify(mongoDBService).stop();
verify(sourceCoordinator).giveUpPartitions();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class MongoDBServiceTest {
@Mock
private MongoDBConfig mongoDBConfig;

@Mock
private Buffer<Record<Object>> buffer;

@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private ScheduledExecutorService scheduledExecutorService;

@Mock
private SourceCoordinator<MongoDBSnapshotProgressState> sourceCoordinator;

@Mock
private MongoDBPartitionCreationSupplier mongoDBPartitionCreationSupplier;

@Mock
private PluginMetrics pluginMetrics;

@Test
public void testConstructor() {
createObjectUnderTest();
verify(sourceCoordinator).initialize();
}

@Test
public void testStart() {
createObjectUnderTest().start();
verify(scheduledExecutorService).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS));
}

private MongoDBService createObjectUnderTest() {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedConstruction<MongoDBPartitionCreationSupplier> mockedConstruction = mockConstruction(MongoDBPartitionCreationSupplier.class, (mock, context) -> {
mongoDBPartitionCreationSupplier = mock;
})) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService);
return MongoDBService.create(mongoDBConfig, sourceCoordinator, buffer, acknowledgementSetManager, pluginMetrics);
}
}
}

0 comments on commit 3f7d26f

Please sign in to comment.