From fa9093c7182d38f5ea687ebfbd04c2c89462f86d Mon Sep 17 00:00:00 2001 From: Tamer Ahmed Date: Wed, 22 Jul 2020 16:04:46 -0700 Subject: [PATCH] [configdb] Add Ability to Query/Update Redis Using Pipelines Redis recommend using pipeline in order to obtain optimal speen when handling large volume of data. Te pipeline API also given an ability devide work in batches. singed-off-by: Tamer Ahmed --- src/swsssdk/__init__.py | 2 +- src/swsssdk/configdb.py | 141 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 134 insertions(+), 9 deletions(-) diff --git a/src/swsssdk/__init__.py b/src/swsssdk/__init__.py index d7cf391f550c..982286020342 100644 --- a/src/swsssdk/__init__.py +++ b/src/swsssdk/__init__.py @@ -9,7 +9,7 @@ try: from .dbconnector import SonicDBConfig, SonicV2Connector - from .configdb import ConfigDBConnector + from .configdb import ConfigDBConnector, ConfigDBPipeConnector from .sonic_db_dump_load import sonic_db_dump_load except (KeyError, ValueError): msg = "Failed to database connector objects -- incorrect database config schema." diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index 4ccbb0dca32e..ad99564bd925 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -108,12 +108,12 @@ def listen(self): (table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) if self.handlers.has_key(table): client = self.get_redis_client(self.db_name) - data = self.__raw_to_typed(client.hgetall(key)) + data = self.raw_to_typed(client.hgetall(key)) self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries - def __raw_to_typed(self, raw_data): + def raw_to_typed(self, raw_data): if raw_data == None: return None typed_data = {} @@ -141,7 +141,7 @@ def __raw_to_typed(self, raw_data): typed_data[key] = raw_data[raw_key] return typed_data - def __typed_to_raw(self, typed_data): + def typed_to_raw(self, typed_data): if typed_data == None: return None elif typed_data == {}: @@ -187,7 +187,7 @@ def set_entry(self, table, key, data): client.delete(_hash) else: original = self.get_entry(table, key) - client.hmset(_hash, self.__typed_to_raw(data)) + client.hmset(_hash, self.typed_to_raw(data)) for k in [ k for k in original.keys() if k not in data.keys() ]: if type(original[k]) == list: k = k + '@' @@ -208,7 +208,7 @@ def mod_entry(self, table, key, data): if data == None: client.delete(_hash) else: - client.hmset(_hash, self.__typed_to_raw(data)) + client.hmset(_hash, self.typed_to_raw(data)) def get_entry(self, table, key): """Read a table entry from config db. @@ -222,7 +222,7 @@ def get_entry(self, table, key): key = self.serialize_key(key) client = self.get_redis_client(self.db_name) _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) - return self.__raw_to_typed(client.hgetall(_hash)) + return self.raw_to_typed(client.hgetall(_hash)) def get_keys(self, table, split=True): """Read all keys of a table from config db. @@ -266,7 +266,7 @@ def get_table(self, table): data = {} for key in keys: try: - entry = self.__raw_to_typed(client.hgetall(key)) + entry = self.raw_to_typed(client.hgetall(key)) if entry != None: if PY3K: key = key.decode('utf-8') @@ -328,10 +328,135 @@ def get_config(self): key = key.decode('utf-8') try: (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) - entry = self.__raw_to_typed(client.hgetall(key)) + entry = self.raw_to_typed(client.hgetall(key)) if entry != None: data.setdefault(table_name, {})[self.deserialize_key(row)] = entry except ValueError: pass #Ignore non table-formated redis entries return data + +class ConfigDBPipeConnector(ConfigDBConnector): + REDIS_SCAN_BATCH_SIZE = 30 + + def __init__(self, **kwargs): + super(ConfigDBPipeConnector, self).__init__(**kwargs) + + def __delete_entries(self, client, pipe, pattern, cursor): + """Helper method to delete table entries from config db using Redis pipeline + with batch size of REDIS_SCAN_BATCH_SIZE. + The caller should call pipeline execute once ready + Args: + client: Redis client + pipe: Redis DB pipe + pattern: key pattern + cursor: position to start scanning from + + Returns: + cur: poition of next item to scan + """ + cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE) + for key in keys: + pipe.delete(key) + + return cur + + def __delete_table(self, client, pipe, table): + """Helper method to delete table entries from config db using Redis pipeline. + The caller should call pipeline execute once ready + Args: + client: Redis client + pipe: Redis DB pipe + table: Table name. + """ + pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR) + cur = self.__delete_entries(client, pipe, pattern, 0) + while cur != 0: + cur = self.__delete_entries(client, pipe, pattern, cur) + + def __mod_entry(self, pipe, table, key, data): + """Modify a table entry to config db. + Args: + table: Table name. + pipe: Redis DB pipe + table: Table name. + key: Key of table entry, or a tuple of keys if it is a multi-key table. + data: Table row data in a form of dictionary {'column_key': 'value', ...}. + Pass {} as data will create an entry with no column if not already existed. + Pass None as data will delete the entry. + """ + key = self.serialize_key(key) + _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) + if data == None: + pipe.delete(_hash) + else: + pipe.hmset(_hash, self.typed_to_raw(data)) + + def mod_config(self, data): + """Write multiple tables into config db. + Extra entries/fields in the db which are not in the data are kept. + Args: + data: config data in a dictionary form + { + 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...}, + 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...}, + ... + } + """ + client = self.get_redis_client(self.db_name) + pipe = client.pipeline() + for table_name in data: + table_data = data[table_name] + if table_data == None: + self.__delete_table(client, pipe, table_name) + continue + for key in table_data: + self.__mod_entry(pipe, table_name, key, table_data[key]) + pipe.execute() + client.bgsave() + + def _get_config(self, client, pipe, data, cursor): + """Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines + Args: + client: Redis client + pipe: Redis DB pipe + data: config dictionary + cursor: position to start scanning from + + Returns: + cur: poition of next item to scan + """ + cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE) + keys = [key.decode('utf-8') for key in keys if key != self.INIT_INDICATOR] + for key in keys: + pipe.hgetall(key) + records = pipe.execute() + + for index, key in enumerate(keys): + (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) + entry = self.raw_to_typed(records[index]) + if entry is not None: + data.setdefault(table_name, {})[self.deserialize_key(row)] = entry + + return cur + + def get_config(self): + """Read all config data. + Returns: + Config data in a dictionary form of + { + 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...}, + 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...}, + ... + } + """ + client = self.get_redis_client(self.db_name) + pipe = client.pipeline() + data = {} + + cur = self._get_config(client, pipe, data, 0) + while cur != 0: + cur = self._get_config(client, pipe, data, cur) + + return data +