Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriber cant receive topic message #192

Open
chenziyi2018 opened this issue Jan 16, 2021 · 7 comments
Open

subscriber cant receive topic message #192

chenziyi2018 opened this issue Jan 16, 2021 · 7 comments
Labels

Comments

@chenziyi2018
Copy link

chenziyi2018 commented Jan 16, 2021

Questions

  • subscriber cant receive topic message

Version

4.0.0

Context

  • client code

`
@slf4j
public class MqttClientVerticle extends AbstractVerticle {

private MqttClient mqttClient;

\@Override
public void start() throws Exception {
    log.info("good we are in now");
    MqttClientOptions options = new MqttClientOptions()
            .setKeepAliveInterval(5)
            .setClientId("client_1111")
            .setAutoKeepAlive(true);
    mqttClient = MqttClient.create(vertx, options);
    mqttClient.publishHandler(handle -> {
        log.info("receive topic:[{}],payload:[{}]", handle.topicName(), handle.payload());
    });

    mqttClient.connect(1883, "localhost", handler -> {
        mqttClient.subscribe("/topic", 0);
    });
}
@Override
public void stop() throws Exception {
    if (mqttClient != null) {
        log.info("disconnect");
        mqttClient.disconnect();
    }
}

}`

  • server code

`
@slf4j
public class MqtttServer extends AbstractVerticle {
private static final String TOPIC_LIGHTS = "lights";

@Override
public void start() throws Exception {
    MqttServerOptions options = new MqttServerOptions().setPort(1883).setHost("0.0.0.0");
    MqttServer mqttServer = MqttServer.create(vertx, options);
    mqttServer.endpointHandler(endpoint -> {
        System.out.println("connected client " + endpoint.clientIdentifier());
        endpoint.accept(false);
        endpoint.pingHandler(h -> {
            log.info("receive client [{}] ping message", endpoint.clientIdentifier());
        });
        handleSubscription(endpoint);
        handleUnsubscription(endpoint);
        publishHandler(endpoint);
        handleClientDisconnect(endpoint);
    })
            .listen(ar -> {
                if (ar.succeeded()) {
                    log.info("MQTT server is listening on port " + ar.result().actualPort());
                } else {
                    log.info("Error on starting the server");
                    ar.cause().printStackTrace();
                }
            });
}


private static void handleSubscription(MqttEndpoint endpoint) {
    endpoint.subscribeHandler(subscribe -> {

        List<MqttQoS> grantedQosLevels = new ArrayList<>();
        for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
            grantedQosLevels.add(s.qualityOfService());
        }
        String topicNames = subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.joining(","));
        log.info("{} subscribe :{}", endpoint.clientIdentifier(), topicNames);
        endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
        for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
            endpoint.publish(topicSubscription.topicName(), Buffer.buffer("hello"),MqttQoS.EXACTLY_ONCE,false,false);
        }
        endpoint.publishAcknowledgeHandler(messageId -> {

            System.out.println("Received ack for message = " + messageId);

        }).publishReceivedHandler(endpoint::publishRelease).publishCompletionHandler(messageId -> {

            System.out.println("Received ack for message = " + messageId);
        });
    });
}

private static void handleUnsubscription(MqttEndpoint endpoint) {
    endpoint.unsubscribeHandler(unsubscribe -> {
        String topics = String.join(",", unsubscribe.topics());
        log.info("{} unsubscribe :{}", endpoint.clientIdentifier(), topics);
        endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
    });
}

private static void publishHandler(MqttEndpoint endpoint) {
    endpoint.publishHandler(message -> {
        log.info("client [{}] publish message :{}, to topic:[{}]", endpoint.clientIdentifier(), message.payload(), message.topicName());
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            endpoint.publishAcknowledge(message.messageId());
        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
            endpoint.publishRelease(message.messageId());
        }
    }).publishReleaseHandler(endpoint::publishComplete);
}

private static void handleQoS(MqttPublishMessage message, MqttEndpoint endpoint) {
    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
        String topicName = message.topicName();

        if (TOPIC_LIGHTS.equals(topicName)) {
            LightsController.handler(message);
        }
        endpoint.publishAcknowledge(message.messageId());

    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
        endpoint.publishRelease(message.messageId());
    }
}

private static void handleClientDisconnect(MqttEndpoint endpoint) {
    endpoint.disconnectHandler(h -> {
        log.info("{} disconnect", endpoint.clientIdentifier());
    });
}

}`

  • controller code

`
@slf4j
public class VertxServer extends AbstractVerticle {

@Override
public void start() throws Exception {
    HttpServer server = vertx.createHttpServer();
    Router router = Router.router(vertx);
    router.route("/static/*").handler(StaticHandler.create("templates"));
    router.get("/hello").handler(ctx -> {

        MqttClientOptions options = new MqttClientOptions().setClientId("client_sender_111");
        MqttClient mqttClient = MqttClient.create(vertx, options);
        mqttClient.connect(1883, "localhost", h -> {
            if (h.succeeded()) {
                mqttClient.publish("/topic", Buffer.buffer("hello, how are you"), MqttQoS.AT_MOST_ONCE, false, false, s -> {
                    log.info("send success? {}", s.succeeded());
                    mqttClient.disconnect();
                });
            }
        });

        // This handler will be called for every request
        HttpServerResponse response = ctx.response();
        response.putHeader("content-type", "text/plain");
        // Write to the response and end it
        response.end("send success");
    });

    server.requestHandler(router).listen(9091, h -> {
        log.info("server start:{}", h.succeeded());
    });
}

}`

description

when the client init on project start, I can receive the msg publish by the mqtt server subscribeHandler "hello", but when I use the
web router "/hello" to publish msg to the topic "/topic" the subscriber client "client_1111" cant recieve the msg ...

help me please ~ T_T ~

@chenziyi2018
Copy link
Author

sorry its not bug, I used a wrong tag

@vietj
Copy link
Contributor

vietj commented Jan 18, 2021

is it a bug or a question ?

@chenziyi2018
Copy link
Author

is it a bug or a question ?

its a question. I write a demo like this, bug it not work : subscriber cant receive the published message.

i want to use the vertx in my project. but development documents are too less, and the samples are too simple. can you give me some
suggests?

@zhengchalei
Copy link

I have the same problem,

endpoint.publishHandler { message ->
       // this is ok;
        println("publishHandler topic: ${message.topicName()}  payload: ${message.payload()}")
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
          endpoint.publishAcknowledge(message.messageId())
        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
          endpoint.publishReceived(message.messageId())
        }
      }.publishReleaseHandler { messageId -> endpoint.publishComplete(messageId) }

but other clients subscribe topic can't get the message;
do you need to manually forward the subscription to other clients

@sunqb
Copy link

sunqb commented May 19, 2021

how to get message from subscribe ? i can't find a method to get message from

@sunqb
Copy link

sunqb commented May 19, 2021

how to get message from subscribe ? i can't find a method to get message from

method name is not good,i think 。 this is not a bug,i use 'publishHandler' to get message from subscribe

@pigbayspy
Copy link
Contributor

yeah, vertx-mqtt is not a fully featured MQTT broker. You need to complete pub/sub message passage by yourself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

5 participants