Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #405 Fix SubscriptionType typo error #522

Merged
merged 19 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ public class SubscriptionItem {

private SubscriptionMode mode;

private SubcriptionType type;
private SubscriptionType type;

public SubscriptionItem() {
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType type) {
this.topic = topic;
this.mode = mode;
this.type = type;
}

public SubcriptionType getType() {
public SubscriptionType getType() {
return type;
}

public void setType(SubcriptionType type) {
public void setType(SubscriptionType type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.protocol;

public enum SubcriptionType {
public enum SubscriptionType {
/**
* SYNC
*/
Expand All @@ -29,7 +29,7 @@ public enum SubcriptionType {

private String type;

SubcriptionType(String type) {
SubscriptionType(String type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.ThreadUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.http.demo.AsyncPublishInstance;
Expand All @@ -52,7 +52,7 @@ public class SubService implements InitializingBean {

final Properties properties = Utils.readPropertiesFile("application.properties");

final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC));
final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
final String localIp = IPUtil.getLocalAddress();
final String localPort = properties.getProperty("server.port");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -45,7 +45,7 @@ public static void main(String[] agrs) throws Exception {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
client.registerSubBusiHandler(handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.RandomStringUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand Down Expand Up @@ -85,7 +85,7 @@ public void tryHTTPRequest() {

String requestCode = "";

if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
if (SubscriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void push(final DownStreamMsgContext downStreamMsgContext) {
Command cmd;
if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
cmd = Command.BROADCAST_MESSAGE_TO_CLIENT;
} else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
} else if (SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
cmd = Command.REQUEST_TO_CLIENT;
} else {
cmd = Command.ASYNC_MESSAGE_TO_CLIENT;
Expand Down Expand Up @@ -101,7 +100,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);

//retry
long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
long delayTime = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills
: session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills;
downStreamMsgContext.delay(delayTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.openmessaging.api.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand Down Expand Up @@ -75,7 +75,7 @@ public void pushRetry(DownStreamMsgContext downStreamMsgContext) {
return;
}

int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes
: eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.api;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;

import org.apache.eventmesh.runtime.client.hook.ReceiveMsgHook;
Expand All @@ -39,9 +39,9 @@ public interface EventMeshClient {

Package listen() throws Exception;

Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.api;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -34,9 +34,9 @@ public interface SubClient {

void reconnect() throws Exception;

Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception;

Package listen() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -63,10 +63,10 @@ public static Package subscribe() {
return msg;
}

public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType));
return msg;
}

Expand All @@ -76,10 +76,10 @@ public static Package unsubscribe() {
return msg;
}

public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType));
return msg;
}

Expand Down Expand Up @@ -169,18 +169,18 @@ public static UserAgent generateSubServer() {
public static Subscription generateSubscription() {
Subscription subscription = new Subscription();
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
subscription.setTopicList(subscriptionItems);
return subscription;
}

public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) {
Subscription subscription = new Subscription();
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
subscription.setTopicList(subscriptionItems);
return subscription;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.runtime.client.impl;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down Expand Up @@ -84,14 +84,14 @@ public Package listen() throws Exception {

@Override
public Package justSubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType);
SubscriptionType subscriptionType) throws Exception {
return this.subClient.justSubscribe(topic, subscriptionMode, subscriptionType);
}

@Override
public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType);
SubscriptionType subscriptionType) throws Exception {
return this.subClient.justUnsubscribe(topic, subscriptionMode, subscriptionType);
}

public void registerSubBusiHandler(ReceiveMsgHook handler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import io.netty.channel.SimpleChannelInboundHandler;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.demo.SyncPubClient;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,9 +118,9 @@ private void hello() throws Exception {
this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception {
subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
Package msg = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

Expand All @@ -142,9 +141,9 @@ public Package listen() throws Exception {
// }

public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
SubscriptionType subscriptionType) throws Exception {
subscriptionItems.remove(topic);
Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType);
Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subscriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;

Expand All @@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.netty.channel.ChannelHandlerContext;

import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
Expand Down
Loading