Skip to content

Commit

Permalink
[orchagent]: Remove global lock caused by notifications running in an…
Browse files Browse the repository at this point in the history
…other thread (sonic-net#478)

* Remove global mutex for the DB

* Don't set event notifications on the switch

* Adding notification handlers enable notification in syncd
  • Loading branch information
pavel-shirshov authored Apr 19, 2018
1 parent aef6a46 commit 39523aa
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ fpmsyncd/fpmsyncd
intfsyncd/intfsyncd
cfgmgr/intfmgrd
cfgmgr/vlanmgrd
cfgmgr/buffermanager
neighsyncd/neighsyncd
portsyncd/portsyncd
orchagent/orchagent
orchagent/routeresync
swssconfig/swssconfig
swssconfig/swssplayer
tests/tests

91 changes: 63 additions & 28 deletions orchagent/fdborch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "fdborch.h"
#include "crmorch.h"
#include "notifier.h"
#include "sai_serialize.h"

extern sai_fdb_api_t *sai_fdb_api;

Expand All @@ -24,9 +25,15 @@ FdbOrch::FdbOrch(DBConnector *db, string tableName, PortsOrch *port) :
m_table(Table(db, tableName))
{
m_portsOrch->attach(this);
auto consumer = new NotificationConsumer(db, "FLUSHFDBREQUEST");
auto fdbNotification = new Notifier(consumer, this);
Orch::addExecutor("", fdbNotification);
m_flushNotificationsConsumer = new NotificationConsumer(db, "FLUSHFDBREQUEST");
auto flushNotifier = new Notifier(m_flushNotificationsConsumer, this);
Orch::addExecutor("", flushNotifier);

/* Add FDB notifications support from ASIC */
DBConnector *notificationsDb = new DBConnector(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
m_fdbNotificationConsumer = new swss::NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto fdbNotifier = new Notifier(m_fdbNotificationConsumer, this);
Orch::addExecutor("FDB_NOTIFICATIONS", fdbNotifier);
}

void FdbOrch::update(sai_fdb_event_t type, const sai_fdb_entry_t* entry, sai_object_id_t bridge_port_id)
Expand Down Expand Up @@ -290,36 +297,64 @@ void FdbOrch::doTask(NotificationConsumer& consumer)

consumer.pop(op, data, values);

if (op == "ALL")
if (&consumer == m_flushNotificationsConsumer)
{
/*
* so far only support flush all the FDB entris
* flush per port and flush per vlan will be added later.
*/
status = sai_fdb_api->flush_fdb_entries(gSwitchId, 0, NULL);
if (status != SAI_STATUS_SUCCESS)
if (op == "ALL")
{
SWSS_LOG_ERROR("Flush fdb failed, return code %x", status);
}
/*
* so far only support flush all the FDB entris
* flush per port and flush per vlan will be added later.
*/
status = sai_fdb_api->flush_fdb_entries(gSwitchId, 0, NULL);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Flush fdb failed, return code %x", status);
}

return;
}
else if (op == "PORT")
{
/*place holder for flush port fdb*/
SWSS_LOG_ERROR("Received unsupported flush port fdb request");
return;
}
else if (op == "VLAN")
{
/*place holder for flush vlan fdb*/
SWSS_LOG_ERROR("Received unsupported flush vlan fdb request");
return;
return;
}
else if (op == "PORT")
{
/*place holder for flush port fdb*/
SWSS_LOG_ERROR("Received unsupported flush port fdb request");
return;
}
else if (op == "VLAN")
{
/*place holder for flush vlan fdb*/
SWSS_LOG_ERROR("Received unsupported flush vlan fdb request");
return;
}
else
{
SWSS_LOG_ERROR("Received unknown flush fdb request");
return;
}
}
else
else if (&consumer == m_fdbNotificationConsumer && op == "fdb_event")
{
SWSS_LOG_ERROR("Received unknown flush fdb request");
return;
uint32_t count;
sai_fdb_event_notification_data_t *fdbevent = nullptr;

sai_deserialize_fdb_event_ntf(data, count, &fdbevent);

for (uint32_t i = 0; i < count; ++i)
{
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

for (uint32_t j = 0; j < fdbevent[i].attr_count; ++j)
{
if (fdbevent[i].attr[j].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID)
{
oid = fdbevent[i].attr[j].value.oid;
break;
}
}

this->update(fdbevent[i].event_type, &fdbevent[i].fdb_entry, oid);

sai_deserialize_free_fdb_event_ntf(count, fdbevent);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions orchagent/fdborch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class FdbOrch: public Orch, public Subject, public Observer
set<FdbEntry> m_entries;
fdb_entries_by_port_t saved_fdb_entries;
Table m_table;
NotificationConsumer* m_flushNotificationsConsumer;
NotificationConsumer* m_fdbNotificationConsumer;

void doTask(Consumer& consumer);
void doTask(NotificationConsumer& consumer);
Expand Down
4 changes: 0 additions & 4 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ extern "C" {
#include <iostream>
#include <unordered_map>
#include <map>
#include <mutex>
#include <thread>
#include <chrono>
#include <getopt.h>
Expand Down Expand Up @@ -47,9 +46,6 @@ bool gLogRotate = false;
ofstream gRecordOfs;
string gRecordFile;

/* Global database mutex */
mutex gDbMutex;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-b batch_size] [-m MAC]" << endl;
Expand Down
60 changes: 4 additions & 56 deletions orchagent/notifications.cpp
Original file line number Diff line number Diff line change
@@ -1,72 +1,20 @@
#include <unordered_map>
#include <mutex>
#include <assert.h>

#include "portsorch.h"
#include "fdborch.h"

extern "C" {
#include "sai.h"
}

#include "logger.h"
#include "notifications.h"

extern mutex gDbMutex;
extern PortsOrch *gPortsOrch;
extern FdbOrch *gFdbOrch;

void on_fdb_event(uint32_t count, sai_fdb_event_notification_data_t *data)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);

if (!gFdbOrch)
{
SWSS_LOG_NOTICE("gFdbOrch is not initialized");
return;
}

for (uint32_t i = 0; i < count; ++i)
{
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

for (uint32_t j = 0; j < data[i].attr_count; ++j)
{
if (data[i].attr[j].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID)
{
oid = data[i].attr[j].value.oid;
break;
}
}

gFdbOrch->update(data[i].event_type, &data[i].fdb_entry, oid);
}
// don't use this event handler, because it runs by libsairedis in a separate thread
// which causes concurrency access to the DB
}

