Skip to content

Commit

Permalink
feat: PowerReport don't have socket and core attribute anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
altor committed Aug 30, 2021
1 parent e1f79e8 commit 15f910a
Show file tree
Hide file tree
Showing 24 changed files with 137 additions and 128 deletions.
19 changes: 15 additions & 4 deletions powerapi/cli/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def __init__(self):

subparser_prom_output = ComponentSubParser('prom')
subparser_prom_output.add_argument('a', 'addr', help='specify server address')
subparser_prom_output.add_argument('t', 'tags', help='specify report tags')
subparser_prom_output.add_argument('p', 'port', help='specify server port', type=int)
subparser_prom_output.add_argument('M', 'metric_name', help='speify metric name')
subparser_prom_output.add_argument('d', 'metric_description', help='specify metric description', default='energy consumption')
Expand All @@ -142,6 +143,7 @@ def __init__(self):

subparser_direct_prom_output = ComponentSubParser('direct_prom')
subparser_direct_prom_output.add_argument('a', 'addr', help='specify server address')
subparser_direct_prom_output.add_argument('t', 'tags', help='specify report tags')
subparser_direct_prom_output.add_argument('p', 'port', help='specify server port', type=int)
subparser_direct_prom_output.add_argument('M', 'metric_name', help='speify metric name')
subparser_direct_prom_output.add_argument('d', 'metric_description', help='specify metric description', default='energy consumption')
Expand All @@ -156,12 +158,15 @@ def __init__(self):
help='specify directory where where output csv files will be writen')
subparser_csv_output.add_argument('m', 'model', help='specify data type that will be storen in the database',
default='PowerReport')

subparser_csv_output.add_argument('t', 'tags', help='specify report tags')
subparser_csv_output.add_argument('n', 'name', help='specify pusher name', default='pusher_csv')
self.add_actor_subparser('output', subparser_csv_output,
help_str='specify a database input : --db_output database_name ARG1 ARG2 ... ')

subparser_influx_output = ComponentSubParser('influxdb')
subparser_influx_output.add_argument('u', 'uri', help='specify InfluxDB uri')
subparser_influx_output.add_argument('t', 'tags', help='specify report tags')
subparser_influx_output.add_argument('d', 'db', help='specify InfluxDB database name')
subparser_influx_output.add_argument('p', 'port', help='specify InfluxDB connection port', type=int)
subparser_influx_output.add_argument('m', 'model', help='specify data type that will be storen in the database',
Expand Down Expand Up @@ -276,6 +281,12 @@ def __init__(self, database_name):
self.database_name = database_name


def gen_tag_list(db_config: Dict):
if 'tags' not in db_config:
return []
return db_config['tags'].split(',')


class DBActorGenerator(Generator):

def __init__(self, component_group_name):
Expand All @@ -290,14 +301,14 @@ def __init__(self, component_group_name):
self.db_factory = {
'mongodb': lambda db_config: MongoDB(db_config['model'], db_config['uri'], db_config['db'], db_config['collection']),
'socket': lambda db_config: SocketDB(db_config['model'], db_config['port']),
'csv': lambda db_config: CsvDB(db_config['model'], current_path=os.getcwd() if 'directory' not in db_config else db_config['directory'],
'csv': lambda db_config: CsvDB(db_config['model'], gen_tag_list(db_config), current_path=os.getcwd() if 'directory' not in db_config else db_config['directory'],
files=[] if 'files' not in db_config else db_config['files']),
'influxdb': lambda db_config: InfluxDB(db_config['model'], db_config['uri'], db_config['port'], db_config['db']),
'influxdb': lambda db_config: InfluxDB(db_config['model'], db_config['uri'], db_config['port'], db_config['db'], gen_tag_list(db_config)),
'opentsdb': lambda db_config: OpenTSDB(db_config['model'], db_config['uri'], db_config['port'], db_config['metric_name']),
'prom': lambda db_config: PrometheusDB(db_config['model'], db_config['port'], db_config['addr'], db_config['metric_name'],
db_config['metric_description'], db_config['aggregation_period']),
db_config['metric_description'], db_config['aggregation_period'], gen_tag_list(db_config)),
'direct_prom': lambda db_config: DirectPrometheusDB(db_config['model'], db_config['port'], db_config['addr'], db_config['metric_name'],
db_config['metric_description']),
db_config['metric_description'], gen_tag_list(db_config)),
'virtiofs': lambda db_config: VirtioFSDB(db_config['model'], db_config['vm_name_regexp'], db_config['root_directory_name'], db_config['vm_directory_name_prefix'], db_config['vm_directory_name_suffix']),
}

