From 5818f5992c3483b14cac97464ef2716c6b768569 Mon Sep 17 00:00:00 2001 From: yanrongzhen Date: Fri, 3 Nov 2023 16:23:20 +0800 Subject: [PATCH 1/2] Add unit test. --- .../eventmesh-connector-rocketmq/build.gradle | 3 + .../connector/RocketMQSinkConnectorTest.java | 110 ++++++++++++++++++ .../src/test/resources/sink-config.yml | 30 +++++ 3 files changed, 143 insertions(+) create mode 100644 eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java create mode 100644 eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle index ef6e907ebd..769e9c6cf8 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle @@ -33,7 +33,10 @@ List rocketmq = [ dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") + implementation project(":eventmesh-common") implementation rocketmq compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' + testImplementation "org.mockito:mockito-core" + testImplementation "org.mockito:mockito-junit-jupiter" } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java new file mode 100644 index 0000000000..bf2198a6e8 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.sink.connector; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.rocketmq.sink.config.RocketMQSinkConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.platform.commons.support.HierarchyTraversalMode; +import org.junit.platform.commons.support.ReflectionSupport; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.core.type.TypeReference; + +@ExtendWith(MockitoExtension.class) +public class RocketMQSinkConnectorTest { + + @InjectMocks + private RocketMQSinkConnector sinkConnector; + + @Mock + private DefaultMQProducer producer; + + private static final String EXPECTED_MESSAGE = "testMessage"; + + @BeforeEach + public void setUp() throws Exception { + Mockito.doNothing().when(producer).start(); + Mockito.doAnswer(invocationOnMock -> { + Message argument = invocationOnMock.getArgument(0); + String actualMessage = new String(argument.getBody()); + HashMap contentMap = JsonUtils.parseTypeReferenceObject(actualMessage, + new TypeReference>() { + }); + Assertions.assertNotNull(contentMap); + Assertions.assertEquals(EXPECTED_MESSAGE, contentMap.get("content")); + return null; + }).when(producer).send(Mockito.any(Message.class)); + Field field = ReflectionSupport.findFields(sinkConnector.getClass(), + (f) -> f.getName().equals("producer"), HierarchyTraversalMode.BOTTOM_UP).get(0); + field.setAccessible(true); + field.set(sinkConnector, producer); + producer.start(); + RocketMQSinkConfig sinkConfig = (RocketMQSinkConfig) ConfigUtil.parse(sinkConnector.configClass()); + sinkConnector.init(sinkConfig); + sinkConnector.start(); + } + + @Test + public void testRocketMQSinkConnector() throws Exception { + final int messageCount = 5; + sinkConnector.put(generateMockedRecords(messageCount)); + verify(producer, times(messageCount)).send(any(Message.class)); + } + + private List generateMockedRecords(final int messageCount) { + List records = new ArrayList<>(); + for (int i = 0; i < messageCount; i++) { + RecordPartition partition = new RecordPartition(); + RecordOffset offset = new RecordOffset(); + Map content = new HashMap<>(); + content.put("content", EXPECTED_MESSAGE); + ConnectRecord connectRecord = new ConnectRecord(partition, offset, System.currentTimeMillis(), + JsonUtils.toJSONString(content).getBytes(StandardCharsets.UTF_8)); + connectRecord.addExtension("id", String.valueOf(UUID.randomUUID())); + records.add(connectRecord); + } + return records; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml new file mode 100644 index 0000000000..210361dc28 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: rocketmqSink + appId: 5031 + userName: rocketmqSinkUser + passWord: rocketmqPassWord +connectorConfig: + connectorName: rocketmqSink + nameServer: 127.0.0.1:9876 + topic: TopicTest From eb921be0570e27a22d57e41550b3d9c5b01a7c3f Mon Sep 17 00:00:00 2001 From: yanrongzhen Date: Fri, 3 Nov 2023 18:15:04 +0800 Subject: [PATCH 2/2] Refactor --- .../connector/RocketMQSinkConnectorTest.java | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java index bf2198a6e8..afd13a3f2f 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.rocketmq.sink.config.RocketMQSinkConfig; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset; @@ -34,12 +33,9 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -50,8 +46,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import com.fasterxml.jackson.core.type.TypeReference; - @ExtendWith(MockitoExtension.class) public class RocketMQSinkConnectorTest { @@ -61,21 +55,12 @@ public class RocketMQSinkConnectorTest { @Mock private DefaultMQProducer producer; - private static final String EXPECTED_MESSAGE = "testMessage"; + private static final String EXPECTED_MESSAGE = "\"testMessage\""; @BeforeEach public void setUp() throws Exception { Mockito.doNothing().when(producer).start(); - Mockito.doAnswer(invocationOnMock -> { - Message argument = invocationOnMock.getArgument(0); - String actualMessage = new String(argument.getBody()); - HashMap contentMap = JsonUtils.parseTypeReferenceObject(actualMessage, - new TypeReference>() { - }); - Assertions.assertNotNull(contentMap); - Assertions.assertEquals(EXPECTED_MESSAGE, contentMap.get("content")); - return null; - }).when(producer).send(Mockito.any(Message.class)); + Mockito.doReturn(null).when(producer).send(Mockito.any(Message.class)); Field field = ReflectionSupport.findFields(sinkConnector.getClass(), (f) -> f.getName().equals("producer"), HierarchyTraversalMode.BOTTOM_UP).get(0); field.setAccessible(true); @@ -98,10 +83,8 @@ private List generateMockedRecords(final int messageCount) { for (int i = 0; i < messageCount; i++) { RecordPartition partition = new RecordPartition(); RecordOffset offset = new RecordOffset(); - Map content = new HashMap<>(); - content.put("content", EXPECTED_MESSAGE); ConnectRecord connectRecord = new ConnectRecord(partition, offset, System.currentTimeMillis(), - JsonUtils.toJSONString(content).getBytes(StandardCharsets.UTF_8)); + EXPECTED_MESSAGE.getBytes(StandardCharsets.UTF_8)); connectRecord.addExtension("id", String.valueOf(UUID.randomUUID())); records.add(connectRecord); }