Skip to content

Commit

Permalink
Fix UDPTunnel messages and add Disconnect method
Browse files Browse the repository at this point in the history
  • Loading branch information
pcgod committed Mar 7, 2010
1 parent 47d1991 commit 99bb247
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 46 deletions.
111 changes: 68 additions & 43 deletions client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,51 @@ void MumbleClient::ProcessTCPSendQueue(const boost::system::error_code& error, c
if (send_queue_.empty())
return;

Message& msg = send_queue_.front();

std::vector<boost::asio::const_buffer> bufs;
bufs.push_back(boost::asio::buffer(reinterpret_cast<char *>(&msg.header_), sizeof(msg.header_)));
bufs.push_back(boost::asio::buffer(msg.msg_, msg.msg_.size()));

async_write(*tcp_socket_, bufs, boost::bind(&MumbleClient::ProcessTCPSendQueue, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
std::cout << "<< ASYNC Type: " << ntohs(msg.header_.type) << " Length: 6+" << msg.msg_.size() << std::endl;
SendFirstQueued();
} else {
std::cerr << "Write error: " << error.message() << std::endl;
}
}

void MumbleClient::SendFirstQueued() {
Message& msg = send_queue_.front();

std::vector<boost::asio::const_buffer> bufs;
bufs.push_back(boost::asio::buffer(reinterpret_cast<char *>(&msg.header_), sizeof(msg.header_)));
bufs.push_back(boost::asio::buffer(msg.msg_, msg.msg_.size()));

async_write(*tcp_socket_, bufs, boost::bind(&MumbleClient::ProcessTCPSendQueue, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
std::cout << "<< ASYNC Type: " << ntohs(msg.header_.type) << " Length: 6+" << msg.msg_.size() << std::endl;
}

void MumbleClient::ReadHandler(const boost::system::error_code& error) {
if (error) {
std::cerr << "read error: " << error.message() << std::endl;
return;
}

// Receive message header
MessageHeader msg_header;
read(*tcp_socket_, boost::asio::buffer(reinterpret_cast<char *>(&msg_header), 6));

msg_header.type = ntohs(msg_header.type);
msg_header.length = ntohl(msg_header.length);

if (msg_header.length >= 0x7FFFF)
exit(1);

// Receive message body
char* buffer = static_cast<char *>(malloc(msg_header.length));
read(*tcp_socket_, boost::asio::buffer(buffer, msg_header.length));

ParseMessage(msg_header, buffer);
free(buffer);

// Requeue read
if (tcp_socket_)
tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadHandler, this, boost::asio::placeholders::error));
}

///////////////////////////////////////////////////////////////////////////////
// MumbleClient, public:

Expand Down Expand Up @@ -207,38 +239,25 @@ void MumbleClient::Connect(const Settings& s) {
a.add_celt_versions(0x8000000b);
SendMessage(PbMessageType::Authenticate, a, true);

tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadWriteHandler, this, boost::asio::placeholders::error));
tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadHandler, this, boost::asio::placeholders::error));
}

void MumbleClient::ReadWriteHandler(const boost::system::error_code& error) {
if (error) {
std::cerr << "read error: " << error.message() << std::endl;
return;
}

// TCP socket handling - read
while (true) {
// Receive message header
MessageHeader msg_header;
read(*tcp_socket_, boost::asio::buffer(reinterpret_cast<char *>(&msg_header), 6));

msg_header.type = ntohs(msg_header.type);
msg_header.length = ntohl(msg_header.length);
void MumbleClient::Disconnect() {
if (ping_timer_)
ping_timer_->cancel();
send_queue_.clear();

if (msg_header.length >= 0x7FFFF)
exit(1);
tcp_socket_->lowest_layer().cancel();
tcp_socket_->lowest_layer().close();
udp_socket_->close();

// Receive message body
char* buffer = static_cast<char *>(malloc(msg_header.length));
read(*tcp_socket_, boost::asio::buffer(buffer, msg_header.length));
delete tcp_socket_;
delete udp_socket_;

ParseMessage(msg_header, buffer);
free(buffer);
break;
}
tcp_socket_ = NULL;
udp_socket_ = NULL;

// Requeue read
tcp_socket_->async_read_some(boost::asio::null_buffers(), boost::bind(&MumbleClient::ReadWriteHandler, this, boost::asio::placeholders::error));
state_ = kStateNew;
}

void MumbleClient::SendMessage(PbMessageType::MessageType type, const ::google::protobuf::Message& new_msg, bool print) {
Expand All @@ -254,18 +273,24 @@ void MumbleClient::SendMessage(PbMessageType::MessageType type, const ::google::
msg_header.length = htonl(length);

std::string pb_message = new_msg.SerializeAsString();

Message message(msg_header, pb_message);
send_queue_.push_back(message);
send_queue_.push_back(new Message(msg_header, pb_message));

if (state_ >= kStateHandshakeCompleted && !write_in_progress) {
Message& msg = send_queue_.front();
SendFirstQueued();
}
}

std::vector<boost::asio::const_buffer> bufs;
bufs.push_back(boost::asio::buffer(reinterpret_cast<char *>(&msg.header_), sizeof(msg.header_)));
bufs.push_back(boost::asio::buffer(msg.msg_, msg.msg_.size()));
async_write(*tcp_socket_, bufs, boost::bind(&MumbleClient::ProcessTCPSendQueue, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
std::cout << "<< ASYNC Type: " << ntohs(msg.header_.type) << " Length: 6+" << msg.msg_.size() << std::endl;
void MumbleClient::SendRawUdpTunnel(const char* buffer, int32_t len) {
bool write_in_progress = !send_queue_.empty();
MessageHeader msg_header;
msg_header.type = htons(static_cast<int16_t>(PbMessageType::UDPTunnel));
msg_header.length = htonl(len);

std::string m(buffer, len);
send_queue_.push_back(new Message(msg_header, m));

if (state_ >= kStateHandshakeCompleted && !write_in_progress) {
SendFirstQueued();
}
}

Expand Down
9 changes: 6 additions & 3 deletions client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
#include <deque>

#include "messages.h"
Expand Down Expand Up @@ -55,6 +56,7 @@ class MumbleClient {
public:
~MumbleClient();
void Connect(const Settings& s);
void Disconnect();
void SendMessage(PbMessageType::MessageType type, const ::google::protobuf::Message& msg, bool print);
void SetComment(const std::string& text);
void SendRawUdpTunnel(const char* buffer, int32_t len);
Expand All @@ -71,18 +73,19 @@ class MumbleClient {
void DoPing(const boost::system::error_code& error);
void ParseMessage(const mumble_message::MessageHeader& msg_header, void* buffer);
void ProcessTCPSendQueue(const boost::system::error_code& error, const size_t bytes_transferred);
void ReadWriteHandler(const boost::system::error_code& error);
void SendFirstQueued();
void ReadHandler(const boost::system::error_code& error);

boost::asio::io_service* io_service_;
std::deque<mumble_message::Message> send_queue_;
State state_;
#if SSL
stream<tcp::socket>* tcp_socket_;
#else
tcp::socket* tcp_socket_;
#endif
udp::socket* udp_socket_;
CryptState* cs_;
boost::ptr_deque<mumble_message::Message> send_queue_;
State state_;
boost::asio::deadline_timer* ping_timer_;
int32_t session_;

Expand Down

0 comments on commit 99bb247

Please sign in to comment.