Expand Down
6 changes: 3 additions & 3 deletions powerapi/database/csvdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class CsvDB(BaseDB):
a CsvDB instance can be define by his ReportModel and its current path
"""

def __init__(self, report_type: Type[Report], current_path="/tmp/csvdbtest", files=[]):
def __init__(self, report_type: Type[Report], tags: List[str], current_path="/tmp/csvdbtest", files=[]):
"""
:param current_path: Current path where read/write files
"""
Expand All @@ -212,6 +212,7 @@ def __init__(self, report_type: Type[Report], current_path="/tmp/csvdbtest", fil
#: (int): allow to know if we read a new report, or the same
#: current timestamp
self.saved_timestamp = utils.timestamp_to_datetime(0)
self.tags = tags

self.add_files(files)

Expand Down Expand Up @@ -270,7 +271,7 @@ def save(self, report: Report):
:param report: Report
:param report_model: ReportModel
"""
csv_header, data = self.report_type.to_csv_lines(report)
csv_header, data = self.report_type.to_csv_lines(report, self.tags)

# If the repository doesn't exist, create it
rep_path = self.current_path + report.sensor + "-" + report.target
Expand All @@ -280,7 +281,6 @@ def save(self, report: Report):
pass

for filename, values in data.items():
print(values)
rep_path_with_file = rep_path + '/' + filename + '.csv'

# Get the header and check if it's ok
Expand Down
14 changes: 8 additions & 6 deletions powerapi/database/direct_prometheus_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ class DirectPrometheusDB(BaseDB):
Could only be used with a pusher actor
"""

def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, metric_description: str):
def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, metric_description: str, tags: List[str]):
"""
:param address: address that expose the metric
:param port:
:param metric_name:
:param metric_description: short sentence that describe the metric
:param tags: metadata used to tag metric
"""
BaseDB.__init__(self, report_type)
self.address = address
self.port = port
self.metric_name = metric_name
self.metric_description = metric_description
self.tags = tags

self.energy_metric = None

Expand All @@ -67,19 +69,19 @@ def connect(self):
"""
Start a HTTP server exposing one metric
"""
self.energy_metric = Gauge(self.metric_name, self.metric_description, self.report_type.get_tags())
self.energy_metric = Gauge(self.metric_name, self.metric_description, ['sensor', 'target'] + self.tags)
start_http_server(self.port)

def _expose_data(self, key, measure):
kwargs = {label: measure['tags'][label] for label in self.report_type.get_tags()}
kwargs = {label: measure['tags'][label] for label in measure['tags']}
try:
self.energy_metric.labels(**kwargs).set(measure['value'])
except TypeError:
self.energy_metric.labels(kwargs).set(measure['value'])

def _report_to_measure_and_key(self, report):
value = self.report_type.to_prometheus(report)
key = ''.join([str(value['tags'][tag]) for tag in self.report_type.get_tags()])
value = self.report_type.to_prometheus(report, self.tags)
key = ''.join([str(value['tags'][tag]) for tag in value['tags']])
return key, value

def _update_exposed_measure(self):
Expand All @@ -105,7 +107,7 @@ def save(self, report: Report):

self._expose_data(key, measure)
if key not in self.measure_for_current_period:
args = [measure['tags'][label] for label in self.report_type.get_tags()]
args = [measure['tags'][label] for label in measure['tags']]
self.measure_for_current_period[key] = args

def save_many(self, reports: List[Report]):
Expand Down
13 changes: 8 additions & 5 deletions powerapi/database/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class InfluxDB(BaseDB):
Allow to handle a InfluxDB database in reading or writing.
"""

