Skip to content

Commit

Permalink
Enhancement: add hostname to metadata events (#215)
Browse files Browse the repository at this point in the history
* add hostnames to metadata.

* added pid tid and hhash column to metadata
  • Loading branch information
hariharan-devarajan authored Oct 5, 2024
1 parent 3a4ce79 commit c8c6533
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
28 changes: 16 additions & 12 deletions dfanalyzer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ def load_objects(line, fn, time_granularity, time_approximate, condition_fn, loa
d["name"] = val["name"]
if "cat" in val:
d["cat"] = val["cat"]
if "pid" in val:
d["pid"] = val["pid"]
if "tid" in val:
d["tid"] = val["tid"]
if "args" in val and "hhash" in val["args"]:
d["hhash"] = val["args"]["hhash"]
if "M" == val["ph"]:
if d["name"] == "FH":
d["type"] = 1 # 1-> file hash
Expand All @@ -186,17 +192,18 @@ def load_objects(line, fn, time_granularity, time_approximate, condition_fn, loa
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["hash"] = val["args"]["value"]
elif d["name"] == "PR":
d["type"] = 5 # 5-> process metadata
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["hash"] = val["args"]["value"]
else:
d["type"] = 4 # 4-> others
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["value"] = str(val["args"]["value"])
else:
d["type"] = 0 # 0->regular event
if "pid" in val:
d["pid"] = val["pid"]
if "tid" in val:
d["tid"] = val["tid"]
if "dur" in val:
val["dur"] = int(val["dur"])
val["ts"] = int(val["ts"])
Expand Down Expand Up @@ -261,8 +268,6 @@ def io_function(json_object, current_dict, time_approximate,condition_fn):
if "args" in json_object:
if "fhash" in json_object["args"]:
d["fhash"] = json_object["args"]["fhash"]
if "hhash" in json_object["args"]:
d["hhash"] = json_object["args"]["hhash"]

if "POSIX" == json_object["cat"] and "ret" in json_object["args"]:
size = int(json_object["args"]["ret"])
Expand All @@ -281,7 +286,6 @@ def io_function(json_object, current_dict, time_approximate,condition_fn):
def io_columns():
conf = get_dft_configuration()
return {
'hhash': "uint64[pyarrow]",
'compute_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
'io_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
'app_io_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
Expand Down Expand Up @@ -453,10 +457,10 @@ def __init__(self, file_pattern, load_fn=None, load_cols={}, load_data = {}, met
'tinterval': "string[pyarrow]" if not self.conf.time_approximate else "uint64[pyarrow]", 'trange': "uint64[pyarrow]"}
columns.update(io_columns())
columns.update(load_cols)
file_hash_columns = {'name': "string[pyarrow]", 'hash':"uint64[pyarrow]"}
hostname_hash_columns = {'name': "string[pyarrow]", 'hash':"uint64[pyarrow]"}
string_hash_columns = {'name': "string[pyarrow]", 'hash':"uint64[pyarrow]"}
other_metadata_columns = { 'name':"string[pyarrow]" ,'value':"string[pyarrow]" }
file_hash_columns = {'name': "string[pyarrow]", 'hash':"uint64[pyarrow]",'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]", 'hhash': "uint64[pyarrow]"}
hostname_hash_columns = {'name': "string[pyarrow]", 'hash':"uint64[pyarrow]",'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]", 'hhash': "uint64[pyarrow]"}
string_hash_columns = {'name': "string[pyarrow]", 'hash':"uint64[pyarrow]",'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]", 'hhash': "uint64[pyarrow]"}
other_metadata_columns = { 'name':"string[pyarrow]" ,'value':"string[pyarrow]",'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]", 'hhash': "uint64[pyarrow]"}
if "FH" in metadata_cols:
file_hash_columns.update(metadata_cols["FH"])
if "HH" in metadata_cols:
Expand All @@ -475,7 +479,7 @@ def __init__(self, file_pattern, load_fn=None, load_cols={}, load_data = {}, met
self.file_hash = self.all_events.query("type == 1")[list(file_hash_columns.keys())].groupby('hash').first().persist()
self.host_hash = self.all_events.query("type == 2")[list(hostname_hash_columns.keys())].groupby('hash').first().persist()
self.string_hash = self.all_events.query("type == 3")[list(string_hash_columns.keys())].groupby('hash').first().persist()
self.metadata = self.all_events.query("type == 4")[list(other_metadata_columns.keys())].persist()
self.metadata = self.all_events.query("type == 4")[list(other_metadata_columns.keys())].persist()
self.n_partition = math.ceil(total_size.compute() / (128 * 1024 ** 2))
logging.debug(f"Number of partitions used are {self.n_partition}")
self.events = events.repartition(npartitions=self.n_partition).persist()
Expand Down
4 changes: 2 additions & 2 deletions src/dftracer/writer/chrome_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ void dftracer::ChromeWriter::convert_json_metadata(
if (is_string) {
written_size = sprintf(
buffer.data() + current_index,
R"(%s{"id":%d,"name":"%s","cat":"dftracer","pid":%lu,"tid":%lu,"ph":"M","args":{"name":"%s","value":"%s"}})",
R"(%s{"id":%d,"name":"%s","cat":"dftracer","pid":%lu,"tid":%lu,"ph":"M","args":{"hhash":%d,"name":"%s","value":"%s"}})",
is_first_char, index, ph, process_id, thread_id, name, value);
} else {
written_size = sprintf(
buffer.data() + current_index,
R"(%s{"id":%d,"name":"%s","cat":"dftracer","pid":%lu,"tid":%lu,"ph":"M","args":{"name":"%s","value":%s}})",
R"(%s{"id":%d,"name":"%s","cat":"dftracer","pid":%lu,"tid":%lu,"ph":"M","args":{"hhash":%d,"name":"%s","value":%s}})",
is_first_char, index, ph, process_id, thread_id, name, value);
}
current_index += written_size;
Expand Down

0 comments on commit c8c6533

Please sign in to comment.