Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Websocket Both Client and Broker. #206

Open
Dustone-JavaWeb opened this issue Aug 17, 2021 · 3 comments
Open

Support Websocket Both Client and Broker. #206

Dustone-JavaWeb opened this issue Aug 17, 2021 · 3 comments

Comments

@Dustone-JavaWeb
Copy link

Describe the feature

The broker support the websocket in recent versions, I suggest to make Client support too.

Use cases

In most cases, The service will be deploy on an k8s instance or behind nginx with vitural host, To make a tcp port bind for a container is not allowed in most companies, At past we use an tcp proxy which handshake with websocket, It feels not good.

Contribution

@Dustone-JavaWeb
Copy link
Author

@vietj

I had implemented it by adding WebSocket Client handshake before the pipeline.
Here is the code, Forgive me that I don't know how to create a branch.

class file : io.vertx.mqtt.impl.MqttClientImpl

key code and total file

    private URI websocketURI;
if(options.isWebsocket()){
                        try {
                            websocketURI = new URI("ws"+(options.isSsl()?"s":"")+"://"+host+":"+port+"/mqtt");
                        } catch (URISyntaxException e) {
                            soi.close();
                            connectPromise.fail(e.getMessage());
                            disconnectPromise.complete();
                            return;
                        }
                    }
                    initChannel(soi).onSuccess(vvoid -> {
        if (this.options.isWebsocket()) {
            // The websocket mark
            HttpHeaders httpHeaders = new DefaultHttpHeaders();
            WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, "mqtt, mqttv3.1, mqttv3.1.1", true, httpHeaders);


            pipeline.addBefore("mqttEncoder", "httpCClientCodec", new HttpClientCodec());
            pipeline.addAfter("httpCClientCodec", "aggregator", new HttpObjectAggregator(65536));


            pipeline.addAfter("aggregator", "webSocketHandler", new WebSocketClientProtocolHandler(handshaker));
            pipeline.addAfter("webSocketHandler", "ws-success-checker", new WebScoketSucccessHandler(handshakePromise));

            pipeline.addAfter("webSocketHandler", "bytebuf2wsEncoder", new MqttServerImpl.WebSocketFrameToByteBufDecoder());
            pipeline.addAfter("bytebuf2wsEncoder", "ws2bytebufDecoder", new MqttServerImpl.ByteBufToWebSocketFrameEncoder());

            System.out.println(pipeline.toMap().keySet());
        } else {
            handshakePromise.complete();
        }
        return handshakePromise.future();
    }
    private class WebScoketSucccessHandler extends SimpleChannelInboundHandler<io.vertx.core.http.WebSocketFrame> {
        public static final String WS_CONNECT_SUCCESS_EVENT = "WS_CONNECT_SUCCESS_EVENT";
        PromiseInternal<Void> handshakePromise;

        public WebScoketSucccessHandler(PromiseInternal<Void> handshakePromise) {
            this.handshakePromise = handshakePromise;
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, io.vertx.core.http.WebSocketFrame msg) throws Exception {
            super.channelRead(ctx, msg);
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                ctx.pipeline().fireUserEventTriggered(WS_CONNECT_SUCCESS_EVENT);
                handshakePromise.complete();
                ctx.pipeline().remove(this);
            } else if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) {
                handshakePromise.fail(new VertxException("Websocket handshake timeout !"));
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
/*
 * Copyright 2016 Red Hat Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.vertx.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.netty.handler.codec.mqtt.MqttQoS.*;
import static io.vertx.mqtt.MqttServerOptions.MQTT_SUBPROTOCOL_CSV_LIST;

/**
 * MQTT client implementation
 */
public class MqttClientImpl implements MqttClient {

    private enum Status {CLOSED, CONNECTING, CONNECTED, CLOSING}

    // patterns for topics validation
    private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$");
    private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$");
    private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class);

    private static final int MAX_MESSAGE_ID = 65535;
    private static final int MAX_TOPIC_LEN = 65535;
    private static final int MIN_TOPIC_LEN = 1;
    private static final String PROTOCOL_NAME = "MQTT";
    private static final int PROTOCOL_VERSION = 4;
    private static final int DEFAULT_IDLE_TIMEOUT = 0;

    private final VertxInternal vertx;
    private final MqttClientOptions options;
    private NetSocketInternal connection;
    private ContextInternal ctx;

    // handler to call when a publish is complete
    private Handler<Integer> publishCompletionHandler;
    // handler to call when a publish has expired
    private Handler<Integer> publishCompletionExpirationHandler;
    // handler to call when a PUBACK is received for an unknown packetId
    private Handler<Integer> publishCompletionPhantomHandler;
    // handler to call when a unsubscribe request is completed
    private Handler<Integer> unsubscribeCompletionHandler;
    // handler to call when a publish message comes in
    private Handler<MqttPublishMessage> publishHandler;
    // handler to call when a subscribe request is completed
    private Handler<MqttSubAckMessage> subscribeCompletionHandler;
    // handler to call when a connection request is completed
    private Promise<MqttConnAckMessage> connectPromise;
    // handler to call when a connection disconnects
    private Promise<Void> disconnectPromise;
    // handler to call when a pingresp is received
    private Handler<Void> pingrespHandler;
    // handler to call when a problem at protocol level happens
    private Handler<Throwable> exceptionHandler;
    //handler to call when the remote MQTT server closes the connection
    private Handler<Void> closeHandler;

    // storage of PUBLISH QoS=1 messages which was not responded with PUBACK
    private HashMap<Integer, ExpiringPacket> qos1outbound = new HashMap<>();

    // storage of PUBLISH QoS=2 messages which was not responded with PUBREC
    // and PUBREL messages which was not responded with PUBCOMP
    private HashMap<Integer, ExpiringPacket> qos2outbound = new HashMap<>();

    // storage of PUBLISH messages which was responded with PUBREC
    private HashMap<Integer, MqttMessage> qos2inbound = new HashMap<>();

    // counter for the message identifier
    private int messageIdCounter;

    // Keep alive management
    private final long keepAliveTimeout;
    private Deque<Ping> pings = new ArrayDeque<>();

    // total number of unacknowledged packets
    private int countInflightQueue;

    private NetClient client;
    private Status status = Status.CLOSED;
    private URI websocketURI;

    /**
     * Constructor
     *
     * @param vertx   Vert.x instance
     * @param options MQTT client options
     */
    public MqttClientImpl(Vertx vertx, MqttClientOptions options) {

        // copy given options
        NetClientOptions netClientOptions = new NetClientOptions(options);
        netClientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);

        this.vertx = (VertxInternal) vertx;
        this.options = new MqttClientOptions(options);
        this.keepAliveTimeout = ((options.getKeepAliveInterval() * 1000) * 3) / 2;
    }

    int getInFlightMessagesCount() {
        synchronized (this) {
            return countInflightQueue;
        }
    }

    @Override
    public Future<MqttConnAckMessage> connect(int port, String host) {

        return this.doConnect(port, host, null);
    }

    /**
     * See {@link MqttClient#connect(int, String, Handler)} for more details
     */
    @Override
    public MqttClient connect(int port, String host, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {

        Future<MqttConnAckMessage> fut = connect(port, host);
        if (connectHandler != null) {
            fut.onComplete(connectHandler);
        }
        return this;
    }

    @Override
    public Future<MqttConnAckMessage> connect(int port, String host, String serverName) {

        return this.doConnect(port, host, serverName);
    }

    /**
     * See {@link MqttClient#connect(int, String, String, Handler)} for more details
     */
    @Override
    public MqttClient connect(int port, String host, String serverName, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {

        Future<MqttConnAckMessage> fut = this.doConnect(port, host, serverName);
        if (connectHandler != null) {
            fut.onComplete(connectHandler);
        }
        return this;
    }

    private Future<MqttConnAckMessage> doConnect(int port, String host, String serverName) {

        ContextInternal ctx = vertx.getOrCreateContext();
        NetClient client = vertx.createNetClient(options, new CloseFuture());
        PromiseInternal<MqttConnAckMessage> connectPromise = ctx.promise();
        PromiseInternal<Void> disconnectPromise = ctx.promise();

        synchronized (this) {
            if (this.status != Status.CLOSED) {
                return ctx.failedFuture(new IllegalStateException("Client " + this.status.name().toLowerCase()));
            }
            this.status = Status.CONNECTING;
            this.ctx = ctx;
            this.connectPromise = connectPromise;
            this.disconnectPromise = disconnectPromise;
            this.client = client;
        }

        ctx.runOnContext(v -> {
            log.debug(String.format("Trying to connect with %s:%d", host, port));

            client.connect(port, host, serverName, done -> {

                // the TCP connection fails
                if (done.failed()) {
                    log.error(String.format("Can't connect to %s:%d", host, port), done.cause());
                    synchronized (this) {
                        this.status = Status.CLOSED;
                        this.connectPromise = null;
                        this.disconnectPromise = null;
                        this.ctx = null;
                        this.client = null;
                    }
                    client.close();
                    connectPromise.fail(done.cause());
                    disconnectPromise.complete();
                } else {
                    log.info(String.format("Connection with %s:%d established successfully", host, port));

                    boolean closing;
                    synchronized (MqttClientImpl.this) {
                        if (closing = (status == Status.CLOSING)) {
                            this.status = Status.CLOSED;
                            this.client = null;
                            this.connectPromise = null;
                            this.disconnectPromise = null;
                        }
                    }

                    NetSocketInternal soi = (NetSocketInternal) done.result();

                    if (closing) {
                        soi.close();
                        connectPromise.fail("Disconnected");
                        disconnectPromise.complete();
                        return;
                    }

                    if (options.isAutoGeneratedClientId() && (options.getClientId() == null || options.getClientId().isEmpty())) {
                        options.setClientId(generateRandomClientId());
                    }
                    if(options.isWebsocket()){
                        try {
                            websocketURI = new URI("ws"+(options.isSsl()?"s":"")+"://"+host+":"+port+"/mqtt");
                        } catch (URISyntaxException e) {
                            soi.close();
                            connectPromise.fail(e.getMessage());
                            disconnectPromise.complete();
                            return;
                        }
                    }
                    initChannel(soi).onSuccess(vvoid -> {
                        synchronized (MqttClientImpl.this) {
                            this.connection = soi;
                        }

                        soi.messageHandler(msg -> this.handleMessage(soi.channelHandlerContext(), msg));
                        soi.closeHandler(v2 -> {
                            client.close();
                            synchronized (MqttClientImpl.this) {
                                this.connection = null;
                                this.status = Status.CLOSED;
                                this.connectPromise = null;
                                this.disconnectPromise = null;
                            }
                            connectPromise.fail("Closed");
                            disconnectPromise.complete();
                        });

                        // an exception at connection level
                        soi.exceptionHandler(this::handleException);

                        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
                                false,
                                AT_MOST_ONCE,
                                false,
                                0);

                        MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
                                PROTOCOL_NAME,
                                PROTOCOL_VERSION,
                                options.hasUsername(),
                                options.hasPassword(),
                                options.isWillRetain(),
                                options.getWillQoS(),
                                options.isWillFlag(),
                                options.isCleanSession(),
                                options.getKeepAliveInterval()
                        );

                        MqttConnectPayload payload = new MqttConnectPayload(
                                options.getClientId() == null ? "" : options.getClientId(),
                                options.getWillTopic(),
                                options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
                                options.hasUsername() ? options.getUsername() : null,
                                options.hasPassword() ? options.getPassword().getBytes() : null
                        );

                        io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);

                        this.write(connect);
                    }).onFailure(cause -> {
                        soi.close();
                        connectPromise.fail(cause.getMessage());
                        disconnectPromise.complete();
                    });
                }

            });
        });

        return connectPromise.future();
    }

    /**
     * See {@link MqttClient#disconnect()} for more details
     */
    @Override
    public Future<Void> disconnect() {

        NetSocketInternal connection;
        Status status;
        Future<Void> fut;
        synchronized (this) {
            status = this.status;
            switch (this.status) {
                case CLOSED:
                    return vertx.getOrCreateContext().succeededFuture();
                case CONNECTED:
                    this.status = Status.CLOSING;
                    connection = this.connection;
                    break;
                case CONNECTING:
                    this.status = Status.CLOSING;
                    connection = this.connection;
                    break;
                case CLOSING:
                    connection = null;
                    break;
                default:
                    throw new AssertionError();
            }
            fut = this.disconnectPromise.future();
        }

        if (connection != null) {
            if (status == Status.CONNECTED) {
                MqttFixedHeader fixedHeader = new MqttFixedHeader(
                        MqttMessageType.DISCONNECT,
                        false,
                        AT_MOST_ONCE,
                        false,
                        0
                );
                io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null);
                connection.writeMessage(disconnect);
            }
            connection.close();
        }

        return fut;
    }

    /**
     * See {@link MqttClient#disconnect(Handler)} for more details
     */
    @Override
    public MqttClient disconnect(Handler<AsyncResult<Void>> disconnectHandler) {

        Future<Void> fut = disconnect();
        if (disconnectHandler != null) {
            fut.onComplete(disconnectHandler);
        }
        return this;
    }

    /**
     * See {@link MqttClient#publish(String, Buffer, MqttQoS, boolean, boolean)} for more details
     */
    @Override
    public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {

        if (MqttQoS.FAILURE == qosLevel) {
            throw new IllegalArgumentException("QoS level must be one of AT_MOST_ONCE, AT_LEAST_ONCE or EXACTLY_ONCE");
        }

        io.netty.handler.codec.mqtt.MqttMessage publish;
        MqttPublishVariableHeader variableHeader;
        synchronized (this) {
            if (countInflightQueue >= options.getMaxInflightQueue()) {
                String msg = String.format("Attempt to exceed the limit of %d inflight messages", options.getMaxInflightQueue());
                log.error(msg);
                MqttException exception = new MqttException(MqttException.MQTT_INFLIGHT_QUEUE_FULL, msg);
                return ctx.failedFuture(exception);
            }

            if (!isValidTopicName(topic)) {
                String msg = String.format("Invalid Topic Name - %s. It mustn't contains wildcards: # and +. Also it can't contains U+0000(NULL) chars", topic);
                log.error(msg);
                MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_NAME, msg);
                return ctx.failedFuture(exception);
            }

            MqttFixedHeader fixedHeader = new MqttFixedHeader(
                    MqttMessageType.PUBLISH,
                    isDup,
                    qosLevel,
                    isRetain,
                    0
            );
            ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes());
            variableHeader = new MqttPublishVariableHeader(topic, nextMessageId());
            publish = MqttMessageFactory.newMessage(fixedHeader, variableHeader, buf);
            switch (qosLevel) {
                case AT_LEAST_ONCE:
                    qos1outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubackTimeout, variableHeader.packetId()));
                    countInflightQueue++;
                    break;
                case EXACTLY_ONCE:
                    qos2outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubrecTimeout, variableHeader.packetId()));
                    countInflightQueue++;
                    break;
                default:
                    // nothing to do for AT_MOST_ONCE
                    break;
            }
        }

        return this.write(publish).map(variableHeader.packetId());
    }

    /**
     * See {@link MqttClient#publish(String, Buffer, MqttQoS, boolean, boolean, Handler)} for more details
     */
    @Override
    public MqttClient publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, Handler<AsyncResult<Integer>> publishSentHandler) {

        Future<Integer> fut = publish(topic, payload, qosLevel, isDup, isRetain);
        if (publishSentHandler != null) {
            fut.onComplete(publishSentHandler);
        }
        return this;
    }

    /**
     * See {@link MqttClient#publishCompletionHandler(Handler)} for more details
     */
    @Override
    public MqttClient publishCompletionHandler(Handler<Integer> publishCompletionHandler) {

        this.publishCompletionHandler = publishCompletionHandler;
        return this;
    }

    private synchronized Handler<Integer> publishCompletionHandler() {
        return this.publishCompletionHandler;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public MqttClient publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler) {

        this.publishCompletionExpirationHandler = publishCompletionExpirationHandler;
        return this;
    }

    private synchronized Handler<Integer> publishCompletionExpirationHandler() {
        return this.publishCompletionExpirationHandler;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public MqttClient publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionPhantomHandler) {

        this.publishCompletionPhantomHandler = publishCompletionPhantomHandler;
        return this;
    }

    private synchronized Handler<Integer> publishCompletionUnknownPacketIdHandler() {
        return this.publishCompletionPhantomHandler;
    }

    /**
     * See {@link MqttClient#publishHandler(Handler)} for more details
     */
    @Override
    public MqttClient publishHandler(Handler<MqttPublishMessage> publishHandler) {

        this.publishHandler = publishHandler;
        return this;
    }

    private synchronized Handler<MqttPublishMessage> publishHandler() {
        return this.publishHandler;
    }

    /**
     * See {@link MqttClient#subscribeCompletionHandler(Handler)} for more details
     */
    @Override
    public MqttClient subscribeCompletionHandler(Handler<MqttSubAckMessage> subscribeCompletionHandler) {

        this.subscribeCompletionHandler = subscribeCompletionHandler;
        return this;
    }

    private synchronized Handler<MqttSubAckMessage> subscribeCompletionHandler() {
        return this.subscribeCompletionHandler;
    }

    /**
     * See {@link MqttClient#subscribe(String, int)} for more details
     */
    @Override
    public Future<Integer> subscribe(String topic, int qos) {
        return subscribe(Collections.singletonMap(topic, qos));
    }

    /**
     * See {@link MqttClient#subscribe(String, int, Handler)} for more details
     */
    @Override
    public MqttClient subscribe(String topic, int qos, Handler<AsyncResult<Integer>> subscribeSentHandler) {
        return subscribe(Collections.singletonMap(topic, qos), subscribeSentHandler);
    }

    /**
     * See {@link MqttClient#subscribe(Map)} for more details
     */
    @Override
    public Future<Integer> subscribe(Map<String, Integer> topics) {

        Map<String, Integer> invalidTopics = topics.entrySet()
                .stream()
                .filter(e -> !isValidTopicFilter(e.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        if (invalidTopics.size() > 0) {
            String msg = String.format("Invalid Topic Filters: %s", invalidTopics);
            log.error(msg);
            MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_FILTER, msg);
            return ctx.failedFuture(exception);
        }

        MqttFixedHeader fixedHeader = new MqttFixedHeader(
                MqttMessageType.SUBSCRIBE,
                false,
                AT_LEAST_ONCE,
                false,
                0);

        MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);
        List<MqttTopicSubscription> subscriptions = topics.entrySet()
                .stream()
                .map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
                .collect(Collectors.toList());

        MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);

        io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);

        return this.write(subscribe).map(variableHeader.messageId());
    }

    /**
     * See {@link MqttClient#subscribe(Map, Handler)} for more details
     */
    @Override
    public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Integer>> subscribeSentHandler) {

        Future<Integer> fut = subscribe(topics);
        if (subscribeSentHandler != null) {
            fut.onComplete(subscribeSentHandler);
        }
        return this;
    }

    /**
     * See {@link MqttClient#unsubscribeCompletionHandler(Handler)} for more details
     */
    @Override
    public MqttClient unsubscribeCompletionHandler(Handler<Integer> unsubscribeCompletionHandler) {

        this.unsubscribeCompletionHandler = unsubscribeCompletionHandler;
        return this;
    }

    private synchronized Handler<Integer> unsubscribeCompletionHandler() {

        return this.unsubscribeCompletionHandler;
    }

    /**
     * See {@link MqttClient#unsubscribe(String, Handler)} )} for more details
     */
    @Override
    public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {

        Future<Integer> fut = unsubscribe(topic);
        if (unsubscribeSentHandler != null) {
            fut.onComplete(unsubscribeSentHandler);
        }
        return this;
    }

    /**
     * See {@link MqttClient#unsubscribe(String)} )} for more details
     */
    @Override
    public Future<Integer> unsubscribe(String topic) {

        MqttFixedHeader fixedHeader = new MqttFixedHeader(
                MqttMessageType.UNSUBSCRIBE,
                false,
                AT_LEAST_ONCE,
                false,
                0);

        MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);

        MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));

        io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);

        this.write(unsubscribe);

        return ctx.succeededFuture(variableHeader.messageId());
    }

    /**
     * See {@link MqttClient#pingResponseHandler(Handler)} for more details
     */
    @Override
    public synchronized MqttClient pingResponseHandler(Handler<Void> pingResponseHandler) {
        this.pingrespHandler = pingResponseHandler;
        return this;
    }

    private synchronized Handler<Void> pingResponseHandler() {
        return this.pingrespHandler;
    }

    /**
     * See {@link MqttClient#exceptionHandler(Handler)} for more details
     */
    @Override
    public synchronized MqttClient exceptionHandler(Handler<Throwable> handler) {
        exceptionHandler = handler;
        return this;
    }

    private synchronized Handler<Throwable> exceptionHandler() {
        return this.exceptionHandler;
    }

    /**
     * See {@link MqttClient#closeHandler(Handler)} for more details
     */
    @Override
    public synchronized MqttClient closeHandler(Handler<Void> closeHandler) {
        this.closeHandler = closeHandler;
        return this;
    }

    private synchronized Handler<Void> closeHandler() {
        return this.closeHandler;
    }

    private class Ping {
        final long id;

        private Ping(long id) {
            this.id = id;
        }

        void ack() {
            vertx.cancelTimer(id);
        }

        void cancel() {
            vertx.cancelTimer(id);
        }
    }

    /**
     * See {@link MqttClient#ping()} for more details
     */
    @Override
    public MqttClient ping() {
        ctx.execute(() -> {
            MqttFixedHeader fixedHeader =
                    new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);

            io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null);

            long id = vertx.setTimer(keepAliveTimeout, _id -> {
                disconnect();
            });

            pings.add(new Ping(id));

            this.write(pingreq);
        });
        return this;
    }

    @Override
    public synchronized String clientId() {
        return this.options.getClientId();
    }

    @Override
    public synchronized boolean isConnected() {
        return this.status == Status.CONNECTED;
    }

    /**
     * Sends PUBACK packet to server
     *
     * @param publishMessageId identifier of the PUBLISH message to acknowledge
     */
    private void publishAcknowledge(int publishMessageId) {

        MqttFixedHeader fixedHeader =
                new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);

        MqttMessageIdVariableHeader variableHeader =
                MqttMessageIdVariableHeader.from(publishMessageId);

        io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);

        this.write(puback);
    }

    /**
     * Sends PUBREC packet to server
     *
     * @param publishMessage a PUBLISH message to acknowledge
     */
    private void publishReceived(MqttPublishMessage publishMessage) {

        MqttFixedHeader fixedHeader =
                new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);

        MqttMessageIdVariableHeader variableHeader =
                MqttMessageIdVariableHeader.from(publishMessage.messageId());

        io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);

        synchronized (this) {
            qos2inbound.put(publishMessage.messageId(), publishMessage);
        }
        this.write(pubrec);
    }

    /**
     * Sends PUBCOMP packet to server
     *
     * @param publishMessageId identifier of the PUBLISH message to acknowledge
     */
    private void publishComplete(int publishMessageId) {

        MqttFixedHeader fixedHeader =
                new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);

        MqttMessageIdVariableHeader variableHeader =
                MqttMessageIdVariableHeader.from(publishMessageId);

        io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);

        this.write(pubcomp);
    }

    /**
     * Sends the PUBREL message to server
     *
     * @param publishMessageId identifier of the PUBLISH message to acknowledge
     */
    private void publishRelease(int publishMessageId) {

        MqttFixedHeader fixedHeader =
                new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);

        MqttMessageIdVariableHeader variableHeader =
                MqttMessageIdVariableHeader.from(publishMessageId);

        io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);

        synchronized (this) {
            qos2outbound.put(publishMessageId, new ExpiringPacket(this::handlePubcompTimeout, publishMessageId));
        }
        this.write(pubrel);
    }

    private Future<Void> initChannel(NetSocketInternal sock) {
        PromiseInternal<Void> handshakePromise = ctx.promise();
        ChannelPipeline pipeline = sock.channelHandlerContext().pipeline();

        // add into pipeline netty's (en/de)coder
        pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);

        if (this.options.getMaxMessageSize() > 0) {
            pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
        } else {
            // max message size not set, so the default from Netty MQTT codec is used
            pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
        }

        if (this.options.isAutoKeepAlive() &&
                this.options.getKeepAliveInterval() != 0) {

            int keepAliveInterval = this.options.getKeepAliveInterval();

            // handler for sending PINGREQ (keepAlive) if reader- or writer-channel become idle
            pipeline.addBefore("handler", "idle",
                    new IdleStateHandler(0, keepAliveInterval, 0) {
                        @Override
                        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
                            if (evt.state() == IdleState.WRITER_IDLE) {
                                // verify that server is still connected (e.g. when using QoS-0)
                                ping();
                            }
                        }
                    });
        }
        if (this.options.isWebsocket()) {
            // The websocket mark
            HttpHeaders httpHeaders = new DefaultHttpHeaders();
            WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, "mqtt, mqttv3.1, mqttv3.1.1", true, httpHeaders);


            pipeline.addBefore("mqttEncoder", "httpCClientCodec", new HttpClientCodec());
            pipeline.addAfter("httpCClientCodec", "aggregator", new HttpObjectAggregator(65536));


            pipeline.addAfter("aggregator", "webSocketHandler", new WebSocketClientProtocolHandler(handshaker));
            pipeline.addAfter("webSocketHandler", "ws-success-checker", new WebScoketSucccessHandler(handshakePromise));

            pipeline.addAfter("webSocketHandler", "bytebuf2wsEncoder", new MqttServerImpl.WebSocketFrameToByteBufDecoder());
            pipeline.addAfter("bytebuf2wsEncoder", "ws2bytebufDecoder", new MqttServerImpl.ByteBufToWebSocketFrameEncoder());

            System.out.println(pipeline.toMap().keySet());
        } else {
            handshakePromise.complete();
        }
        return handshakePromise.future();
    }

    /**
     * Update and return the next message identifier
     *
     * @return message identifier
     */
    private synchronized int nextMessageId() {

        // if 0 or MAX_MESSAGE_ID, it becomes 1 (first valid messageId)
        this.messageIdCounter = ((this.messageIdCounter % MAX_MESSAGE_ID) != 0) ? this.messageIdCounter + 1 : 1;
        return this.messageIdCounter;
    }

    private synchronized NetSocketInternal connection() {
        return connection;
    }

    private Future<Void> write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) {
        log.debug(String.format("Sending packet %s", mqttMessage));
        return this.connection().writeMessage(mqttMessage);
    }

    /**
     * Used for calling the close handler when the remote MQTT server closes the connection
     */
    private void handleClosed() {
        Promise<MqttConnAckMessage> connectPromise;
        Promise<Void> disconnectPromise;
        NetClient client;
        Deque<Ping> pings;
        synchronized (this) {
            client = this.client;
            connectPromise = this.connectPromise;
            disconnectPromise = this.disconnectPromise;
            pings = this.pings;
            this.disconnectPromise = null;
            this.status = Status.CLOSED;
            this.connection = null;
            this.ctx = null;
            this.client = null;
            this.pings = new ArrayDeque<>();
        }

        // Cleanup pending pings
        pings.forEach(ping -> {
            ping.cancel();
        });

        Handler<Void> handler = closeHandler();
        if (handler != null) {
            handler.handle(null);
        }
        disconnectPromise.complete();
        if (connectPromise != null) {
            connectPromise.fail("Closed");
        }
        client.close();
    }

    /**
     * Handle the MQTT message received from the remote MQTT server
     *
     * @param msg Incoming Packet
     */
    private void handleMessage(ChannelHandlerContext chctx, Object msg) {

        // handling directly native Netty MQTT messages, some of them are translated
        // to the related Vert.x ones for polyglotization
        if (msg instanceof io.netty.handler.codec.mqtt.MqttMessage) {

            io.netty.handler.codec.mqtt.MqttMessage mqttMessage = (io.netty.handler.codec.mqtt.MqttMessage) msg;

            DecoderResult result = mqttMessage.decoderResult();
            if (result.isFailure()) {
                chctx.pipeline().fireExceptionCaught(result.cause());
                return;
            }
            if (!result.isFinished()) {
                chctx.pipeline().fireExceptionCaught(new Exception("Unfinished message"));
                return;
            }

            log.debug(String.format("Incoming packet %s", msg));
            switch (mqttMessage.fixedHeader().messageType()) {

                case CONNACK:

                    io.netty.handler.codec.mqtt.MqttConnAckMessage connack = (io.netty.handler.codec.mqtt.MqttConnAckMessage) mqttMessage;

                    MqttConnAckMessage mqttConnAckMessage = MqttConnAckMessage.create(
                            connack.variableHeader().connectReturnCode(),
                            connack.variableHeader().isSessionPresent());
                    handleConnack(mqttConnAckMessage);
                    break;

                case PUBLISH:

                    io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
                    ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload());

                    MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
                            publish.variableHeader().packetId(),
                            publish.fixedHeader().qosLevel(),
                            publish.fixedHeader().isDup(),
                            publish.fixedHeader().isRetain(),
                            publish.variableHeader().topicName(),
                            newBuf);
                    handlePublish(mqttPublishMessage);
                    break;

                case PUBACK:
                    handlePuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
                    break;

                case PUBREC:
                    handlePubrec(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
                    break;

                case PUBREL:
                    handlePubrel(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
                    break;

                case PUBCOMP:
                    handlePubcomp(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
                    break;

                case SUBACK:

                    io.netty.handler.codec.mqtt.MqttSubAckMessage unsuback = (io.netty.handler.codec.mqtt.MqttSubAckMessage) mqttMessage;

                    MqttSubAckMessage mqttSubAckMessage = MqttSubAckMessage.create(
                            unsuback.variableHeader().messageId(),
                            unsuback.payload().grantedQoSLevels());
                    handleSuback(mqttSubAckMessage);
                    break;

                case UNSUBACK:
                    handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
                    break;

                case PINGRESP:
                    handlePingresp();
                    break;

                default:

                    chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type " + msg.getClass().getName()));
                    break;
            }

        } else {

            chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type"));
        }
    }

    /**
     * Used for calling the pingresp handler when the server replies to the ping
     */
    private void handlePingresp() {

        Ping ping = pings.poll();
        if (ping != null) {
            ping.ack();
        }

        Handler<Void> handler = pingResponseHandler();
        if (handler != null) {
            handler.handle(null);
        }
    }

    /**
     * Used for calling the unsuback handler when the server acks an unsubscribe
     *
     * @param unsubackMessageId identifier of the subscribe acknowledged by the server
     */
    private void handleUnsuback(int unsubackMessageId) {

        Handler<Integer> handler = unsubscribeCompletionHandler();
        if (handler != null) {
            handler.handle(unsubackMessageId);
        }
    }

    /**
     * Used for calling the puback handler when the server acknowledge a QoS 1 message with puback
     *
     * @param pubackMessageId identifier of the message acknowledged by the server
     */
    private void handlePuback(int pubackMessageId) {

        synchronized (this) {

            ExpiringPacket removedPacket = qos1outbound.remove(pubackMessageId);

            if (removedPacket == null) {
                log.debug("Received PUBACK packet without having related PUBLISH packet in storage");
                // PUBACK has been received after timer has already fired
                Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
                if (handler != null) {
                    handler.handle(pubackMessageId);
                }
                return;
            }
            removedPacket.cancelTimer();
            countInflightQueue--;
        }
        Handler<Integer> handler = publishCompletionHandler();
        if (handler != null) {
            handler.handle(pubackMessageId);
        }
    }

    private void handlePubackTimeout(int packetId) {
        ExpiringPacket expiredMessage;
        synchronized (this) {
            expiredMessage = qos1outbound.remove(packetId);

            if (expiredMessage == null) {
                // the message has already been ACKed
                log.debug("PUBLISH expiration timer fired but QoS 1 message has already been PUBACKed by server");
                return;
            }
        }
        countInflightQueue--;
        Handler<Integer> handler = publishCompletionExpirationHandler();
        if (handler != null) {
            handler.handle(expiredMessage.packetId);
        }
    }

    /**
     * Used for calling the pubcomp handler when the server client acknowledge a QoS 2 message with pubcomp
     *
     * @param pubcompMessageId identifier of the message acknowledged by the server
     */
    private void handlePubcomp(int pubcompMessageId) {

        synchronized (this) {
            ExpiringPacket removedPacket = qos2outbound.remove(pubcompMessageId);

            if (removedPacket == null) {
                log.debug("Received PUBCOMP packet without having related PUBREL packet in storage");
                Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
                if (handler != null) {
                    handler.handle(pubcompMessageId);
                }
                return;
            }
            removedPacket.cancelTimer();
            countInflightQueue--;
        }
        Handler<Integer> handler = publishCompletionHandler();
        if (handler != null) {
            handler.handle(pubcompMessageId);
        }
    }

    private void handlePubcompTimeout(int packetId) {
        ExpiringPacket expiredMessage;
        synchronized (this) {
            expiredMessage = qos2outbound.remove(packetId);

            if (expiredMessage == null) {
                log.debug("PUBCOMP expiration timer fired but QoS 2 message has already been PUBCOMPed by server");
                return;
            }
        }
        countInflightQueue--;
        Handler<Integer> handler = publishCompletionExpirationHandler();
        if (handler != null) {
            handler.handle(expiredMessage.packetId);
        }
    }

    /**
     * Used for sending the pubrel when a pubrec is received from the server
     *
     * @param pubrecMessageId identifier of the message acknowledged by server
     */
    private void handlePubrec(int pubrecMessageId) {

        synchronized (this) {
            ExpiringPacket removedPacket = qos2outbound.remove(pubrecMessageId);

            if (removedPacket == null) {
                log.debug("Received PUBREC packet without having related PUBLISH packet in storage");
                Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
                if (handler != null) {
                    handler.handle(pubrecMessageId);
                }
                return;
            }
            removedPacket.cancelTimer();
        }
        this.publishRelease(pubrecMessageId);
    }

    private void handlePubrecTimeout(int packetId) {
        ExpiringPacket expiredMessage;
        synchronized (this) {
            expiredMessage = qos2outbound.remove(packetId);

            if (expiredMessage == null) {
                log.debug("PUBREC expiration timer fired but QoS 2 message has already been PUBRECed by server");
                return;
            }
        }
        countInflightQueue--;
        Handler<Integer> handler = publishCompletionExpirationHandler();
        if (handler != null) {
            handler.handle(expiredMessage.packetId);
        }
    }

    /**
     * Used for calling the suback handler when the server acknowledges subscribe to topics
     *
     * @param msg message with suback information
     */
    private void handleSuback(MqttSubAckMessage msg) {

        Handler<MqttSubAckMessage> handler = subscribeCompletionHandler();
        if (handler != null) {
            handler.handle(msg);
        }
    }

    /**
     * Used for calling the publish handler when the server publishes a message
     *
     * @param msg published message
     */
    private void handlePublish(MqttPublishMessage msg) {

        Handler<MqttPublishMessage> handler;
        switch (msg.qosLevel()) {

            case AT_MOST_ONCE:
                handler = this.publishHandler();
                if (handler != null) {
                    handler.handle(msg);
                }
                break;

            case AT_LEAST_ONCE:
                this.publishAcknowledge(msg.messageId());
                handler = this.publishHandler();
                if (handler != null) {
                    handler.handle(msg);
                }
                break;

            case EXACTLY_ONCE:
                this.publishReceived(msg);
                // we will handle the PUBLISH when a PUBREL comes
                break;
        }
    }

    /**
     * Used for calling the pubrel handler when the server acknowledge a QoS 2 message with pubrel
     *
     * @param pubrelMessageId identifier of the message acknowledged by the server
     */
    private void handlePubrel(int pubrelMessageId) {
        MqttMessage message;
        synchronized (this) {
            message = qos2inbound.remove(pubrelMessageId);

            if (message == null) {
                log.warn("Received PUBREL packet without having related PUBREC packet in storage");
                return;
            }
            this.publishComplete(pubrelMessageId);
        }
        Handler<MqttPublishMessage> handler = this.publishHandler();
        if (handler != null) {
            handler.handle((MqttPublishMessage) message);
        }
    }

    /**
     * Used for calling the connect handler when the server replies to the request
     *
     * @param msg connection response message
     */
    private void handleConnack(MqttConnAckMessage msg) {

        Status status = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED ? Status.CONNECTED : Status.CLOSING;


        if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
            NetSocketInternal connection;
            Promise<MqttConnAckMessage> connectPromise;
            synchronized (this) {
                connection = this.connection;
                connectPromise = this.connectPromise;
                this.connectPromise = null;
                this.status = Status.CONNECTED;
            }
            connection.closeHandler(v -> handleClosed());
            connectPromise.complete(msg);
        } else {
            Promise<MqttConnAckMessage> connectPromise;
            Promise<Void> disconnectPromise;
            NetSocketInternal connection;
            NetClient client;
            synchronized (this) {
                connectPromise = this.connectPromise;
                disconnectPromise = this.disconnectPromise;
                connection = this.connection;
                client = this.client;
                this.connectPromise = null;
                this.disconnectPromise = null;
                this.status = Status.CLOSED;
                this.connection = null;
                this.client = null;
            }
            connection.closeHandler(null);
            MqttConnectionException exception = new MqttConnectionException(msg.code());
            log.error(String.format("Connection refused by the server - code: %s", msg.code()));
            connectPromise.fail(exception);
            disconnectPromise.complete();
            client.close();
        }
    }

    /**
     * Used for calling the exception handler when an error at connection level
     *
     * @param t exception raised
     */
    private void handleException(Throwable t) {

        Handler<Throwable> handler = exceptionHandler();
        if (handler != null) {
            handler.handle(t);
        }
    }

    /**
     * @return Randomly-generated ClientId
     */
    private String generateRandomClientId() {
        return UUID.randomUUID().toString();
    }

    /**
     * Check either given Topic Name valid of not
     *
     * @param topicName given Topic Name
     * @return true - valid, otherwise - false
     */
    private boolean isValidTopicName(String topicName) {
        if (!isValidStringSizeInUTF8(topicName)) {
            return false;
        }

        Matcher matcher = validTopicNamePattern.matcher(topicName);
        return matcher.find();
    }

    /**
     * Check either given Topic Filter valid of not
     *
     * @param topicFilter given Topic Filter
     * @return true - valid, otherwise - false
     */
    private boolean isValidTopicFilter(String topicFilter) {
        if (!isValidStringSizeInUTF8(topicFilter)) {
            return false;
        }

        Matcher matcher = validTopicFilterPattern.matcher(topicFilter);
        return matcher.find();
    }

    /**
     * Check either given string has size more then 65535 bytes in UTF-8 Encoding
     *
     * @param string given string
     * @return true - size is lower or equal than 65535, otherwise - false
     */
    private boolean isValidStringSizeInUTF8(String string) {
        try {
            int length = string.getBytes("UTF-8").length;
            return length >= MIN_TOPIC_LEN && length <= MAX_TOPIC_LEN;
        } catch (UnsupportedEncodingException e) {
            log.error("UTF-8 charset is not supported", e);
        }

        return false;
    }

    /**
     * A wrapper around a packet ID for which the client will wait a limited time
     * for the server's ACK to arrive.
     */
    private class ExpiringPacket {
        private final int packetId;
        private final long timerId;

        /**
         * Creates a new expiring packet.
         *
         * @param timeoutHandler The handler to invoke once the client stops waiting for the server's ACK.
         * @param packetId       The packet ID.
         */
        ExpiringPacket(Handler<Integer> timeoutHandler, final int packetId) {
            this.packetId = packetId;
            if (options.getAckTimeout() > -1) {
                this.timerId = vertx.setTimer(options.getAckTimeout() * 1000L, tid -> timeoutHandler.handle(packetId));
            } else {
                // default MQTT client behavior,
                // don't start a timer for expiring the publish
                this.timerId = -1;
            }
        }

        /**
         * Cancels the timer created for expiring the ACK.
         * <p>
         * This method should be invoked once the server's ACK for the packet ID has arrived
         * in order to prevent the client from timing out while waiting for an ACK.
         *
         * @return {@code true} if the timer has been canceled.
         */
        boolean cancelTimer() {
            return vertx.cancelTimer(timerId);
        }
    }

    private class WebScoketSucccessHandler extends SimpleChannelInboundHandler<io.vertx.core.http.WebSocketFrame> {
        public static final String WS_CONNECT_SUCCESS_EVENT = "WS_CONNECT_SUCCESS_EVENT";
        PromiseInternal<Void> handshakePromise;

        public WebScoketSucccessHandler(PromiseInternal<Void> handshakePromise) {
            this.handshakePromise = handshakePromise;
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, io.vertx.core.http.WebSocketFrame msg) throws Exception {
            super.channelRead(ctx, msg);
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                ctx.pipeline().fireUserEventTriggered(WS_CONNECT_SUCCESS_EVENT);
                handshakePromise.complete();
                ctx.pipeline().remove(this);
            } else if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) {
                handshakePromise.fail(new VertxException("Websocket handshake timeout !"));
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

@vietj
Copy link
Contributor

vietj commented Aug 25, 2021

can you provide a pull request ?

@alamothe
Copy link

alamothe commented Apr 6, 2022

Currently, the broker can either work with native clients, or websocket clients, but not both. Can you please confirm?

In that case, I think it makes more sense to have the broker support both native and websocket clients at the same time. Most MQTT brokers support this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants