Skip to content

Commit

Permalink
[orch] change Consumer class to support multiple values for the same …
Browse files Browse the repository at this point in the history
…key (#1184)

Description

The Consumer class is used by the orch objects to deal with the redis consumer tables' popped items. It has m_toSync map to save the tasks. During the operations (more items than the orch objects can handle), the tasks in the map is merged for optimization. However, since it is a map, we would only have one value for each key. This potentially eliminate the necessary actions from Redis, e,g, we have a DEL action and SET action coming out from Redis for some key, we would overwrite the DEL action today in the code and thus not able to delete some objects as we intended.

The PR changed the m_toSync from map to multi-map to get multiple values for one key. In this design, we keep maximun two values per key, DEL or SET or DEL+SET. We need strictly keep the order of DEL and SET. It is possible to use map of vectors to fulfill this, we chose multi-map because:
1, It will have less/no changes to different orch classes to iterate the m_toSync
2, The order can be guaranteed. The order of the key-value pairs whose keys compare equivalent is the order of insertion and does not change. (since C++11). See
https://en.cppreference.com/w/cpp/container/multimap

The PR also refactors the consumer class so vlanmgr.cpp and routeorch.cpp will leverage the Consumer functions instead of operating on the members. It also refactors the UT code (aclorch_ut.cpp) so it removes the redundant code and uses the same code.

Google UT tests were added for Consumer Class especially for different cases for addToSync() function.

What I did
Change the m_toSync in Consumer class to multimap so it could support both DEL and SET
Reload Consumer addToSync() and refactor vlanmgr/route-orch and ut code to use it
Add google ut for consumer class

Why I did it
See description.

How I verified it
Unit tests:
Running main() from gtest_main.cc

```
[==========] Running 19 tests from 5 test cases.
[----------] Global test environment set-up.
[----------] 1 test from AclTest
[ RUN      ] AclTest.Create_L3_Acl_Table
[       OK ] AclTest.Create_L3_Acl_Table (1 ms)
[----------] 1 test from AclTest (1 ms total)

[----------] 3 tests from AclOrchTest
[ RUN      ] AclOrchTest.ACL_Creation_and_Destorying
[       OK ] AclOrchTest.ACL_Creation_and_Destorying (1000 ms)
[ RUN      ] AclOrchTest.L3Acl_Matches_Actions
[       OK ] AclOrchTest.L3Acl_Matches_Actions (1001 ms)
[ RUN      ] AclOrchTest.L3V6Acl_Matches_Actions
[       OK ] AclOrchTest.L3V6Acl_Matches_Actions (1000 ms)
[----------] 3 tests from AclOrchTest (3003 ms total)

[----------] 2 tests from PortsOrchTest
[ RUN      ] PortsOrchTest.PortReadinessColdBoot
[       OK ] PortsOrchTest.PortReadinessColdBoot (21 ms)
[ RUN      ] PortsOrchTest.PortReadinessWarmBoot
[       OK ] PortsOrchTest.PortReadinessWarmBoot (13 ms)
[----------] 2 tests from PortsOrchTest (34 ms total)

[----------] 4 tests from SaiSpy
[ RUN      ] SaiSpy.CURD
[       OK ] SaiSpy.CURD (0 ms)
[ RUN      ] SaiSpy.Same_Function_Signature_In_Same_API_Table
[       OK ] SaiSpy.Same_Function_Signature_In_Same_API_Table (0 ms)
[ RUN      ] SaiSpy.Same_Function_Signature_In_Different_API_Table
[       OK ] SaiSpy.Same_Function_Signature_In_Different_API_Table (0 ms)
[ RUN      ] SaiSpy.create_switch_and_acl_table
[       OK ] SaiSpy.create_switch_and_acl_table (0 ms)
[----------] 4 tests from SaiSpy (0 ms total)

[----------] 9 tests from ConsumerTest
[ RUN      ] ConsumerTest.ConsumerAddToSync_Set
[       OK ] ConsumerTest.ConsumerAddToSync_Set (1 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Del
[       OK ] ConsumerTest.ConsumerAddToSync_Del (0 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Set_Del
[       OK ] ConsumerTest.ConsumerAddToSync_Set_Del (0 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Del_Set
[       OK ] ConsumerTest.ConsumerAddToSync_Del_Set (130 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi
[       OK ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi (204 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi_In_Q
[       OK ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi_In_Q (4 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew
[       OK ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew (0 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew1
[       OK ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew1 (0 ms)
[ RUN      ] ConsumerTest.ConsumerAddToSync_Ind_Set_Del
[       OK ] ConsumerTest.ConsumerAddToSync_Ind_Set_Del (0 ms)
[----------] 9 tests from ConsumerTest (340 ms total)

[----------] Global test environment tear-down
[==========] 19 tests from 5 test cases ran. (4344 ms total)
[  PASSED  ] 19 tests.
```

Signed-off-by: Zhenggen Xu <[email protected]>
  • Loading branch information
zhenggen-xu authored and lguohan committed Jan 30, 2020
1 parent 49ad38f commit dc695fb
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 86 deletions.
5 changes: 3 additions & 2 deletions cfgmgr/vlanmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,9 @@ void VlanMgr::processUntaggedVlanMembers(string vlan, const string &members)
vector<FieldValueTuple> fvVector;
FieldValueTuple t("tagging_mode", "untagged");
fvVector.push_back(t);
consumer.m_toSync[member_key] = make_tuple(member_key, SET_COMMAND, fvVector);
SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, consumer.m_toSync[member_key])).c_str());
KeyOpFieldsValuesTuple tuple = make_tuple(member_key, SET_COMMAND, fvVector);
consumer.addToSync(tuple);
SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, tuple)).c_str());
}
/*
* There is pending task from consumer pipe, in this case just skip it.
Expand Down
82 changes: 62 additions & 20 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,66 @@ vector<Selectable *> Orch::getSelectables()
return selectables;
}

size_t Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry)
{
SWSS_LOG_ENTER();

/* Nothing popped */
if (entries.empty())

