From 4df5cab5bdc59f1ea5c96346d2cfaf4f7698bc43 Mon Sep 17 00:00:00 2001 From: Stepan Blyshchak <38952541+stepanblyschak@users.noreply.github.com> Date: Thu, 9 Feb 2023 03:22:59 +0200 Subject: [PATCH] [ResponsePublisher] add pipeline support (#2511) * [ResponsePublisher] add pipeline support Why I did it I did it to improve performance when sending many responses. Responses are buffered in redis client before beeing sent out to redis server, while orchagent has no more pending tasks to do, responses are flushed to redis. --- .gitignore | 1 + orchagent/orch.cpp | 5 +++ orchagent/orch.h | 5 +++ orchagent/orchdaemon.cpp | 5 +++ orchagent/response_publisher.cpp | 37 +++++++++++-------- orchagent/response_publisher.h | 24 +++++++++--- tests/mock_tests/Makefile.am | 21 ++++++++++- tests/mock_tests/fake_response_publisher.cpp | 6 ++- .../response_publisher_ut.cpp | 37 +++++++++++++++++++ 9 files changed, 117 insertions(+), 24 deletions(-) create mode 100644 tests/mock_tests/response_publisher/response_publisher_ut.cpp diff --git a/.gitignore b/.gitignore index 13debec21ad4..04c3b514c769 100644 --- a/.gitignore +++ b/.gitignore @@ -74,6 +74,7 @@ swssconfig/swssplayer tlm_teamd/tlm_teamd teamsyncd/teamsyncd tests/tests +tests/mock_tests/tests_response_publisher tests/mock_tests/tests_fpmsyncd diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index 26093354c1db..5690d85dd417 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -562,6 +562,11 @@ void Orch::dumpPendingTasks(vector &ts) } } +void Orch::flushResponses() +{ + m_publisher.flush(); +} + void Orch::logfileReopen() { gRecordOfs.close(); diff --git a/orchagent/orch.h b/orchagent/orch.h index 6c620a3ef4e4..efee98a73c29 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -223,6 +223,11 @@ class Orch static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple); void dumpPendingTasks(std::vector &ts); + + /** + * @brief Flush pending responses + */ + void flushResponses(); protected: ConsumerMap m_consumerMap; diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 18295a9f59a8..5f432502566d 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -677,6 +677,11 @@ void OrchDaemon::flush() SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status); handleSaiFailure(true); } + + for (auto* orch: m_orchList) + { + orch->flushResponses(); + } } /* Release the file handle so the log can be rotated */ diff --git a/orchagent/response_publisher.cpp b/orchagent/response_publisher.cpp index 5d0490167c57..169075faa495 100644 --- a/orchagent/response_publisher.cpp +++ b/orchagent/response_publisher.cpp @@ -90,7 +90,10 @@ void RecordResponse(const std::string &response_channel, const std::string &key, } // namespace -ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) +ResponsePublisher::ResponsePublisher(bool buffered) : + m_db(std::make_unique("APPL_STATE_DB", 0)), + m_pipe(std::make_unique(m_db.get())), + m_buffered(buffered) { } @@ -107,17 +110,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key } std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL"; - if (m_notifiers.find(table) == m_notifiers.end()) - { - m_notifiers[table] = std::make_unique(&m_db, response_channel); - } + swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered}; auto intent_attrs_copy = intent_attrs; // Add error message as the first field-value-pair. swss::FieldValueTuple err_str("err_str", PrependedComponent(status) + status.message()); intent_attrs_copy.insert(intent_attrs_copy.begin(), err_str); // Sends the response to the notification channel. - m_notifiers[table]->send(status.codeStr(), key, intent_attrs_copy); + notificationProducer.send(status.codeStr(), key, intent_attrs_copy); RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr()); } @@ -140,17 +140,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key void ResponsePublisher::writeToDB(const std::string &table, const std::string &key, const std::vector &values, const std::string &op, bool replace) { - if (m_tables.find(table) == m_tables.end()) - { - m_tables[table] = std::make_unique(&m_db, table); - } + swss::Table applStateTable{m_pipe.get(), table, m_buffered}; auto attrs = values; if (op == SET_COMMAND) { if (replace) { - m_tables[table]->del(key); + applStateTable.del(key); } if (!values.size()) { @@ -160,9 +157,9 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k // Write to DB only if the key does not exist or non-NULL attributes are // being written to the entry. std::vector fv; - if (!m_tables[table]->get(key, fv)) + if (!applStateTable.get(key, fv)) { - m_tables[table]->set(key, attrs); + applStateTable.set(key, attrs); RecordDBWrite(table, key, attrs, op); return; } @@ -179,13 +176,23 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k } if (attrs.size()) { - m_tables[table]->set(key, attrs); + applStateTable.set(key, attrs); RecordDBWrite(table, key, attrs, op); } } else if (op == DEL_COMMAND) { - m_tables[table]->del(key); + applStateTable.del(key); RecordDBWrite(table, key, {}, op); } } + +void ResponsePublisher::flush() +{ + m_pipe->flush(); +} + +void ResponsePublisher::setBuffered(bool buffered) +{ + m_buffered = buffered; +} diff --git a/orchagent/response_publisher.h b/orchagent/response_publisher.h index cd688112e860..db882d9c705a 100644 --- a/orchagent/response_publisher.h +++ b/orchagent/response_publisher.h @@ -16,7 +16,8 @@ class ResponsePublisher : public ResponsePublisherInterface { public: - explicit ResponsePublisher(); + explicit ResponsePublisher(bool buffered = false); + virtual ~ResponsePublisher() = default; // Intent attributes are the attributes sent in the notification into the @@ -42,10 +43,21 @@ class ResponsePublisher : public ResponsePublisherInterface void writeToDB(const std::string &table, const std::string &key, const std::vector &values, const std::string &op, bool replace = false) override; + /** + * @brief Flush pending responses + */ + void flush(); + + /** + * @brief Set buffering mode + * + * @param buffered Flag whether responses are buffered + */ + void setBuffered(bool buffered); + private: - swss::DBConnector m_db; - // Maps table names to tables. - std::unordered_map> m_tables; - // Maps table names to notifiers. - std::unordered_map> m_notifiers; + std::unique_ptr m_db; + std::unique_ptr m_pipe; + + bool m_buffered{false}; }; diff --git a/tests/mock_tests/Makefile.am b/tests/mock_tests/Makefile.am index 02bb54dd25ba..6b648d5f4547 100644 --- a/tests/mock_tests/Makefile.am +++ b/tests/mock_tests/Makefile.am @@ -4,9 +4,9 @@ P4_ORCH_DIR = $(top_srcdir)/orchagent/p4orch CFLAGS_SAI = -I /usr/include/sai -TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd +TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher -noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd +noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher LDADD_SAI = -lsaimeta -lsaimetadata -lsaivs -lsairedis @@ -183,3 +183,20 @@ tests_fpmsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST tests_fpmsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_fpmsyncd_INCLUDES) tests_fpmsyncd_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \ -lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread -lgmock -lgmock_main + +## response publisher unit tests + +tests_response_publisher_SOURCES = response_publisher/response_publisher_ut.cpp \ + $(top_srcdir)/orchagent/response_publisher.cpp \ + mock_orchagent_main.cpp \ + mock_dbconnector.cpp \ + mock_table.cpp \ + mock_hiredis.cpp \ + mock_redisreply.cpp + +tests_response_publisher_INCLUDES = $(tests_INCLUDES) +tests_response_publisher_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) +tests_response_publisher_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_response_publisher_INCLUDES) +tests_response_publisher_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \ + -lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread + diff --git a/tests/mock_tests/fake_response_publisher.cpp b/tests/mock_tests/fake_response_publisher.cpp index 94480913d50e..4c2c2b037098 100644 --- a/tests/mock_tests/fake_response_publisher.cpp +++ b/tests/mock_tests/fake_response_publisher.cpp @@ -3,7 +3,7 @@ #include "response_publisher.h" -ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) {} +ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique("APPL_STATE_DB", 0)), m_buffered(buffered) {} void ResponsePublisher::publish( const std::string& table, const std::string& key, @@ -20,3 +20,7 @@ void ResponsePublisher::writeToDB( const std::string& table, const std::string& key, const std::vector& values, const std::string& op, bool replace) {} + +void ResponsePublisher::flush() {} + +void ResponsePublisher::setBuffered(bool buffered) {} diff --git a/tests/mock_tests/response_publisher/response_publisher_ut.cpp b/tests/mock_tests/response_publisher/response_publisher_ut.cpp new file mode 100644 index 000000000000..3738ac6d8752 --- /dev/null +++ b/tests/mock_tests/response_publisher/response_publisher_ut.cpp @@ -0,0 +1,37 @@ +#include "response_publisher.h" + +#include + +bool gResponsePublisherRecord{false}; +bool gResponsePublisherLogRotate{false}; +std::ofstream gResponsePublisherRecordOfs; +std::string gResponsePublisherRecordFile; + +using namespace swss; + +TEST(ResponsePublisher, TestPublish) +{ + DBConnector conn{"APPL_STATE_DB", 0}; + Table stateTable{&conn, "SOME_TABLE"}; + std::string value; + ResponsePublisher publisher{}; + + publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS)); + ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value)); + ASSERT_EQ(value, "value"); +} + +TEST(ResponsePublisher, TestPublishBuffered) +{ + DBConnector conn{"APPL_STATE_DB", 0}; + Table stateTable{&conn, "SOME_TABLE"}; + std::string value; + ResponsePublisher publisher{}; + + publisher.setBuffered(true); + + publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS)); + publisher.flush(); + ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value)); + ASSERT_EQ(value, "value"); +}