Skip to content

Commit

Permalink
Clean up reconnect handler.
Browse files Browse the repository at this point in the history
Refs #6107
  • Loading branch information
gunnarbeutner committed May 8, 2014
1 parent f704468 commit bd610a7
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 43 deletions.
40 changes: 27 additions & 13 deletions lib/remote/apiclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ INITIALIZE_ONCE(&ApiClient::StaticInitialize);
static Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params);
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);

ApiClient::ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role)
: m_Endpoint(endpoint), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime())
{ }
ApiClient::ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime())
{
m_Endpoint = Endpoint::GetByName(identity);
}

void ApiClient::StaticInitialize(void)
{
Expand All @@ -57,6 +59,11 @@ void ApiClient::Start(void)
thread.detach();
}

String ApiClient::GetIdentity(void) const
{
return m_Identity;
}

Endpoint::Ptr ApiClient::GetEndpoint(void) const
{
return m_Endpoint;
Expand All @@ -81,7 +88,7 @@ void ApiClient::SendMessage(const Dictionary::Ptr& message)
m_Seen = Utility::GetTime();
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << m_Endpoint->GetName() << "': " << DiagnosticInformation(ex);
msgbuf << "Error while sending JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex);
Log(LogWarning, "remote", msgbuf.str());

Disconnect();
Expand All @@ -90,9 +97,11 @@ void ApiClient::SendMessage(const Dictionary::Ptr& message)

void ApiClient::Disconnect(void)
{
Log(LogWarning, "remote", "API client disconnected for endpoint '" + m_Endpoint->GetName() + "'");
Log(LogWarning, "remote", "API client disconnected for identity '" + m_Identity + "'");
m_Stream->Close();
m_Endpoint->RemoveClient(GetSelf());

if (m_Endpoint)
m_Endpoint->RemoveClient(GetSelf());
}

bool ApiClient::ProcessMessage(void)
Expand All @@ -105,7 +114,7 @@ bool ApiClient::ProcessMessage(void)
if (message->Get("method") != "log::SetLogPosition")
m_Seen = Utility::GetTime();

if (message->Contains("ts")) {
if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");

/* ignore old messages */
Expand All @@ -118,14 +127,16 @@ bool ApiClient::ProcessMessage(void)
MessageOrigin origin;
origin.FromClient = GetSelf();

if (m_Endpoint->GetZone() != Zone::GetLocalZone())
origin.FromZone = m_Endpoint->GetZone();
else
origin.FromZone = Zone::GetByName(message->Get("originZone"));
if (m_Endpoint) {
if (m_Endpoint->GetZone() != Zone::GetLocalZone())
origin.FromZone = m_Endpoint->GetZone();
else
origin.FromZone = Zone::GetByName(message->Get("originZone"));
}

String method = message->Get("method");

Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Endpoint->GetName() + "'");
Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Identity + "'");

Dictionary::Ptr resultMessage = make_shared<Dictionary>();

Expand Down Expand Up @@ -159,7 +170,7 @@ void ApiClient::MessageThreadProc(void)

Disconnect();
} catch (const std::exception& ex) {
Log(LogWarning, "remote", "Error while reading JSON-RPC message for endpoint '" + m_Endpoint->GetName() + "': " + DiagnosticInformation(ex));
Log(LogWarning, "remote", "Error while reading JSON-RPC message for identity '" + m_Identity + "': " + DiagnosticInformation(ex));
}
}

Expand Down Expand Up @@ -193,6 +204,9 @@ Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr&
double log_position = params->Get("log_position");
Endpoint::Ptr endpoint = origin.FromClient->GetEndpoint();

if (!endpoint)
return Empty;

if (log_position > endpoint->GetLocalLogPosition())
endpoint->SetLocalLogPosition(log_position);

Expand Down
4 changes: 3 additions & 1 deletion lib/remote/apiclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ class I2_REMOTE_API ApiClient : public Object
public:
DECLARE_PTR_TYPEDEFS(ApiClient);

ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role);
ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role);

static void StaticInitialize(void);

void Start(void);

String GetIdentity(void) const;
Endpoint::Ptr GetEndpoint(void) const;
Stream::Ptr GetStream(void) const;
ConnectionRole GetRole(void) const;
Expand All @@ -61,6 +62,7 @@ class I2_REMOTE_API ApiClient : public Object
void SendMessage(const Dictionary::Ptr& request);

private:
String m_Identity;
Endpoint::Ptr m_Endpoint;
Stream::Ptr m_Stream;
ConnectionRole m_Role;
Expand Down
59 changes: 36 additions & 23 deletions lib/remote/apilistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,31 +201,28 @@ void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole rol
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert);

Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");

if (!endpoint) {
Log(LogInformation, "remote", "New client for unknown endpoint '" + identity + "'");
return;
}
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);

Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
if (endpoint) {
bool need_sync = !endpoint->IsConnected();

bool need_sync = !endpoint->IsConnected();
ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
aclient->Start();

ApiClient::Ptr aclient = make_shared<ApiClient>(endpoint, tlsStream, role);
aclient->Start();
if (need_sync) {
{
ObjectLock olock(endpoint);

if (need_sync) {
{
ObjectLock olock(endpoint);
endpoint->SetSyncing(true);
}

endpoint->SetSyncing(true);
ReplayLog(aclient);
}

ReplayLog(aclient);
endpoint->AddClient(aclient);
}

endpoint->AddClient(aclient);
}

void ApiListener::ApiTimerHandler(void)
Expand Down Expand Up @@ -262,19 +259,35 @@ void ApiListener::ApiTimerHandler(void)
if (IsMaster()) {
Zone::Ptr my_zone = Zone::GetLocalZone();

BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
/* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
continue;

if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
continue;
bool connected = false;

Zone::Ptr their_zone = endpoint->GetZone();
BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
if (endpoint->IsConnected()) {
connected = true;
break;
}
}

if (my_zone != their_zone && my_zone != their_zone->GetParent() && their_zone != my_zone->GetParent())
/* don't connect to an endpoint if we already have a connection to the zone */
if (connected)
continue;

AddConnection(endpoint->GetHost(), endpoint->GetPort());
BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
/* don't connect to ourselves */
if (endpoint->GetName() == GetIdentity())
continue;

/* don't try to connect to endpoints which don't have a host and port */
if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
continue;

AddConnection(endpoint->GetHost(), endpoint->GetPort());
}
}
}

Expand Down
5 changes: 0 additions & 5 deletions lib/remote/messageorigin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,3 @@ bool MessageOrigin::IsLocal(void) const
{
return !FromClient;
}

bool MessageOrigin::IsSameZone(void) const
{
return !FromZone;
}
1 change: 0 additions & 1 deletion lib/remote/messageorigin.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ struct I2_REMOTE_API MessageOrigin
ApiClient::Ptr FromClient;

bool IsLocal(void) const;
bool IsSameZone(void) const;
};

}
Expand Down

0 comments on commit bd610a7

Please sign in to comment.