string key = kfvKey(entry);
string op = kfvOp(entry);

/* Record incoming tasks */
if (gSwssRecord)
{
return 0;
Orch::recordTuple(*this, entry);
}

for (auto& entry: entries)
/*
* m_toSync is a multimap which will allow one key with multiple values,
* Also, the order of the key-value pairs whose keys compare equivalent
* is the order of insertion and does not change. (since C++11)
*/

/* If a new task comes we directly put it into getConsumerTable().m_toSync map */
if (m_toSync.find(key) == m_toSync.end())
{
string key = kfvKey(entry);
string op = kfvOp(entry);
m_toSync.emplace(key, entry);
}

/* Record incoming tasks */
if (gSwssRecord)
/* if a DEL task comes, we overwrite the old key */
else if (op == DEL_COMMAND)
{
m_toSync.erase(key);
m_toSync.emplace(key, entry);
}
else
{
/*
* Now we are trying to add the key-value with SET.
* We maintain maximun two values per key.
* In case there is one key-value, it should be DEL or SET
* In case there are two key-value pairs, it should be DEL then SET
* The code logic is following:
* We iterate the values with the key, we skip the value with DEL and then
* check if that was the only one (I,E, the iter pointer now points to end or next key),
* in such case, we insert the key-value with SET.
* If there was a SET already (I,E, the pointer still points to the same key), we combine the kfv.
*/
auto ret = m_toSync.equal_range(key);
auto iter = ret.first;
for (; iter != ret.second; ++iter)
{
Orch::recordTuple(*this, entry);
auto old_op = kfvOp(iter->second);
if (old_op == SET_COMMAND)
break;
}

/* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND)
if (iter == ret.second)
{
m_toSync[key] = entry;
m_toSync.emplace(key, entry);
}
/* If an old task is still there, we combine the old task with new task */
else
{
KeyOpFieldsValuesTuple existing_data = m_toSync[key];
KeyOpFieldsValuesTuple existing_data = iter->second;

auto new_values = kfvFieldsValues(entry);
auto existing_values = kfvFieldsValues(existing_data);
Expand All @@ -118,9 +148,21 @@ size_t Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
}
existing_values.push_back(FieldValueTuple(field, value));
}
m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
iter->second = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}

}

