diff --git a/client.cc b/client.cc index aa712d7..d9af46e 100644 --- a/client.cc +++ b/client.cc @@ -113,19 +113,51 @@ void MumbleClient::ProcessTCPSendQueue(const boost::system::error_code& error, c if (send_queue_.empty()) return; - Message& msg = send_queue_.front(); - - std::vector bufs; - bufs.push_back(boost::asio::buffer(reinterpret_cast(&msg.header_), sizeof(msg.header_))); - bufs.push_back(boost::asio::buffer(msg.msg_, msg.msg_.size())); - - async_write(*tcp_socket_, bufs, boost::bind(&MumbleClient::ProcessTCPSendQueue, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); - std::cout << "<< ASYNC Type: " << ntohs(msg.header_.type) << " Length: 6+" << msg.msg_.size() << std::endl; + SendFirstQueued(); } else { std::cerr << "Write error: " << error.message() << std::endl; } } +void MumbleClient::SendFirstQueued() { + Message& msg = send_queue_.front(); + + std::vector bufs; + bufs.push_back(boost::asio::buffer(reinterpret_cast(&msg.header_), sizeof(msg.header_))); + bufs.push_back(boost::asio::buffer(msg.msg_, msg.msg_.size())); + + async_write(*tcp_socket_, bufs, boost::bind(&MumbleClient::ProcessTCPSendQueue, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + std::cout << "<< ASYNC Type: " << ntohs(msg.header_.type) << " Length: 6+" << msg.msg_.size() << std::endl; +} + +void MumbleClient::ReadHandler(const boost::system::error_code& error) { + if (error) { + std::cerr << "read error: " << error.message() << std::endl; + return; + } + + // Receive message header + MessageHeader msg_header; + read(*tcp_socket_, boost::asio::buffer(reinterpret_cast(&msg_header), 6)); + + msg_header.type = ntohs(msg_header.type); + msg_header.length = ntohl(msg_header.length); + + if (msg_header.length >= 0x7FFFF) + exit(1); + + // Receive message body + char* buffer = static_cast(malloc(msg_header.length)); + read(*tcp_socket_, boost::asio::buffer(buffer, msg_header.length)); + + ParseMessage(msg_header, buffer); + free(buffer); + + // Requeue read + if (tcp_socket_) + tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadHandler, this, boost::asio::placeholders::error)); +} + /////////////////////////////////////////////////////////////////////////////// // MumbleClient, public: @@ -207,38 +239,25 @@ void MumbleClient::Connect(const Settings& s) { a.add_celt_versions(0x8000000b); SendMessage(PbMessageType::Authenticate, a, true); - tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadWriteHandler, this, boost::asio::placeholders::error)); + tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadHandler, this, boost::asio::placeholders::error)); } -void MumbleClient::ReadWriteHandler(const boost::system::error_code& error) { - if (error) { - std::cerr << "read error: " << error.message() << std::endl; - return; - } - - // TCP socket handling - read - while (true) { - // Receive message header - MessageHeader msg_header; - read(*tcp_socket_, boost::asio::buffer(reinterpret_cast(&msg_header), 6)); - - msg_header.type = ntohs(msg_header.type); - msg_header.length = ntohl(msg_header.length); +void MumbleClient::Disconnect() { + if (ping_timer_) + ping_timer_->cancel(); + send_queue_.clear(); - if (msg_header.length >= 0x7FFFF) - exit(1); + tcp_socket_->lowest_layer().cancel(); + tcp_socket_->lowest_layer().close(); + udp_socket_->close(); - // Receive message body - char* buffer = static_cast(malloc(msg_header.length)); - read(*tcp_socket_, boost::asio::buffer(buffer, msg_header.length)); + delete tcp_socket_; + delete udp_socket_; - ParseMessage(msg_header, buffer); - free(buffer); - break; - } + tcp_socket_ = NULL; + udp_socket_ = NULL; - // Requeue read - tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadWriteHandler, this, boost::asio::placeholders::error)); + state_ = kStateNew; } void MumbleClient::SendMessage(PbMessageType::MessageType type, const ::google::protobuf::Message& new_msg, bool print) { @@ -254,18 +273,24 @@ void MumbleClient::SendMessage(PbMessageType::MessageType type, const ::google:: msg_header.length = htonl(length); std::string pb_message = new_msg.SerializeAsString(); - - Message message(msg_header, pb_message); - send_queue_.push_back(message); + send_queue_.push_back(new Message(msg_header, pb_message)); if (state_ >= kStateHandshakeCompleted && !write_in_progress) { - Message& msg = send_queue_.front(); + SendFirstQueued(); + } +} - std::vector bufs; - bufs.push_back(boost::asio::buffer(reinterpret_cast(&msg.header_), sizeof(msg.header_))); - bufs.push_back(boost::asio::buffer(msg.msg_, msg.msg_.size())); - async_write(*tcp_socket_, bufs, boost::bind(&MumbleClient::ProcessTCPSendQueue, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); - std::cout << "<< ASYNC Type: " << ntohs(msg.header_.type) << " Length: 6+" << msg.msg_.size() << std::endl; +void MumbleClient::SendRawUdpTunnel(const char* buffer, int32_t len) { + bool write_in_progress = !send_queue_.empty(); + MessageHeader msg_header; + msg_header.type = htons(static_cast(PbMessageType::UDPTunnel)); + msg_header.length = htonl(len); + + std::string m(buffer, len); + send_queue_.push_back(new Message(msg_header, m)); + + if (state_ >= kStateHandshakeCompleted && !write_in_progress) { + SendFirstQueued(); } } diff --git a/client.h b/client.h index ba21242..ca24981 100644 --- a/client.h +++ b/client.h @@ -5,6 +5,7 @@ #include #include +#include #include #include "messages.h" @@ -55,6 +56,7 @@ class MumbleClient { public: ~MumbleClient(); void Connect(const Settings& s); + void Disconnect(); void SendMessage(PbMessageType::MessageType type, const ::google::protobuf::Message& msg, bool print); void SetComment(const std::string& text); void SendRawUdpTunnel(const char* buffer, int32_t len); @@ -71,11 +73,10 @@ class MumbleClient { void DoPing(const boost::system::error_code& error); void ParseMessage(const mumble_message::MessageHeader& msg_header, void* buffer); void ProcessTCPSendQueue(const boost::system::error_code& error, const size_t bytes_transferred); - void ReadWriteHandler(const boost::system::error_code& error); + void SendFirstQueued(); + void ReadHandler(const boost::system::error_code& error); boost::asio::io_service* io_service_; - std::deque send_queue_; - State state_; #if SSL stream* tcp_socket_; #else @@ -83,6 +84,8 @@ class MumbleClient { #endif udp::socket* udp_socket_; CryptState* cs_; + boost::ptr_deque send_queue_; + State state_; boost::asio::deadline_timer* ping_timer_; int32_t session_;