Skip to content

Commit

Permalink
feat: better log
Browse files Browse the repository at this point in the history
use info and warning level
remove some useless debug log
  • Loading branch information
altor committed Sep 28, 2021
1 parent d2039b2 commit e9c6ac1
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 45 deletions.
4 changes: 3 additions & 1 deletion powerapi/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def receiveMsg_StartMessage(self, message: StartMessage, sender: ActorAddress):

self.initialized = True
self.send(sender, OKMessage(self.name))
self.log_info(self.name + ' started')

def receiveMsg_ErrorMessage(self, message: ErrorMessage, _: ActorAddress):
"""
Expand All @@ -145,14 +146,15 @@ def receiveUnrecognizedMessage(self, message: Any, sender: ActorAddress):
"""
When receiving a message with a type that can't be handle, the actor answer with an ErrorMessage
"""
self.log_debug('received message ' + str(message))
self.log_warning('received unrecognized message : ' + str(message))
self.send(sender, ErrorMessage(self.name, "did not recognize the message type : " + str(type(message))))

def receiveMsg_ActorExitRequest(self, message: ActorExitRequest, _: ActorAddress):
"""
When receive ActorExitRequestMessage log it and exit
"""
self.log_debug('received message ' + str(message))
self.log_info(self.name + ' exit')

def _initialization(self, start_message: StartMessage):
pass
Expand Down
12 changes: 7 additions & 5 deletions powerapi/dispatcher/dispatcher_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,18 @@ def receiveMsg_PoisonMessage(self, message: PoisonMessage, sender: ActorAddress)
log_line += 'with this error stack : ' + message.details
self.log_debug(log_line)
blocking_detector.notify_poison_received(poison_message)
self.log_debug('formula ' + formula_name + ' is blocked : ' + str(blocking_detector.is_blocked()))
if blocking_detector.is_blocked():
self.log_debug('formula ' + formula_name + ' is blocked : ' + str(blocking_detector.is_blocked()))
self.log_debug('restart formula ' + formula_name)
self.log_error('formula ' + formula_name + ' is blocked after this error : ' + message.details)
self._restart_formula(formula_name)
return

def receiveMsg_ActorExitRequest(self, message: ActorExitRequest, _: ActorAddress):
def receiveMsg_ActorExitRequest(self, message: ActorExitRequest, sender: ActorAddress):
"""
When receiving ActorExitRequest, forward it to all formula
"""
self.log_debug('received message ' + str(message))
Actor.receiveMsg_ActorExitRequest(self, message, sender)
for _, (formula, __) in self.formula_pool.items():
self.send(formula, ActorExitRequest())
for _, formula in self.formula_waiting_service.get_all_formula():
Expand Down Expand Up @@ -203,7 +204,7 @@ def receiveMsg_Report(self, message: Report, _: ActorAddress):
self._send_message(formula_name, message)
except KeyError:
formula_name = self._gen_formula_name(formula_id)
self.log_debug('create formula ' + formula_name)
self.log_info('create formula ' + formula_name)
formula = self._create_formula(formula_id, formula_name)
self.formula_name_service.add(formula_id, formula_name)
self.formula_waiting_service.add(formula_name, formula)
Expand Down Expand Up @@ -238,7 +239,7 @@ def receiveMsg_ErrorMessage(self, message: ErrorMessage, _: ActorAddress):
"""
When receiving an ErrorMessage after trying to start a formula, remove formula from waiting service
"""
self.log_debug('received error message ' + str(message))
self.log_info('error while trying to start ' + message.sender_name + ' : ' + message.error_message)
self.formula_waiting_service.remove_formula(message.sender_name)

def receiveMsg_OKMessage(self, message: OKMessage, sender: ActorAddress):
Expand All @@ -251,6 +252,7 @@ def receiveMsg_OKMessage(self, message: OKMessage, sender: ActorAddress):
self.formula_pool[formula_name] = (sender, BlockingDetector())
for waiting_msg in waiting_messages:
self._send_message(formula_name, waiting_msg)
self.log_info('formula ' + formula_name + 'started')

def receiveMsg_EndMessage(self, message: EndMessage, _: ActorAddress):
"""
Expand Down
7 changes: 5 additions & 2 deletions powerapi/puller.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,13 @@ def _launch_task(self):
if self.stream_mode:
self.wakeupAfter(self._time_interval)
return
self.log_info('input source empty, stop system')
self._terminate()
return
except BadInputData:
pass
except BadInputData as exn:
log_line = 'BadinputData exception raised for input data' + str(exn.input_data)
log_line += ' with message : ' + exn.msg
self.log_warning(log_line)