def __init__(self, report_type: Type[Report], uri: str, port, db_name: str):
def __init__(self, report_type: Type[Report], uri: str, port, db_name: str, tags: List[str]):
"""
:param url: URL of the InfluxDB server
:param port: port of the InfluxDB server
Expand All @@ -62,13 +62,15 @@ def __init__(self, report_type: Type[Report], uri: str, port, db_name: str):
(ex: "powerapi")
:param report_type: Type of the report handled by this database
:param tags: metadata used to tag metric
"""
BaseDB.__init__(self, report_type)
self.uri = uri
self.port = port
self.db_name = db_name

self.tags = tags

self.client = None

def _ping_client(self):
Expand Down Expand Up @@ -108,10 +110,11 @@ def save(self, report: Report):
:param report: Report to save
"""
data = self.report_type.to_influxdb(report)
data = self.report_type.to_influxdb(report, self.tags)
for tag in data['tags']:
data['tags'][tag] = str(data['tags'][tag])
self.client.write_points([data])


def save_many(self, reports: List[Report]):
"""
Save a batch of data
Expand All @@ -120,5 +123,5 @@ def save_many(self, reports: List[Report]):
:param report_model: ReportModel
"""

data_list = list(map(lambda r: self.report_type.to_influxdb(r), reports))
data_list = list(map(lambda r: self.report_type.to_influxdb(r, self.tags), reports))
self.client.write_points(data_list)
21 changes: 12 additions & 9 deletions powerapi/database/prometheus_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,23 @@ class PrometheusDB(BaseDB):
Could only be used with a pusher actor
"""

def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, metric_description: str, aggregation_periode: int):
def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, metric_description: str, aggregation_periode: int, tags: List[str]):
"""
:param address: address that expose the metric
:param port:
:param metric_name:
:param metric_description: short sentence that describe the metric
:param aggregation_periode: number of second for the value must be aggregated before compute statistics on them
:param tags: metadata used to tag metric
"""
BaseDB.__init__(self, report_type)
self.address = address
self.port = port
self.metric_name = metric_name
self.metric_description = metric_description
self.aggregation_periode = aggregation_periode
self.tags = tags
self.final_tags = ['sensor', 'target'] + tags

