-
Notifications
You must be signed in to change notification settings - Fork 613
/
discovery.py
144 lines (127 loc) · 5.32 KB
/
discovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# python standard library
import socket
# local pret classes
from helper import output, conv
# third party modules
try:
from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher
from pysnmp.carrier.asyncore.dgram import udp
from pysnmp.proto import api
from pyasn1.codec.ber import encoder, decoder
snmp_modules_found = True
except ImportError:
snmp_modules_found = False
########################################################
### Most of this code comes from the PySNMP examples ###
### and needs to be refactored into an 'snmp' class! ###
########################################################
start = conv().now() # get current time
timeout = 0.5 # be quick and dirty
maxhost = 999 # max printers to list
results = {} # dict of found printers
try:
# use snmp v1 because it is most widely supported among printers
pmod = api.protoModules[api.protoVersion1]
pdu_send = pmod.GetRequestPDU() # build protocol data unit (pdu)
except:
pass
# cause timeout interrupt
class stop_waiting(Exception):
pass
# check for timeout
def timer(date):
if date - start > timeout:
raise stop_waiting()
# noinspection PyUnusedLocal,PyUnusedLocal
def recv(dispatcher, domain, address, msg):
while msg:
msg_recv, msg = decoder.decode(msg, asn1Spec=pmod.Message())
pdu_recv = pmod.apiMessage.getPDU(msg_recv)
# match response to request as we're broadcasting
if pmod.apiPDU.getRequestID(pdu_send) == pmod.apiPDU.getRequestID(pdu_recv):
ipaddr = address[0]
device = '?'
uptime = '?'
status = '?'
prstat = 0
# retrieve device properties
for oid, val in pmod.apiPDU.getVarBinds(pdu_recv):
oid, val = oid.prettyPrint(), val.prettyPrint()
# skip non-printer devices
if oid == '1.3.6.1.2.1.25.3.2.1.2.1' and val != '1.3.6.1.2.1.25.3.1.5':
return
# harvest device information
if oid == '1.3.6.1.2.1.25.3.2.1.3.1':
device = val
if oid == '1.3.6.1.2.1.1.3.0':
uptime = conv().elapsed(val, 100, True)
if oid == '1.3.6.1.2.1.43.16.5.1.2.1.1':
status = val
if oid == '1.3.6.1.2.1.25.3.2.1.5.1' and val:
prstat = val[:1]
dispatcher.jobFinished(1)
results[ipaddr] = [device, uptime, status, prstat]
class discovery():
# discover local network printers
def __init__(self, usage=False):
# abort if pysnmp is not installed
if not snmp_modules_found:
output().warning("Please install the 'pysnmp' module for SNMP support.")
if usage:
print("")
return
# skip when running 'discover' in interactive mode
if usage:
print("No target given, discovering local printers")
oid = (('1.3.6.1.2.1.25.3.2.1.2.1', None), # HOST-RESOURCES-MIB → hrDeviceType
# HOST-RESOURCES-MIB → hrDeviceDescr
('1.3.6.1.2.1.25.3.2.1.3.1', None),
# HOST-RESOURCES-MIB → hrDeviceStatus
('1.3.6.1.2.1.25.3.2.1.5.1', None),
# Printer-MIB → Printer status
('1.3.6.1.2.1.43.16.5.1.2.1.1', None),
('1.3.6.1.2.1.1.3.0', None)) # SNMPv2-MIBv → sysUpTime
try:
# build protocol data unit (pdu)
pmod.apiPDU.setDefaults(pdu_send)
pmod.apiPDU.setVarBinds(pdu_send, oid)
# build message
msg_send = pmod.Message()
pmod.apiMessage.setDefaults(msg_send)
pmod.apiMessage.setCommunity(msg_send, 'public')
pmod.apiMessage.setPDU(msg_send, pdu_send)
# ...
dispatcher = AsyncoreDispatcher()
dispatcher.registerRecvCbFun(recv)
dispatcher.registerTimerCbFun(timer)
# use ipv4 udp broadcast
udpSocketTransport = udp.UdpSocketTransport().openClientMode().enableBroadcast()
dispatcher.registerTransport(udp.domainName, udpSocketTransport)
# pass message to dispatcher
target = ('255.255.255.255', 161)
dispatcher.sendMessage(encoder.encode(
msg_send), udp.domainName, target)
# wait for timeout or max hosts
dispatcher.jobStarted(1, maxhost)
# dispatcher will finish as all jobs counter reaches zero
try:
dispatcher.runDispatcher()
except stop_waiting:
dispatcher.closeDispatcher()
# list found network printers
if results:
print("")
output().discover(('address', ('device', 'uptime', 'status', None)))
output().hline(79)
for printer in sorted(list(results.items()), key=lambda item: socket.inet_aton(item[0])):
output().discover(printer)
else:
output().info("No printers found via SNMP broadcast")
if usage or results:
print("")
except Exception as e:
output().errmsg("SNMP Error", e)
if usage:
print("")