From ca2b66dd9d08e35eac230a1fc1cbe8d6768e309f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=A0=E7=82=9C?= Date: Tue, 5 Dec 2023 10:45:29 +0800 Subject: [PATCH 1/6] rename the TemplateMessageResponse.errocode to TemplateMessageResponse.errcode --- .../wechat/sink/connector/TemplateMessageResponse.java | 2 +- .../connector/wechat/sink/connector/WeChatSinkConnector.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java index 3d51704e04..231b01c164 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java @@ -22,7 +22,7 @@ @Data public class TemplateMessageResponse { - private int errocode; + private int errcode; private String errmsg; diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java index 0457202b68..c275ea2491 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java @@ -166,9 +166,9 @@ private void sendMessage(ConnectRecord record) { throw new IOException("message response is null."); } - if (messageResponse.getErrocode() != 0) { + if (messageResponse.getErrcode() != 0) { throw new IllegalAccessException(String.format("Send message to weCom error! errorCode=%s, errorMessage=%s", - messageResponse.getErrocode(), messageResponse.getErrmsg())); + messageResponse.getErrcode(), messageResponse.getErrmsg())); } } From 7a8d2e8619156fb09632d143be39c6a25b120db6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=A0=E7=82=9C?= Date: Tue, 5 Dec 2023 13:40:23 +0800 Subject: [PATCH 2/6] add unit test for abnormal case --- .../connector/WeChatSinkConnectorTest.java | 66 +++++++++++++------ 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java index 2c4ad5b013..707d491a94 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java @@ -79,6 +79,20 @@ public void setUp() throws Exception { Mockito.doReturn(tokenCall).when(okHttpClient).newCall(Mockito.argThat(tokenMatcher)); Mockito.doReturn(tokenResponse).when(tokenCall).execute(); + weChatSinkConnector = new WeChatSinkConnector(); + WeChatSinkConfig weChatSinkConfig = (WeChatSinkConfig) ConfigUtil.parse(weChatSinkConnector.configClass()); + weChatSinkConnector.init(weChatSinkConfig); + Field clientField = ReflectionSupport.findFields(weChatSinkConnector.getClass(), + (f) -> f.getName().equals("okHttpClient"), + HierarchyTraversalMode.BOTTOM_UP).get(0); + clientField.setAccessible(true); + clientField.set(weChatSinkConnector, okHttpClient); + weChatSinkConnector.start(); + } + + @Test + public void testSendMessageToWeChat() throws Exception { + Request sendMessageRequest = new Request.Builder().url("https://api.weixin.qq.com/cgi-bin/message/template/send").build(); String sendMessageResponseJson = "{\"errcode\":0,\"errmsg\":\"ok\",\"msgid\":200228332}"; ResponseBody sendMessageBody = ResponseBody.create(MediaType.parse("application/json; charset=utf-8"), sendMessageResponseJson); @@ -95,31 +109,43 @@ public void setUp() throws Exception { Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher)); Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute(); - weChatSinkConnector = new WeChatSinkConnector(); - WeChatSinkConfig weChatSinkConfig = (WeChatSinkConfig) ConfigUtil.parse(weChatSinkConnector.configClass()); - weChatSinkConnector.init(weChatSinkConfig); - Field clientField = ReflectionSupport.findFields(weChatSinkConnector.getClass(), - (f) -> f.getName().equals("okHttpClient"), - HierarchyTraversalMode.BOTTOM_UP).get(0); - clientField.setAccessible(true); - clientField.set(weChatSinkConnector, okHttpClient); - weChatSinkConnector.start(); + List records = new ArrayList<>(); + RecordPartition partition = new RecordPartition(); + RecordOffset offset = new RecordOffset(); + ConnectRecord connectRecord = new ConnectRecord(partition, offset, + System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8)); + records.add(connectRecord); + + weChatSinkConnector.put(records); + verify(okHttpClient, times(2)).newCall(any(Request.class)); } @Test - public void testSendMessageToWeChat() throws Exception { - final int times = 1; - List records = new ArrayList<>(); - for (int i = 0; i < times; i++) { - RecordPartition partition = new RecordPartition(); - RecordOffset offset = new RecordOffset(); - ConnectRecord connectRecord = new ConnectRecord(partition, offset, - System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8)); - records.add(connectRecord); - } + public void testSendMessageToWeChatAbnormally() throws Exception { + Request sendMessageRequest = new Request.Builder().url("https://api.weixin.qq.com/cgi-bin/message/template/send").build(); + String sendMessageResponseJson = "{\"errcode\":42001,\"errmsg\":\"access_token expired rid: 656e8793-061949b5-738cb8f4\"}"; + ResponseBody sendMessageBody = ResponseBody.create(MediaType.parse("application/json; charset=utf-8"), sendMessageResponseJson); + Response sendMessageResponse = new Response.Builder() + .code(200) + .protocol(Protocol.HTTP_1_0) + .request(sendMessageRequest) + .body(sendMessageBody) + .message("ok") + .build(); + ArgumentMatcher sendMessageMatcher = (anyRequest) -> + sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath()); + Call sendMessageRequestCall = Mockito.mock(Call.class); + Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher)); + Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute(); + RecordPartition partition = new RecordPartition(); + RecordOffset offset = new RecordOffset(); + List records = new ArrayList<>(); + ConnectRecord connectRecord = new ConnectRecord(partition, offset, + System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8)); + records.add(connectRecord); weChatSinkConnector.put(records); - verify(okHttpClient, times(times + 1)).newCall(any(Request.class)); + verify(okHttpClient, times(2)).newCall(any(Request.class)); } @AfterEach From 7cc713b44cb6408cc3e291fec95b8639e0438b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=A0=E7=82=9C?= Date: Tue, 5 Dec 2023 20:52:56 +0800 Subject: [PATCH 3/6] change the weCom to WeChat --- .../connector/wechat/sink/connector/WeChatSinkConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java index c275ea2491..1d1c3258cb 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java @@ -167,7 +167,7 @@ private void sendMessage(ConnectRecord record) { } if (messageResponse.getErrcode() != 0) { - throw new IllegalAccessException(String.format("Send message to weCom error! errorCode=%s, errorMessage=%s", + throw new IllegalAccessException(String.format("Send message to WeChat error! errorCode=%s, errorMessage=%s", messageResponse.getErrcode(), messageResponse.getErrmsg())); } } From 0ab75eb207ab9688bff1824eb6a0e425a7259f0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=A0=E7=82=9C=23A03088?= <1099297353@qq.com> Date: Tue, 5 Dec 2023 22:42:40 +0800 Subject: [PATCH 4/6] add WeChat sink connector module in eventmesh-connectors/README.md --- eventmesh-connectors/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md index 25abaef684..1fc836f3ae 100644 --- a/eventmesh-connectors/README.md +++ b/eventmesh-connectors/README.md @@ -66,4 +66,6 @@ Add a new connector by implementing the source/sink interface using : | [Spring](eventmesh-connector-spring) | Sink | ✅ | | WeCom | Source | ⬜ | | [WeCom](eventmesh-connector-wecom) | Sink | ✅ | +| WeChat | Source | ⬜ | +| [WeChat](eventmesh-connector-wechat) | Sink | ✅ | | More connectors will be added... | Source/Sink | N/A | \ No newline at end of file From 0bf57f092eea37444b8bda3650ce989373eb5382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=A0=E7=82=9C=23A03088?= <1099297353@qq.com> Date: Tue, 5 Dec 2023 23:19:18 +0800 Subject: [PATCH 5/6] use JUnit's assertThrows() to assert exceptions --- .../wechat/sink/connector/WeChatSinkConnectorTest.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java index 707d491a94..d426737fac 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java @@ -29,11 +29,14 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -140,12 +143,11 @@ public void testSendMessageToWeChatAbnormally() throws Exception { RecordPartition partition = new RecordPartition(); RecordOffset offset = new RecordOffset(); - List records = new ArrayList<>(); ConnectRecord connectRecord = new ConnectRecord(partition, offset, System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8)); - records.add(connectRecord); - weChatSinkConnector.put(records); - verify(okHttpClient, times(2)).newCall(any(Request.class)); + Method sendMessageMethod = WeChatSinkConnector.class.getDeclaredMethod("sendMessage", ConnectRecord.class); + sendMessageMethod.setAccessible(true); + Assertions.assertThrows(InvocationTargetException.class, () -> sendMessageMethod.invoke(weChatSinkConnector, connectRecord)); } @AfterEach From 133a64b3b37ca39df428a5fdd3e27871a3bcb111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=A0=E7=82=9C=23A03088?= <1099297353@qq.com> Date: Tue, 5 Dec 2023 23:41:22 +0800 Subject: [PATCH 6/6] invalid token,testSendMessageToWeChatAbnormally will get token from server again to fix org.mockito.exceptions.misusing.UnnecessaryStubbingException --- .../wechat/sink/connector/WeChatSinkConnectorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java index d426737fac..5920417ea7 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java @@ -121,6 +121,8 @@ public void testSendMessageToWeChat() throws Exception { weChatSinkConnector.put(records); verify(okHttpClient, times(2)).newCall(any(Request.class)); + + WeChatSinkConnector.ACCESS_TOKEN_CACHE.invalidate(WeChatSinkConnector.ACCESS_TOKEN_CACHE_KEY); } @Test