def _terminate(self):
self.send(self.parent, EndMessage(self.name))
Expand Down
17 changes: 15 additions & 2 deletions powerapi/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from powerapi.actor import Actor, InitializationException
from powerapi.message import PusherStartMessage, EndMessage
from powerapi.database import DBError
from powerapi.report import PowerReport
from powerapi.report import PowerReport, BadInputData
from powerapi.exception import PowerAPIExceptionWithMessage, PowerAPIException


class PusherActor(Actor):
Expand Down Expand Up @@ -59,7 +60,19 @@ def receiveMsg_PowerReport(self, message: PowerReport, _: ActorAddress):
When receiving a PowerReport save it to database
"""
self.log_debug('received message ' + str(message))
self.database.save(message)
try:
self.database.save(message)
self.log_debug(str(message) + 'saved to database')
except BadInputData as exn:
log_line = 'BadinputData exception raised for report' + str(exn.input_data)
log_line += ' with message : ' + exn.msg
self.log_warning(log_line)
except PowerAPIExceptionWithMessage as exn:
log_line = 'exception ' + str(exn) + 'was raised while trying to save ' + str(message)
log_line += 'with message : ' + str(exn.msg)
self.log_warning(log_line)
except PowerAPIException as exn:
self.log_warning('exception ' + str(exn) + 'was raised while trying to save ' + str(message))

def receiveMsg_EndMessage(self, message: EndMessage, _: ActorAddress):
"""
Expand Down
41 changes: 24 additions & 17 deletions powerapi/report/hwpc_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def from_json(data: Dict) -> HWPCReport:
ts = Report._extract_timestamp(data['timestamp'])
return HWPCReport(ts, data['sensor'], data['target'], data['groups'])
except KeyError as exn:
raise BadInputData('no field ' + str(exn.args[0]) + ' in json document') from exn
raise BadInputData('no field ' + str(exn.args[0]) + ' in json document', data) from exn
except ValueError as exn:
raise BadInputData(exn.args[0], data) from exn

@staticmethod
def to_json(report: HWPCReport) -> Dict:
Expand Down Expand Up @@ -134,27 +136,32 @@ def from_csv_lines(lines: List[Tuple[str, Dict[str, str]]]) -> HWPCReport:
timestamp = HWPCReport._extract_timestamp(row['timestamp'])
else:
if sensor_name != row['sensor']:
raise BadInputData('csv line with different sensor name are mixed into one report')
raise BadInputData('csv line with different sensor name are mixed into one report', row)
if target != row['target']:
raise BadInputData('csv line with different target are mixed into one report')
raise BadInputData('csv line with different target are mixed into one report', row)
if timestamp != HWPCReport._extract_timestamp(row['timestamp']):
raise BadInputData('csv line with different timestamp are mixed into one report')
raise BadInputData('csv line with different timestamp are mixed into one report', row)

if group_name not in groups:
groups[group_name] = {}
HWPCReport._create_group(row, groups, group_name)

if row['socket'] not in groups[group_name]:
groups[group_name][row['socket']] = {}
except KeyError as exn:
raise BadInputData('missing field ' + str(exn.args[0]) + ' in csv file ' + file_name, row) from exn
except ValueError as exn:
raise BadInputData(exn.args[0], row) from exn

return HWPCReport(timestamp, sensor_name, target, groups)

if row['cpu'] not in groups[group_name][row['socket']]:
groups[group_name][row['socket']][row['cpu']] = {}
@staticmethod
def _create_group(row, groups, group_name):
if group_name not in groups:
groups[group_name] = {}

