Skip to content

Commit

Permalink
Merge pull request #39 from ptony82/features/simplified_connections
Browse files Browse the repository at this point in the history
Features/simplified connections
  • Loading branch information
kyuhojeong committed Mar 4, 2014
2 parents 0c2e2c8 + cac23d2 commit 8ebb5db
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 104 deletions.
46 changes: 25 additions & 21 deletions src/controlleraccess.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ static const int kBufferSize = 1024;
static std::map<std::string, int> rpc_calls;

enum {
REGISTER_SVC,
CREATE_LINK,
SET_LOCAL_IP,
SET_REMOTE_IP,
TRIM_LINK,
SET_CB_ENDPOINT,
SEND_MSG,
GET_STATE,
SET_LOGGING,
REGISTER_SVC = 1,
CREATE_LINK = 2,
SET_LOCAL_IP = 3,
SET_REMOTE_IP = 4,
TRIM_LINK = 5,
SET_CB_ENDPOINT = 6,
GET_STATE = 7,
SET_LOGGING = 8,
SET_TRANSLATION = 9,
};

static void init_map() {
Expand All @@ -57,16 +57,18 @@ static void init_map() {
rpc_calls["set_remote_ip"] = SET_REMOTE_IP;
rpc_calls["trim_link"] = TRIM_LINK;
rpc_calls["set_cb_endpoint"] = SET_CB_ENDPOINT;
rpc_calls["send_msg"] = SEND_MSG;
rpc_calls["get_state"] = GET_STATE;
rpc_calls["set_logging"] = SET_LOGGING;
rpc_calls["set_translation"] = SET_TRANSLATION;
}

ControllerAccess::ControllerAccess(
TinCanConnectionManager& manager, XmppNetwork& network,
talk_base::BasicPacketSocketFactory* packet_factory)
talk_base::BasicPacketSocketFactory* packet_factory,
thread_opts_t* opts)
: manager_(manager),
network_(network) {
network_(network),
opts_(opts) {
socket_.reset(packet_factory->CreateUdpSocket(
talk_base::SocketAddress(kLocalHost, kUdpPort), 0, 0));
socket_->SignalReadPacket.connect(this, &ControllerAccess::HandlePacket);
Expand Down Expand Up @@ -106,7 +108,7 @@ void ControllerAccess::SendToPeer(int overlay_id, const std::string& uid,

void ControllerAccess::SendState(const std::string& uid, bool get_stats,
const talk_base::SocketAddress& addr) {
Json::Value state = manager_.GetState(uid, get_stats);
Json::Value state = manager_.GetState(network_.friends(), get_stats);
Json::Value local_state;
local_state["_uid"] = manager_.uid();
local_state["_ip4"] = manager_.ipv4();
Expand Down Expand Up @@ -137,12 +139,12 @@ void ControllerAccess::HandlePacket(talk_base::AsyncPacketSocket* socket,

// TODO - input sanitazation for security purposes
std::string method = root["m"].asString();

switch (rpc_calls[method]) {
case REGISTER_SVC: {
std::string user = root["username"].asString();
std::string pass = root["password"].asString();
std::string host = root["host"].asString();
network_.set_status(manager_.fingerprint());
bool res = network_.Login(user, pass, manager_.uid(), host);
}
break;
Expand Down Expand Up @@ -199,13 +201,6 @@ void ControllerAccess::HandlePacket(talk_base::AsyncPacketSocket* socket,
manager_.set_forward_addr(remote_addr_);
}
break;
case SEND_MSG: {
int overlay_id = root["overlay_id"].asInt();
std::string uid = root["uid"].asString();
std::string fpr = root["data"].asString();
network_.SendToPeer(overlay_id, uid, fpr, "send_msg");
}
break;
case GET_STATE: {
std::string uid = root["uid"].asString();
bool get_stats = root["stats"].asBool();
Expand All @@ -228,7 +223,16 @@ void ControllerAccess::HandlePacket(talk_base::AsyncPacketSocket* socket,
}
}
break;
case SET_TRANSLATION: {
int translate = root["translate"].asInt();
opts_->translate = translate;
}
break;
default: {
int overlay_id = root["overlay_id"].asInt();
std::string uid = root["uid"].asString();
std::string fpr = root["data"].asString();
network_.SendToPeer(overlay_id, uid, fpr, method);
}
break;
}
Expand Down
4 changes: 3 additions & 1 deletion src/controlleraccess.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class ControllerAccess : public PeerSignalSenderInterface,
public sigslot::has_slots<> {
public:
ControllerAccess(TinCanConnectionManager& manager, XmppNetwork& network,
talk_base::BasicPacketSocketFactory* packet_factory);
talk_base::BasicPacketSocketFactory* packet_factory,
thread_opts_t* opts);

// Inherited from PeerSignalSenderInterface
virtual void SendToPeer(int overlay_id, const std::string& uid,
Expand All @@ -62,6 +63,7 @@ class ControllerAccess : public PeerSignalSenderInterface,
void SendState(const std::string& uid, bool get_stats,
const talk_base::SocketAddress& addr);

thread_opts_t* opts_;
XmppNetwork& network_;
TinCanConnectionManager& manager_;
talk_base::SocketAddress remote_addr_;
Expand Down
12 changes: 2 additions & 10 deletions src/tincan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,6 @@ bool SSLVerificationCallback(void* cert) {
int main(int argc, char **argv) {
talk_base::InitializeSSL(SSLVerificationCallback);
peerlist_init();
int translate = 1;

for (int i = argc - 1; i > 0; i--) {
if (strncmp(argv[i], "-nt", 3) == 0) {
translate = 0;
}
}

thread_opts_t opts;
#if defined(LINUX) || defined(ANDROID)
opts.tap = tap_open(tincan::kTapName, opts.mac);
Expand All @@ -120,7 +112,7 @@ int main(int argc, char **argv) {
opts.win32_tap = open_tap(tincan::kTapName, opts.mac);
if (opts.win32_tap < 0) return -1;
#endif
opts.translate = translate;
opts.translate = 0;

talk_base::Thread packet_handling_thread, send_thread, recv_thread;
talk_base::AutoThread link_setup_thread;
Expand All @@ -133,7 +125,7 @@ int main(int argc, char **argv) {
xmpp.HandlePeer.connect(&manager,
&tincan::TinCanConnectionManager::HandlePeer);
talk_base::BasicPacketSocketFactory packet_factory;
tincan::ControllerAccess controller(manager, xmpp, &packet_factory);
tincan::ControllerAccess controller(manager, xmpp, &packet_factory, &opts);
signal_sender.add_service(0, &controller);
signal_sender.add_service(1, &xmpp);
opts.send_func = &tincan::TinCanConnectionManager::DoPacketSend;
Expand Down
39 changes: 15 additions & 24 deletions src/tincanconnectionmanager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "tincan_utils.h"
#include "tincanconnectionmanager.h"


namespace tincan {

static const char kIpv4[] = "172.31.0.100";
Expand Down Expand Up @@ -485,14 +484,7 @@ void TinCanConnectionManager::OnMessage(talk_base::Message* msg) {

void TinCanConnectionManager::HandlePeer(const std::string& uid,
const std::string& data, const std::string& type) {
// This is a callback message to the controller indicating a new
// connection request sent over XMPP
if (type.size() > 0) {
signal_sender_->SendToPeer(kLocalControllerId, uid, data, type);
}
else {
signal_sender_->SendToPeer(kLocalControllerId, uid, data, kConReq);
}
signal_sender_->SendToPeer(kLocalControllerId, uid, data, type);
LOG_TS(INFO) << "uid:" << uid << " data:" << data << " type:" << type;
}

Expand Down Expand Up @@ -525,18 +517,23 @@ void TinCanConnectionManager::HandleQueueSignal_w() {
}

Json::Value TinCanConnectionManager::StateToJson(const std::string& uid,
const PeerIPs& ips,
uint32 xmpp_time,
bool get_stats) {
Json::Value peer(Json::objectValue);
peer["uid"] = uid;
peer["ip4"] = ips.ip4;
peer["ip6"] = ips.ip6;
peer["status"] = "offline";

// time_diff gives the amount of time since last xmpp presense message
uint32 time_diff = talk_base::Time() - xmpp_time;
peer["xmpp_time"] = time_diff/1000;

if (uid_map_.find(uid) != uid_map_.end()) {
peer["ip4"] = ip_map_[uid].ip4;
peer["ip6"] = ip_map_[uid].ip6;
peer["fpr"] = uid_map_[uid]->fingerprint;

// time_diff gives the amount of time since connection was created
uint32 time_diff = talk_base::Time() - uid_map_[uid]->last_time;
time_diff = talk_base::Time() - uid_map_[uid]->last_time;
peer["last_time"] = time_diff/1000;

// if transport is readable and writable that means P2P connection
Expand Down Expand Up @@ -578,18 +575,12 @@ Json::Value TinCanConnectionManager::StateToJson(const std::string& uid,
return peer;
}

Json::Value TinCanConnectionManager::GetState(const std::string& uid,
bool get_stats) {
Json::Value TinCanConnectionManager::GetState(
const std::map<std::string, uint32>& friends, bool get_stats) {
Json::Value peers(Json::objectValue);
if (uid.size() == this->uid().size() &&
ip_map_.find(uid) != ip_map_.end()) {
peers[uid] = StateToJson(uid, ip_map_[uid], get_stats);
}
else if (uid.size() != this->uid().size() ) {
for (std::map<std::string, PeerIPs>::const_iterator it =
ip_map_.begin(); it != ip_map_.end(); ++it) {
peers[it->first] = StateToJson(it->first, it->second, get_stats);
}
for (std::map<std::string, uint32>::const_iterator it =
friends.begin(); it != friends.end(); ++it) {
peers[it->first] = StateToJson(it->first, it->second, get_stats);
}
return peers;
}
Expand Down
5 changes: 3 additions & 2 deletions src/tincanconnectionmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ class TinCanConnectionManager : public talk_base::MessageHandler,

virtual bool DestroyTransport(const std::string& uid);

virtual Json::Value GetState(const std::string& uid, bool get_stats);
virtual Json::Value GetState(const std::map<std::string, uint32>& friends,
bool get_stats);

static int DoPacketSend(const char* buf, size_t len);

Expand Down Expand Up @@ -189,7 +190,7 @@ class TinCanConnectionManager : public talk_base::MessageHandler,
void SetupTransport(PeerState* peer_state);
void HandleQueueSignal_w();
void HandleControllerSignal_w();
Json::Value StateToJson(const std::string& uid, const PeerIPs& ips,
Json::Value StateToJson(const std::string& uid, uint32 xmpp_time,
bool get_stats);
bool SetRelay(PeerState* peer_state, const std::string& turn_server,
const std::string& username, const std::string& password);
Expand Down
57 changes: 28 additions & 29 deletions src/xmppnetwork.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,21 @@ namespace tincan {
static const int kXmppPort = 5222;
static const int kInterval = 15000;

// kPresenceInterval is kInterval * 8 = 120 secs meaning that
// presence message is sent to XMPP server every 2 minutes
static const int kPresenceInterval = 8;

static const buzz::StaticQName QN_TINCAN = { "jabber:iq:tincan", "query" };
static const buzz::StaticQName QN_TINCAN_DATA = { "jabber:iq:tincan", "data" };
static const buzz::StaticQName QN_TINCAN_TYPE = { "jabber:iq:tincan", "type" };
static const char kTemplate[] = "<query xmlns=\"jabber:iq:tincan\" />";
static const char kErrorMsg[] = "error";

static const int kPingPeriod = 10000;
static const int kPingTimeout = 500;
static const int kPingTimeout = 1000;

// Predetermined size of fingerprint string from RFC 4572
static const int kFprSize = 59;
// TODO - we should not be storing in global map, need to move to a class
static std::map<std::string, std::string> g_uid_map;

static std::string get_key(const std::string& uid) {
size_t idx = uid.find('/') + sizeof(kXmppPrefix);
Expand All @@ -63,7 +67,6 @@ static std::string get_key(const std::string& uid) {
return uid;
}


TinCanTask::TinCanTask(buzz::XmppClient* client,
PeerHandlerInterface* handler)
: XmppTask(client, buzz::XmppEngine::HL_TYPE),
Expand All @@ -73,7 +76,8 @@ TinCanTask::TinCanTask(buzz::XmppClient* client,
void TinCanTask::SendToPeer(int overlay_id, const std::string &uid,
const std::string &data,
const std::string &type) {
const buzz::Jid to(get_xmpp_id(uid));
if (g_uid_map.find(uid) == g_uid_map.end()) return;
const buzz::Jid to(g_uid_map[uid]);
talk_base::scoped_ptr<buzz::XmlElement> get(
MakeIq(buzz::STR_GET, to, task_id()));
// TODO - Figure out how to build from QN_TINCAN instead of template
Expand All @@ -90,6 +94,8 @@ void TinCanTask::SendToPeer(int overlay_id, const std::string &uid,

get->AddElement(element);
SendStanza(get.get());
LOG_TS(INFO) << "XMPP SEND uid " << uid << " data " << data
<< " type " << type;
}

int TinCanTask::ProcessStart() {
Expand All @@ -103,8 +109,8 @@ int TinCanTask::ProcessStart() {
from != GetClient()->jid()) {
std::string uid = stanza->Attr(buzz::QN_FROM);
std::string uid_key = get_key(uid);
set_xmpp_id(uid_key, uid);
LOG_TS(INFO) << "uid_key:" << uid_key << " uid:" << uid;
// map each uid to a uid_key
g_uid_map[uid_key] = uid;

const buzz::XmlElement* msg = stanza->FirstNamed(QN_TINCAN);
if (msg != NULL) {
Expand Down Expand Up @@ -202,29 +208,21 @@ void XmppNetwork::OnSignOn() {

void XmppNetwork::OnStateChange(buzz::XmppEngine::State state) {
xmpp_state_ = state;
std::string str_state;
switch (state) {
case buzz::XmppEngine::STATE_START:
LOG_TS(INFO) << "START";
str_state = "START";
break;
case buzz::XmppEngine::STATE_OPENING:
LOG_TS(INFO) << "OPENING";
str_state = "OPENING";
break;
case buzz::XmppEngine::STATE_OPEN:
LOG_TS(INFO) << "OPEN";
str_state = "OPEN";
OnSignOn();
break;
case buzz::XmppEngine::STATE_CLOSED:
LOG_TS(INFO) << "CLOSED";
str_state = "CLOSED";
break;
}
buzz::Jid local_jid(xcs_.user(), xcs_.host(), xcs_.resource());
std::string uid_key = get_key(local_jid.Str());
HandlePeer(uid_key, "1000:xmpp_state", str_state);
}

void XmppNetwork::OnPresenceMessage(const buzz::PresenceStatus &status) {
Expand All @@ -234,12 +232,12 @@ void XmppNetwork::OnPresenceMessage(const buzz::PresenceStatus &status) {
kXmppPrefix) == 0 && status.jid() != pump_->client()->jid()) {
std::string uid = status.jid().Str();
std::string uid_key = get_key(uid);
std::string fpr = status.status();
tincan_task_->set_xmpp_id(uid_key, uid);
LOG_TS(INFO) << "uid_key:" << uid_key << " uid" << uid << " status:" << fpr;
// TODO - Decide what message type to assign to presence messages
if (fpr.size() == kFprSize)
HandlePeer(uid_key, fpr, "");

// Simply update the timestamp of this presence message by uid
presence_time_[uid_key] = talk_base::Time();
g_uid_map[uid_key] = uid;

LOG_TS(INFO) << "uid " << uid << " status " << status.status();
}
}

Expand All @@ -253,14 +251,6 @@ void XmppNetwork::OnTimeout() {

void XmppNetwork::OnMessage(talk_base::Message* msg) {
if (pump_.get()) {
// Resend presence every 3 min necessary for reconnections
if (on_msg_counter_++ % 12 == 0) {
presence_out_.release();
presence_out_.reset(new buzz::PresenceOutTask(pump_->client()));
presence_out_->Send(status_);
presence_out_->Start();
}

if (xmpp_state_ == buzz::XmppEngine::STATE_START ||
xmpp_state_ == buzz::XmppEngine::STATE_OPENING) {
pump_->DoDisconnect();
Expand All @@ -282,6 +272,15 @@ void XmppNetwork::OnMessage(talk_base::Message* msg) {
//pump_.release();
Connect();
}
else if (xmpp_state_ == buzz::XmppEngine::STATE_OPEN &&
on_msg_counter_++ % kPresenceInterval == 0) {
// Resend presence every 2 min necessary for reconnections
presence_out_.release();
presence_out_.reset(new buzz::PresenceOutTask(pump_->client()));
presence_out_->Send(status_);
presence_out_->Start();
}

}
main_thread_->PostDelayed(kInterval, this, 0, 0);
}
Expand Down
Loading

0 comments on commit 8ebb5db

Please sign in to comment.