Skip to content

Commit

Permalink
[ISSUE apache#366 ] remove custom-format topic concept (apache#388)
Browse files Browse the repository at this point in the history
* remove custom-format topic concept

* remove custom-format topic concept

* remove custom-format topic concept

* remove custom-format topic concept

* remove custom-format topic concept

* remove custom-format topic concept

* remove custom-format topic concept
  • Loading branch information
iNanos authored and jjz921024 committed Jul 25, 2021
1 parent 3905834 commit 51b9672
Show file tree
Hide file tree
Showing 53 changed files with 436 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class IPUtil {
public static String getLocalAddress() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
String dockerHostIp = System.getenv("webank_docker_host_ip");
String dockerHostIp = System.getenv("docker_host_ip");
if (dockerHostIp != null && !"".equals(dockerHostIp))
return dockerHostIp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,29 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.consumergroup.event;
package org.apache.eventmesh.common.protocol;

public class ConsumerGroupInstanceChangeEvent {
public enum SubcriptionType {
/**
* SYNC
*/
SYNC("SYNC"),
/**
* ASYNC
*/
ASYNC("ASYNC");

private String type;

SubcriptionType(String type) {
this.type = type;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol;

public class SubscriptionItem {

private String topic;

private SubscriptionMode mode;

private SubcriptionType type;

public SubscriptionItem() {
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
this.topic = topic;
this.mode = mode;
this.type = type;
}

public SubcriptionType getType() {
return type;
}

public void setType(SubcriptionType type) {
this.type = type;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public SubscriptionMode getMode() {
return mode;
}

public void setMode(SubscriptionMode mode) {
this.mode = mode;
}

@Override
public String toString() {
return "SubscriptionItem{" +
"topic=" + topic +
", mode=" + mode +
", type=" + type +
'}';
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol;

public enum SubscriptionMode {

/**
* broadcast
*/
BROADCASTING("BROADCASTING"),
/**
* clustering
*/
CLUSTERING("CLUSTERING");

private String mode;

SubscriptionMode(String mode) {
this.mode = mode;
}

public String getMode() {
return mode;
}

public void setMode(String mode) {
this.mode = mode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class HeartbeatRequestBody extends Body {
public static final String CLIENTTYPE = "clientType";
public static final String HEARTBEATENTITIES = "heartbeatEntities";


private String clientType;

private List<HeartbeatEntity> heartbeatEntities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fastjson.JSONArray;

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;

public class RegRequestBody extends Body {
Expand All @@ -39,13 +40,13 @@ public class RegRequestBody extends Body {

private String endPoint;

private List<String> topics;
private List<SubscriptionItem> topics;

public List<String> getTopics() {
public List<SubscriptionItem> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}

Expand All @@ -69,7 +70,7 @@ public static RegRequestBody buildBody(Map<String, Object> bodyParam) {
RegRequestBody body = new RegRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), String.class));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fastjson.JSONArray;

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;

public class SubscribeRequestBody extends Body {
Expand All @@ -33,20 +34,18 @@ public class SubscribeRequestBody extends Body {

public static final String URL = "url";

private List<String> topics;
private List<SubscriptionItem> topics;

private String url;

private String topic;

public List<String> getTopics() {
public List<SubscriptionItem> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}

private String url;

public String getUrl() {
return url;
}
Expand All @@ -58,7 +57,7 @@ public void setUrl(String url) {
public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
SubscribeRequestBody body = new SubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@

package org.apache.eventmesh.common.protocol.tcp;

import org.apache.eventmesh.common.protocol.SubscriptionItem;

import java.util.LinkedList;
import java.util.List;

public class Subscription {

private List<String> topicList = new LinkedList<>();
private List<SubscriptionItem> topicList = new LinkedList<>();

public Subscription() {
}

public Subscription(List<String> topicList) {
public Subscription(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}

public List<String> getTopicList() {
public List<SubscriptionItem> getTopicList() {
return topicList;
}

public void setTopicList(List<String> topicList) {
public void setTopicList(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}

Expand All @@ -45,6 +47,4 @@ public String toString() {
"topicList=" + topicList +
'}';
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class EventMeshHTTPServer extends AbrstractHTTPServer {

private EventMeshHTTPConfiguration eventMeshHttpConfiguration;

public final ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();

public final ConcurrentHashMap<String, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();

public EventMeshHTTPServer(EventMeshServer eventMeshServer,
EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void init() throws Exception {

String eventstore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV));
logger.info("eventstore : {}", eventstore);
// logger.info("load custom {} class for eventMesh", ConsumeMessageConcurrentlyService.class.getCanonicalName());

serviceState = ServiceState.INITED;
logger.info("server state:{}", serviceState);
Expand Down

This file was deleted.

Loading

0 comments on commit 51b9672

Please sign in to comment.