Skip to content

Commit

Permalink
Apply custom Si settings via CMIS: SONiC xcvrd platform common changes (
Browse files Browse the repository at this point in the history
sonic-net#384)


Co-authored-by: Prince George <[email protected]>
  • Loading branch information
2 people authored and noaOrMlnx committed Sep 12, 2023
1 parent a5d8321 commit ddac2d4
Show file tree
Hide file tree
Showing 4 changed files with 657 additions and 12 deletions.
322 changes: 320 additions & 2 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ def __init__(self, xcvr_eeprom):
self.vdm = CmisVdmApi(xcvr_eeprom) if not self.is_flat_memory() else None
self.cdb = CmisCdbApi(xcvr_eeprom) if not self.is_flat_memory() else None

def get_manufacturer(self):
'''
This function returns the manufacturer of the module
'''
return self.xcvr_eeprom.read(consts.VENDOR_NAME_FIELD)

def get_model(self):
'''
This function returns the part number of the module
Expand Down Expand Up @@ -2069,7 +2075,7 @@ def get_application(self, lane):

return (appl & 0xf)

def set_application(self, channel, appl_code):
def set_application(self, channel, appl_code, ec=0):
"""
Update the selected application code to the specified lanes on the host side
Expand All @@ -2092,11 +2098,323 @@ def set_application(self, channel, appl_code):
lane_first = lane
addr = "{}_{}_{}".format(consts.STAGED_CTRL_APSEL_FIELD, 0, lane + 1)
data = (appl_code << 4) | (lane_first << 1)
#set EC bit
data|= ec
self.xcvr_eeprom.write(addr, data)

# Apply DataPathInit
def scs_apply_datapath_init(self, channel):
'''
This function applies DataPathInit
'''
return self.xcvr_eeprom.write("%s_%d" % (consts.STAGED_CTRL_APPLY_DPINIT_FIELD, 0), channel)

def get_rx_output_amp_max_val(self):
'''
This function returns the supported RX output amp val
'''
rx_amp_max_val = self.xcvr_eeprom.read(consts.RX_OUTPUT_LEVEL_SUPPORT)
if rx_amp_max_val is None:
return None
return rx_amp_max_val

def get_rx_output_eq_pre_max_val(self):
'''
This function returns the supported RX output eq pre cursor val
'''
rx_pre_max_val = self.xcvr_eeprom.read(consts.RX_OUTPUT_EQ_PRE_CURSOR_MAX)
if rx_pre_max_val is None:
return None
return rx_pre_max_val

def get_rx_output_eq_post_max_val(self):
'''
This function returns the supported RX output eq post cursor val
'''
rx_post_max_val = self.xcvr_eeprom.read(consts.RX_OUTPUT_EQ_POST_CURSOR_MAX)
if rx_post_max_val is None:
return None
return rx_post_max_val

def get_tx_input_eq_max_val(self):
'''
This function returns the supported TX input eq val
'''
tx_input_max_val = self.xcvr_eeprom.read(consts.TX_INPUT_EQ_MAX)
if tx_input_max_val is None:
return None
return tx_input_max_val

def get_tx_cdr_supported(self):
'''
This function returns the supported TX CDR field
'''
tx_cdr_support = self.xcvr_eeprom.read(consts.TX_CDR_SUPPORT_FIELD)
if not tx_cdr_support or tx_cdr_support is None:
return False
return tx_cdr_support

def get_rx_cdr_supported(self):
'''
This function returns the supported RX CDR field
'''
rx_cdr_support = self.xcvr_eeprom.read(consts.RX_CDR_SUPPORT_FIELD)
if not rx_cdr_support or rx_cdr_support is None:
return False
return rx_cdr_support

def get_tx_input_eq_fixed_supported(self):
'''
This function returns the supported TX input eq field
'''
tx_fixed_support = self.xcvr_eeprom.read(consts.TX_INPUT_EQ_FIXED_MANUAL_CTRL_SUPPORT_FIELD)
if not tx_fixed_support or tx_fixed_support is None:
return False
return tx_fixed_support

def get_tx_input_adaptive_eq_supported(self):
'''
This function returns the supported TX input adaptive eq field
'''
tx_adaptive_support = self.xcvr_eeprom.read(consts.TX_INPUT_ADAPTIVE_EQ_SUPPORT_FIELD)
if not tx_adaptive_support or tx_adaptive_support is None:
return False
return tx_adaptive_support

def get_tx_input_recall_buf1_supported(self):
'''
This function returns the supported TX input recall buf1 field
'''
tx_recall_buf1_support = self.xcvr_eeprom.read(consts.TX_INPUT_EQ_RECALL_BUF1_SUPPORT_FIELD)
if not tx_recall_buf1_support or tx_recall_buf1_support is None:
return False
return tx_recall_buf1_support

def get_tx_input_recall_buf2_supported(self):
'''
This function returns the supported TX input recall buf2 field
'''
tx_recall_buf2_support = self.xcvr_eeprom.read(consts.TX_INPUT_EQ_RECALL_BUF2_SUPPORT_FIELD)
if not tx_recall_buf2_support or tx_recall_buf2_support is None:
return False
return tx_recall_buf2_support

def get_rx_ouput_amp_ctrl_supported(self):
'''
This function returns the supported RX output amp control field
'''
rx_amp_support = self.xcvr_eeprom.read(consts.RX_OUTPUT_AMP_CTRL_SUPPORT_FIELD)
if not rx_amp_support or rx_amp_support is None:
return False
return rx_amp_support

def get_rx_output_eq_pre_ctrl_supported(self):
'''
This function returns the supported RX output eq pre control field
'''
rx_pre_support = self.xcvr_eeprom.read(consts.RX_OUTPUT_EQ_PRE_CTRL_SUPPORT_FIELD)
if not rx_pre_support or rx_pre_support is None:
return False
return rx_pre_support

def get_rx_output_eq_post_ctrl_supported(self):
'''
This function returns the supported RX output eq post control field
'''
rx_post_support = self.xcvr_eeprom.read(consts.RX_OUTPUT_EQ_POST_CTRL_SUPPORT_FIELD)
if not rx_post_support or rx_post_support is None:
return False
return rx_post_support

def scs_lane_write(self, si_param, host_lanes_mask, si_settings_dict):
'''
This function sets each lane val based on SI param
'''
for lane in range(self.NUM_CHANNELS):
if ((1 << lane) & host_lanes_mask) == 0:
continue
lane = lane+1
si_param_lane = "{}{}".format(si_param, lane)
si_param_lane_val = si_settings_dict[si_param_lane]
if si_param_lane_val is None:
return False
if not self.xcvr_eeprom.write(si_param_lane, si_param_lane_val):
return False
return True

def stage_output_eq_pre_cursor_target_rx(self, host_lanes_mask, si_settings_dict):
'''
This function applies RX output eq pre cursor settings
'''
rx_pre_max_val = self.get_rx_output_eq_pre_max_val()
if rx_pre_max_val is None:
return False
for lane in range(self.NUM_CHANNELS):
if ((1 << lane) & host_lanes_mask) == 0:
continue
lane = lane+1
si_param_lane = "{}{}".format(consts.OUTPUT_EQ_PRE_CURSOR_TARGET_RX, lane)
si_param_lane_val = si_settings_dict[si_param_lane]
if si_param_lane_val is None or si_param_lane_val > rx_pre_max_val:
return False
if not self.xcvr_eeprom.write(si_param_lane, si_param_lane_val):
return False
return True

def stage_output_eq_post_cursor_target_rx(self, host_lanes_mask, si_settings_dict):
'''
This function applies RX output eq post cursor settings
'''
rx_post_max_val = self.get_rx_output_eq_post_max_val()
if rx_post_max_val is None:
return False
for lane in range(self.NUM_CHANNELS):
if ((1 << lane) & host_lanes_mask) == 0:
continue
lane = lane+1
si_param_lane = "{}{}".format(consts.OUTPUT_EQ_POST_CURSOR_TARGET_RX, lane)
si_param_lane_val = si_settings_dict[si_param_lane]
if si_param_lane_val is None or si_param_lane_val > rx_post_max_val:
return False
if not self.xcvr_eeprom.write(si_param_lane, si_param_lane_val):
return False
return True

def stage_output_amp_target_rx(self, host_lanes_mask, si_settings_dict):
'''
This function applies RX output amp settings
'''
rx_amp_max_val = self.get_rx_output_amp_max_val()
if rx_amp_max_val is None:
return False
for lane in range(self.NUM_CHANNELS):
if ((1 << lane) & host_lanes_mask) == 0:
continue
lane = lane+1
si_param_lane = "{}{}".format(consts.OUTPUT_AMPLITUDE_TARGET_RX, lane)
si_param_lane_val = si_settings_dict[si_param_lane]
if si_param_lane_val is None or si_param_lane_val > rx_amp_max_val:
return False
if not self.xcvr_eeprom.write(si_param_lane, si_param_lane_val):
return False
return True

def stage_fixed_input_target_tx(self, host_lanes_mask, si_settings_dict):
'''
This function applies fixed TX input si settings
'''
tx_fixed_input = self.get_tx_input_eq_max_val()
if tx_fixed_input is None:
return False
for lane in range(self.NUM_CHANNELS):
if ((1 << lane) & host_lanes_mask) == 0:
continue
lane = lane+1
si_param_lane = "{}{}".format(consts.FIXED_INPUT_EQ_TARGET_TX, lane)
si_param_lane_val = si_settings_dict[si_param_lane]
if si_param_lane_val is None or si_param_lane_val > tx_fixed_input:
return False
if not self.xcvr_eeprom.write(si_param_lane, si_param_lane_val):
return False
return True

def stage_adaptive_input_eq_recall_tx(self, host_lanes_mask, si_settings_dict):
'''
This function applies adaptive TX input recall si settings.
'''
if si_settings_dict is None:
return False
return self.scs_lane_write(consts.ADAPTIVE_INPUT_EQ_RECALLED_TX, host_lanes_mask, si_settings_dict)

def stage_adaptive_input_eq_enable_tx(self, host_lanes_mask, si_settings_dict):
'''
This function applies adaptive TX input enable si settings
'''
if si_settings_dict is None:
return False
return self.scs_lane_write(consts.ADAPTIVE_INPUT_EQ_ENABLE_TX, host_lanes_mask, si_settings_dict)

def stage_cdr_tx(self, host_lanes_mask, si_settings_dict):
'''
This function applies TX CDR si settings
'''
if si_settings_dict is None:
return False
return self.scs_lane_write(consts.CDR_ENABLE_TX, host_lanes_mask, si_settings_dict)

def stage_cdr_rx(self, host_lanes_mask, si_settings_dict):
'''
This function applies RX CDR si settings
'''
if si_settings_dict is None:
return False
return self.scs_lane_write(consts.CDR_ENABLE_RX, host_lanes_mask, si_settings_dict)

def stage_rx_si_settings(self, host_lanes_mask, si_settings_dict):
for si_param in si_settings_dict:
if si_param == consts.OUTPUT_EQ_PRE_CURSOR_TARGET_RX:
if self.get_rx_output_eq_pre_ctrl_supported():
if not self.stage_output_eq_pre_cursor_target_rx(host_lanes_mask, si_settings_dict[si_param]):
return False
elif si_param == consts.OUTPUT_EQ_POST_CURSOR_TARGET_RX:
if self.get_rx_output_eq_post_ctrl_supported():
if not self.stage_output_eq_post_cursor_target_rx(host_lanes_mask, si_settings_dict[si_param]):
return False
elif si_param == consts.OUTPUT_AMPLITUDE_TARGET_RX:
if self.get_rx_ouput_amp_ctrl_supported():
if not self.stage_output_amp_target_rx(host_lanes_mask, si_settings_dict[si_param]):
return False
elif si_param == consts.CDR_ENABLE_RX:
if self.get_rx_cdr_supported():
if not self.stage_cdr_rx(host_lanes_mask, si_settings_dict[si_param]):
return False
else:
return False

return True

def stage_tx_si_settings(self, host_lanes_mask, si_settings_dict):
for si_param in si_settings_dict:
if si_param == consts.FIXED_INPUT_EQ_TARGET_TX:
if self.get_tx_input_eq_fixed_supported():
if not self.stage_fixed_input_target_tx(host_lanes_mask, si_settings_dict[si_param]):
return False
elif si_param == consts.ADAPTIVE_INPUT_EQ_RECALLED_TX:
if self.get_tx_input_recall_buf1_supported() or self.get_tx_input_recall_buf2_supported():
if not self.stage_adaptive_input_eq_recall_tx(host_lanes_mask, si_settings_dict[si_param]):
return False
elif si_param == consts.ADAPTIVE_INPUT_EQ_ENABLE_TX:
if self.get_tx_input_adaptive_eq_supported():
if not self.stage_adaptive_input_eq_enable_tx(host_lanes_mask, si_settings_dict[si_param]):
return False
elif si_param == consts.CDR_ENABLE_TX:
if self.get_tx_cdr_supported():
if not self.stage_cdr_tx(host_lanes_mask, si_settings_dict[si_param]):
return False
else:
return False

return True

def stage_custom_si_settings(self, host_lanes_mask, optics_si_dict):
# Create TX/RX specific si_dict
rx_si_settings = {}
tx_si_settings = {}
for si_param in optics_si_dict:
if si_param.endswith("Tx"):
tx_si_settings[si_param] = optics_si_dict[si_param]
elif si_param.endswith("Rx"):
rx_si_settings[si_param] = optics_si_dict[si_param]

# stage RX si settings
if not self.stage_rx_si_settings(host_lanes_mask, rx_si_settings):
return False

# stage TX si settings
if not self.stage_tx_si_settings(host_lanes_mask, tx_si_settings):
return False

return True

def get_error_description(self):
dp_state = self.get_datapath_state()
conf_state = self.get_config_datapath_hostlane_status()
Expand Down
Loading

0 comments on commit ddac2d4

Please sign in to comment.