forked from sonic-net/sonic-buildimage
-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[device/alibaba] - Update get_last_firmware_upgrade api to support mu…
…ltiple upgrade
- Loading branch information
Wirut Getbamrung
committed
Mar 28, 2019
1 parent
ff0ca79
commit e6f3779
Showing
3 changed files
with
381 additions
and
180 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import pexpect | ||
import base64 | ||
import time | ||
import json | ||
from datetime import datetime | ||
|
||
try: | ||
|
@@ -28,6 +29,7 @@ def __init__(self): | |
self.bmc_raw_command_url = "http://240.1.1.1:8080/api/sys/raw" | ||
self.fw_upgrade_url = "http://240.1.1.1:8080/api/sys/upgrade" | ||
self.onie_config_file = "/host/machine.conf" | ||
self.fw_upgrade_logger_path = "/usr/local/etc/last_fw_upgrade_logger" | ||
self.cpldb_version_path = "/sys/devices/platform/%s.cpldb/getreg" % self.platform_name | ||
self.fpga_version_path = "/sys/devices/platform/%s.switchboard/FPGA/getreg" % self.platform_name | ||
self.switchboard_cpld1_path = "/sys/devices/platform/%s.switchboard/CPLD1/getreg" % self.platform_name | ||
|
@@ -71,17 +73,19 @@ def __fpga_pci_rescan(self): | |
return 0 | ||
|
||
def get_bmc_pass(self): | ||
with open(self.bmc_pwd_path) as file: | ||
data = file.read() | ||
|
||
key = "bmc" | ||
dec = [] | ||
enc = base64.urlsafe_b64decode(data) | ||
for i in range(len(enc)): | ||
key_c = key[i % len(key)] | ||
dec_c = chr((256 + ord(enc[i]) - ord(key_c)) % 256) | ||
dec.append(dec_c) | ||
return "".join(dec) | ||
if os.path.exists(self.bmc_pwd_path): | ||
with open(self.bmc_pwd_path) as file: | ||
data = file.read() | ||
|
||
key = "bmc" | ||
dec = [] | ||
enc = base64.urlsafe_b64decode(data) | ||
for i in range(len(enc)): | ||
key_c = key[i % len(key)] | ||
dec_c = chr((256 + ord(enc[i]) - ord(key_c)) % 256) | ||
dec.append(dec_c) | ||
return "".join(dec) | ||
return False | ||
|
||
def get_bmc_version(self): | ||
"""Get BMC version from SONiC | ||
|
@@ -103,8 +107,8 @@ def upload_file_bmc(self, fw_path): | |
fw_path) | ||
child = pexpect.spawn(scp_command) | ||
i = child.expect(["[email protected]'s password:"], timeout=30) | ||
if i == 0: | ||
bmc_pwd = self.get_bmc_pass() | ||
bmc_pwd = self.get_bmc_pass() | ||
if i == 0 and bmc_pwd: | ||
child.sendline(bmc_pwd) | ||
data = child.read() | ||
print(data) | ||
|
@@ -227,6 +231,13 @@ def get_fpga_version(self): | |
int(version[2:][:4], 16), int(version[2:][4:], 16)) | ||
return str(version) | ||
|
||
def upgrade_logger(self, upgrade_list): | ||
try: | ||
with open(self.fw_upgrade_logger_path, 'w') as filetowrite: | ||
json.dump(upgrade_list, filetowrite) | ||
except Exception as e: | ||
pass | ||
|
||
def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | ||
""" | ||
@fw_type MANDATORY, firmware type, should be one of the strings: 'cpld', 'fpga', 'bios', 'bmc' | ||
|
@@ -237,14 +248,21 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
or 'cpld_fan_come_board', etc. If None, upgrade all CPLD/FPGA firmware. for fw_type 'bios' and 'bmc', | ||
value should be one of 'master' or 'slave' or 'both' | ||
""" | ||
fw_type = fw_type.lower() | ||
upgrade_list = [] | ||
bmc_pwd = self.get_bmc_pass() | ||
if not bmc_pwd and fw_type != "fpga": | ||
print("Failed: BMC credential not found") | ||
return False | ||
|
||
if fw_type == 'bmc': | ||
# Copy BMC image file to BMC | ||
print("BMC Upgrade") | ||
print("Uploading image to BMC...") | ||
upload_file = self.upload_file_bmc(fw_path) | ||
|
||
if not upload_file: | ||
print("Failed") | ||
print("Failed: Unable to upload BMC image to BMC") | ||
return False | ||
|
||
filename_w_ext = os.path.basename(fw_path) | ||
|
@@ -261,7 +279,7 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
json_data["flash"] = "master" | ||
r = requests.post(self.bmc_info_url, json=json_data) | ||
if r.status_code != 200 or 'success' not in r.json().get('result'): | ||
print("Failed") | ||
print("Failed: Unable to install BMC image") | ||
return False | ||
print("Done") | ||
json_data["flash"] = "slave" | ||
|
@@ -274,11 +292,13 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
reboot_dict["reboot"] = "yes" | ||
r = requests.post(self.bmc_info_url, json=reboot_dict) | ||
print("Done") | ||
return True | ||
else: | ||
print("Failed") | ||
print("Failed: Unable to install BMC image") | ||
return False | ||
|
||
upgrade_list.append(filename_w_ext.lower()) | ||
return True | ||
|
||
elif fw_type == 'fpga': | ||
print("FPGA Upgrade") | ||
command = 'fpga_prog ' + fw_path | ||
|
@@ -295,14 +315,16 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
|
||
if process.returncode == 0: | ||
rc = self.__fpga_pci_rescan() | ||
if rc == 0: | ||
return True | ||
else: | ||
print("Fails to load new FPGA firmware") | ||
if rc != 0: | ||
print("Failed: Unable to load new FPGA firmware") | ||
return False | ||
else: | ||
print("Failed: Invalid fpga image") | ||
return False | ||
|
||
print("Done") | ||
return True | ||
|
||
elif 'cpld' in fw_type: | ||
print("CPLD Upgrade") | ||
# Check input | ||
|
@@ -369,6 +391,7 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
return False | ||
|
||
print("%s cpld upgrade completed\n" % fw_extra_str) | ||
upgrade_list.append(filename_w_ext.lower()) | ||
|
||
# Refresh CPLD | ||
if "COMBO_CPLD" in fw_extra_str_list or "BASE_CPLD" in fw_extra_str_list or "CPU_CPLD" in fw_extra_str_list: | ||
|
@@ -385,6 +408,8 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
return False | ||
|
||
filename_w_ext = os.path.basename(fw_path) | ||
upgrade_list.append(filename_w_ext.lower()) | ||
self.upgrade_logger(upgrade_list) | ||
json_data = dict() | ||
json_data["image_path"] = "[email protected]:/home/root/%s" % filename_w_ext | ||
json_data["password"] = bmc_pwd | ||
|
@@ -395,6 +420,7 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
print("Failed: Invalid refresh image") | ||
return False | ||
|
||
self.upgrade_logger(upgrade_list) | ||
print("Done") | ||
return True | ||
|
||
|
@@ -417,6 +443,8 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
data = child.read() | ||
print(data) | ||
child.close | ||
if not os.path.exists(fw_path): | ||
return False | ||
|
||
json_data = dict() | ||
json_data["data"] = "/usr/bin/ipmitool -b 1 -t 0x2c raw 0x2e 0xdf 0x57 0x01 0x00 0x01" | ||
|
@@ -438,11 +466,15 @@ def firmware_upgrade(self, fw_type, fw_path, fw_extra=None): | |
if r.status_code != 200 or 'success' not in r.json().get('result'): | ||
print("Failed") | ||
return False | ||
|
||
upgrade_list.append(filename_w_ext.lower()) | ||
print("Done") | ||
return True | ||
else: | ||
print("Failed: Invalid firmware type") | ||
return False | ||
|
||
print("Failed: Invalid firmware type") | ||
return False | ||
self.upgrade_logger(upgrade_list) | ||
return True | ||
|
||
def get_last_upgrade_result(self): | ||
""" | ||
|
@@ -451,55 +483,90 @@ def get_last_upgrade_result(self): | |
2) FwPath: path and file name of firmware(passed by method 'firmware_upgrade'), string | ||
3) FwExtra: designated string, econdings of this string is determined by vendor(passed by method 'firmware_upgrade') | ||
4) Result: indicates whether the upgrade action is performed and success/failure status if performed. Values should be one of: "DONE"/"FAILED"/"NOT_PERFORMED". | ||
dict object: | ||
{ | ||
"FwType": "cpld", | ||
"FwPath": "/tmp/fw/x86_64-alibaba-as13-48f8h/cpld_come_fan_board.vme" | ||
"FwExtra": "specific_encoded_string" | ||
"Result": "DONE"/"FAILED"/"NOT_PERFORMED" | ||
} | ||
list of object: | ||
[ | ||
{ | ||
"FwType": "cpld", | ||
"FwPath": "cpu_cpld.vme" | ||
"FwExtra":"CPU_CPLD" | ||
"Result": "DONE" | ||
}, | ||
{ | ||
"FwType": "cpld", | ||
"FwPath": "fan_cpld.vme" | ||
"FwExtra": "FAN_CPLD" | ||
"Result": "FAILED" | ||
}, | ||
{ | ||
"FwType": "cpld", | ||
"FwPath": "refresh_cpld.vme" | ||
"FwExtra": "REFRESH_CPLD" | ||
"Result": "DONE" | ||
} | ||
] | ||
""" | ||
last_update_dict = dict() | ||
last_update_list = [] | ||
local_log = [] | ||
|
||
if os.path.exists(self.fw_upgrade_logger_path): | ||
with open(self.fw_upgrade_logger_path, 'r') as file: | ||
data = file.read() | ||
|
||
local_log = json.loads(data) | ||
upgrade_info_req = requests.get(self.fw_upgrade_url) | ||
if upgrade_info_req.status_code == 200 and 'success' in upgrade_info_req.json().get('result'): | ||
upgrade_info_json = upgrade_info_req.json() | ||
|
||
for info_json_key in upgrade_info_json.keys(): | ||
if info_json_key == "result" or upgrade_info_json[info_json_key] == "None": | ||
continue | ||
|
||
update_dict = dict() | ||
raw_data = upgrade_info_json[info_json_key][-1] | ||
raw_data_list = raw_data.split(",") | ||
fw_path = raw_data_list[1].split("firmware:")[1].strip() | ||
fw_extra_raw = raw_data_list[0].split(":")[0].strip() | ||
fw_result_raw = raw_data_list[0].split(":")[1].strip() | ||
raw_datetime = raw_data_list[2].split("time:")[1].strip() | ||
reformat_datetime = raw_datetime.replace("CST", "UTC") | ||
fw_extra_time = datetime.strptime( | ||
reformat_datetime, '%a %b %d %H:%M:%S %Z %Y') | ||
|
||
fw_extra_str = fw_extra_raw | ||
if info_json_key == "CPLD upgrade log": | ||
for x in range(0, len(upgrade_info_json[info_json_key])): | ||
update_dict = dict() | ||
index = x if info_json_key == "CPLD upgrade log" else -1 | ||
raw_data = upgrade_info_json[info_json_key][index] | ||
raw_data_list = raw_data.split(",") | ||
fw_path = raw_data_list[1].split("firmware:")[1].strip() | ||
fw_extra_raw = raw_data_list[0].split(":")[0].strip() | ||
fw_result_raw = raw_data_list[0].split(":")[1].strip() | ||
raw_datetime = raw_data_list[2].split("time:")[1].strip() | ||
reformat_datetime = raw_datetime.replace("CST", "UTC") | ||
fw_extra_time = datetime.strptime( | ||
reformat_datetime, '%a %b %d %H:%M:%S %Z %Y') | ||
fw_extra_str = { | ||
"top_lc": "TOP_LC_CPLD", | ||
"bot_lc": "BOT_LC_CPLD", | ||
"fan": "FAN_CPLD", | ||
"cpu": "CPU_CPLD", | ||
"base": "BASE_CPLD", | ||
"combo": "COMBO_CPLD", | ||
"switch": "SW_CPLD" | ||
}.get(fw_extra_raw, None) | ||
fw_result = "DONE" if fw_result_raw == "success" else fw_result_raw.upper() | ||
fw_result = "FAILED" if "FAILED" in fw_result else fw_result | ||
fw_result = "NOT_PERFORMED" if fw_result != "DONE" and fw_result != "FAILED" else fw_result | ||
update_dict["FwType"] = info_json_key.split(" ")[0].lower() | ||
update_dict["FwPath"] = fw_path | ||
update_dict["FwExtra"] = fw_extra_str | ||
update_dict["Result"] = fw_result | ||
update_dict["FwTime"] = fw_extra_time | ||
if last_update_dict == dict() or update_dict["FwTime"] > last_update_dict["FwTime"]: | ||
last_update_dict = update_dict | ||
|
||
last_update_dict.pop('FwTime', None) | ||
return last_update_dict | ||
"switch": "SW_CPLD", | ||
"enable": "REFRESH_CPLD", | ||
}.get(fw_extra_raw, fw_extra_raw) | ||
fw_result = "DONE" if fw_result_raw == "success" else fw_result_raw.upper() | ||
fw_result = "FAILED" if "FAILED" in fw_result else fw_result | ||
fw_result = "NOT_PERFORMED" if fw_result != "DONE" and fw_result != "FAILED" else fw_result | ||
update_dict["FwType"] = info_json_key.split(" ")[0].lower() | ||
update_dict["FwPath"] = fw_path | ||
update_dict["FwExtra"] = fw_extra_str | ||
update_dict["Result"] = fw_result | ||
update_dict["FwTime"] = fw_extra_time | ||
last_update_list.append(update_dict) | ||
if info_json_key != "CPLD upgrade log": | ||
break | ||
|
||
last_update_list = sorted(last_update_list, key=lambda i: i['FwTime']) | ||
map(lambda d: d.pop('FwTime', None), last_update_list) | ||
if len(last_update_list) >= len(local_log) and len(last_update_list) != 0: | ||
if str(last_update_list[-1].get('FwType')).lower() != 'cpld' or len(local_log) <= 1: | ||
last_update_list = [last_update_list[-1]] | ||
else: | ||
remote_list = last_update_list[-len(local_log):] | ||
remote_list_value = [d['FwPath'].lower() | ||
for d in remote_list if 'FwPath' in d] | ||
last_update_list = remote_list if remote_list_value == local_log else [ | ||
last_update_list[-1]] | ||
elif len(last_update_list) != 0: | ||
last_update_list = [last_update_list[-1]] | ||
|
||
return last_update_list |
Oops, something went wrong.