size_t Consumer::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
{
SWSS_LOG_ENTER();

for (auto& entry: entries)
{
addToSync(entry);
}

return entries.size();
}

Expand Down Expand Up @@ -186,7 +228,7 @@ void Consumer::drain()
m_orch->doTask(*this);
}

string Consumer::dumpTuple(KeyOpFieldsValuesTuple &tuple)
string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
{
string s = getTableName() + getConsumerTable()->getTableNameSeparator() + kfvKey(tuple)
+ "|" + kfvOp(tuple);
Expand Down Expand Up @@ -412,7 +454,7 @@ void Orch::logfileReopen()
}
}

void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
void Orch::recordTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple)
{
string s = consumer.dumpTuple(tuple);

Expand All @@ -426,7 +468,7 @@ void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
}
}

string Orch::dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
string Orch::dumpTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple)
{
string s = consumer.dumpTuple(tuple);
return s;
Expand Down
17 changes: 11 additions & 6 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ typedef std::pair<std::string, sai_object_id_t> object_map_pair;

typedef std::map<std::string, object_map*> type_map;
typedef std::pair<std::string, object_map*> type_map_pair;
typedef std::map<std::string, swss::KeyOpFieldsValuesTuple> SyncMap;

// Use multimap to support multiple OpFieldsValues for the same key (e,g, DEL and SET)
// The order of the key-value pairs whose keys compare equivalent is the order of
// insertion and does not change. (since C++11)
typedef std::multimap<std::string, swss::KeyOpFieldsValuesTuple> SyncMap;

typedef std::pair<std::string, int> table_name_with_pri_t;

Expand Down Expand Up @@ -132,7 +136,7 @@ class Consumer : public Executor {
return getConsumerTable()->getDbId();
}

std::string dumpTuple(swss::KeyOpFieldsValuesTuple &tuple);
std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple);
void dumpPendingTasks(std::vector<std::string> &ts);

size_t refillToSync();
Expand All @@ -144,9 +148,10 @@ class Consumer : public Executor {
// TODO: hide?
SyncMap m_toSync;

protected:
void addToSync(const swss::KeyOpFieldsValuesTuple &entry);

// Returns: the number of entries added to m_toSync
size_t addToSync(std::deque<swss::KeyOpFieldsValuesTuple> &entries);
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
};

typedef std::map<std::string, std::shared_ptr<Executor>> ConsumerMap;
Expand Down Expand Up @@ -194,14 +199,14 @@ class Orch
virtual void doTask(swss::SelectableTimer &timer) { }

/* TODO: refactor recording */
static void recordTuple(Consumer &consumer, swss::KeyOpFieldsValuesTuple &tuple);
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);

void dumpPendingTasks(std::vector<std::string> &ts);
protected:
ConsumerMap m_consumerMap;

static void logfileReopen();
std::string dumpTuple(Consumer &consumer, swss::KeyOpFieldsValuesTuple &tuple);
std::string dumpTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);
ref_resolve_status resolveFieldRefValue(type_map&, const std::string&, swss::KeyOpFieldsValuesTuple&, sai_object_id_t&);
bool parseIndexRange(const std::string &input, sai_uint32_t &range_low, sai_uint32_t &range_high);
bool parseReference(type_map &type_maps, std::string &ref, std::string &table_name, std::string &object_name);
Expand Down
4 changes: 2 additions & 2 deletions orchagent/routeorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ std::string RouteOrch::getLinkLocalEui64Addr(void)

uint8_t eui64_interface_id[EUI64_INTF_ID_LEN];
char ipv6_ll_addr[INET6_ADDRSTRLEN] = {0};

