From a73eb66546900aeea9dc451b6f5a509cfdd8d654 Mon Sep 17 00:00:00 2001 From: pavel-shirshov Date: Fri, 22 Nov 2019 11:07:36 -0800 Subject: [PATCH] [docker-fpm-frr]: Refactor bgpcfgd (#3789) --- dockers/docker-fpm-frr/bgpcfgd | 383 +++++++++++++++++++++++---------- 1 file changed, 266 insertions(+), 117 deletions(-) diff --git a/dockers/docker-fpm-frr/bgpcfgd b/dockers/docker-fpm-frr/bgpcfgd index b98a3f7ad45a..120e07fcdbe2 100755 --- a/dockers/docker-fpm-frr/bgpcfgd +++ b/dockers/docker-fpm-frr/bgpcfgd @@ -8,11 +8,11 @@ import syslog import signal import traceback import os -import shutil import tempfile import json from collections import defaultdict from pprint import pprint +from pprint import pformat import jinja2 import netaddr @@ -34,6 +34,7 @@ def run_command(command, shell=False): return p.returncode, stdout, stderr + class TemplateFabric(object): def __init__(self): j2_template_paths = ['/usr/share/sonic/templates'] @@ -76,130 +77,18 @@ class TemplateFabric(object): return addr.version == 6 -class BGPConfigManager(object): - def __init__(self, daemon): - self.bgp_asn = None - self.meta = None - self.neig_meta = {} - self.bgp_messages = [] - self.peers = self.load_peers() # we can have bgp monitors peers here. it could be fixed by adding support for it here - fabric = TemplateFabric() - self.bgp_peer_add_template = fabric.from_file('bgpd.peer.conf.j2') - self.bgp_peer_del_template = fabric.from_string('no neighbor {{ neighbor_addr }}') - self.bgp_peer_shutdown = fabric.from_string('neighbor {{ neighbor_addr }} shutdown') - self.bgp_peer_no_shutdown = fabric.from_string('no neighbor {{ neighbor_addr }} shutdown') - daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, self.__metadata_handler) - daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME, self.__neighbor_metadata_handler) - daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, self.__bgp_handler) - - def load_peers(self): - peers = set() - command = ["vtysh", "-c", "show bgp neighbors json"] - rc, out, err = run_command(command) - if rc == 0: - js_bgp = json.loads(out) - peers = set(js_bgp.keys()) - return peers - - def __metadata_handler(self, key, op, data): - if key != "localhost" \ - or "bgp_asn" not in data \ - or self.bgp_asn == data["bgp_asn"]: - return - - # TODO add ASN update commands - - self.meta = { 'localhost': data } - self.bgp_asn = data["bgp_asn"] - self.__update_bgp() - - def __neighbor_metadata_handler(self, key, op, data): - if op == swsscommon.SET_COMMAND: - self.neig_meta[key] = data - elif op == swsscommon.DEL_COMMAND: - if key in self.neig_meta: - del self.neig_meta[key] - else: - syslog.syslog(syslog.LOG_ERR,"Can't remove key '%s' from neighbor metadata handler. The key doesn't exist" % key) - else: - syslog.syslog(syslog.LOG_ERR,"Wrong operation '%s' for neighbor metadata handler" % op) - self.__update_bgp() - - def __update_bgp(self): - cmds = [] - new_bgp_messages = [] - for key, op, data in self.bgp_messages: - if op == swsscommon.SET_COMMAND: - if key not in self.peers: - if 'name' in data and data['name'] not in self.neig_meta: - # DEVICE_NEIGHBOR_METADATA should be populated before the rendering - new_bgp_messages.append((key, op, data)) - continue - try: - txt = self.bgp_peer_add_template.render(DEVICE_METADATA=self.meta, DEVICE_NEIGHBOR_METADATA=self.neig_meta, neighbor_addr=key, bgp_session=data) - cmds.append(txt) - except: - syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data)) - else: - syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data)) - self.peers.add(key) - else: - # when the peer is already configured we support "shutdown/no shutdown" - # commands for the peers only - if "admin_status" in data: - if data['admin_status'] == 'up': - cmds.append(self.bgp_peer_no_shutdown.render(neighbor_addr=key)) - syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key)) - elif data['admin_status'] == 'down': - cmds.append(self.bgp_peer_shutdown.render(neighbor_addr=key)) - syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key)) - else: - syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status'])) - else: - syslog.syslog(syslog.LOG_INFO, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key)) - elif op == swsscommon.DEL_COMMAND: - if key in self.peers: - cmds.append(self.bgp_peer_del_template.render(neighbor_addr=key)) - syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key)) - self.peers.remove(key) - else: - syslog.syslog(syslog.LOG_WARNING, 'Peer {} is not found'.format(key)) - self.bgp_messages = new_bgp_messages - - if len(cmds) == 0: - return - - fd, tmp_filename = tempfile.mkstemp(dir='/tmp') - os.close(fd) - with open (tmp_filename, 'w') as fp: - fp.write('router bgp %s\n' % self.bgp_asn) - for cmd in cmds: - fp.write("%s\n" % cmd) - - command = ["vtysh", "-f", tmp_filename] - run_command(command) #FIXME - os.remove(tmp_filename) - - def __bgp_handler(self, key, op, data): - self.bgp_messages.append((key, op, data)) - # If ASN is not set, we just cache this message until the ASN is set. - if self.bgp_asn is not None: - self.__update_bgp() - - class Daemon(object): SELECT_TIMEOUT = 1000 - DATABASE_LIST = [ swsscommon.CONFIG_DB ] def __init__(self): - self.db_connectors = { db : swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0) for db in Daemon.DATABASE_LIST } + self.db_connectors = {} self.selector = swsscommon.Select() self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> [] self.subscribers = set() def add_manager(self, db, table_name, callback): - if db not in Daemon.DATABASE_LIST: - raise ValueError("database {} isn't supported. Supported '{}' only.".format(db, ",".join(Daemon.DATABASE_LIST))) + if db not in self.db_connectors: + self.db_connectors[db] = swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0) if table_name not in self.callbacks[db]: conn = self.db_connectors[db] @@ -226,6 +115,260 @@ class Daemon(object): callback(key, op, dict(fvs)) +class Directory(object): + def __init__(self): + self.data = defaultdict(dict) + self.notify = defaultdict(lambda: defaultdict(list)) + + def path_exist(self, slot, path): + if slot not in self.data: + return False + elif path == '': + return True + d = self.data[slot] + for p in path.split("/"): + if p not in d: + return False + d = d[p] + return True + + def get_path(self, slot, path): + if slot not in self.data: + return None + elif path == '': + return self.data[slot] + d = self.data[slot] + for p in path.split("/"): + if p not in d: + return None + d = d[p] + return d + + def put(self, slot, key, value): + self.data[slot][key] = value + if slot in self.notify: + for path in self.notify[slot].keys(): + if self.path_exist(slot, path): + for handler in self.notify[slot][path]: + handler() + + def get(self, slot, key): + return self.data[slot][key] + + def remove(self, slot, key): + if slot in self.data: + if key in self.data[slot]: + del self.data[slot][key] + else: + syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot)) + else: + syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot)) + + def remove_slot(self, slot, key): + if slot in self.data: + del self.data[slot] + else: + syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove slot '%s'. The slot doesn't exist" % slot) + + def get_slot(self, slot): + return self.data[slot] + + def available_slot(self, slot): + return slot in self.data + + def available_deps(self, deps): + res = True + for slot, path in deps: + res = res and self.path_exist(slot, path) + return res + + def subscribe(self, deps, handler): + for slot, path in deps: + self.notify[slot][path].append(handler) + + +class Manager(object): + def __init__(self, daemon, directory, deps, database, table_name): + self.directory = directory + self.deps = deps + self.set_queue = [] + daemon.add_manager(database, table_name, self.handler) + directory.subscribe(deps, self.on_deps_change) + + def handler(self, key, op, data): + if op == swsscommon.SET_COMMAND: + if self.directory.available_deps(self.deps): + res = self.set_handler(key, data) + if not res: + self.set_queue.append((key, data)) + else: + self.set_queue.append((key, data)) + elif op == swsscommon.DEL_COMMAND: + self.del_handler(key) + else: + syslog.syslog(syslog.LOG_ERR, 'Invalid operation "%s" for key "%s"' % (op, key)) + + def on_deps_change(self): + new_queue = [] + for key, data in self.set_queue: + res = self.set_handler(key, data) + if not res: + new_queue.append((key, data)) + self.set_queue = new_queue + + def set_handler(self, key, data): + syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__)) + + def del_handler(self, key): + syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__)) + + +class BGPDeviceMetaMgr(Manager): + def __init__(self, daemon, directory): + super(BGPDeviceMetaMgr, self).__init__( + daemon, + directory, + [], + swsscommon.CONFIG_DB, + swsscommon.CFG_DEVICE_METADATA_TABLE_NAME + ) + + def set_handler(self, key, data): + if key != "localhost" or "bgp_asn" not in data: + return + if self.directory.path_exist("meta", "localhost/bgp_asn"): + bgp_asn = self.directory.get_path("meta", "localhost/bgp_asn") + if bgp_asn == data["bgp_asn"]: + return + self.directory.put("meta", key, data) + + return True + + def del_handler(self, key): + self.directory.remove("meta", key) + + +class BGPNeighborMetaMgr(Manager): + def __init__(self, daemon, directory): + super(BGPNeighborMetaMgr, self).__init__( + daemon, + directory, + [], + swsscommon.CONFIG_DB, + swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME + ) + + def set_handler(self, key, data): + self.directory.put("neigmeta", key, data) + + return True + + def del_handler(self, key): + self.directory.remove("neigmeta", key) + + +class BGPPeerMgr(Manager): + def __init__(self, daemon, directory): + super(BGPPeerMgr, self).__init__( + daemon, + directory, + [ + ("meta", "localhost/bgp_asn"), + ("neigmeta", ""), + ], + swsscommon.CONFIG_DB, + swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME + ) + self.peers = self.load_peers() + fabric = TemplateFabric() + self.templates = { + "add": fabric.from_file('bgpd.peer.conf.j2'), + "delete": fabric.from_string('no neighbor {{ neighbor_addr }}'), + "shutdown": fabric.from_string('neighbor {{ neighbor_addr }} shutdown'), + "no shutdown": fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'), + } + + def set_handler(self, key, data): + if key not in self.peers: + cmd = None + neigmeta = self.directory.get_slot("neigmeta") + if 'name' in data and data["name"] not in neigmeta: + return False + try: + cmd = self.templates["add"].render( + DEVICE_METADATA=self.directory.get_slot("meta"), + DEVICE_NEIGHBOR_METADATA=neigmeta, + neighbor_addr=key, + bgp_session=data + ) + except: + syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data)) + return True + if cmd is not None: + rc = self.apply_op(cmd) + if rc: + self.peers.add(key) + syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data)) + else: + syslog.syslog(syslog.LOG_ERR, "Peer {} wasn't added.".format(key)) + else: + # when the peer is already configured we support "shutdown/no shutdown" + # commands for the peers only + if "admin_status" in data: + if data['admin_status'] == 'up': + rc = self.apply_op(self.templates["no shutdown"].render(neighbor_addr=key)) + if rc: + syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key)) + else: + syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'up'.".format(key)) + elif data['admin_status'] == 'down': + rc = self.apply_op(self.templates["shutdown"].render(neighbor_addr=key)) + if rc: + syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key)) + else: + syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'down'.".format(key)) + else: + syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status'])) + else: + syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key)) + return True + + def del_handler(self, key): + if key not in self.peers: + syslog.syslog(syslog.LOG_WARNING, 'Peer {} has not been found'.format(key)) + return + cmd = self.templates["delete"].render(neighbor_addr=key) + rc = self.apply_op(cmd) + if rc: + syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key)) + self.peers.remove(key) + else: + syslog.syslog(syslog.LOG_ERR, "Peer {} hasn't been removed".format(key)) + + def apply_op(self, cmd): + bgp_asn = self.directory.get_slot("meta")["localhost"]["bgp_asn"] + fd, tmp_filename = tempfile.mkstemp(dir='/tmp') + os.close(fd) + with open(tmp_filename, 'w') as fp: + fp.write('router bgp %s\n' % bgp_asn) + fp.write("%s\n" % cmd) + + command = ["vtysh", "-f", tmp_filename] + rc, _, _ = run_command(command) + os.remove(tmp_filename) + return rc == 0 + + @staticmethod + def load_peers(): + peers = set() + command = ["vtysh", "-c", "show bgp neighbors json"] + rc, out, err = run_command(command) + if rc == 0: + js_bgp = json.loads(out) + peers = set(js_bgp.keys()) + return peers + + def wait_for_bgpd(): # wait for 20 seconds stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20) @@ -240,9 +383,15 @@ def wait_for_bgpd(): def main(): + managers = [ + BGPDeviceMetaMgr, + BGPNeighborMetaMgr, + BGPPeerMgr, + ] wait_for_bgpd() daemon = Daemon() - bgp_manager = BGPConfigManager(daemon) + directory = Directory() + manager_instanses = [ manager(daemon, directory) for manager in managers ] daemon.run()