Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sflowmgrd] Infer sampling rate dynamically based on oper speed #2799

Merged
merged 12 commits into from
Jun 12, 2023
172 changes: 123 additions & 49 deletions cfgmgr/sflowmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,8 @@
using namespace std;
using namespace swss;

map<string,string> sflowSpeedRateInitMap =
{
{SFLOW_SAMPLE_RATE_KEY_400G, SFLOW_SAMPLE_RATE_VALUE_400G},
{SFLOW_SAMPLE_RATE_KEY_200G, SFLOW_SAMPLE_RATE_VALUE_200G},
{SFLOW_SAMPLE_RATE_KEY_100G, SFLOW_SAMPLE_RATE_VALUE_100G},
{SFLOW_SAMPLE_RATE_KEY_50G, SFLOW_SAMPLE_RATE_VALUE_50G},
{SFLOW_SAMPLE_RATE_KEY_40G, SFLOW_SAMPLE_RATE_VALUE_40G},
{SFLOW_SAMPLE_RATE_KEY_25G, SFLOW_SAMPLE_RATE_VALUE_25G},
{SFLOW_SAMPLE_RATE_KEY_10G, SFLOW_SAMPLE_RATE_VALUE_10G},
{SFLOW_SAMPLE_RATE_KEY_1G, SFLOW_SAMPLE_RATE_VALUE_1G}
};

SflowMgr::SflowMgr(DBConnector *cfgDb, DBConnector *appDb, const vector<string> &tableNames) :
Orch(cfgDb, tableNames),
m_cfgSflowTable(cfgDb, CFG_SFLOW_TABLE_NAME),
m_cfgSflowSessionTable(cfgDb, CFG_SFLOW_SESSION_TABLE_NAME),
SflowMgr::SflowMgr(DBConnector *appDb, const std::vector<TableConnector>& tableNames) :
Orch(tableNames),
m_appSflowTable(appDb, APP_SFLOW_TABLE_NAME),
m_appSflowSessionTable(appDb, APP_SFLOW_SESSION_TABLE_NAME)
{
Expand All @@ -35,6 +21,33 @@ SflowMgr::SflowMgr(DBConnector *cfgDb, DBConnector *appDb, const vector<string>
m_intfAllDir = "rx";
}

void SflowMgr::readPortConfig()
{
auto consumer_it = m_consumerMap.find(CFG_PORT_TABLE_NAME);
if (consumer_it != m_consumerMap.end())
{
consumer_it->second->drain();
SWSS_LOG_NOTICE("Port Configuration Read..");
}
else
{
SWSS_LOG_ERROR("Consumer object for PORT_TABLE not found");
}
}

bool SflowMgr::isPortEnabled(const std::string& alias)
{
/* Checks if the sflow is enabled on the port */
auto it = m_sflowPortConfMap.find(alias);
if (it == m_sflowPortConfMap.end())
{
return false;
}
bool local_admin = it->second.local_admin_cfg;
bool status = it->second.admin == "up" ? true : false;
return m_gEnable && (m_intfAllConf || (local_admin && status));
}

void SflowMgr::sflowHandleService(bool enable)
{
stringstream cmd;
Expand Down Expand Up @@ -71,7 +84,6 @@ void SflowMgr::sflowUpdatePortInfo(Consumer &consumer)
while (it != consumer.m_toSync.end())
{
KeyOpFieldsValuesTuple t = it->second;

string key = kfvKey(t);
string op = kfvOp(t);
auto values = kfvFieldsValues(t);
Expand All @@ -87,16 +99,17 @@ void SflowMgr::sflowUpdatePortInfo(Consumer &consumer)
new_port = true;
port_info.local_rate_cfg = false;
port_info.local_admin_cfg = false;
port_info.speed = ERROR_SPEED;
port_info.oper_speed = NA_SPEED;
port_info.local_dir_cfg = false;
port_info.speed = SFLOW_ERROR_SPEED_STR;
port_info.rate = "";
port_info.admin = "";
port_info.dir = "";
m_sflowPortConfMap[key] = port_info;
}

bool speed_change = false;
string new_speed = SFLOW_ERROR_SPEED_STR;
bool rate_update = false;
string new_speed = ERROR_SPEED;
for (auto i : values)
{
if (fvField(i) == "speed")
Expand All @@ -107,7 +120,11 @@ void SflowMgr::sflowUpdatePortInfo(Consumer &consumer)
if (m_sflowPortConfMap[key].speed != new_speed)
{
m_sflowPortConfMap[key].speed = new_speed;
speed_change = true;
/* if oper_speed is set, no need to write to APP_DB */
if (m_sflowPortConfMap[key].oper_speed == NA_SPEED)
{
rate_update = true;
}
}

string def_dir = "rx";
Expand All @@ -116,13 +133,13 @@ void SflowMgr::sflowUpdatePortInfo(Consumer &consumer)
m_sflowPortConfMap[key].dir = def_dir;
}

if (m_gEnable && m_intfAllConf)
if (isPortEnabled(key))
{
// If the Local rate Conf is already present, dont't override it even though the speed is changed
if (new_port || (speed_change && !m_sflowPortConfMap[key].local_rate_cfg))
// If the Local rate conf is already present, dont't override it even though the speed is changed
if (new_port || (rate_update && !m_sflowPortConfMap[key].local_rate_cfg))
{
vector<FieldValueTuple> fvs;
sflowGetGlobalInfo(fvs, m_sflowPortConfMap[key].speed, m_sflowPortConfMap[key].dir);
sflowGetGlobalInfo(fvs, key, m_sflowPortConfMap[key].dir);
m_appSflowSessionTable.set(key, fvs);
}
}
Expand All @@ -147,6 +164,59 @@ void SflowMgr::sflowUpdatePortInfo(Consumer &consumer)
}
}

