Skip to content

Commit

Permalink
[Issue apache#417] support Grpc broadcast async publish
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo authored and xwm1992 committed Feb 16, 2022
1 parent a8bce21 commit adf637c
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
AsyncConsumeContext asyncConsumeContext = new AsyncConsumeContext() {
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
switch (action) {
Expand All @@ -229,7 +229,9 @@ public void commit(EventMeshAction action) {
}
};

listener.consume(cloudEvent, asyncConsumeContext);
eventMeshAsyncConsumeContext.setAbstractContext(context);

listener.consume(cloudEvent, eventMeshAsyncConsumeContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub.eventmeshmessage;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.producer.EventMeshGrpcProducer;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.util.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Slf4j
public class AsyncPublishBroadcast {

// This messageSize is also used in SubService.java (Subscriber)
public static int messageSize = 5;

public static void main(String[] args) throws Exception {

Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port");

final String topic = "TEST-TOPIC-GRPC-BROADCAST";

EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.producerGroup("EventMeshTest-producerGroup")
.env("env").idc("idc")
.sys("1234").build();

EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);

eventMeshGrpcProducer.init();

Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

for (int i = 0; i < messageSize; i++) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.content(JsonUtils.serialize(content))
.topic(topic)
.uniqueId(RandomStringUtils.generateNum(30))
.bizSeqNo(RandomStringUtils.generateNum(30))
.build()
.addProp(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000));
eventMeshGrpcProducer.publish(eventMeshMessage);
Thread.sleep(1000);
}
Thread.sleep(30000);
try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) {
// ignore
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.eventmesh.grpc.sub;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.util.Utils;

import java.util.Collections;
import java.util.Optional;
import java.util.Properties;

@Slf4j
public class EventmeshSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage> {

public static EventmeshSubscribeBroadcast handler = new EventmeshSubscribeBroadcast();

public static void main(String[] args) throws InterruptedException {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port");

final String topic = "TEST-TOPIC-GRPC-BROADCAST";

EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.consumerGroup("EventMeshTest-consumerGroup")
.env("env").idc("idc")
.sys("1234").build();

SubscriptionItem subscriptionItem = new SubscriptionItem();
subscriptionItem.setTopic(topic);
subscriptionItem.setMode(SubscriptionMode.BROADCASTING);
subscriptionItem.setType(SubscriptionType.ASYNC);

EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);

eventMeshGrpcConsumer.init();

eventMeshGrpcConsumer.registerListener(handler);

eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem));

Thread.sleep(60000);
eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem));
}

@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
log.info("receive async broadcast msg====================={}", msg);
return Optional.empty();
}

@Override
public String getProtocolType() {
return EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -22,6 +24,8 @@ public class StreamPushRequest extends AbstractPushRequest {

private final List<EventEmitter<SimpleMessage>> totalEmitters;

private final SubscriptionMode subscriptionMode;

private final int startIdx;

public StreamPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<AbstractPushRequest>> waitingRequests) {
Expand All @@ -30,6 +34,7 @@ public StreamPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<Abst
StreamTopicConfig topicConfig = (StreamTopicConfig) handleMsgContext.getConsumeTopicConfig();
this.idcEmitters = topicConfig.getIdcEmitters();
this.totalEmitters = topicConfig.getTotalEmitters();
this.subscriptionMode = topicConfig.getSubscriptionMode();
this.startIdx = RandomUtils.nextInt(0, totalEmitters.size());
}

Expand All @@ -39,45 +44,60 @@ public void tryPushRequest() {
return;
}

EventEmitter<SimpleMessage> eventEmitter = selectEmitter();
List<EventEmitter<SimpleMessage>> eventEmitters = selectEmitter();

if (eventEmitter == null) {
return;
}
this.lastPushTime = System.currentTimeMillis();

simpleMessage = SimpleMessage.newBuilder(simpleMessage)
.putProperties(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(lastPushTime))
.build();
try {
long cost = System.currentTimeMillis() - lastPushTime;
// catch the error and retry, don't use eventEmitter.onNext() to hide the error
eventEmitter.getEmitter().onNext(simpleMessage);
messageLogger.info(
"message|eventMesh2client|emitter|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", simpleMessage.getTopic(),
simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost);
complete();
} catch (Throwable t) {
long cost = System.currentTimeMillis() - lastPushTime;
messageLogger.error(
"message|eventMesh2client|exception={} |emitter|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", t.getMessage(), simpleMessage.getTopic(),
simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost, t);

delayRetry();
for (EventEmitter<SimpleMessage> eventEmitter: eventEmitters) {
this.lastPushTime = System.currentTimeMillis();

simpleMessage = SimpleMessage.newBuilder(simpleMessage)
.putProperties(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(lastPushTime))
.build();
try {
// catch the error and retry, don't use eventEmitter.onNext() to hide the error
eventEmitter.getEmitter().onNext(simpleMessage);

long cost = System.currentTimeMillis() - lastPushTime;
messageLogger.info(
"message|eventMesh2client|emitter|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", simpleMessage.getTopic(),
simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost);
complete();
} catch (Throwable t) {
long cost = System.currentTimeMillis() - lastPushTime;
messageLogger.error(
"message|eventMesh2client|exception={} |emitter|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", t.getMessage(), simpleMessage.getTopic(),
simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost, t);

delayRetry();
}
}
}

private EventEmitter<SimpleMessage> selectEmitter() {
private List<EventEmitter<SimpleMessage>> selectEmitter() {
List<EventEmitter<SimpleMessage>> emitterList = MapUtils.getObject(idcEmitters,
eventMeshGrpcConfiguration.eventMeshIDC, null);
if (CollectionUtils.isNotEmpty(emitterList)) {
return emitterList.get((startIdx + retryTimes) % emitterList.size());
if (subscriptionMode.equals(SubscriptionMode.CLUSTERING)) {
return Collections.singletonList(emitterList.get((startIdx + retryTimes) % emitterList.size()));
} else if (subscriptionMode.equals(SubscriptionMode.BROADCASTING)) {
return emitterList;
} else {
messageLogger.error("Invalid Subscription Mode, no message returning back to subscriber.");
return Collections.emptyList();
}
}
if (CollectionUtils.isNotEmpty(totalEmitters)) {
return totalEmitters.get((startIdx + retryTimes) % totalEmitters.size());
if (subscriptionMode.equals(SubscriptionMode.CLUSTERING)) {
return Collections.singletonList(totalEmitters.get((startIdx + retryTimes) % totalEmitters.size()));
} else if(subscriptionMode.equals(SubscriptionMode.BROADCASTING)) {
return totalEmitters;
} else {
messageLogger.error("Invalid Subscription Mode, no message returning back to subscriber.");
return Collections.emptyList();
}
}
return null;
messageLogger.error("No event emitters from subscriber, no message returning.");
return Collections.emptyList();
}
}
Loading

0 comments on commit adf637c

Please sign in to comment.