Skip to content

Commit

Permalink
Make sairedis/syncd synchronous (sonic-net#476)
Browse files Browse the repository at this point in the history
* Make sairedis/syncd synchronous

* Add wait for response for bulk api

* Update aspell

* Address comments

* Add support for bulk route create in saiplayer

* Fix spelling

* Make synchronous mode optional and disabled by default
  • Loading branch information
kcudnik authored Jul 17, 2019
1 parent bd33da9 commit e193e9d
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 15 deletions.
4 changes: 4 additions & 0 deletions lib/inc/sai_redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ extern void recordLine(std::string s);
extern std::string joinFieldValues(
_In_ const std::vector<swss::FieldValueTuple> &values);

extern sai_status_t internal_api_wait_for_response(
_In_ sai_common_api_t api);

// other global declarations

extern volatile bool g_record;
extern volatile bool g_useTempView;
extern volatile bool g_asicInitViewMode;
extern volatile bool g_logrotate;
extern volatile bool g_syncMode;

extern sai_service_method_table_t g_services;
extern std::shared_ptr<swss::ProducerTable> g_asicState;
Expand Down
13 changes: 13 additions & 0 deletions lib/inc/sairedis.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ typedef enum _sai_redis_switch_attr_t
*/
SAI_REDIS_SWITCH_ATTR_PERFORM_LOG_ROTATE,

/**
* @brief Synchronous mode.
*
* Enable or disable synchronous mode. When enabled syncd also needs to be
* running in synchronous mode. Command pipeline will be disabled when this
* flag will be set to true.
*
* @type bool
* @flags CREATE_AND_SET
* @default false
*/
SAI_REDIS_SWITCH_ATTR_SYNC_MODE,

} sai_redis_switch_attr_t;

/*
Expand Down
6 changes: 2 additions & 4 deletions lib/src/sai_redis_generic_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,7 @@ sai_status_t internal_redis_generic_create(

g_asicState->set(key, entry, "create");

// we assume create will always succeed which may not be true
// we should make this synchronous call
return SAI_STATUS_SUCCESS;
return internal_api_wait_for_response(SAI_COMMON_API_CREATE);
}

sai_status_t redis_generic_create(
Expand Down Expand Up @@ -389,7 +387,7 @@ sai_status_t internal_redis_bulk_generic_create(
g_asicState->set(key, entries, "bulkcreate");
}

return SAI_STATUS_SUCCESS;
return internal_api_wait_for_response(SAI_COMMON_API_CREATE);
}

#define REDIS_ENTRY_CREATE(OT,ot) \
Expand Down
2 changes: 1 addition & 1 deletion lib/src/sai_redis_generic_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ sai_status_t internal_redis_generic_get(
const std::string &op = kfvOp(kco);
const std::string &opkey = kfvKey(kco);

SWSS_LOG_DEBUG("response: op = %s, key = %s", opkey.c_str(), op.c_str());
SWSS_LOG_INFO("response: op = %s, key = %s", opkey.c_str(), op.c_str());

if (op != "getresponse") // ignore non response messages
{
Expand Down
8 changes: 4 additions & 4 deletions lib/src/sai_redis_generic_remove.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ sai_status_t internal_redis_generic_remove(

g_asicState->del(key, "remove");

return SAI_STATUS_SUCCESS;
return internal_api_wait_for_response(SAI_COMMON_API_REMOVE);
}

sai_status_t redis_generic_remove(
Expand Down Expand Up @@ -118,10 +118,10 @@ sai_status_t internal_redis_bulk_generic_remove(
}

/*
* Capital 'C' stands for bulk CREATE operation.
* Capital 'R' stands for bulk CREATE operation.
*/

recordLine("C|" + str_object_type + joined);
recordLine("R|" + str_object_type + joined);
}

// key: object_type:count
Expand All @@ -134,7 +134,7 @@ sai_status_t internal_redis_bulk_generic_remove(
g_asicState->set(key, entries, "bulkremove");
}

return SAI_STATUS_SUCCESS;
return internal_api_wait_for_response(SAI_COMMON_API_CREATE);
}


Expand Down
81 changes: 79 additions & 2 deletions lib/src/sai_redis_generic_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,83 @@
#include "meta/sai_serialize.h"
#include "meta/saiattributelist.h"

sai_status_t internal_api_wait_for_response(
_In_ sai_common_api_t api)
{
SWSS_LOG_ENTER();

if (!g_syncMode)
{
/*
* By default sync mode is disabled and all create/set/remove are
* considered success operations.
*/

return SAI_STATUS_SUCCESS;
}

SWSS_LOG_INFO("waiting for response %d", api);

swss::Select s;

s.addSelectable(g_redisGetConsumer.get());

while (true)
{
SWSS_LOG_INFO("wait for %d api response", api);

swss::Selectable *sel;

// get timeout and selector is used for all quad api's
int result = s.select(&sel, GET_RESPONSE_TIMEOUT);

if (result == swss::Select::OBJECT)
{
swss::KeyOpFieldsValuesTuple kco;

g_redisGetConsumer->pop(kco);

const std::string &op = kfvOp(kco);
const std::string &opkey = kfvKey(kco);

SWSS_LOG_INFO("response: op = %s, key = %s", opkey.c_str(), op.c_str());

if (op != "getresponse") // ignore non response messages
{
continue;
}

sai_status_t status;
sai_deserialize_status(opkey, status);

if (g_record)
{
const std::string &str_status = kfvKey(kco);
const std::vector<swss::FieldValueTuple> &values = kfvFieldsValues(kco);

// first serialized is status
recordLine("G|" + str_status + "|" + joinFieldValues(values));
}

SWSS_LOG_DEBUG("generic %d api status: %d", api, status);

return status;
}

SWSS_LOG_ERROR("generic %d api failed due to SELECT operation result: %s", api, getSelectResultAsString(result).c_str());
break;
}

if (g_record)
{
recordLine("G|SAI_STATUS_FAILURE");
}

SWSS_LOG_ERROR("generic %d api failed to get response", api);

return SAI_STATUS_FAILURE;
}

