forked from lbenitez000/trparse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
trparse.py
164 lines (137 loc) · 5.1 KB
/
trparse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# -*- coding: utf-8 -*-
"""
Copyright (C) 2015 Luis Benitez
Parses the output of a traceroute execution into an AST (Abstract Syntax Tree).
"""
import re
RE_HEADER = re.compile(r'(\S+)\s+\((?:(\d+\.\d+\.\d+\.\d+)|([0-9a-fA-F:]+))\)')
RE_HOP = re.compile(r'^\s*(\d+)\s+(?:\[AS(\d+)\]\s+)?([\s\S]+?(?=^\s*\d+\s+|^_EOS_))', re.M)
RE_PROBE_NAME = re.compile(r'^([a-zA-z0-9\.-]+)$|^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$|^([0-9a-fA-F:]+)$')
RE_PROBE_IP = re.compile(r'\((?:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([0-9a-fA-F:]+))\)+')
RE_PROBE_RTT = re.compile(r'^(\d+(?:\.?\d+)?)$')
RE_PROBE_ANNOTATION = re.compile(r'^(!\w*)$')
RE_PROBE_TIMEOUT = re.compile(r'^(\*)$')
class Traceroute(object):
"""
Abstraction of a traceroute result.
"""
def __init__(self, dest_name, dest_ip):
self.dest_name = dest_name
self.dest_ip = dest_ip
self.hops = []
def add_hop(self, hop):
self.hops.append(hop)
def __str__(self):
text = "Traceroute for %s (%s)\n\n" % (self.dest_name, self.dest_ip)
for hop in self.hops:
text += str(hop)
return text
class Hop(object):
"""
Abstraction of a hop in a traceroute.
"""
def __init__(self, idx, asn=None):
self.idx = idx # Hop count, starting at 1
self.asn = asn # Autonomous System number
self.probes = [] # Series of Probe instances
def add_probe(self, probe):
"""Adds a Probe instance to this hop's results."""
if self.probes:
probe_last = self.probes[-1]
if not probe.ip:
probe.ip = probe_last.ip
probe.name = probe_last.name
self.probes.append(probe)
def __str__(self):
text = "{0:>3d} ".format(self.idx)
if self.asn:
text += "[AS{0:>5d}] ".format(self.asn)
text_len = len(text)
for n, probe in enumerate(self.probes):
text_probe = str(probe)
if n:
text += (text_len*" ")+text_probe
else:
text += text_probe
text += "\n"
return text
class Probe(object):
"""
Abstraction of a probe in a traceroute.
"""
def __init__(self, name=None, ip=None, rtt=None, anno=''):
self.name = name
self.ip = ip
self.rtt = rtt # RTT in ms
self.anno = anno # Annotation, such as !H, !N, !X, etc
def __str__(self):
if self.rtt:
text = "{0:s} ({1:s}) {2:1.3f} ms {3:s}\n".format(self.name, self.ip, self.rtt, self.anno)
else:
text = "*\n"
return text
def loads(data):
"""Parser entry point. Parses the output of a traceroute execution"""
data += "\n_EOS_" # Append EOS token. Helps to match last RE_HOP
# Get headers
match_dest = RE_HEADER.search(data)
dest_name = match_dest.group(1)
dest_ip = match_dest.group(2)
# The Traceroute is the root of the tree.
traceroute = Traceroute(dest_name, dest_ip)
# Get hops
matches_hop = RE_HOP.findall(data)
for match_hop in matches_hop:
# Initialize a hop
idx = int(match_hop[0])
if match_hop[1]:
asn = int(match_hop[1])
else:
asn = None
hop = Hop(idx, asn)
# Parse probes data: <name> | <(IP)> | <rtt> | 'ms' | '*'
probes_data = match_hop[2].split()
# Get rid of 'ms': <name> | <(IP)> | <rtt> | '*'
probes_data = list(filter(lambda s: s.lower() != 'ms', probes_data))
i = 0
while i < len(probes_data):
# For each hop parse probes
name = None
ip = None
rtt = None
anno = ''
# RTT check comes first because RE_PROBE_NAME can confuse rtt with an IP as name
# The regex RE_PROBE_NAME can be improved
if RE_PROBE_RTT.match(probes_data[i]):
# Matched rtt, so name and IP have been parsed before
rtt = float(probes_data[i])
i += 1
elif RE_PROBE_NAME.match(probes_data[i]):
# Matched a name, so next elements are IP and rtt
name = probes_data[i]
ip = probes_data[i+1].strip('()')
rtt = float(probes_data[i+2])
i += 3
elif RE_PROBE_TIMEOUT.match(probes_data[i]):
# Its a timeout, so maybe name and IP have been parsed before
# or maybe not. But it's Hop job to deal with it
rtt = None
i += 1
else:
ext = "i: %d\nprobes_data: %s\nname: %s\nip: %s\nrtt: %s\nanno: %s" % (i, probes_data, name, ip, rtt, anno)
raise ParseError("Parse error \n%s" % ext)
# Check for annotation
try:
if RE_PROBE_ANNOTATION.match(probes_data[i]):
anno = probes_data[i]
i += 1
except IndexError:
pass
probe = Probe(name, ip, rtt, anno)
hop.add_probe(probe)
traceroute.add_hop(hop)
return traceroute
def load(data):
return loads(data.read())
class ParseError(Exception):
pass