void on_port_state_change(uint32_t count, sai_port_oper_status_notification_t *data)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);

if (!gPortsOrch)
{
SWSS_LOG_NOTICE("gPortsOrch is not initialized");
return;
}

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t id = data[i].port_id;
sai_port_oper_status_t status = data[i].port_state;

SWSS_LOG_NOTICE("Get port state change notification id:%lx status:%d", id, status);

gPortsOrch->updateDbPortOperStatus(id, status);
gPortsOrch->setHostIntfsOperStatus(id, status == SAI_PORT_OPER_STATUS_UP);
}
// don't use this event handler, because it runs by libsairedis in a separate thread
// which causes concurrency access to the DB
}

void on_switch_shutdown_request()
Expand Down
6 changes: 0 additions & 6 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <fstream>
#include <iostream>
#include <mutex>
#include <sys/time.h>
#include "timestamp.h"
#include "orch.h"
Expand All @@ -15,8 +14,6 @@ using namespace swss;

extern int gBatchSize;

extern mutex gDbMutex;

extern bool gSwssRecord;
extern ofstream gRecordOfs;
extern bool gLogRotate;
Expand Down Expand Up @@ -73,9 +70,6 @@ void Consumer::execute()
{
SWSS_LOG_ENTER();

// TODO: remove DbMutex when there is only single thread
lock_guard<mutex> lock(gDbMutex);

std::deque<KeyOpFieldsValuesTuple> entries;
getConsumerTable()->pops(entries);

Expand Down
50 changes: 50 additions & 0 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "sai_serialize.h"
#include "crmorch.h"
#include "countercheckorch.h"
#include "notifier.h"

extern sai_switch_api_t *sai_switch_api;
extern sai_bridge_api_t *sai_bridge_api;
Expand Down Expand Up @@ -235,6 +236,12 @@ PortsOrch::PortsOrch(DBConnector *db, vector<table_name_with_pri_t> &tableNames)

removeDefaultVlanMembers();
removeDefaultBridgePorts();

/* Add port oper status notification support */
DBConnector *notificationsDb = new DBConnector(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
m_portStatusNotificationConsumer = new swss::NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto portStatusNotificatier = new Notifier(m_portStatusNotificationConsumer, this);
Orch::addExecutor("PORT_STATUS_NOTIFICATIONS", portStatusNotificatier);
}

void PortsOrch::removeDefaultVlanMembers()
Expand Down Expand Up @@ -2331,3 +2338,46 @@ bool PortsOrch::removeLagMember(Port &lag, Port &port)

return true;
}

void PortsOrch::doTask(NotificationConsumer &consumer)
{
SWSS_LOG_ENTER();

/* Wait for all ports to be initialized */
if (!isInitDone())
{
return;
}

std::string op;
std::string data;
std::vector<swss::FieldValueTuple> values;

consumer.pop(op, data, values);

if (&consumer != m_portStatusNotificationConsumer)
{
return;
}

if (op == "port_state_change")
{
uint32_t count;
sai_port_oper_status_notification_t *portoperstatus = nullptr;

sai_deserialize_port_oper_status_ntf(data, count, &portoperstatus);

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t id = portoperstatus[i].port_id;
sai_port_oper_status_t status = portoperstatus[i].port_state;

SWSS_LOG_NOTICE("Get port state change notification id:%lx status:%d", id, status);

this->updateDbPortOperStatus(id, status);
this->setHostIntfsOperStatus(id, status == SAI_PORT_OPER_STATUS_UP);
}

sai_deserialize_free_port_oper_status_ntf(count, portoperstatus);
}
}
4 changes: 4 additions & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ class PortsOrch : public Orch, public Subject
map<set<int>, tuple<string, uint32_t>> m_lanesAliasSpeedMap;
map<string, Port> m_portList;

NotificationConsumer* m_portStatusNotificationConsumer;

void doTask(Consumer &consumer);
void doPortTask(Consumer &consumer);
void doVlanTask(Consumer &consumer);
void doVlanMemberTask(Consumer &consumer);
void doLagTask(Consumer &consumer);
void doLagMemberTask(Consumer &consumer);

void doTask(NotificationConsumer &consumer);

void removeDefaultVlanMembers();
void removeDefaultBridgePorts();

Expand Down

0 comments on commit 39523aa

Please sign in to comment.