Skip to content

Commit

Permalink
Issue #405 Fix SubscriptionType typo error (#522)
Browse files Browse the repository at this point in the history
* [Issue #337] Fix HttpSubscriber startup issue

* [Issue #337] test commit

* [Issue #337] revert test commit

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Address code review comment for Subscriber Demo App

* Fix subscriptionType typo

Co-authored-by: j00441484 <[email protected]>
  • Loading branch information
jinrongluo and j00441484 authored Sep 15, 2021
1 parent bd8d736 commit 84b2f4d
Show file tree
Hide file tree
Showing 27 changed files with 82 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ public class SubscriptionItem {

private SubscriptionMode mode;

private SubcriptionType type;
private SubscriptionType type;

public SubscriptionItem() {
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType type) {
this.topic = topic;
this.mode = mode;
this.type = type;
}

public SubcriptionType getType() {
public SubscriptionType getType() {
return type;
}

public void setType(SubcriptionType type) {
public void setType(SubscriptionType type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.protocol;

public enum SubcriptionType {
public enum SubscriptionType {
/**
* SYNC
*/
Expand All @@ -29,7 +29,7 @@ public enum SubcriptionType {

private String type;

SubcriptionType(String type) {
SubscriptionType(String type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.ThreadUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.http.demo.AsyncPublishInstance;
Expand All @@ -52,7 +52,7 @@ public class SubService implements InitializingBean {

final Properties properties = Utils.readPropertiesFile("application.properties");

final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC));
final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
final String localIp = IPUtil.getLocalAddress();
final String localPort = properties.getProperty("server.port");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -45,7 +45,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
client.registerSubBusiHandler(handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.RandomStringUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand Down Expand Up @@ -85,7 +85,7 @@ public void tryHTTPRequest() {

String requestCode = "";

if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
if (SubscriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void push(final DownStreamMsgContext downStreamMsgContext) {
Command cmd;
if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
cmd = Command.BROADCAST_MESSAGE_TO_CLIENT;
} else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
} else if (SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
cmd = Command.REQUEST_TO_CLIENT;
} else {
cmd = Command.ASYNC_MESSAGE_TO_CLIENT;
Expand Down Expand Up @@ -101,7 +100,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);

//retry
long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
long delayTime = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills
: session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills;
downStreamMsgContext.delay(delayTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.openmessaging.api.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand Down Expand Up @@ -75,7 +75,7 @@ public void pushRetry(DownStreamMsgContext downStreamMsgContext) {
return;
}

int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes
: eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.api;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;

import org.apache.eventmesh.runtime.client.hook.ReceiveMsgHook;
Expand All @@ -39,9 +39,9 @@ public interface EventMeshClient {

Package listen() throws Exception;

Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.api;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -34,9 +34,9 @@ public interface SubClient {

void reconnect() throws Exception;

Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package listen() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -63,10 +63,10 @@ public static Package subscribe() {
return msg;
}

public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType));
return msg;
}

Expand All @@ -76,10 +76,10 @@ public static Package unsubscribe() {
return msg;
}

public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType));
return msg;
}

Expand Down Expand Up @@ -169,18 +169,18 @@ public static UserAgent generateSubServer() {
public static Subscription generateSubscription() {
Subscription subscription = new Subscription();
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscription.setTopicList(subscriptionItems);
return subscription;
}

public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Subscription subscription = new Subscription();
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
subscription.setTopicList(subscriptionItems);
return subscription;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.impl;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down Expand Up @@ -84,14 +84,14 @@ public Package listen() throws Exception {

@Override
public Package justSubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType);
SubscriptionType subscriptionType) throws Exception {
return this.subClient.justSubscribe(topic, subscriptionMode, subscriptionType);
}

@Override
public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType);
SubscriptionType subscriptionType) throws Exception {
return this.subClient.justUnsubscribe(topic, subscriptionMode, subscriptionType);
}

public void registerSubBusiHandler(ReceiveMsgHook handler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import io.netty.channel.SimpleChannelInboundHandler;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.demo.SyncPubClient;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,9 +118,9 @@ private void hello() throws Exception {
this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception {
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
Package msg = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

Expand All @@ -142,9 +141,9 @@ public Package listen() throws Exception {
// }

public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
SubscriptionType subscriptionType) throws Exception {
subscriptionItems.remove(topic);
Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType);
Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subscriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;

Expand All @@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
Expand Down
Loading

0 comments on commit 84b2f4d

Please sign in to comment.