diff --git a/ping.py b/ping.py index a3ea1b7..dffe7da 100755 --- a/ping.py +++ b/ping.py @@ -103,7 +103,7 @@ def _checksum(source_string): def single_ping(destIP, hostname, timeout, mySeqNumber, numDataBytes, - myStats=None, quiet=False, ipv6=False): + myStats=None, quiet=False, ipv6=False): """ Returns either the delay (in ms) or None on timeout. """ @@ -316,6 +316,23 @@ def verbose_ping(hostname, timeout=3000, count=3, """ Send >count< ping to >destIP< with the given >timeout< and display the result. + + To continuously attempt ping requests, set >count< to None. + + To consume the generator, use the following syntax: + >>> import ping + >>> for return_val in ping.verbose_ping('google.ca'): + pass # COLLECT YIELDS AND PERFORM LOGIC. + + Alternatively, you can consume the generator by using list comprehension: + >>> import ping + >>> consume = list(ping.verbose_ping('google.ca')) + + Via the same syntax, you can successfully get the exit code via: + >>> import ping + >>> consume = list(ping.verbose_ping('google.ca')) + >>> exit_code = consume[:-1] # The last yield is the exit code. + >>> sys.exit(exit_code) """ signal.signal(signal.SIGINT, _signal_handler) # Handle Ctrl-C if hasattr(signal, "SIGBREAK"): @@ -341,9 +358,10 @@ def verbose_ping(hostname, timeout=3000, count=3, myStats.thisIP = destIP - for i in range(count): + i = 0 + while 1: delay = single_ping(destIP, hostname, timeout, mySeqNumber, - numDataBytes, ipv6=ipv6, myStats=myStats) + numDataBytes, ipv6=ipv6, myStats=myStats) if delay is None: delay = 0 @@ -353,15 +371,23 @@ def verbose_ping(hostname, timeout=3000, count=3, if (MAX_SLEEP > delay): time.sleep((MAX_SLEEP - delay)/1000) + if count is not None and i < count: + i += 1 + yield myStats.pktsRcvd + elif count is None: + yield myStats.pktsRcvd + elif count is not None and i >= count: + break + _dump_stats(myStats) # 0 if we receive at least one packet # 1 if we don't receive any packets - return not myStats.pktsRcvd + yield not myStats.pktsRcvd def quiet_ping(hostname, timeout=3000, count=3, numDataBytes=64, path_finder=False, ipv6=False): - """ Same as verbose_ping, but the results are returned as tuple """ + """ Same as verbose_ping, but the results are yielded as a tuple """ myStats = MyStats() # Reset the stats mySeqNumber = 0 # Starting value @@ -372,7 +398,7 @@ def quiet_ping(hostname, timeout=3000, count=3, else: destIP = socket.gethostbyname(hostname) except socket.gaierror: - return False + yield False myStats.thisIP = destIP @@ -382,12 +408,14 @@ def quiet_ping(hostname, timeout=3000, count=3, if path_finder: fakeStats = MyStats() single_ping(fakeStats, destIP, hostname, timeout, - mySeqNumber, numDataBytes, quiet=True, ipv6=ipv6) + mySeqNumber, numDataBytes, quiet=True, ipv6=ipv6) time.sleep(0.5) - for i in range(count): - delay = single_ping(destIP, hostname, timeout, mySeqNumber, numDataBytes, - quiet=True, ipv6=ipv6, myStats=myStats) + i = 0 + while 1: + delay = single_ping(destIP, hostname, timeout, mySeqNumber, + numDataBytes, quiet=True, ipv6=ipv6, + myStats=myStats) if delay is None: delay = 0 @@ -398,6 +426,15 @@ def quiet_ping(hostname, timeout=3000, count=3, if (MAX_SLEEP > delay): time.sleep((MAX_SLEEP - delay) / 1000) + yield myStats.pktsSent + + if count is not None and i < count: + i += 1 + elif count is not None: + yield myStats.pktsSent + elif count is not None and i >= count: + break + if myStats.pktsSent > 0: myStats.fracLoss = (myStats.pktsSent - myStats.pktsRcvd) / \ myStats.pktsSent @@ -406,7 +443,7 @@ def quiet_ping(hostname, timeout=3000, count=3, myStats.avrgTime = myStats.totTime / myStats.pktsRcvd # return tuple(max_rtt, min_rtt, avrg_rtt, percent_lost) - return myStats.maxTime, myStats.minTime, myStats.avrgTime, myStats.fracLoss + yield myStats.maxTime, myStats.minTime, myStats.avrgTime, myStats.fracLoss if __name__ == '__main__': @@ -414,24 +451,34 @@ def quiet_ping(hostname, timeout=3000, count=3, if sys.argv.count('-T') or sys.argv.count('--test_case'): print('Running PYTHON PING test case.') # These should work: - verbose_ping("127.0.0.1") - verbose_ping("8.8.8.8") - verbose_ping("heise.de") - verbose_ping("google.com") + for val in verbose_ping("127.0.0.1"): + pass + for val in verbose_ping("8.8.8.8"): + pass + for val in verbose_ping("heise.de"): + pass + for val in verbose_ping("google.com"): + pass # Inconsistent on Windows w/ ActivePython (Python 3.2 resolves # correctly to the local host, but 2.7 tries to resolve to the local # *gateway*) - verbose_ping("localhost") + for val in verbose_ping("localhost"): + pass # Should fail with 'getaddrinfo failed': - verbose_ping("foobar_url.foobar") + for val in verbose_ping("foobar_url.fooobar"): + pass # Should fail (timeout), but it depends on the local network: - verbose_ping("192.168.255.254") + for val in verbose_ping("192.168.255.254"): + pass # Should fails with 'The requested address is not valid in its context' - verbose_ping("0.0.0.0") + for val in verbose_ping("0.0.0.0"): + pass + + exit() parser = argparse.ArgumentParser(prog='python-ping', description='A pure python implementation\ @@ -444,7 +491,9 @@ def quiet_ping(hostname, timeout=3000, count=3, parser.add_argument('-c', '--request_count', help='The number of attempts \ to make. See --infinite to attempt requests until \ stopped.', type=int, default=3) - # TODO Implement the ability to continuously attempt pings. + + parser.add_argument('-i', '--infinite', help='Flag to continuously ping \ + a host until stopped.', action='store_true') parser.add_argument('-I', '--ipv6', action='store_true', help='Flag to \ use IPv6.') @@ -457,5 +506,12 @@ def quiet_ping(hostname, timeout=3000, count=3, parsed = parser.parse_args() - sys.exit(verbose_ping(parsed.address, parsed.timeout, parsed.request_count, - parsed.packet_size, ipv6=parsed.ipv6)) + if parsed.infinite: + sys.exit(list(verbose_ping(parsed.address, parsed.timeout, + None, parsed.packet_size, + ipv6=parsed.ipv6))[:-1]) + + else: + sys.exit(list(verbose_ping(parsed.address, parsed.timeout, + parsed.request_count, parsed.packet_size, + ipv6=parsed.ipv6))[:-1])