Skip to content

Commit

Permalink
Issue apache#405 Fix SubscriptionType typo error (apache#522)
Browse files Browse the repository at this point in the history
* [Issue apache#337] Fix HttpSubscriber startup issue

* [Issue apache#337] test commit

* [Issue apache#337] revert test commit

* [Issue apache#337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue apache#337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue apache#337] Address code review comment for Subscriber Demo App

* Fix subscriptionType typo

Co-authored-by: j00441484 <[email protected]>
  • Loading branch information
2 people authored and xwm1992 committed Dec 27, 2021
1 parent ae3fab6 commit 4527f8d
Show file tree
Hide file tree
Showing 26 changed files with 80 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ public class SubscriptionItem {

private SubscriptionMode mode;

private SubcriptionType type;
private SubscriptionType type;

public SubscriptionItem() {
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType type) {
this.topic = topic;
this.mode = mode;
this.type = type;
}

public SubcriptionType getType() {
public SubscriptionType getType() {
return type;
}

public void setType(SubcriptionType type) {
public void setType(SubscriptionType type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.ThreadUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.http.demo.AsyncPublishInstance;
Expand All @@ -52,7 +52,7 @@ public class SubService implements InitializingBean {

final Properties properties = Utils.readPropertiesFile("application.properties");

final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC));
final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
final String localIp = IPUtil.getLocalAddress();
final String localPort = properties.getProperty("server.port");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -45,7 +45,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
client.registerSubBusiHandler(handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.RandomStringUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand Down Expand Up @@ -85,7 +85,7 @@ public void tryHTTPRequest() {

String requestCode = "";

if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
if (SubscriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void push(final DownStreamMsgContext downStreamMsgContext) {
Command cmd;
if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
cmd = Command.BROADCAST_MESSAGE_TO_CLIENT;
} else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
} else if (SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
cmd = Command.REQUEST_TO_CLIENT;
} else {
cmd = Command.ASYNC_MESSAGE_TO_CLIENT;
Expand Down Expand Up @@ -101,7 +100,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);

//retry
long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
long delayTime = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills
: session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills;
downStreamMsgContext.delay(delayTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.openmessaging.api.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand Down Expand Up @@ -75,7 +75,7 @@ public void pushRetry(DownStreamMsgContext downStreamMsgContext) {
return;
}

int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes
: eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.api;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;

import org.apache.eventmesh.runtime.client.hook.ReceiveMsgHook;
Expand All @@ -39,9 +39,9 @@ public interface EventMeshClient {

Package listen() throws Exception;

Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.api;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -34,9 +34,9 @@ public interface SubClient {

void reconnect() throws Exception;

Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package listen() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -63,10 +63,10 @@ public static Package subscribe() {
return msg;
}

public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType));
return msg;
}

Expand All @@ -76,10 +76,10 @@ public static Package unsubscribe() {
return msg;
}

public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType));
return msg;
}

Expand Down Expand Up @@ -169,18 +169,18 @@ public static UserAgent generateSubServer() {
public static Subscription generateSubscription() {
Subscription subscription = new Subscription();
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscription.setTopicList(subscriptionItems);
return subscription;
}

public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Subscription subscription = new Subscription();
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
subscription.setTopicList(subscriptionItems);
return subscription;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.impl;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down Expand Up @@ -84,14 +84,14 @@ public Package listen() throws Exception {

@Override
public Package justSubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType);
SubscriptionType subscriptionType) throws Exception {
return this.subClient.justSubscribe(topic, subscriptionMode, subscriptionType);
}

@Override
public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType);
SubscriptionType subscriptionType) throws Exception {
return this.subClient.justUnsubscribe(topic, subscriptionMode, subscriptionType);
}

public void registerSubBusiHandler(ReceiveMsgHook handler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import io.netty.channel.SimpleChannelInboundHandler;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.demo.SyncPubClient;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,9 +118,9 @@ private void hello() throws Exception {
this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception {
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
Package msg = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

Expand All @@ -142,9 +141,9 @@ public Package listen() throws Exception {
// }

public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
SubscriptionType subscriptionType) throws Exception {
subscriptionItems.remove(topic);
Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType);
Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subscriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;

Expand All @@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
Expand Down
Loading

0 comments on commit 4527f8d

Please sign in to comment.