for key, value in row.items():
if key not in CSV_HEADER_HWPC:
groups[group_name][
row['socket']][row['cpu']][key] = int(value)
if row['socket'] not in groups[group_name]:
groups[group_name][row['socket']] = {}

except KeyError as exn:
raise BadInputData('missing field ' + str(exn.args[0]) + ' in csv file ' + file_name) from exn
if row['cpu'] not in groups[group_name][row['socket']]:
groups[group_name][row['socket']][row['cpu']] = {}

return HWPCReport(timestamp, sensor_name, target, groups)
for key, value in row.items():
if key not in CSV_HEADER_HWPC:
groups[group_name][row['socket']][row['cpu']][key] = int(value)
16 changes: 10 additions & 6 deletions powerapi/report/power_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def from_json(data: Dict) -> Report:
ts = Report._extract_timestamp(data['timestamp'])
return PowerReport(ts, data['sensor'], data['target'], data['power'], data['metadata'])
except KeyError as exn:
raise BadInputData('no field ' + str(exn.args[0]) + ' in json document') from exn
raise BadInputData('no field ' + str(exn.args[0]) + ' in json document', data) from exn
except ValueError as exn:
raise BadInputData(exn.args[0], data) from exn

@staticmethod
def from_csv_lines(lines: List[Tuple[str, Dict]]) -> PowerReport:
Expand All @@ -81,7 +83,7 @@ def from_csv_lines(lines: List[Tuple[str, Dict]]) -> PowerReport:
:return: a PowerReport that contains value from the given lines
"""
if len(lines) != 1:
raise BadInputData('a power report could only be parsed from one csv line')
raise BadInputData('a power report could only be parsed from one csv line', None)
file_name, row = lines[0]

try:
Expand All @@ -97,7 +99,9 @@ def from_csv_lines(lines: List[Tuple[str, Dict]]) -> PowerReport:
return PowerReport(timestamp, sensor_name, target, power, metadata)

except KeyError as exn:
raise BadInputData('missing field ' + str(exn.args[0]) + ' in csv file ' + file_name) from exn
raise BadInputData('missing field ' + str(exn.args[0]) + ' in csv file ' + file_name, row) from exn
except ValueError as exn:
raise BadInputData(exn.args[0], row) from exn

@staticmethod
def to_csv_lines(report: PowerReport, tags: List[str]) -> Tuple[List[str], Dict]:
Expand All @@ -117,7 +121,7 @@ def to_csv_lines(report: PowerReport, tags: List[str]) -> Tuple[List[str], Dict]
}
for tag in tags:
if tag not in report.metadata:
raise BadInputData('no tag ' + tag + ' in power report')
raise BadInputData('no tag ' + tag + ' in power report', report)
line[tag] = report.metadata[tag]

final_dict = {'PowerReport': [line]}
Expand All @@ -129,7 +133,7 @@ def to_virtiofs_db(report: PowerReport) -> Tuple[str, str]:
return a tuple containing the power value and the name of the file to store the value.
"""
if 'socket' not in report:
raise BadInputData('no tag socket in power report')
raise BadInputData('no tag socket in power report', report)
filename = 'power_consumption_package' + str(report['socket'])
power = report.power
return filename, power
Expand All @@ -141,7 +145,7 @@ def _gen_tag(self, metadata_keept):

for metadata_name in metadata_keept:
if metadata_name not in self.metadata:
raise BadInputData('no tag ' + metadata_name + ' in power report')
raise BadInputData('no tag ' + metadata_name + ' in power report', self)
else:
tags[metadata_name] = self.metadata[metadata_name]

Expand Down
10 changes: 5 additions & 5 deletions powerapi/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class BadInputData(PowerAPIExceptionWithMessage):
"""
Exception raised when input data can't be converted to a Report
"""
def __init__(self, msg, input_data):
PowerAPIExceptionWithMessage.__init__(self, msg)
self.input_data = input_data


