Skip to content

Commit

Permalink
Closes #1527 - Add is_ipv4() and is_ipv6() for ipaddresses (#1538)
Browse files Browse the repository at this point in the history
* Adds is_ipv4 is_ipv6. Adds testing.

* Updating following code review comments.
  • Loading branch information
Ethan-DeBandi99 authored Jun 29, 2022
1 parent 059c85b commit 9d4489a
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 0 deletions.
76 changes: 76 additions & 0 deletions arkouda/client_dtypes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from functools import partial
from ipaddress import ip_address as _ip_address
from typing import Optional, Union

import numpy as np # type: ignore
from typeguard import typechecked

from arkouda.dtypes import bitType, intTypes, isSupportedInt
from arkouda.groupbyclass import GroupBy, broadcast
Expand Down Expand Up @@ -547,3 +549,77 @@ def opeq(self, other, op):
else:
return NotImplemented
self.values.opeq(otherdata, op)


@typechecked
def is_ipv4(ip: Union[pdarray, IPv4], ip2: Optional[pdarray] = None) -> pdarray:
"""
Indicate which values are ipv4 when passed data containing IPv4 and IPv6 values.
Parameters
----------
ip: pdarray (int64) or ak.IPv4
IPv4 value. High Bits of IPv6 if IPv6 is passed in.
ip2: pdarray (int64), Optional
Low Bits of IPv6. This is added for support when dealing with data that contains IPv6 as well.
Returns
-------
pdarray of bools indicating which indexes are IPv4.
See Also
--------
ak.is_ipv6
"""
# grab the ipv4 pdarray of values
if isinstance(ip, IPv4):
ip = ip.values

if ip.dtype not in intTypes or (ip2 is not None and ip2.dtype not in intTypes):
raise TypeError("ip and ip2 must be int64 pdarrays. ip2 is Optional.")

if ip2 is not None and ip.size != ip2.size:
raise RuntimeError("When supplying a value for ip2, ip and ip2 must be the same size.")

if ip2 is not None:
ans = ip < 2 ** 32
ans2 = ip2 == 0
return ans & ans2
else:
return ip < 2 ** 32


@typechecked
def is_ipv6(ip: Union[pdarray, IPv4], ip2: Optional[pdarray] = None) -> pdarray:
"""
Indicate which values are ipv6 when passed data containing IPv4 and IPv6 values.
Parameters
----------
ip: pdarray (int64) or ak.IPv4
High Bits of IPv6.
ip2: pdarray (int64), Optional
Low Bits of IPv6
Returns
-------
pdarray of bools indicating which indexes are IPv6.
See Also
--------
ak.is_ipv4
"""
# grab the ipv4 pdarray of values
if isinstance(ip, IPv4):
ip = ip.values
if ip.dtype not in intTypes or (ip2 is not None and ip2.dtype not in intTypes):
raise TypeError("ip and ip2 must be int64 pdarrays. ip2 is Optional.")
if ip2 is not None and ip.size != ip2.size:
raise RuntimeError("When supplying a value for ip2, ip and ip2 must be the same size.")

if ip2 is not None:
ans = ip >= 2 ** 32
ans2 = ip2 != 0
return ans | ans2
else:
return ip >= 2 ** 32
40 changes: 40 additions & 0 deletions tests/client_dtypes_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import random

from base_test import ArkoudaTest
from context import arkouda as ak

Expand Down Expand Up @@ -119,3 +121,41 @@ def test_ipv4_normalization(self):
ipv4 = ak.IPv4(ip_list)
ip_as_int = ipv4.normalize("192.168.1.1")
self.assertEqual(3232235777, ip_as_int)

def test_is_ipv4(self):
x = [random.getrandbits(32) for i in range(100)]

ans = ak.is_ipv4(ak.array(x, dtype=ak.uint64))
self.assertListEqual(ans.to_ndarray().tolist(), [True]*100)

ipv4 = ak.IPv4(ak.array(x))
ans = ak.is_ipv4(ipv4)
self.assertListEqual(ans.to_ndarray().tolist(), [True] * 100)

x = [random.getrandbits(64) if i < 5 else random.getrandbits(32) for i in range(10)]
ans = ak.is_ipv4(ak.array(x, ak.uint64))
self.assertListEqual(ans.to_ndarray().tolist(), [i >= 5 for i in range(10)])

with self.assertRaises(TypeError):
ak.is_ipv4(ak.array(x))

with self.assertRaises(RuntimeError):
ak.is_ipv4(ak.array(x, dtype=ak.uint64), ak.arange(2, dtype=ak.uint64))

def test_is_ipv6(self):
x = [random.getrandbits(128) for i in range(100)]
low = ak.array([i & (2 ** 64 - 1) for i in x], dtype=ak.uint64)
high = ak.array([i >> 64 for i in x], dtype=ak.uint64)

ans = ak.is_ipv6(high, low)
self.assertListEqual(ans.to_ndarray().tolist(), [True] * 100)

x = [random.getrandbits(64) if i < 5 else random.getrandbits(32) for i in range(10)]
ans = ak.is_ipv6(ak.cast(ak.array(x), ak.int64))
self.assertListEqual(ans.to_ndarray().tolist(), [i < 5 for i in range(10)])

with self.assertRaises(TypeError):
ak.is_ipv6(ak.array(x))

with self.assertRaises(RuntimeError):
ak.is_ipv6(ak.cast(ak.array(x), ak.int64), ak.cast(ak.arange(2), ak.int64))

0 comments on commit 9d4489a

Please sign in to comment.