void SflowMgr::sflowProcessOperSpeed(Consumer &consumer)
{
auto it = consumer.m_toSync.begin();

while (it != consumer.m_toSync.end())
{
KeyOpFieldsValuesTuple t = it->second;
string alias = kfvKey(t);
string op = kfvOp(t);
auto values = kfvFieldsValues(t);
string oper_speed = "";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to default as NA_SPEED instead of empty check??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

orchagent may write NA_SPEED to oper_speed in STATE_DB, setting it as empty will let us know if oper_speed field is written to state-db

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks for the clarification

bool rate_update = false;

for (auto i : values)
{
if (fvField(i) == "speed")
{
oper_speed = fvValue(i);
}
}

if (m_sflowPortConfMap.find(alias) != m_sflowPortConfMap.end() && op == SET_COMMAND)
{
SWSS_LOG_DEBUG("STATE_DB update: iface: %s, oper_speed: %s, cfg_speed: %s, new_speed: %s",
alias.c_str(), m_sflowPortConfMap[alias].oper_speed.c_str(),
m_sflowPortConfMap[alias].speed.c_str(),
oper_speed.c_str());
/* oper_speed is updated by orchagent if the vendor supports and oper status is up */
if (m_sflowPortConfMap[alias].oper_speed != oper_speed && !oper_speed.empty())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check first oper_speed != NA_SPEED && m_sflowPortConfMap[alias].oper_speed != oper_speed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!oper_speed.empty() check is required to understand if something is written to STATE_DB. (otherwise we'll have duplicate writes to APPL_DB). If the oper_speed happen to be NA findSamplingRate() has logic to check if the speed is NA and if so, written the sampling rate based on configured speed

Copy link

@Gokulnath-Raja Gokulnath-Raja Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks for the clarification

{
rate_update = true;
if (oper_speed == m_sflowPortConfMap[alias].speed && m_sflowPortConfMap[alias].oper_speed == NA_SPEED)
{
/* if oper_speed is equal to cfg_speed, avoid the write to APP_DB
Can happen if auto-neg is not set */
rate_update = false;
}
m_sflowPortConfMap[alias].oper_speed = oper_speed;
}

if (isPortEnabled(alias) && rate_update && !m_sflowPortConfMap[alias].local_rate_cfg)
{
vector<FieldValueTuple> fvs;
sflowGetGlobalInfo(fvs, alias, m_sflowPortConfMap[alias].dir);
m_appSflowSessionTable.set(alias, fvs);
SWSS_LOG_NOTICE("Default sampling rate for %s updated to %s", alias.c_str(), findSamplingRate(alias).c_str());
}
}
/* Do nothing for DEL as the SflowPortConfMap will already be cleared by the DEL from CONFIG_DB */
it = consumer.m_toSync.erase(it);
}
}

void SflowMgr::sflowHandleSessionAll(bool enable, string direction)
{
for (auto it: m_sflowPortConfMap)
Expand All @@ -171,7 +241,7 @@ void SflowMgr::sflowHandleSessionAll(bool enable, string direction)
}
else
{
sflowGetGlobalInfo(fvs, it.second.speed, direction);
sflowGetGlobalInfo(fvs, it.first, direction);
}
m_appSflowSessionTable.set(it.first, fvs);
}
Expand Down Expand Up @@ -202,21 +272,12 @@ void SflowMgr::sflowHandleSessionLocal(bool enable)
}
}