class Report(Message):
Expand Down Expand Up @@ -89,11 +92,8 @@ def _extract_timestamp(ts):
try:
return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
try:
return datetime.fromtimestamp(int(ts) / 1000)
except ValueError as exn:
raise BadInputData(exn.args) from exn
return datetime.fromtimestamp(int(ts) / 1000)
if isinstance(ts, datetime):
return ts

raise BadInputData('timestamp must be a datetime.datetime or a string')
raise ValueError('timestamp must be a datetime.datetime or a string')
7 changes: 4 additions & 3 deletions powerapi/test_utils/db/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import powerapi.test_utils.db as parent_module

OUTPUT_PATH = '/tmp/'
OUTPUT_PATH = '/tmp/powerapi_test_csv/'
ROOT_PATH = parent_module.__path__[0] + '/csv_files/'
FILES = [ROOT_PATH + 'core2.csv',
ROOT_PATH + "rapl2.csv",
Expand All @@ -43,6 +43,7 @@ def files():
"""
fixture that remove csv files used by test module before launching the test and after the test end
"""
os.system('rm -Rf ' + OUTPUT_PATH + 'grvingt-12-system')
os.system('rm -Rf ' + OUTPUT_PATH)
os.system('mkdir -p ' + OUTPUT_PATH)
yield None
os.system('rm -Rf ' + OUTPUT_PATH + 'grvingt-12-system')
os.system('rm -Rf ' + OUTPUT_PATH)
45 changes: 41 additions & 4 deletions tests/acceptation/test_simple_architecture.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def launch_simple_architecture(config, supervisor):
pusher_generator = PusherGenerator()
pusher_info = pusher_generator.generate(config)
pusher_cls, pusher_start_message = pusher_info['test_pusher']

pusher = supervisor.launch(pusher_cls, pusher_start_message)

# Dispatcher
Expand Down Expand Up @@ -195,7 +195,7 @@ def test_run_mongo_to_influx(mongo_database, influx_database, shutdown_system):
##############
def check_output_file():
input_file = open(ROOT_PATH + 'rapl2.csv', 'r')
output_file = open(OUTPUT_PATH + 'all/PowerReport.csv', 'r')
output_file = open(OUTPUT_PATH + 'grvingt-12-system/PowerReport.csv', 'r')

# line count
l_output = -1
Expand Down Expand Up @@ -316,9 +316,46 @@ def test_run_socket_with_delay_between_message_to_mongo(mongo_database, unused_t
##############
# Socket to CSV #
##############
def check_output_file2():
input_reports = extract_rapl_reports_with_2_sockets(10)
output_file = open(OUTPUT_PATH + 'test_sensor-all/PowerReport.csv', 'r')

# line count
l_output = -1
for _ in output_file:
l_output += 1

assert len(input_reports) * 2 == l_output
output_file.seek(0)

output_file.readline()

# split socket0 report from socket1 report
output_socket0 = []
output_socket1 = []

for output_line in map(lambda x: x.split(','), output_file):
if output_line[4] == '\0\n':
output_socket0.append(output_line)
else:
output_socket1.append(output_line)

def test_run_socket_to_csv(unused_tcp_port,shutdown_system):
# check value
for report, output_line_s0, output_line_s1 in zip(input_reports, output_socket0, output_socket1):
ts = datetime.strptime(report['timestamp'], "%Y-%m-%dT%H:%M:%S.%f")
assert ts == output_line_s0[0]
assert ts == output_line_s1[0]

assert report['sensor'] == output_line_s0[1]
assert report['sensor'] == output_line_s1[1]

assert report['target'] == output_line_s0[2]
assert report['target'] == output_line_s1[2]

int(output_line_s0[3]) == 42
int(output_line_s1[3]) == 42

def test_run_socket_to_csv(unused_tcp_port, files, shutdown_system):
config = {'verbose': True,
'stream': False,
'output': {'test_pusher': {'type': 'csv',
Expand All @@ -336,4 +373,4 @@ def test_run_socket_to_csv(unused_tcp_port,shutdown_system):
client = ClientThread(extract_rapl_reports_with_2_sockets(10), unused_tcp_port)
client.start()
supervisor.monitor()
check_output_file()
check_output_file2()

0 comments on commit e9c6ac1

Please sign in to comment.