From e193e9d9d35f27e37aab2c2631c4a81d3148f3b0 Mon Sep 17 00:00:00 2001 From: Kamil Cudnik Date: Wed, 17 Jul 2019 10:26:18 +0200 Subject: [PATCH] Make sairedis/syncd synchronous (#476) * Make sairedis/syncd synchronous * Add wait for response for bulk api * Update aspell * Address comments * Add support for bulk route create in saiplayer * Fix spelling * Make synchronous mode optional and disabled by default --- lib/inc/sai_redis.h | 4 ++ lib/inc/sairedis.h | 13 +++++ lib/src/sai_redis_generic_create.cpp | 6 +-- lib/src/sai_redis_generic_get.cpp | 2 +- lib/src/sai_redis_generic_remove.cpp | 8 +-- lib/src/sai_redis_generic_set.cpp | 81 +++++++++++++++++++++++++++- lib/src/sai_redis_interfacequery.cpp | 2 + lib/src/sai_redis_switch.cpp | 20 +++++++ saiplayer/saiplayer.cpp | 54 ++++++++++++++++++- syncd/syncd.cpp | 59 ++++++++++++++++++-- tests/aspell.en.pws | 2 + 11 files changed, 236 insertions(+), 15 deletions(-) diff --git a/lib/inc/sai_redis.h b/lib/inc/sai_redis.h index bf29a8dead12..e0fd780f63bc 100644 --- a/lib/inc/sai_redis.h +++ b/lib/inc/sai_redis.h @@ -54,12 +54,16 @@ extern void recordLine(std::string s); extern std::string joinFieldValues( _In_ const std::vector &values); +extern sai_status_t internal_api_wait_for_response( + _In_ sai_common_api_t api); + // other global declarations extern volatile bool g_record; extern volatile bool g_useTempView; extern volatile bool g_asicInitViewMode; extern volatile bool g_logrotate; +extern volatile bool g_syncMode; extern sai_service_method_table_t g_services; extern std::shared_ptr g_asicState; diff --git a/lib/inc/sairedis.h b/lib/inc/sairedis.h index e70a03ad759d..610e3952e5f7 100644 --- a/lib/inc/sairedis.h +++ b/lib/inc/sairedis.h @@ -104,6 +104,19 @@ typedef enum _sai_redis_switch_attr_t */ SAI_REDIS_SWITCH_ATTR_PERFORM_LOG_ROTATE, + /** + * @brief Synchronous mode. + * + * Enable or disable synchronous mode. When enabled syncd also needs to be + * running in synchronous mode. Command pipeline will be disabled when this + * flag will be set to true. + * + * @type bool + * @flags CREATE_AND_SET + * @default false + */ + SAI_REDIS_SWITCH_ATTR_SYNC_MODE, + } sai_redis_switch_attr_t; /* diff --git a/lib/src/sai_redis_generic_create.cpp b/lib/src/sai_redis_generic_create.cpp index 49361c5a66c6..e9dbfd3d189a 100644 --- a/lib/src/sai_redis_generic_create.cpp +++ b/lib/src/sai_redis_generic_create.cpp @@ -234,9 +234,7 @@ sai_status_t internal_redis_generic_create( g_asicState->set(key, entry, "create"); - // we assume create will always succeed which may not be true - // we should make this synchronous call - return SAI_STATUS_SUCCESS; + return internal_api_wait_for_response(SAI_COMMON_API_CREATE); } sai_status_t redis_generic_create( @@ -389,7 +387,7 @@ sai_status_t internal_redis_bulk_generic_create( g_asicState->set(key, entries, "bulkcreate"); } - return SAI_STATUS_SUCCESS; + return internal_api_wait_for_response(SAI_COMMON_API_CREATE); } #define REDIS_ENTRY_CREATE(OT,ot) \ diff --git a/lib/src/sai_redis_generic_get.cpp b/lib/src/sai_redis_generic_get.cpp index 0c79293dc51d..8eb35221927b 100644 --- a/lib/src/sai_redis_generic_get.cpp +++ b/lib/src/sai_redis_generic_get.cpp @@ -205,7 +205,7 @@ sai_status_t internal_redis_generic_get( const std::string &op = kfvOp(kco); const std::string &opkey = kfvKey(kco); - SWSS_LOG_DEBUG("response: op = %s, key = %s", opkey.c_str(), op.c_str()); + SWSS_LOG_INFO("response: op = %s, key = %s", opkey.c_str(), op.c_str()); if (op != "getresponse") // ignore non response messages { diff --git a/lib/src/sai_redis_generic_remove.cpp b/lib/src/sai_redis_generic_remove.cpp index 372aa2f47f97..d6afc1c69b9b 100644 --- a/lib/src/sai_redis_generic_remove.cpp +++ b/lib/src/sai_redis_generic_remove.cpp @@ -21,7 +21,7 @@ sai_status_t internal_redis_generic_remove( g_asicState->del(key, "remove"); - return SAI_STATUS_SUCCESS; + return internal_api_wait_for_response(SAI_COMMON_API_REMOVE); } sai_status_t redis_generic_remove( @@ -118,10 +118,10 @@ sai_status_t internal_redis_bulk_generic_remove( } /* - * Capital 'C' stands for bulk CREATE operation. + * Capital 'R' stands for bulk CREATE operation. */ - recordLine("C|" + str_object_type + joined); + recordLine("R|" + str_object_type + joined); } // key: object_type:count @@ -134,7 +134,7 @@ sai_status_t internal_redis_bulk_generic_remove( g_asicState->set(key, entries, "bulkremove"); } - return SAI_STATUS_SUCCESS; + return internal_api_wait_for_response(SAI_COMMON_API_CREATE); } diff --git a/lib/src/sai_redis_generic_set.cpp b/lib/src/sai_redis_generic_set.cpp index 1bd7ad80eb56..e084d9440209 100644 --- a/lib/src/sai_redis_generic_set.cpp +++ b/lib/src/sai_redis_generic_set.cpp @@ -2,6 +2,83 @@ #include "meta/sai_serialize.h" #include "meta/saiattributelist.h" +sai_status_t internal_api_wait_for_response( + _In_ sai_common_api_t api) +{ + SWSS_LOG_ENTER(); + + if (!g_syncMode) + { + /* + * By default sync mode is disabled and all create/set/remove are + * considered success operations. + */ + + return SAI_STATUS_SUCCESS; + } + + SWSS_LOG_INFO("waiting for response %d", api); + + swss::Select s; + + s.addSelectable(g_redisGetConsumer.get()); + + while (true) + { + SWSS_LOG_INFO("wait for %d api response", api); + + swss::Selectable *sel; + + // get timeout and selector is used for all quad api's + int result = s.select(&sel, GET_RESPONSE_TIMEOUT); + + if (result == swss::Select::OBJECT) + { + swss::KeyOpFieldsValuesTuple kco; + + g_redisGetConsumer->pop(kco); + + const std::string &op = kfvOp(kco); + const std::string &opkey = kfvKey(kco); + + SWSS_LOG_INFO("response: op = %s, key = %s", opkey.c_str(), op.c_str()); + + if (op != "getresponse") // ignore non response messages + { + continue; + } + + sai_status_t status; + sai_deserialize_status(opkey, status); + + if (g_record) + { + const std::string &str_status = kfvKey(kco); + const std::vector &values = kfvFieldsValues(kco); + + // first serialized is status + recordLine("G|" + str_status + "|" + joinFieldValues(values)); + } + + SWSS_LOG_DEBUG("generic %d api status: %d", api, status); + + return status; + } + + SWSS_LOG_ERROR("generic %d api failed due to SELECT operation result: %s", api, getSelectResultAsString(result).c_str()); + break; + } + + if (g_record) + { + recordLine("G|SAI_STATUS_FAILURE"); + } + + SWSS_LOG_ERROR("generic %d api failed to get response", api); + + return SAI_STATUS_FAILURE; +} + sai_status_t internal_redis_generic_set( _In_ sai_object_type_t object_type, _In_ const std::string &serialized_object_id, @@ -28,7 +105,7 @@ sai_status_t internal_redis_generic_set( g_asicState->set(key, entry, "set"); - return SAI_STATUS_SUCCESS; + return internal_api_wait_for_response(SAI_COMMON_API_SET); } sai_status_t internal_redis_bulk_generic_set( @@ -110,7 +187,7 @@ sai_status_t internal_redis_bulk_generic_set( g_asicState->set(key, entries, "bulkset"); } - return SAI_STATUS_SUCCESS; + return internal_api_wait_for_response(SAI_COMMON_API_CREATE); } diff --git a/lib/src/sai_redis_interfacequery.cpp b/lib/src/sai_redis_interfacequery.cpp index 16fb96accc30..8a9761f069d1 100644 --- a/lib/src/sai_redis_interfacequery.cpp +++ b/lib/src/sai_redis_interfacequery.cpp @@ -124,6 +124,8 @@ sai_status_t sai_api_initialize( g_redisNotifications = std::make_shared(g_dbNtf.get(), "NOTIFICATIONS"); g_redisClient = std::make_shared(g_db.get()); + g_asicState->setBuffered(false); // in sync mode, always false + clear_local_state(); g_asicInitViewMode = false; diff --git a/lib/src/sai_redis_switch.cpp b/lib/src/sai_redis_switch.cpp index f664232d250a..36248ccc7918 100644 --- a/lib/src/sai_redis_switch.cpp +++ b/lib/src/sai_redis_switch.cpp @@ -7,6 +7,7 @@ volatile bool g_asicInitViewMode = false; // default mode is apply mode volatile bool g_useTempView = false; +volatile bool g_syncMode = false; sai_status_t sai_redis_internal_notify_syncd( _In_ const std::string& key) @@ -266,7 +267,26 @@ sai_status_t redis_set_switch_attribute( g_useTempView = attr->value.booldata; return SAI_STATUS_SUCCESS; + case SAI_REDIS_SWITCH_ATTR_SYNC_MODE: + + g_syncMode = attr->value.booldata; + + if (g_syncMode) + { + SWSS_LOG_NOTICE("disabling buffered pipeline in sync mode"); + g_asicState->setBuffered(false); + } + + return SAI_STATUS_SUCCESS; + case SAI_REDIS_SWITCH_ATTR_USE_PIPELINE: + + if (g_syncMode) + { + SWSS_LOG_WARN("use pipeline is not supported in sync mode"); + return SAI_STATUS_NOT_SUPPORTED; + } + g_asicState->setBuffered(attr->value.booldata); return SAI_STATUS_SUCCESS; diff --git a/saiplayer/saiplayer.cpp b/saiplayer/saiplayer.cpp index a15bab185c02..0eb1f6235b8a 100644 --- a/saiplayer/saiplayer.cpp +++ b/saiplayer/saiplayer.cpp @@ -964,6 +964,54 @@ sai_status_t handle_bulk_route( return status; } + else if (api == (sai_common_api_t)SAI_COMMON_API_BULK_CREATE) + { + std::vector attr_count; + + std::vector attr_list; + + // route can have multiple attributes, so we need to handle them all + for (const auto &alist: attributes) + { + attr_list.push_back(alist->get_attr_list()); + attr_count.push_back(alist->get_attr_count()); + } + + SWSS_LOG_NOTICE("executing BULK route create with %zu routes", attr_count.size()); + + sai_status_t status = sai_bulk_create_route_entry( + (uint32_t)routes.size(), + routes.data(), + attr_count.data(), + attr_list.data(), + SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, // TODO we need to get that from recording + statuses.data()); + + if (status != SAI_STATUS_SUCCESS) + { + // Entire API fails, so no need to compare statuses. + return status; + } + + for (size_t i = 0; i < statuses.size(); ++i) + { + if (statuses[i] != recorded_statuses[i]) + { + /* + * If recorded statuses are different than received, throw + * exception since data don't match. + */ + + SWSS_LOG_THROW("recorded status is %s but returned is %s on %s", + sai_serialize_status(recorded_statuses[i]).c_str(), + sai_serialize_status(statuses[i]).c_str(), + object_ids[i].c_str()); + } + } + + return status; + + } else { SWSS_LOG_THROW("api %d is not supported in bulk route", api); @@ -981,7 +1029,8 @@ void processBulk( return; } - if (api != (sai_common_api_t)SAI_COMMON_API_BULK_SET) + if (api != (sai_common_api_t)SAI_COMMON_API_BULK_SET && + api != (sai_common_api_t)SAI_COMMON_API_BULK_CREATE) { SWSS_LOG_THROW("bulk common api %d is not supported yet, FIXME", api); } @@ -1154,6 +1203,9 @@ int replay(int argc, char **argv) case 'S': processBulk((sai_common_api_t)SAI_COMMON_API_BULK_SET, line); continue; + case 'C': + processBulk((sai_common_api_t)SAI_COMMON_API_BULK_CREATE, line); + continue; case 'g': api = SAI_COMMON_API_GET; break; diff --git a/syncd/syncd.cpp b/syncd/syncd.cpp index a24f79bf5e0c..353e1ae85ddc 100644 --- a/syncd/syncd.cpp +++ b/syncd/syncd.cpp @@ -103,6 +103,7 @@ struct cmdOptions bool run_rpc_server; std::string portMapFile; #endif // SAITHRIFT + bool syncMode; ~cmdOptions() {} }; @@ -1005,6 +1006,33 @@ void internal_syncd_get_send( SWSS_LOG_INFO("response for GET api was send"); } +void internal_syncd_api_send_response( + _In_ sai_common_api_t api, + _In_ sai_status_t status) +{ + SWSS_LOG_ENTER(); + + /* + * By default synchronous mode is disabled and can be enabled by command + * line on syncd start. This will also require to enable synchronous mode + * in OA/sairedis because same GET RESPONSE channel is used to generate + * response for sairedis quad API. + */ + + if (!options.syncMode) + return; + + std::vector entry; + + std::string str_status = sai_serialize_status(status); + + SWSS_LOG_INFO("sending response for %d api with status: %s", api, str_status.c_str()); + + getResponse->set(str_status, entry, "getresponse"); + + SWSS_LOG_INFO("response for %d api was send", api); +} + const char* profile_get_value( _In_ sai_switch_profile_id_t profile_id, _In_ const char* variable) @@ -1467,7 +1495,7 @@ void sendNotifyResponse( std::vector entry; - SWSS_LOG_NOTICE("sending response: %s", str_status.c_str()); + SWSS_LOG_INFO("sending response: %s", str_status.c_str()); getResponse->set(str_status, entry, "notify"); } @@ -2240,6 +2268,8 @@ sai_status_t processEventInInitViewMode( } } + internal_syncd_api_send_response(api, SAI_STATUS_SUCCESS); + return SAI_STATUS_SUCCESS; case SAI_COMMON_API_REMOVE: @@ -2277,6 +2307,8 @@ sai_status_t processEventInInitViewMode( initViewRemovedVidSet.insert(object_vid); } + internal_syncd_api_send_response(api, SAI_STATUS_SUCCESS); + return SAI_STATUS_SUCCESS; case SAI_COMMON_API_SET: @@ -2285,6 +2317,8 @@ sai_status_t processEventInInitViewMode( * We support SET api on all objects in init view mode. */ + internal_syncd_api_send_response(api, SAI_STATUS_SUCCESS); + return SAI_STATUS_SUCCESS; case SAI_COMMON_API_GET: @@ -2479,6 +2513,7 @@ sai_status_t handle_bulk_generic( if (status != SAI_STATUS_SUCCESS) { + internal_syncd_api_send_response(api, status); return status; } } @@ -2587,6 +2622,8 @@ sai_status_t processBulkEvent( sai_serialize_status(status).c_str()); } + internal_syncd_api_send_response(api, status); + return status; } @@ -2850,6 +2887,8 @@ sai_status_t processEvent( } else if (status != SAI_STATUS_SUCCESS) { + internal_syncd_api_send_response(api, status); + if (!info->isnonobjectid && api == SAI_COMMON_API_SET) { sai_object_id_t vid; @@ -2872,6 +2911,11 @@ sai_status_t processEvent( key.c_str(), sai_serialize_status(status).c_str()); } + else // non GET api, status is SUCCESS + { + internal_syncd_api_send_response(api, status); + } + } while (!consumer.empty()); return status; @@ -3060,7 +3104,7 @@ bool processFlexCounterEvent( if (!try_translate_vid_to_rid(vid, rid)) { - SWSS_LOG_WARN("port VID %s, was not found (probably port was removed/splitted) and will remove from counters now", + SWSS_LOG_WARN("port VID %s, was not found (probably port was removed/splitted) and will remove from counters now", sai_serialize_object_id(vid).c_str()); return false; @@ -3216,7 +3260,7 @@ void printUsage() { SWSS_LOG_ENTER(); - std::cout << "Usage: syncd [-N] [-U] [-d] [-p profile] [-i interval] [-t [cold|warm|fast|fastfast]] [-h] [-u] [-S]" << std::endl; + std::cout << "Usage: syncd [-N] [-U] [-d] [-p profile] [-i interval] [-t [cold|warm|fast|fastfast]] [-h] [-u] [-S] [-s]" << std::endl; std::cout << " -N --nocounters" << std::endl; std::cout << " Disable counter thread" << std::endl; std::cout << " -d --diag" << std::endl; @@ -3241,6 +3285,8 @@ void printUsage() std::cout << " -m --portmap" << std::endl; std::cout << " Specify port map file" << std::endl; #endif // SAITHRIFT + std::cout << " -s --syncMode" << std::endl; + std::cout << " Enable synchronous mode" << std::endl; std::cout << " -h --help" << std::endl; std::cout << " Print out this message" << std::endl; } @@ -3251,6 +3297,7 @@ void handleCmdLine(int argc, char **argv) options.disableExitSleep = false; options.enableUnittests = false; + options.syncMode = false; #ifdef SAITHRIFT options.run_rpc_server = false; @@ -3275,6 +3322,7 @@ void handleCmdLine(int argc, char **argv) { "rpcserver", no_argument, 0, 'r' }, { "portmap", required_argument, 0, 'm' }, #endif // SAITHRIFT + { "syncMode", no_argument, 0, 's' }, { 0, 0, 0, 0 } }; @@ -3355,6 +3403,11 @@ void handleCmdLine(int argc, char **argv) break; #endif // SAITHRIFT + case 's': + SWSS_LOG_NOTICE("enable synchronous mode"); + options.syncMode = true; + break; + case 'h': printUsage(); exit(EXIT_SUCCESS); diff --git a/tests/aspell.en.pws b/tests/aspell.en.pws index 3430c32b3e8a..21b18b89ed76 100644 --- a/tests/aspell.en.pws +++ b/tests/aspell.en.pws @@ -244,3 +244,5 @@ VXLAN workaroung xoff xon +booldata +setBuffered