void SflowMgr::sflowGetGlobalInfo(vector<FieldValueTuple> &fvs, string speed, string dir)
void SflowMgr::sflowGetGlobalInfo(vector<FieldValueTuple> &fvs, const string& alias, const string& dir)
{
string rate;
FieldValueTuple fv1("admin_state", "up");
fvs.push_back(fv1);

if (speed != SFLOW_ERROR_SPEED_STR && sflowSpeedRateInitMap.find(speed) != sflowSpeedRateInitMap.end())
{
rate = sflowSpeedRateInitMap[speed];
}
else
{
rate = SFLOW_ERROR_SPEED_STR;
}
FieldValueTuple fv2("sample_rate",rate);
FieldValueTuple fv2("sample_rate", findSamplingRate(alias));
fvs.push_back(fv2);

FieldValueTuple fv3("sample_direction",dir);
Expand Down Expand Up @@ -289,17 +350,7 @@ void SflowMgr::sflowCheckAndFillValues(string alias, vector<FieldValueTuple> &va
if (m_sflowPortConfMap[alias].rate == "" ||
m_sflowPortConfMap[alias].local_rate_cfg)
{
string speed = m_sflowPortConfMap[alias].speed;

if (speed != SFLOW_ERROR_SPEED_STR && sflowSpeedRateInitMap.find(speed) != sflowSpeedRateInitMap.end())
{
rate = sflowSpeedRateInitMap[speed];
}
else
{
rate = SFLOW_ERROR_SPEED_STR;
}
m_sflowPortConfMap[alias].rate = rate;
m_sflowPortConfMap[alias].rate = findSamplingRate(alias);
}
m_sflowPortConfMap[alias].local_rate_cfg = false;
FieldValueTuple fv("sample_rate", m_sflowPortConfMap[alias].rate);
Expand Down Expand Up @@ -331,6 +382,24 @@ void SflowMgr::sflowCheckAndFillValues(string alias, vector<FieldValueTuple> &va
}
}

string SflowMgr::findSamplingRate(const string& alias)
{
/* Default sampling rate is equal to the oper_speed, if present
if oper_speed is not found, use the configured speed */
if (m_sflowPortConfMap.find(alias) == m_sflowPortConfMap.end())
{
SWSS_LOG_ERROR("%s not found in port configuration map", alias.c_str());
return ERROR_SPEED;
}
string oper_speed = m_sflowPortConfMap[alias].oper_speed;
string cfg_speed = m_sflowPortConfMap[alias].speed;
if (!oper_speed.empty() && oper_speed != NA_SPEED)
{
return oper_speed;
}
return cfg_speed;
}

void SflowMgr::doTask(Consumer &consumer)
{
SWSS_LOG_ENTER();
Expand All @@ -342,6 +411,11 @@ void SflowMgr::doTask(Consumer &consumer)
sflowUpdatePortInfo(consumer);
return;
}
else if (table == STATE_PORT_TABLE_NAME)
{
sflowProcessOperSpeed(consumer);
return;
}

auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
Expand Down Expand Up @@ -502,7 +576,7 @@ void SflowMgr::doTask(Consumer &consumer)
if (m_intfAllConf)
{
vector<FieldValueTuple> fvs;
sflowGetGlobalInfo(fvs, m_sflowPortConfMap[key].speed, m_intfAllDir);
sflowGetGlobalInfo(fvs, key, m_intfAllDir);
m_appSflowSessionTable.set(key,fvs);
}
}
Expand Down
34 changes: 10 additions & 24 deletions cfgmgr/sflowmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,16 @@

