Skip to content

Commit

Permalink
[configdb] Add Ability to Query/Update Redis Using Pipelines
Browse files Browse the repository at this point in the history
Redis recommend using pipeline in order to obtain optimal speen when
handling large volume of data. Te pipeline API also given an ability
devide work in batches.

singed-off-by: Tamer Ahmed <[email protected]>
  • Loading branch information
tahmed-dev committed Jul 22, 2020
1 parent 2df4f40 commit fa9093c
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/swsssdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

try:
from .dbconnector import SonicDBConfig, SonicV2Connector
from .configdb import ConfigDBConnector
from .configdb import ConfigDBConnector, ConfigDBPipeConnector
from .sonic_db_dump_load import sonic_db_dump_load
except (KeyError, ValueError):
msg = "Failed to database connector objects -- incorrect database config schema."
Expand Down
141 changes: 133 additions & 8 deletions src/swsssdk/configdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ def listen(self):
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if self.handlers.has_key(table):
client = self.get_redis_client(self.db_name)
data = self.__raw_to_typed(client.hgetall(key))
data = self.raw_to_typed(client.hgetall(key))
self.__fire(table, row, data)
except ValueError:
pass #Ignore non table-formated redis entries

def __raw_to_typed(self, raw_data):
def raw_to_typed(self, raw_data):
if raw_data == None:
return None
typed_data = {}
Expand Down Expand Up @@ -141,7 +141,7 @@ def __raw_to_typed(self, raw_data):
typed_data[key] = raw_data[raw_key]
return typed_data

def __typed_to_raw(self, typed_data):
def typed_to_raw(self, typed_data):
if typed_data == None:
return None
elif typed_data == {}:
Expand Down Expand Up @@ -187,7 +187,7 @@ def set_entry(self, table, key, data):
client.delete(_hash)
else:
original = self.get_entry(table, key)
client.hmset(_hash, self.__typed_to_raw(data))
client.hmset(_hash, self.typed_to_raw(data))
for k in [ k for k in original.keys() if k not in data.keys() ]:
if type(original[k]) == list:
k = k + '@'
Expand All @@ -208,7 +208,7 @@ def mod_entry(self, table, key, data):
if data == None:
client.delete(_hash)
else:
client.hmset(_hash, self.__typed_to_raw(data))
client.hmset(_hash, self.typed_to_raw(data))

def get_entry(self, table, key):
"""Read a table entry from config db.
Expand All @@ -222,7 +222,7 @@ def get_entry(self, table, key):
key = self.serialize_key(key)
client = self.get_redis_client(self.db_name)
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
return self.__raw_to_typed(client.hgetall(_hash))
return self.raw_to_typed(client.hgetall(_hash))

def get_keys(self, table, split=True):
"""Read all keys of a table from config db.
Expand Down Expand Up @@ -266,7 +266,7 @@ def get_table(self, table):
data = {}
for key in keys:
try:
entry = self.__raw_to_typed(client.hgetall(key))
entry = self.raw_to_typed(client.hgetall(key))
if entry != None:
if PY3K:
key = key.decode('utf-8')
Expand Down Expand Up @@ -328,10 +328,135 @@ def get_config(self):
key = key.decode('utf-8')
try:
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
entry = self.__raw_to_typed(client.hgetall(key))
entry = self.raw_to_typed(client.hgetall(key))
if entry != None:
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry
except ValueError:
pass #Ignore non table-formated redis entries
return data


class ConfigDBPipeConnector(ConfigDBConnector):
REDIS_SCAN_BATCH_SIZE = 30

def __init__(self, **kwargs):
super(ConfigDBPipeConnector, self).__init__(**kwargs)

def __delete_entries(self, client, pipe, pattern, cursor):
"""Helper method to delete table entries from config db using Redis pipeline
with batch size of REDIS_SCAN_BATCH_SIZE.
The caller should call pipeline execute once ready
Args:
client: Redis client
pipe: Redis DB pipe
pattern: key pattern
cursor: position to start scanning from
Returns:
cur: poition of next item to scan
"""
cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE)
for key in keys:
pipe.delete(key)

return cur

def __delete_table(self, client, pipe, table):
"""Helper method to delete table entries from config db using Redis pipeline.
The caller should call pipeline execute once ready
Args:
client: Redis client
pipe: Redis DB pipe
table: Table name.
"""
pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR)
cur = self.__delete_entries(client, pipe, pattern, 0)
while cur != 0:
cur = self.__delete_entries(client, pipe, pattern, cur)

def __mod_entry(self, pipe, table, key, data):
"""Modify a table entry to config db.
Args:
table: Table name.
pipe: Redis DB pipe
table: Table name.
key: Key of table entry, or a tuple of keys if it is a multi-key table.
data: Table row data in a form of dictionary {'column_key': 'value', ...}.
Pass {} as data will create an entry with no column if not already existed.
Pass None as data will delete the entry.
"""
key = self.serialize_key(key)
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
if data == None:
pipe.delete(_hash)
else:
pipe.hmset(_hash, self.typed_to_raw(data))

def mod_config(self, data):
"""Write multiple tables into config db.
Extra entries/fields in the db which are not in the data are kept.
Args:
data: config data in a dictionary form
{
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
...
}
"""
client = self.get_redis_client(self.db_name)
pipe = client.pipeline()
for table_name in data:
table_data = data[table_name]
if table_data == None:
self.__delete_table(client, pipe, table_name)
continue
for key in table_data:
self.__mod_entry(pipe, table_name, key, table_data[key])
pipe.execute()
client.bgsave()

def _get_config(self, client, pipe, data, cursor):
"""Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
Args:
client: Redis client
pipe: Redis DB pipe
data: config dictionary
cursor: position to start scanning from
Returns:
cur: poition of next item to scan
"""
cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE)
keys = [key.decode('utf-8') for key in keys if key != self.INIT_INDICATOR]
for key in keys:
pipe.hgetall(key)
records = pipe.execute()

for index, key in enumerate(keys):
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
entry = self.raw_to_typed(records[index])
if entry is not None:
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry

return cur

def get_config(self):
"""Read all config data.
Returns:
Config data in a dictionary form of
{
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
...
}
"""
client = self.get_redis_client(self.db_name)
pipe = client.pipeline()
data = {}

cur = self._get_config(client, pipe, data, 0)
while cur != 0:
cur = self._get_config(client, pipe, data, cur)

return data

0 comments on commit fa9093c

Please sign in to comment.