/* Link-local IPv6 address autogenerated by kernel with eui64 interface-id
* derived from the MAC address of the host interface.
*/
Expand Down Expand Up @@ -406,7 +406,7 @@ void RouteOrch::doTask(Consumer& consumer)
vector<FieldValueTuple> v;
key = vrf + i.first.to_string();
auto x = KeyOpFieldsValuesTuple(key, DEL_COMMAND, v);
consumer.m_toSync[key] = x;
consumer.addToSync(x);
}
}
m_resync = true;
Expand Down
1 change: 1 addition & 0 deletions tests/mock_tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ LDADD_GTEST = -L/usr/src/gtest
tests_SOURCES = aclorch_ut.cpp \
portsorch_ut.cpp \
saispy_ut.cpp \
consumer_ut.cpp \
ut_saihelper.cpp \
mock_orchagent_main.cpp \
mock_dbconnector.cpp \
Expand Down
61 changes: 5 additions & 56 deletions tests/mock_tests/aclorch_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,54 +23,6 @@ namespace aclorch_test
{
using namespace std;

size_t consumerAddToSync(Consumer *consumer, const deque<KeyOpFieldsValuesTuple> &entries)
{
/* Nothing popped */
if (entries.empty())
{
return 0;
}

for (auto &entry : entries)
{
string key = kfvKey(entry);
string op = kfvOp(entry);

/* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
if (consumer->m_toSync.find(key) == consumer->m_toSync.end() || op == DEL_COMMAND)
{
consumer->m_toSync[key] = entry;
}
/* If an old task is still there, we combine the old task with new task */
else
{
KeyOpFieldsValuesTuple existing_data = consumer->m_toSync[key];

auto new_values = kfvFieldsValues(entry);
auto existing_values = kfvFieldsValues(existing_data);

for (auto it : new_values)
{
string field = fvField(it);
string value = fvValue(it);

auto iu = existing_values.begin();
while (iu != existing_values.end())
{
string ofield = fvField(*iu);
if (field == ofield)
iu = existing_values.erase(iu);
else
iu++;
}
existing_values.push_back(FieldValueTuple(field, value));
}
consumer->m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}
return entries.size();
}

struct AclTestBase : public ::testing::Test
{
vector<int32_t *> m_s32list_pool;
Expand Down Expand Up @@ -199,8 +151,7 @@ namespace aclorch_test
auto consumer = unique_ptr<Consumer>(new Consumer(
new swss::ConsumerStateTable(config_db, CFG_ACL_TABLE_TABLE_NAME, 1, 1), m_aclOrch, CFG_ACL_TABLE_TABLE_NAME));

consumerAddToSync(consumer.get(), entries);

consumer->addToSync(entries);
static_cast<Orch *>(m_aclOrch)->doTask(*consumer);
}

Expand All @@ -209,8 +160,7 @@ namespace aclorch_test
auto consumer = unique_ptr<Consumer>(new Consumer(
new swss::ConsumerStateTable(config_db, CFG_ACL_RULE_TABLE_NAME, 1, 1), m_aclOrch, CFG_ACL_RULE_TABLE_NAME));

consumerAddToSync(consumer.get(), entries);

consumer->addToSync(entries);
static_cast<Orch *>(m_aclOrch)->doTask(*consumer);
}

Expand Down Expand Up @@ -381,8 +331,7 @@ namespace aclorch_test
auto consumer = unique_ptr<Consumer>(new Consumer(
new swss::ConsumerStateTable(m_app_db.get(), APP_PORT_TABLE_NAME, 1, 1), gPortsOrch, APP_PORT_TABLE_NAME));

consumerAddToSync(consumer.get(), { { "PortInitDone", EMPTY_PREFIX, { { "", "" } } } });

consumer->addToSync({ { "PortInitDone", EMPTY_PREFIX, { { "", "" } } } });
static_cast<Orch *>(gPortsOrch)->doTask(*consumer.get());
}

Expand Down Expand Up @@ -628,7 +577,7 @@ namespace aclorch_test
// consistency validation with CRM
bool validateResourceCountWithCrm(const AclOrch *aclOrch, CrmOrch *crmOrch)
{
// Verify ACL Tables
// Verify ACL Tables
auto const &resourceMap = Portal::CrmOrchInternal::getResourceMap(crmOrch);
uint32_t crm_acl_table_cnt = 0;
for (auto const &kv : resourceMap.at(CrmResourceType::CRM_ACL_TABLE).countersMap)
Expand All @@ -642,7 +591,7 @@ namespace aclorch_test
<< ") and AclOrch " << Portal::AclOrchInternal::getAclTables(aclOrch).size();
return false;
}


// Verify ACL Rules
//
Expand Down
Loading

0 comments on commit dc695fb

Please sign in to comment.