namespace swss {

#define SFLOW_SAMPLE_RATE_KEY_400G "400000"
#define SFLOW_SAMPLE_RATE_KEY_200G "200000"
#define SFLOW_SAMPLE_RATE_KEY_100G "100000"
#define SFLOW_SAMPLE_RATE_KEY_50G "50000"
#define SFLOW_SAMPLE_RATE_KEY_40G "40000"
#define SFLOW_SAMPLE_RATE_KEY_25G "25000"
#define SFLOW_SAMPLE_RATE_KEY_10G "10000"
#define SFLOW_SAMPLE_RATE_KEY_1G "1000"

#define SFLOW_SAMPLE_RATE_VALUE_400G "400000"
#define SFLOW_SAMPLE_RATE_VALUE_200G "200000"
#define SFLOW_SAMPLE_RATE_VALUE_100G "100000"
#define SFLOW_SAMPLE_RATE_VALUE_50G "50000"
#define SFLOW_SAMPLE_RATE_VALUE_40G "40000"
#define SFLOW_SAMPLE_RATE_VALUE_25G "25000"
#define SFLOW_SAMPLE_RATE_VALUE_10G "10000"
#define SFLOW_SAMPLE_RATE_VALUE_1G "1000"

#define SFLOW_ERROR_SPEED_STR "error"
#define ERROR_SPEED "error"
#define NA_SPEED "N/A"

struct SflowPortInfo
{
bool local_rate_cfg;
bool local_admin_cfg;
bool local_dir_cfg;
std::string speed;
std::string oper_speed;
std::string rate;
std::string admin;
std::string dir;
Expand All @@ -47,15 +31,14 @@ typedef std::map<std::string, SflowPortInfo> SflowPortConfMap;
class SflowMgr : public Orch
{
public:
SflowMgr(DBConnector *cfgDb, DBConnector *appDb, const std::vector<std::string> &tableNames);
SflowMgr(DBConnector *appDb, const std::vector<TableConnector>& tableNames);
void readPortConfig();

using Orch::doTask;
private:
Table m_cfgSflowTable;
Table m_cfgSflowSessionTable;
ProducerStateTable m_appSflowTable;
ProducerStateTable m_appSflowSessionTable;
SflowPortConfMap m_sflowPortConfMap;
SflowPortConfMap m_sflowPortConfMap;
bool m_intfAllConf;
bool m_gEnable;
std::string m_intfAllDir;
Expand All @@ -64,11 +47,14 @@ class SflowMgr : public Orch
void doTask(Consumer &consumer);
void sflowHandleService(bool enable);
void sflowUpdatePortInfo(Consumer &consumer);
void sflowProcessOperSpeed(Consumer &consumer);
void sflowHandleSessionAll(bool enable, std::string direction);
void sflowHandleSessionLocal(bool enable);
void sflowCheckAndFillValues(std::string alias, std::vector<FieldValueTuple> &values, std::vector<FieldValueTuple> &fvs);
void sflowGetPortInfo(std::vector<FieldValueTuple> &fvs, SflowPortInfo &local_info);
void sflowGetGlobalInfo(std::vector<FieldValueTuple> &fvs, std::string speed, std::string direction);
void sflowGetGlobalInfo(std::vector<FieldValueTuple> &fvs, const std::string& alias, const std::string& direction);
bool isPortEnabled(const std::string& alias);
std::string findSamplingRate(const std::string& speed);
};

}
28 changes: 19 additions & 9 deletions cfgmgr/sflowmgrd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,31 @@ int main(int argc, char **argv)

try
{
vector<string> cfg_sflow_tables = {
CFG_SFLOW_TABLE_NAME,
CFG_SFLOW_SESSION_TABLE_NAME,
CFG_PORT_TABLE_NAME
};

DBConnector cfgDb("CONFIG_DB", 0);
DBConnector appDb("APPL_DB", 0);
DBConnector stateDb("STATE_DB", 0);

TableConnector conf_port_table(&cfgDb, CFG_PORT_TABLE_NAME);
TableConnector state_port_table(&stateDb, STATE_PORT_TABLE_NAME);
TableConnector conf_sflow_table(&cfgDb, CFG_SFLOW_TABLE_NAME);
TableConnector conf_sflow_session_table(&cfgDb, CFG_SFLOW_SESSION_TABLE_NAME);

vector<TableConnector> sflow_tables = {
conf_port_table,
state_port_table,
conf_sflow_table,
conf_sflow_session_table
};

SflowMgr sflowmgr(&cfgDb, &appDb, cfg_sflow_tables);
SflowMgr sflowmgr(&appDb, sflow_tables);
/* During process startup, the ordering of config_db followed by state_db notifications cannot be guaranteed
and so handle the config events manually */
sflowmgr.readPortConfig();

vector<Orch *> cfgOrchList = {&sflowmgr};
vector<Orch *> orchList = {&sflowmgr};

swss::Select s;
for (Orch *o : cfgOrchList)
for (Orch *o : orchList)
{
s.addSelectables(o->getSelectables());
}
Expand Down
Loading