Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #366 ] remove custom-format topic concept #388

Merged
merged 8 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol;

public class SubscriptionItem {

private String topic;

private SubscriptionMode mode;

public SubscriptionItem(String topic, SubscriptionMode mode) {
this.topic = topic;
this.mode = mode;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public SubscriptionMode getMode() {
return mode;
}

public void setMode(SubscriptionMode mode) {
this.mode = mode;
}

@Override
public String toString() {
return "SubscriptionItem{" +
"topic=" + topic +
", mode=" + mode +
'}';
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,31 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.consumergroup.event;
package org.apache.eventmesh.common.protocol;

public enum SubscriptionMode {

/**
* broadcast
*/
BROADCASTING("BROADCASTING"),
/**
* clustering
*/
CLUSTERING("CLUSTERING");

private String mode;

SubscriptionMode(String mode) {
this.mode = mode;
}

public String getMode() {
return mode;
}

public void setMode(String mode) {
this.mode = mode;
}

public class ConsumerGroupInstanceChangeEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class HeartbeatRequestBody extends Body {
public static final String CLIENTTYPE = "clientType";
public static final String HEARTBEATENTITIES = "heartbeatEntities";


private String clientType;

private List<HeartbeatEntity> heartbeatEntities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fastjson.JSONArray;

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;

public class RegRequestBody extends Body {
Expand All @@ -39,13 +40,13 @@ public class RegRequestBody extends Body {

private String endPoint;

private List<String> topics;
private List<SubscriptionItem> topics;

public List<String> getTopics() {
public List<SubscriptionItem> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}

Expand All @@ -69,7 +70,7 @@ public static RegRequestBody buildBody(Map<String, Object> bodyParam) {
RegRequestBody body = new RegRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), String.class));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fastjson.JSONArray;

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;

public class SubscribeRequestBody extends Body {
Expand All @@ -33,20 +34,18 @@ public class SubscribeRequestBody extends Body {

public static final String URL = "url";

private List<String> topics;
private List<SubscriptionItem> topics;

private String url;

private String topic;

public List<String> getTopics() {
public List<SubscriptionItem> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}

private String url;

public String getUrl() {
return url;
}
Expand All @@ -58,7 +57,7 @@ public void setUrl(String url) {
public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
SubscribeRequestBody body = new SubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@

package org.apache.eventmesh.common.protocol.tcp;

import org.apache.eventmesh.common.protocol.SubscriptionItem;

import java.util.LinkedList;
import java.util.List;

public class Subscription {

private List<String> topicList = new LinkedList<>();
private List<SubscriptionItem> topicList = new LinkedList<>();

public Subscription() {
}

public Subscription(List<String> topicList) {
public Subscription(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}

public List<String> getTopicList() {
public List<SubscriptionItem> getTopicList() {
return topicList;
}

public void setTopicList(List<String> topicList) {
public void setTopicList(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}

Expand All @@ -45,6 +47,4 @@ public String toString() {
"topicList=" + topicList +
'}';
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class EventMeshHTTPServer extends AbrstractHTTPServer {

private EventMeshHTTPConfiguration eventMeshHttpConfiguration;

public final ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();

public final ConcurrentHashMap<String, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();

public EventMeshHTTPServer(EventMeshServer eventMeshServer,
EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void init() throws Exception {

String eventstore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV));
logger.info("eventstore : {}", eventstore);
// logger.info("load custom {} class for eventMesh", ConsumeMessageConcurrentlyService.class.getCanonicalName());

serviceState = ServiceState.INITED;
logger.info("server state:{}", serviceState);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class EventMeshConstants {

public static final String BROADCAST_PREFIX = "broadcast-";

public static final String IS_SYNC_MESSAGE = "sync";

public final static String CONSUMER_GROUP_NAME_PREFIX = "ConsumerGroup-";

public final static String PRODUCER_GROUP_NAME_PREFIX = "ProducerGroup-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,11 @@ public class ConsumerGroupTopicConf {

private String topic;

/**
* @see org.apache.eventmesh.common.protocol.SubscriptionMode
*/
private SubscriptionMode subscriptionMode;

/**
* PUSH URL
*/
Expand All @@ -53,19 +59,21 @@ public boolean equals(Object o) {
ConsumerGroupTopicConf that = (ConsumerGroupTopicConf) o;
return consumerGroup.equals(that.consumerGroup) &&
Objects.equals(topic, that.topic) &&
Objects.equals(subscriptionMode, that.subscriptionMode) &&
Objects.equals(idcUrls, that.idcUrls);
}

@Override
public int hashCode() {
return Objects.hash(consumerGroup, topic, idcUrls);
return Objects.hash(consumerGroup, topic, subscriptionMode, idcUrls);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("consumeTopicConfig={consumerGroup=").append(consumerGroup)
.append(",topic=").append(topic)
.append(",subscriptionMode=").append(subscriptionMode)
.append(",idcUrls=").append(idcUrls).append("}");
return sb.toString();
}
Expand All @@ -86,6 +94,14 @@ public void setTopic(String topic) {
this.topic = topic;
}

public SubscriptionMode getSubscriptionMode() {
return subscriptionMode;
}

public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}

public Map<String, List<String>> getIdcUrls() {
return idcUrls;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.eventmesh.runtime.core.protocol.http.consumer;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;

public class ConsumerGroupManager {

Expand Down Expand Up @@ -52,8 +54,8 @@ public synchronized void start() throws Exception {
}

private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception {
for (String topic : consumerGroupConfig.getConsumerGroupTopicConf().keySet()) {
eventMeshConsumer.subscribe(topic);
for (Map.Entry<String, ConsumerGroupTopicConf> conf : consumerGroupConfig.getConsumerGroupTopicConf().entrySet()) {
eventMeshConsumer.subscribe(conf.getKey(), conf.getValue().getSubscriptionMode());
}
}

Expand Down
Loading