Skip to content

Commit

Permalink
Merge pull request #18 from l4m3rx/infinite-pings
Browse files Browse the repository at this point in the history
Updated quiet and verbose_ping to generator objects for efficiency
  • Loading branch information
l4m3rx authored Nov 3, 2016
2 parents 48b9ea3 + db9553f commit a0f488c
Showing 1 changed file with 78 additions and 22 deletions.
100 changes: 78 additions & 22 deletions ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _checksum(source_string):


def single_ping(destIP, hostname, timeout, mySeqNumber, numDataBytes,
myStats=None, quiet=False, ipv6=False):
myStats=None, quiet=False, ipv6=False):
"""
Returns either the delay (in ms) or None on timeout.
"""
Expand Down Expand Up @@ -319,6 +319,23 @@ def verbose_ping(hostname, timeout=3000, count=3,
"""
Send >count< ping to >destIP< with the given >timeout< and display
the result.
To continuously attempt ping requests, set >count< to None.
To consume the generator, use the following syntax:
>>> import ping
>>> for return_val in ping.verbose_ping('google.ca'):
pass # COLLECT YIELDS AND PERFORM LOGIC.
Alternatively, you can consume the generator by using list comprehension:
>>> import ping
>>> consume = list(ping.verbose_ping('google.ca'))
Via the same syntax, you can successfully get the exit code via:
>>> import ping
>>> consume = list(ping.verbose_ping('google.ca'))
>>> exit_code = consume[:-1] # The last yield is the exit code.
>>> sys.exit(exit_code)
"""
signal.signal(signal.SIGINT, _signal_handler) # Handle Ctrl-C
if hasattr(signal, "SIGBREAK"):
Expand All @@ -344,9 +361,10 @@ def verbose_ping(hostname, timeout=3000, count=3,

myStats.thisIP = destIP

for i in range(count):
i = 0
while 1:
delay = single_ping(destIP, hostname, timeout, mySeqNumber,
numDataBytes, ipv6=ipv6, myStats=myStats)
numDataBytes, ipv6=ipv6, myStats=myStats)
if delay is None:
delay = 0

Expand All @@ -356,15 +374,23 @@ def verbose_ping(hostname, timeout=3000, count=3,
if (MAX_SLEEP > delay):
time.sleep((MAX_SLEEP - delay)/1000)

if count is not None and i < count:
i += 1
yield myStats.pktsRcvd
elif count is None:
yield myStats.pktsRcvd
elif count is not None and i >= count:
break

_dump_stats(myStats)
# 0 if we receive at least one packet
# 1 if we don't receive any packets
return not myStats.pktsRcvd
yield not myStats.pktsRcvd


def quiet_ping(hostname, timeout=3000, count=3,
numDataBytes=64, path_finder=False, ipv6=False):
""" Same as verbose_ping, but the results are returned as tuple """
""" Same as verbose_ping, but the results are yielded as a tuple """
myStats = MyStats() # Reset the stats
mySeqNumber = 0 # Starting value

Expand All @@ -375,7 +401,7 @@ def quiet_ping(hostname, timeout=3000, count=3,
else:
destIP = socket.gethostbyname(hostname)
except socket.gaierror:
return False
yield False

myStats.thisIP = destIP

Expand All @@ -385,12 +411,14 @@ def quiet_ping(hostname, timeout=3000, count=3,
if path_finder:
fakeStats = MyStats()
single_ping(fakeStats, destIP, hostname, timeout,
mySeqNumber, numDataBytes, quiet=True, ipv6=ipv6)
mySeqNumber, numDataBytes, quiet=True, ipv6=ipv6)
time.sleep(0.5)

for i in range(count):
delay = single_ping(destIP, hostname, timeout, mySeqNumber, numDataBytes,
quiet=True, ipv6=ipv6, myStats=myStats)
i = 0
while 1:
delay = single_ping(destIP, hostname, timeout, mySeqNumber,
numDataBytes, quiet=True, ipv6=ipv6,
myStats=myStats)

if delay is None:
delay = 0
Expand All @@ -401,6 +429,15 @@ def quiet_ping(hostname, timeout=3000, count=3,
if (MAX_SLEEP > delay):
time.sleep((MAX_SLEEP - delay) / 1000)

yield myStats.pktsSent

if count is not None and i < count:
i += 1
elif count is not None:
yield myStats.pktsSent
elif count is not None and i >= count:
break

if myStats.pktsSent > 0:
myStats.fracLoss = (myStats.pktsSent - myStats.pktsRcvd) / \
myStats.pktsSent
Expand All @@ -409,32 +446,42 @@ def quiet_ping(hostname, timeout=3000, count=3,
myStats.avrgTime = myStats.totTime / myStats.pktsRcvd

# return tuple(max_rtt, min_rtt, avrg_rtt, percent_lost)
return myStats.maxTime, myStats.minTime, myStats.avrgTime, myStats.fracLoss
yield myStats.maxTime, myStats.minTime, myStats.avrgTime, myStats.fracLoss


if __name__ == '__main__':
# FIXME: Add a real CLI (mostly fixed)
if sys.argv.count('-T') or sys.argv.count('--test_case'):
print('Running PYTHON PING test case.')
# These should work:
verbose_ping("127.0.0.1")
verbose_ping("8.8.8.8")
verbose_ping("heise.de")
verbose_ping("google.com")
for val in verbose_ping("127.0.0.1"):
pass
for val in verbose_ping("8.8.8.8"):
pass
for val in verbose_ping("heise.de"):
pass
for val in verbose_ping("google.com"):
pass

# Inconsistent on Windows w/ ActivePython (Python 3.2 resolves
# correctly to the local host, but 2.7 tries to resolve to the local
# *gateway*)
verbose_ping("localhost")
for val in verbose_ping("localhost"):
pass

# Should fail with 'getaddrinfo failed':
verbose_ping("foobar_url.foobar")
for val in verbose_ping("foobar_url.fooobar"):
pass

# Should fail (timeout), but it depends on the local network:
verbose_ping("192.168.255.254")
for val in verbose_ping("192.168.255.254"):
pass

# Should fails with 'The requested address is not valid in its context'
verbose_ping("0.0.0.0")
for val in verbose_ping("0.0.0.0"):
pass

exit()

parser = argparse.ArgumentParser(prog='python-ping',
description='A pure python implementation\
Expand All @@ -447,7 +494,9 @@ def quiet_ping(hostname, timeout=3000, count=3,
parser.add_argument('-c', '--request_count', help='The number of attempts \
to make. See --infinite to attempt requests until \
stopped.', type=int, default=3)
# TODO Implement the ability to continuously attempt pings.

parser.add_argument('-i', '--infinite', help='Flag to continuously ping \
a host until stopped.', action='store_true')

parser.add_argument('-I', '--ipv6', action='store_true', help='Flag to \
use IPv6.')
Expand All @@ -460,5 +509,12 @@ def quiet_ping(hostname, timeout=3000, count=3,

parsed = parser.parse_args()

sys.exit(verbose_ping(parsed.address, parsed.timeout, parsed.request_count,
parsed.packet_size, ipv6=parsed.ipv6))
if parsed.infinite:
sys.exit(list(verbose_ping(parsed.address, parsed.timeout,
None, parsed.packet_size,
ipv6=parsed.ipv6))[:-1])

else:
sys.exit(list(verbose_ping(parsed.address, parsed.timeout,
parsed.request_count, parsed.packet_size,
ipv6=parsed.ipv6))[:-1])

0 comments on commit a0f488c

Please sign in to comment.