-
Notifications
You must be signed in to change notification settings - Fork 2
/
hygraph.py
320 lines (279 loc) · 13.5 KB
/
hygraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import networkx as nx
import numpy as np
import pandas as pd
import xarray as xr
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import random
from igraph import Graph as IGraph
from IDGenerator import IDGenerator
from Oberserver import Subject
class Node(Subject):
def __init__(self, oid, label):
super().__init__() # Initialize the Subject
self.oid = oid
self.label = label
self.membership = {}
class PGNode(Node):
def __init__(self, oid, label, start_time, end_time=None):
super().__init__(oid, label)
self.start_time = start_time
self.end_time = end_time
self.properties = {}
class TSNode(Node):
def __init__(self, oid, label,time_series):
super().__init__(oid, label)
self.series = time_series
class Edge(Subject):
def __init__(self, oid, source, target, label, start_time, end_time=None):
super().__init__()
self.oid = oid
self.source = source
self.target = target
self.label = label
self.start_time = start_time
self.end_time = end_time
self.properties = {}
self.membership = {}
class TimeSeriesMetadata:
def __init__(self, owner_id, edge_label='', element_type='', attribute=''):
self.owner_id = owner_id
self.edge_label = edge_label
self.element_type = element_type
self.attribute = attribute
class TimeSeries:
"""
Create and add a multivariate time series to the graph.
:param tsid: Time series ID
:param timestamps: List of timestamps
:param variables: List of variable names
:param data: 2D array-like structure with data
"""
def __init__(self, tsid, timestamps, variables, data, metadata=None):
self.tsid = tsid
time_index = pd.to_datetime(timestamps)
self.data = xr.DataArray(data, coords=[time_index, variables], dims=['time', 'variable'], name=f'ts_{tsid}')
self.metadata = metadata if metadata is not None else {}
def append_data(self, date, value):
date = pd.to_datetime(date)
new_data = xr.DataArray([[value]], coords=[[date], self.data.coords['variable']], dims=['time', 'variable'])
self.data = xr.concat([self.data, new_data], dim='time')
class Subgraph(Subject):
def __init__(self, subgraph_id, label, start_time, end_time=None, filter_func=None):
super().__init__()
self.subgraph_id = subgraph_id
self.label = label
self.start_time = start_time
self.end_time = end_time
self.properties = {}
self.filter_func = filter_func
class GraphObserver:
def __init__(self, hygraph):
self.hygraph = hygraph
self.currently_updating = set()
def update(self, subject):
print(f"GraphObserver: Update called for {type(subject).__name__} with ID {subject.oid}")
if subject not in self.currently_updating:
self.currently_updating.add(subject)
element_type = 'node' if isinstance(subject, Node) else 'edge'
for metric in self.hygraph.custom_metrics:
if metric['element_type'] != element_type:
continue # Skip if the element type does not match
if metric.get('label') and subject.label != metric['label']:
continue # Skip if the label does not match
oid = subject.oid
attribute = metric['attribute']
current_value = metric['aggregate_function'](
self.hygraph,
element_type,
oid,
attribute,
datetime.now()
)
if attribute in subject.properties:
print(f"Updating time series for {oid}, attribute {attribute}")
tsid = subject.properties[attribute]
self.hygraph.append_time_series(tsid, datetime.now(), current_value)
else:
print(f"Creating time series for {oid}, attribute {attribute}")
self.hygraph.create_time_series_from_graph(
element_type=element_type,
oid=oid,
attribute=attribute,
start_date=subject.start_time,
end_date=None,
aggregate_function=metric['aggregate_function'],
edge_label=subject.label if isinstance(subject, Edge) else None
)
self.currently_updating.remove(subject)
class HyGraph:
def __init__(self):
self.graph = nx.MultiGraph()
self.time_series = {}
self.subgraphs = {}
self.id_generator = IDGenerator()
self.graph_observer = GraphObserver(self)
self.custom_metrics = []
def add_node(self, node):
self.graph.add_node(node.oid, data=node)
node.attach(self.graph_observer)
node.notify()
def add_edge(self, edge):
self.graph.add_edge(edge.source, edge.target, key=edge.oid, data=edge)
edge.attach(self.graph_observer)
edge.notify()
def add_subgraph(self, subgraph):
subgraph_view = nx.subgraph_view(self.graph, filter_node=subgraph.filter_func, filter_edge=subgraph.filter_func)
self.subgraphs[subgraph.subgraph_id] = {'view': subgraph_view, 'data': subgraph}
subgraph.attach(self.graph_observer)
def get_element(self, element_type, oid):
if element_type == 'node':
if oid not in self.graph.nodes:
raise ValueError(f"Node with ID {oid} does not exist.")
return self.graph.nodes[oid]['data']
elif element_type == 'edge':
for source, target, key in self.graph.edges(keys=True):
if key == oid:
return self.graph.edges[source, target, key]['data']
raise ValueError(f"Edge with ID {oid} does not exist.")
elif element_type == 'subgraph':
if oid not in self.subgraphs:
raise ValueError(f"Subgraph with ID {oid} does not exist.")
return self.subgraphs[oid]['data']
def add_property(self, element_type, oid, property_key, value):
element = self.get_element(element_type, oid)
element.properties[property_key] = value
print(f"addProperty {element.properties}")
def add_membership(self, element_type, oid, tsid):
"""
Add a membership time series to a node or edge.
:param element_type: Type of the element ('node' or 'edge').
:param oid: ID of the node or edge.
:param tsid: Time series ID of the membership.
"""
if element_type == 'node':
if oid not in self.graph.nodes:
raise ValueError(f"Node with ID {oid} does not exist.")
self.graph.nodes[oid]['memberships'] = tsid
elif element_type == 'edge':
found = False
for source, target, key in self.graph.edges(keys=True):
if key == oid:
self.graph.edges[source, target, key]['memberships'] = tsid
found = True
break
if not found:
raise ValueError(f"Edge with ID {oid} does not exist.")
def get_node(self, oid):
if oid not in self.graph.nodes:
raise ValueError(f"Node with ID {oid} does not exist.")
return self.graph.nodes[oid]
def get_edge(self, oid):
if oid not in self.graph.edges:
raise ValueError(f"Edge with ID {oid} does not exist.")
return self.graph.edges[oid]
def get_time_series(self, tsid):
if tsid not in self.time_series:
raise ValueError(f"Time series with ID {tsid} does not exist.")
return self.time_series[tsid].data
def get_subgraph(self, subgraph_id):
if subgraph_id not in self.subgraphs:
raise ValueError(f"Subgraph with ID {subgraph_id} does not exist.")
return self.subgraphs[subgraph_id]
def create_similarity_edges(self, similarity_threshold):
ts_nodes = [node for node in self.graph.nodes(data=True) if isinstance(node[1]['data'], TSNode)]
edge_id = self.id_generator.generate_edge_id()
for i in range(len(ts_nodes)):
for j in range(i + 1, len(ts_nodes)):
ts1 = ts_nodes[i][1]['data'].series.data.values
ts2 = ts_nodes[j][1]['data'].series.data.values
distance, _ = fastdtw(ts1, ts2, dist=euclidean)
if distance <= similarity_threshold:
start_time = datetime.now()
edge = Edge(oid=edge_id, source=ts_nodes[i][1]['data'], target=ts_nodes[j][1]['data'], label='similarTo', start_time=start_time)
self.add_edge(edge)
# Calculate and add the degree of similarity over time as a property
similarity_over_time = [distance] * len(ts1)
timestamps = pd.date_range(start=start_time, periods=len(similarity_over_time), freq='D')
tsid = self.id_generator.generate_timeseries_id()
time_series = TimeSeries(tsid, timestamps, ['similarity'], [similarity_over_time])
self.time_series[tsid] = time_series
self.add_property('edge', edge.oid, 'degree_similarity_over_time', tsid)
def create_time_series_from_graph(self, element_type, oid, attribute, start_date, aggregate_function, end_date=None, freq='D', edge_label=None):
"""
Generic function to create a time series from the graph.
:param edge_label:
:param element_type: Type of the element ('node', 'edge', or 'subgraph').
:param oid: ID of the element.
:param attribute: Attribute to create the time series for.
:param start_date: Start date for the time series.
:param end_date: End date for the time series.
:param freq: Frequency of the time series (default is daily).
:param aggregate_function: User-defined function to aggregate data.
"""
if aggregate_function is None:
raise ValueError("An aggregate_function must be provided.")
if end_date is None:
end_date = datetime.now()
date_range = pd.date_range(start=start_date, end=end_date, freq=freq)
values = []
last_value = None
for date in date_range:
current_value = aggregate_function(self, element_type, oid, attribute, date)
if current_value != last_value:
values.append((date, current_value))
last_value = current_value
if values:
timestamps, data_values = zip(*values)
reshaped_data_values = np.array(data_values)[:, np.newaxis]
tsid = self.id_generator.generate_timeseries_id()
metadata = TimeSeriesMetadata(oid, edge_label if edge_label else '', element_type, attribute)
time_series = TimeSeries(tsid, timestamps, [attribute], reshaped_data_values, metadata)
self.time_series[tsid] = time_series
self.add_property(element_type, oid, attribute, tsid)
def graph_metrics_evolution(self):
igraph_g = IGraph.TupleList(self.graph.edges(), directed=False)
igraph_g.vs["name"] = list(self.graph.nodes())
communities = igraph_g.community_infomap()
timestamp = datetime.now()
for idx, community in enumerate(communities):
for node in community:
node_id = igraph_g.vs[node]["name"]
self.graph.nodes[node_id]['data'].membership[timestamp] = idx
for source, target, key in self.graph.edges(keys=True):
source_community = self.graph.nodes[source]['data'].membership.get(timestamp)
target_community = self.graph.nodes[target]['data'].membership.get(timestamp)
if source_community == target_community:
self.graph.edges[source, target, key]['data'].membership[timestamp] = source_community
else:
self.graph.edges[source, target, key]['data'].membership[timestamp] = f"{source_community},{target_community}"
def register_custom_metric(self, element_type, attribute, aggregate_function, label=None):
self.custom_metrics.append({
'element_type': element_type,
'attribute': attribute,
'aggregate_function': aggregate_function,
'label': label
})
def display(self):
print("Nodes:")
for node_id, data in self.graph.nodes(data=True):
print(f"Node {node_id}: {data}")
print("\nEdges:")
for source, target, key, data in self.graph.edges(keys=True, data=True):
print(f"Edge {key} from {source} to {target}: {data}")
print("\nSubgraphs:")
for subgraph_id, data in self.subgraphs.items():
print(f"Subgraph {subgraph_id}: {data}")
print("\nTime Series:")
for tsid, ts in self.time_series.items():
print(f"Time Series {tsid}: {ts.metadata.owner_id}")
variables = [str(var) for var in ts.data.coords['variable'].values]
print(f"Variables: {', '.join(variables)}")
ts_df = ts.data.to_dataframe('value').reset_index()
grouped = ts_df.groupby('time')
for time, group in grouped:
values = [f"{row['variable']}: {row['value']}" for idx, row in group.iterrows()]
row_str = ", ".join(values)
print(f"{time}, {row_str}")