Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature #4793] Support MQTT protocol #4794

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ public class Constants {

public static final String GRPC = "GRPC";

public static final String MQTT = "MQTT";

public static final String OS_NAME_KEY = "os.name";

public static final String OS_WIN_PREFIX = "win";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.eventmesh.common.Constants.GRPC;
import static org.apache.eventmesh.common.Constants.HTTP;
import static org.apache.eventmesh.common.Constants.MQTT;
import static org.apache.eventmesh.common.Constants.TCP;

import org.apache.eventmesh.common.config.CommonConfiguration;
Expand All @@ -36,7 +37,7 @@ public class ConfigurationContextUtil {

private static final ConcurrentHashMap<String, CommonConfiguration> CONFIGURATION_MAP = new ConcurrentHashMap<>();

public static final List<String> KEYS = Lists.newArrayList(HTTP, TCP, GRPC);
public static final List<String> KEYS = Lists.newArrayList(HTTP, TCP, GRPC, MQTT);

/**
* Save http, tcp, grpc configuration at startup for global use.
Expand Down
8 changes: 7 additions & 1 deletion eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@
########################## EventMesh Runtime Environment ##########################
eventMesh.server.idc=DEFAULT
eventMesh.server.env=PRD
eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.provide.protocols=HTTP,TCP,GRPC,MQTT
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=0000
eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
eventMesh.server.mqtt.port=10305
########################## EventMesh TCP Configuration ##########################
eventMesh.server.tcp.enabled=true
eventMesh.server.tcp.port=10000
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
eventMesh.server.tcp.clientMaxNum=10000
########################## EventMesh Mtqq Configuration ##########################
eventMesh.server.mqtt.password=false


# client isolation time if the message send failure
eventMesh.server.tcp.pushFailIsolateTimeInMills=30000
# rebalance internal
Expand Down Expand Up @@ -144,3 +149,4 @@ eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook
eventMesh.webHook.nacosMode.serverAddr=127.0.0.1:8848
# Webhook CloudEvent sending mode. This property is the same as the eventMesh.storage.plugin.type configuration.
eventMesh.webHook.producer.storage=standalone

Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,7 @@
return aclProperties;
}

public boolean checkValid(String username, String password) {
return aclService.doAclCheckUserNameAndPassword(username, password);

Check warning on line 240 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java#L240

Added line #L240 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;


import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration;
import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientConnectProcessor;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientDisConnectProcessor;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.HealthCheckProcessor;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.MqttProcessor;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.PublishProcessor;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.SubscrubeProcessor;
import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.UnSubscrubeProcessor;
import org.apache.eventmesh.runtime.meta.MetaStorage;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException;

import lombok.extern.slf4j.Slf4j;

@Slf4j

Check warning on line 67 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L67

Added line #L67 was not covered by tests
public class EventMeshMQTTServer extends AbstractRemotingServer {

private final EventMeshMQTTConfiguration eventMeshMQTTConfiguration;

private final EventMeshServer eventMeshServer;

private final MetaStorage metaStorage;

private final Acl acl;


protected final Map<MqttMessageType, MqttProcessor> processorTable =

Check warning on line 79 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L79

Added line #L79 was not covered by tests
new ConcurrentHashMap<>(64);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is not serializable, is the transient keyword redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted,please review again.


private final transient AtomicBoolean started = new AtomicBoolean(false);

Check warning on line 82 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L82

Added line #L82 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem.
I'm not familiar with MQTT, so the rest of review work needs the community to complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem.

Waiting for your response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the problem here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted,please review again.



public EventMeshMQTTServer(final EventMeshServer eventMeshServer, final EventMeshMQTTConfiguration eventMeshMQTTConfiguration) {
this.eventMeshServer = eventMeshServer;
this.eventMeshMQTTConfiguration = eventMeshMQTTConfiguration;
this.metaStorage = eventMeshServer.getMetaStorage();
this.acl = eventMeshServer.getAcl();
}

Check warning on line 90 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L85-L90

Added lines #L85 - L90 were not covered by tests

@Override
public void init() throws Exception {
log.info("==================EventMeshMQTTServer Initialing==================");
super.init("eventMesh-mqtt");
registerMQTTProcessor();

Check warning on line 96 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L94-L96

Added lines #L94 - L96 were not covered by tests

}

Check warning on line 98 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L98

Added line #L98 was not covered by tests

private void registerMQTTProcessor() {
processorTable.putIfAbsent(MqttMessageType.CONNECT, new ClientConnectProcessor(this, getWorkerGroup()));
processorTable.putIfAbsent(MqttMessageType.DISCONNECT, new ClientDisConnectProcessor(this, getWorkerGroup()));
processorTable.putIfAbsent(MqttMessageType.PINGREQ, new HealthCheckProcessor(this, getWorkerGroup()));
processorTable.putIfAbsent(MqttMessageType.SUBSCRIBE, new SubscrubeProcessor(this, getWorkerGroup()));
processorTable.putIfAbsent(MqttMessageType.UNSUBSCRIBE, new UnSubscrubeProcessor(this, getWorkerGroup()));
processorTable.putIfAbsent(MqttMessageType.PUBLISH, new PublishProcessor(this, getWorkerGroup()));
}

Check warning on line 107 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L101-L107

Added lines #L101 - L107 were not covered by tests


@Override
public void start() throws Exception {
Thread thread = new Thread(() -> {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(this.getBossGroup(), this.getIoGroup())

Check warning on line 114 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L112-L114

Added lines #L112 - L114 were not covered by tests
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, 10485760);

Check warning on line 119 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L116-L119

Added lines #L116 - L119 were not covered by tests

bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new MQTTServerInitializer());

Check warning on line 124 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L121-L124

Added lines #L121 - L124 were not covered by tests

try {
int port = eventMeshMQTTConfiguration.getEventMeshTcpServerPort();
ChannelFuture f = bootstrap.bind(port).sync();
log.info("EventMeshMQTTServer[port={}] started.....", port);
f.channel().closeFuture().sync();
} catch (Exception e) {
log.error("EventMeshMQTTServer RemotingServer Start Err!", e);

Check warning on line 132 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L127-L132

Added lines #L127 - L132 were not covered by tests
try {
shutdown();
} catch (Exception ex) {
log.error("EventMeshMQTTServer RemotingServer shutdown Err!", ex);
}
System.exit(-1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it appropriate to exit the process when the MQTT server fails to start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a reference to other EM protocols.If you want to modify it after the discussion, I will ignore this startup failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xwm1992

Currently tcp protocol, http protocol server must be started successfully, grpc protocol did not so. Now added MQTT protocol server must be started successfullyt, please community to give advice, I can not decide.

目前tcp协议、http协议的server必须成功启动,grpc协议没有如此,现在新增MQTT协议的server是否必须启动成功,请社区给出意见,我权衡不好。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解这里的必须启动成功实际上是受是否开启MQTT协议的配置控制的吧?这里的退出我认为没有问题,如果MQTT协议加载有问题退出了,那其实可以在配置中移除MQTT协议,保证TCP、HTTP等协议正常启动服务就好。

}
}, "eventMesh-mqtt-server");
thread.start();

Check warning on line 141 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L134-L141

Added lines #L134 - L141 were not covered by tests

started.compareAndSet(false, true);

Check warning on line 143 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L143

Added line #L143 was not covered by tests

}

Check warning on line 145 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L145

Added line #L145 was not covered by tests

@Override
public CommonConfiguration getConfiguration() {
return eventMeshMQTTConfiguration;

Check warning on line 149 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L149

Added line #L149 was not covered by tests
}

private class MQTTServerInitializer extends ChannelInitializer<SocketChannel> {

Check warning on line 152 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L152

Added line #L152 was not covered by tests


@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(getWorkerGroup(), MqttEncoder.INSTANCE);
channelPipeline.addLast(getWorkerGroup(), new MqttDecoder());
channelPipeline.addLast(getWorkerGroup(), new EventMeshMqttChannelInboundHandler());
}

Check warning on line 161 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L157-L161

Added lines #L157 - L161 were not covered by tests
}

@Sharable
private class EventMeshMqttChannelInboundHandler extends ChannelInboundHandlerAdapter {

Check warning on line 165 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L165

Added line #L165 was not covered by tests

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MqttMessage) {
MqttMessage mqttMessage = (MqttMessage) msg;

Check warning on line 170 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L170

Added line #L170 was not covered by tests
if (mqttMessage.decoderResult().isFailure()) {
Throwable cause = mqttMessage.decoderResult().cause();

Check warning on line 172 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L172

Added line #L172 was not covered by tests
if (cause instanceof MqttUnacceptableProtocolVersionException) {
ctx.writeAndFlush(MqttMessageFactory.newMessage(

Check warning on line 174 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L174

Added line #L174 was not covered by tests
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false),
null));
} else if (cause instanceof MqttIdentifierRejectedException) {
ctx.writeAndFlush(MqttMessageFactory.newMessage(

Check warning on line 179 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L179

Added line #L179 was not covered by tests
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
null));
}
ctx.close();
return;

Check warning on line 185 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L184-L185

Added lines #L184 - L185 were not covered by tests
}
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageType mqttMessageType = mqttFixedHeader.messageType();
MqttProcessor mqttProcessor = processorTable.get(mqttMessageType);

Check warning on line 189 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L187-L189

Added lines #L187 - L189 were not covered by tests
if (!Objects.isNull(mqttProcessor)) {
Executor executor = mqttProcessor.executor();

Check warning on line 191 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L191

Added line #L191 was not covered by tests
if (Objects.isNull(executor)) {
mqttProcessor.process(ctx, mqttMessage);

Check warning on line 193 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L193

Added line #L193 was not covered by tests
} else {
executor.execute(() -> {

Check warning on line 195 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L195

Added line #L195 was not covered by tests
try {
mqttProcessor.process(ctx, mqttMessage);
} catch (MqttException e) {
log.error("[mqtt Processor error]", e);
ctx.close();
}
});

Check warning on line 202 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L197-L202

Added lines #L197 - L202 were not covered by tests
}
}
}
}

Check warning on line 206 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L206

Added line #L206 was not covered by tests

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
ctx.close();

Check warning on line 211 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L211

Added line #L211 was not covered by tests
} else {
super.exceptionCaught(ctx, cause);

Check warning on line 213 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L213

Added line #L213 was not covered by tests
}
}

Check warning on line 215 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L215

Added line #L215 was not covered by tests
}

public Acl getAcl() {
return acl;

Check warning on line 219 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java#L219

Added line #L219 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import static org.apache.eventmesh.common.Constants.MQTT;

import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration;

public class EventMeshMqttBootstrap implements EventMeshBootstrap {

private EventMeshMQTTServer eventMeshMQTTServer;

private EventMeshServer eventMeshServer;

private EventMeshMQTTConfiguration eventMeshMQTTConfiguration;

public EventMeshMqttBootstrap(EventMeshServer eventMeshServer) {
this.eventMeshServer = eventMeshServer;
ConfigService configService = ConfigService.getInstance();
this.eventMeshMQTTConfiguration = configService.buildConfigInstance(EventMeshMQTTConfiguration.class);
ConfigurationContextUtil.putIfAbsent(MQTT, eventMeshMQTTConfiguration);
}

Check warning on line 39 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L34-L39

Added lines #L34 - L39 were not covered by tests

@Override
public void init() throws Exception {
if (eventMeshMQTTConfiguration != null) {
eventMeshMQTTServer = new EventMeshMQTTServer(eventMeshServer, eventMeshMQTTConfiguration);
eventMeshMQTTServer.init();

Check warning on line 45 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L44-L45

Added lines #L44 - L45 were not covered by tests
}

}

Check warning on line 48 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L48

Added line #L48 was not covered by tests

@Override
public void start() throws Exception {
if (eventMeshMQTTServer != null) {
eventMeshMQTTServer.start();

Check warning on line 53 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L53

Added line #L53 was not covered by tests
}
}

Check warning on line 55 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L55

Added line #L55 was not covered by tests

@Override
public void shutdown() throws Exception {
// server shutdown
if (eventMeshMQTTServer != null) {
eventMeshMQTTServer.shutdown();

Check warning on line 61 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L61

Added line #L61 was not covered by tests
}
}

Check warning on line 63 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L63

Added line #L63 was not covered by tests

public EventMeshMQTTServer getEventMeshMQTTServer() {
return eventMeshMQTTServer;

Check warning on line 66 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L66

Added line #L66 was not covered by tests
}

public EventMeshServer getEventMeshServer() {
return eventMeshServer;

Check warning on line 70 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L70

Added line #L70 was not covered by tests
}

public EventMeshMQTTConfiguration getEventMeshMQTTConfiguration() {
return eventMeshMQTTConfiguration;

Check warning on line 74 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMqttBootstrap.java#L74

Added line #L74 was not covered by tests
}

}
Loading
Loading