From 749b934f4e124156d540ce998eff27a20c27bf96 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Wed, 10 Feb 2016 13:55:35 -0500 Subject: [PATCH] identical to the regular sample, this one registers as a foreign device --- samples/WhoIsIAmForeign.py | 220 +++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100755 samples/WhoIsIAmForeign.py diff --git a/samples/WhoIsIAmForeign.py b/samples/WhoIsIAmForeign.py new file mode 100755 index 00000000..a2e9f889 --- /dev/null +++ b/samples/WhoIsIAmForeign.py @@ -0,0 +1,220 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for Who-Is and I-Am +commands which create the related APDUs, then lines up the coorresponding I-Am +for incoming traffic and prints out the contents. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address, GlobalBroadcast +from bacpypes.app import LocalDeviceObject, BIPForeignApplication + +from bacpypes.apdu import WhoIsRequest, IAmRequest +from bacpypes.basetypes import ServicesSupported +from bacpypes.errors import DecodingError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# WhoIsIAmApplication +# + +class WhoIsIAmApplication(BIPForeignApplication): + + def __init__(self, *args): + if _debug: WhoIsIAmApplication._debug("__init__ %r", args) + BIPForeignApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: WhoIsIAmApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPForeignApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu) + + # forward it along + BIPForeignApplication.confirmation(self, apdu) + + def indication(self, apdu): + if _debug: WhoIsIAmApplication._debug("indication %r", apdu) + + if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)): + device_type, device_instance = apdu.iAmDeviceIdentifier + if device_type != 'device': + raise DecodingError("invalid object type") + + if (self._request.deviceInstanceRangeLowLimit is not None) and \ + (device_instance < self._request.deviceInstanceRangeLowLimit): + pass + elif (self._request.deviceInstanceRangeHighLimit is not None) and \ + (device_instance > self._request.deviceInstanceRangeHighLimit): + pass + else: + # print out the contents + sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n') + sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n') + sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n') + sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n') + sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n') + sys.stdout.flush() + + # forward it along + BIPForeignApplication.indication(self, apdu) + +bacpypes_debugging(WhoIsIAmApplication) + +# +# WhoIsIAmConsoleCmd +# + +class WhoIsIAmConsoleCmd(ConsoleCmd): + + def do_whois(self, args): + """whois [ ] [ ]""" + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args) + + try: + # build a request + request = WhoIsRequest() + if (len(args) == 1) or (len(args) == 3): + request.pduDestination = Address(args[0]) + del args[0] + else: + request.pduDestination = GlobalBroadcast() + + if len(args) == 2: + request.deviceInstanceRangeLowLimit = int(args[0]) + request.deviceInstanceRangeHighLimit = int(args[1]) + if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception as err: + WhoIsIAmConsoleCmd._exception("exception: %r", err) + + def do_iam(self, args): + """iam""" + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args) + + try: + # build a request + request = IAmRequest() + request.pduDestination = GlobalBroadcast() + + # set the parameters from the device object + request.iAmDeviceIdentifier = this_device.objectIdentifier + request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted + request.segmentationSupported = this_device.segmentationSupported + request.vendorID = this_device.vendorIdentifier + if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception as err: + WhoIsIAmConsoleCmd._exception("exception: %r", err) + + def do_rtn(self, args): + """rtn ... """ + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args) + + # safe to assume only one adapter + adapter = this_application.nsap.adapters[0] + if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter) + + # provide the address and a list of network numbers + router_address = Address(args[0]) + network_list = [int(arg) for arg in args[1:]] + + # pass along to the service access point + this_application.nsap.add_router_references(adapter, router_address, network_list) + +bacpypes_debugging(WhoIsIAmConsoleCmd) + +# +# main +# + +def main(): + global this_device, this_application, this_console + + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + if _debug: _log.debug(" - this_device: %r", this_device) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = WhoIsIAmApplication( + this_device, args.ini.address, + Address(args.ini.foreignbbmd), + int(args.ini.foreignttl), + ) + if _debug: _log.debug(" - this_application: %r", this_application) + + # get the services supported + services_supported = this_application.get_services_supported() + if _debug: _log.debug(" - services_supported: %r", services_supported) + + # let the device object know + this_device.protocolServicesSupported = services_supported.value + + # make a console + this_console = WhoIsIAmConsoleCmd() + if _debug: _log.debug(" - this_console: %r", this_console) + + _log.debug("running") + + run() + + _log.debug("finally") + +if __name__ == "__main__": + main()