-
Notifications
You must be signed in to change notification settings - Fork 642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature #4793] Support MQTT protocol #4794
base: master
Are you sure you want to change the base?
Conversation
|
||
|
||
protected final transient Map<MqttMessageType, MqttProcessor> processorTable = | ||
new ConcurrentHashMap<>(64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class is not serializable, is the transient
keyword redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted,please review again.
} catch (Exception ex) { | ||
log.error("EventMeshMQTTServer RemotingServer shutdown Err!", ex); | ||
} | ||
System.exit(-1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it appropriate to exit the process when the MQTT server fails to start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a reference to other EM protocols.If you want to modify it after the discussion, I will ignore this startup failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently tcp protocol, http protocol server must be started successfully, grpc protocol did not so. Now added MQTT protocol server must be started successfullyt, please community to give advice, I can not decide.
目前tcp协议、http协议的server必须成功启动,grpc协议没有如此,现在新增MQTT协议的server是否必须启动成功,请社区给出意见,我权衡不好。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我理解这里的必须启动成功实际上是受是否开启MQTT协议的配置控制的吧?这里的退出我认为没有问题,如果MQTT协议加载有问题退出了,那其实可以在配置中移除MQTT协议,保证TCP、HTTP等协议正常启动服务就好。
this.cleanThread = new Thread(() -> { | ||
while (true && !Thread.currentThread().isInterrupted()) { | ||
try { | ||
ClientInfo clientInfo = delayQueue.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the role of delayQueue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The role of delayQueue is to clean up the time out connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it up to the client to decide and set the length of time that each client's connection survives when it sends a message?
每个客户端的连接的存活时长,是靠客户端发送消息时自己决定并设置好吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
客户端可以携带keep alive time参数,用做会话存活时间
|
||
@Override | ||
public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted,please review again.
protected final Map<MqttMessageType, MqttProcessor> processorTable = | ||
new ConcurrentHashMap<>(64); | ||
|
||
private final transient AtomicBoolean started = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same problem.
I'm not familiar with MQTT, so the rest of review work needs the community to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same problem.
Waiting for your response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the problem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted,please review again.
@karsonto 我没有理解对于publish processor的实现,没有看到将mqtt协议的消息进行转换后发送到eventmesh storage。 |
@xwm1992 |
This PR is the framework design of the MQTT protocol , and will continue to optimize persist messages in another PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is eventmesh-protocol-plugin
implementation the only work left for this protocol in Runtime?
I think there is unnecessry to implement this module for this time. If it needs to be implemented in the future, I will submit a PR to supplement it. |
It has been 60 days since the last activity on this pull request. I am reaching out here to gently remind you that the Apache EventMesh community values every pull request, and please feel free to get in touch with the reviewers at any time. They are available to assist you in advancing the progress of your pull request and offering the latest feedback. If you encounter any challenges during development, seeking support within the community is encouraged. We sincerely appreciate your contributions to Apache EventMesh. |
Fixes #4793
Motivation
Explain the content here.
Explain why you want to make the changes and what problem you're trying to solve.
Modifications
Describe the modifications you've done.
Documentation