diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index 4ebfb8860dcc..283c71ae0eaf 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4959,7 +4959,10 @@ def test_check_identifier_presence_and_setup_channel_with_mock_not_none(self): @patch('proto_out.linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub', MagicMock(return_value=True)) def test_setup_grpc_channel_for_port(self): - rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") + with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: + + patched_util.get_asic_id_for_logical_port.return_value = 0 + rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") assert(rc == (None, None)) @@ -5306,4 +5309,25 @@ def test_get_mux_cable_static_info_without_presence(self): assert(rc['nic_lane1_postcursor1'] == 'N/A') assert(rc['nic_lane1_postcursor2'] == 'N/A') + def test_get_grpc_credentials(self): + + kvp = {} + type = None + + rc = get_grpc_credentials(type, kvp) + + assert(rc == None) + + + @patch('builtins.open') + def test_get_grpc_credentials_root(self, open): + + kvp = {"ca_crt": "file"} + type = "server" + + mock_file = MagicMock() + mock_file.read = MagicMock(return_value=bytes('abcdefgh', 'utf-8')) + open.return_value = mock_file + rc = get_grpc_credentials(type, kvp) + assert(rc != None) diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index eab6e84b5412..dd64a7359ad7 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -56,6 +56,13 @@ # port id 0 -> maps to T0 # port id 1 -> maps to LT0 +GRPC_CLIENT_OPTIONS = [ + ('grpc.keepalive_timeout_ms', 8000), + ('grpc.keepalive_time_ms', 4000), + ('grpc.keepalive_permit_without_calls', True), + ('grpc.http2.max_pings_without_data', 0) +] + SYSLOG_IDENTIFIER = "y_cable_helper" helper_logger = logger.Logger(SYSLOG_IDENTIFIER) @@ -360,13 +367,78 @@ def retry_setup_grpc_channel_for_port(port, asic_index): grpc_port_stubs[port] = stub return True + +def get_grpc_credentials(type, kvp): + + root_file = kvp.get("ca_crt", None) + if root_file is not None: + root_cert = open(root_file, 'rb').read() + else: + helper_logger.log_error("grpc credential channel setup no root file in config_db") + return None + + if type == "mutual": + cert_file = kvp.get("client_crt", None) + if cert_file is not None: + cert_chain = open(cert_file, 'rb').read() + else: + helper_logger.log_error("grpc credential channel setup no cert file for mutual authentication in config_db") + return None + + key_file = kvp.get("client_key", None) + if key_file is not None: + key = open(key_file, 'rb').read() + else: + helper_logger.log_error("grpc credential channel setup no key file for mutual authentication in config_db") + return None + + credential = grpc.ssl_channel_credentials( + root_certificates=root_cert, + private_key=key, + certificate_chain=cert_chain) + elif type == "server": + credential = grpc.ssl_channel_credentials( + root_certificates=root_cert) + else: + #should not happen + helper_logger.log_error("grpc credential channel setup no type specified for authentication in config_db") + return None + + return credential + +def create_channel(type,level, kvp, soc_ip): + + retries = 3 + for _ in range(retries): + + if type == "secure": + credential = get_grpc_credentials(level, kvp) + target_name = kvp.get("grpc_ssl_credential", None) + if credential is None or target_name is None: + return (None, None) + + GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) + + channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) + else: + channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) + + channel_ready = grpc.channel_ready_future(channel) + + try: + channel_ready.result(timeout=2) + except grpc.FutureTimeoutError: + channel = None + stub = None + else: + break + + return channel, stub + def setup_grpc_channel_for_port(port, soc_ip): - """ - root_cert = open('/etc/sonic/credentials/ca-chain-bundle.cert.pem', 'rb').read() - key = open('/etc/sonic/credentials/client.key.pem', 'rb').read() - cert_chain = open('/etc/sonic/credentials/client.cert.pem', 'rb').read() - """ """ Dummy values for lab for now TODO remove these once done @@ -381,23 +453,41 @@ def setup_grpc_channel_for_port(port, soc_ip): """ helper_logger.log_notice("Setting up gRPC channel for RPC's {} {}".format(port,soc_ip)) - retries = 3 - for _ in range(retries): - channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=[('grpc.keepalive_timeout_ms', 8000), - ('grpc.keepalive_time_ms', 4000), - ('grpc.keepalive_permit_without_calls', True), - ('grpc.http2.max_pings_without_data', 0)]) - stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) + config_db,grpc_config = {}, {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + grpc_config[asic_id] = swsscommon.Table(config_db[asic_id], "GRPCCLIENT") - channel_ready = grpc.channel_ready_future(channel) + asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port(port) - try: - channel_ready.result(timeout=2) - except grpc.FutureTimeoutError: - channel = None - stub = None - else: - break + #if no config from config DB, treat channel to be as insecure + type = "insecure" + level = "server" + + (status, fvs) = grpc_config[asic_index].get("config") + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table kvp config for {} for setting up channel type".format(port, grpc_config[asic_index].getTableName())) + else: + grpc_config_dict = dict(fvs) + type = grpc_config_dict.get("type", None) + level = grpc_config_dict.get("auth_level", None) + + + kvp = {} + if type == "secure": + (status, fvs) = grpc_config[asic_index].get("certs") + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table kvp certs for {} for setting up channel type".format(port, grpc_config[asic_index].getTableName())) + #if type is secure, must have certs defined + return (None, None) + kvp = dict(fvs) + + + channel, stub = create_channel(type, level, kvp, soc_ip) if stub is None: helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))