sai_status_t internal_redis_generic_set(
_In_ sai_object_type_t object_type,
_In_ const std::string &serialized_object_id,
Expand All @@ -28,7 +105,7 @@ sai_status_t internal_redis_generic_set(

g_asicState->set(key, entry, "set");

return SAI_STATUS_SUCCESS;
return internal_api_wait_for_response(SAI_COMMON_API_SET);
}

sai_status_t internal_redis_bulk_generic_set(
Expand Down Expand Up @@ -110,7 +187,7 @@ sai_status_t internal_redis_bulk_generic_set(
g_asicState->set(key, entries, "bulkset");
}

return SAI_STATUS_SUCCESS;
return internal_api_wait_for_response(SAI_COMMON_API_CREATE);
}


Expand Down
2 changes: 2 additions & 0 deletions lib/src/sai_redis_interfacequery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ sai_status_t sai_api_initialize(
g_redisNotifications = std::make_shared<swss::NotificationConsumer>(g_dbNtf.get(), "NOTIFICATIONS");
g_redisClient = std::make_shared<swss::RedisClient>(g_db.get());

g_asicState->setBuffered(false); // in sync mode, always false

clear_local_state();

g_asicInitViewMode = false;
Expand Down
20 changes: 20 additions & 0 deletions lib/src/sai_redis_switch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

volatile bool g_asicInitViewMode = false; // default mode is apply mode
volatile bool g_useTempView = false;
volatile bool g_syncMode = false;

sai_status_t sai_redis_internal_notify_syncd(
_In_ const std::string& key)
Expand Down Expand Up @@ -266,7 +267,26 @@ sai_status_t redis_set_switch_attribute(
g_useTempView = attr->value.booldata;
return SAI_STATUS_SUCCESS;

case SAI_REDIS_SWITCH_ATTR_SYNC_MODE:

g_syncMode = attr->value.booldata;

if (g_syncMode)
{
SWSS_LOG_NOTICE("disabling buffered pipeline in sync mode");
g_asicState->setBuffered(false);
}

return SAI_STATUS_SUCCESS;

case SAI_REDIS_SWITCH_ATTR_USE_PIPELINE:

if (g_syncMode)
{
SWSS_LOG_WARN("use pipeline is not supported in sync mode");
return SAI_STATUS_NOT_SUPPORTED;
}

g_asicState->setBuffered(attr->value.booldata);
return SAI_STATUS_SUCCESS;

Expand Down
54 changes: 53 additions & 1 deletion saiplayer/saiplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,54 @@ sai_status_t handle_bulk_route(

return status;
}
else if (api == (sai_common_api_t)SAI_COMMON_API_BULK_CREATE)
{
std::vector<uint32_t> attr_count;

std::vector<const sai_attribute_t*> attr_list;

// route can have multiple attributes, so we need to handle them all
for (const auto &alist: attributes)
{
attr_list.push_back(alist->get_attr_list());
attr_count.push_back(alist->get_attr_count());
}

SWSS_LOG_NOTICE("executing BULK route create with %zu routes", attr_count.size());

sai_status_t status = sai_bulk_create_route_entry(
(uint32_t)routes.size(),
routes.data(),
attr_count.data(),
attr_list.data(),
SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, // TODO we need to get that from recording
statuses.data());

if (status != SAI_STATUS_SUCCESS)
{
// Entire API fails, so no need to compare statuses.
return status;
}

for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i] != recorded_statuses[i])
{
/*
* If recorded statuses are different than received, throw
* exception since data don't match.
*/

SWSS_LOG_THROW("recorded status is %s but returned is %s on %s",
sai_serialize_status(recorded_statuses[i]).c_str(),
sai_serialize_status(statuses[i]).c_str(),
object_ids[i].c_str());
}
}

return status;

}
else
{
SWSS_LOG_THROW("api %d is not supported in bulk route", api);
Expand All @@ -981,7 +1029,8 @@ void processBulk(
return;
}

if (api != (sai_common_api_t)SAI_COMMON_API_BULK_SET)
if (api != (sai_common_api_t)SAI_COMMON_API_BULK_SET &&
api != (sai_common_api_t)SAI_COMMON_API_BULK_CREATE)
{
SWSS_LOG_THROW("bulk common api %d is not supported yet, FIXME", api);
}
Expand Down Expand Up @@ -1154,6 +1203,9 @@ int replay(int argc, char **argv)
case 'S':
processBulk((sai_common_api_t)SAI_COMMON_API_BULK_SET, line);
continue;
case 'C':
processBulk((sai_common_api_t)SAI_COMMON_API_BULK_CREATE, line);
continue;
case 'g':
api = SAI_COMMON_API_GET;
break;
Expand Down
Loading

0 comments on commit e193e9d

Please sign in to comment.