self.mean_metric = None
self.std_metric = None
Expand All @@ -77,18 +80,18 @@ def connect(self):
Start a HTTP server exposing one metric
"""

self.mean_metric = Gauge(self.metric_name + '_mean', self.metric_description + '(MEAN)', self.report_type.get_tags())
self.std_metric = Gauge(self.metric_name + '_std', self.metric_description + '(STD)', self.report_type.get_tags())
self.min_metric = Gauge(self.metric_name + '_min', self.metric_description + '(MIN)', self.report_type.get_tags())
self.max_metric = Gauge(self.metric_name + '_max', self.metric_description + '(MAX)', self.report_type.get_tags())
self.mean_metric = Gauge(self.metric_name + '_mean', self.metric_description + '(MEAN)', self.final_tags)
self.std_metric = Gauge(self.metric_name + '_std', self.metric_description + '(STD)', self.final_tags)
self.min_metric = Gauge(self.metric_name + '_min', self.metric_description + '(MIN)', self.final_tags)
self.max_metric = Gauge(self.metric_name + '_max', self.metric_description + '(MAX)', self.final_tags)
start_http_server(self.port)

def _expose_data(self, key):
aggregated_value = self.buffer.get_stats(key)
if aggregated_value is None:
return

kwargs = {label: aggregated_value['tags'][label] for label in self.report_type.get_tags()}
kwargs = {label: aggregated_value['tags'][label] for label in self.final_tags}
try:
self.mean_metric.labels(**kwargs).set(aggregated_value['mean'])
self.std_metric.labels(**kwargs).set(aggregated_value['std'])
Expand All @@ -101,8 +104,8 @@ def _expose_data(self, key):
self.max_metric.labels(kwargs).set(aggregated_value['max'])

def _report_to_measure_and_key(self, report):
value = self.report_type.to_prometheus(report)
key = ''.join([str(value['tags'][tag]) for tag in self.report_type.get_tags()])
value = self.report_type.to_prometheus(report, self.tags)
key = ''.join([str(value['tags'][tag]) for tag in self.final_tags])
return key, value

def _update_exposed_measure(self):
Expand Down Expand Up @@ -142,7 +145,7 @@ def save(self, report: Report):
self._reinit_persiod(measure['time'])

if key not in self.exposed_measure:
args = [measure['tags'][label] for label in self.report_type.get_tags()]
args = [measure['tags'][label] for label in self.final_tags]
self.exposed_measure[key] = args

if key not in self.measure_for_current_period:
Expand Down
4 changes: 2 additions & 2 deletions powerapi/dispatch_rule/power_dispatch_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def extract_id_from_report(report, depth):
return (report.sensor,)

if depth == PowerDepthLevel.SOCKET:
return extract_id_from_report(report, depth - 1) + (report.socket,)
return extract_id_from_report(report, depth - 1) + (report.metadata['socket'],)

if depth == PowerDepthLevel.CORE:
return extract_id_from_report(report, depth - 1) + (report.core,)
return extract_id_from_report(report, depth - 1) + (report.metadata['core'],)

class PowerDispatchRule(DispatchRule):
"""
Expand Down
3 changes: 2 additions & 1 deletion powerapi/dispatcher/dispatcher_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def receiveMsg_PoisonMessage(self, message: PoisonMessage, sender: ActorAddress)
poison_message = message.poisonMessage
for formula_name, (formula, blocking_detector) in self.formula_pool.items():
if sender == formula:
self.log_debug('received poison messsage from formula ' + formula_name + ' : ' + str(poison_message))
self.log_debug('poison_message')
self.log_debug('received poison messsage from formula ' + formula_name + ' for message ' + str(poison_message) + 'with this error stack : ' + message.details)
blocking_detector.notify_poison_received(poison_message)
self.log_debug('formula ' + formula_name + ' is blocked : ' + str(blocking_detector.is_blocked()))
if blocking_detector.is_blocked():
Expand Down
2 changes: 1 addition & 1 deletion powerapi/formula/dummy/dummy_formula_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ def _initialization(self, message: FormulaStartMessage):
def receiveMsg_Report(self, message: Report, sender: ActorAddress):
self.log_debug('received message ' + str(message))
time.sleep(self.sleeping_time)
power_report = PowerReport(message.timestamp, message.sensor, message.target, self.socket, 42, {})
power_report = PowerReport(message.timestamp, message.sensor, message.target, 42, {'socket': self.socket})
for _, pusher in self.pushers.items():
self.send(pusher, power_report)
2 changes: 0 additions & 2 deletions powerapi/report/hwpc_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def from_csv_lines(lines: List[Tuple[str, Dict]]) -> HWPCReport:
if timestamp is None:
timestamp = HWPCReport._extract_timestamp(row['timestamp'])
else:
print(timestamp)
print(HWPCReport._extract_timestamp(row['timestamp']))
if timestamp != HWPCReport._extract_timestamp(row['timestamp']):
raise BadInputData('csv line with different timestamp are mixed into one report')

Expand Down
Loading

0 comments on commit 15f910a

Please sign in to comment.