-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathreduce.py
executable file
·168 lines (146 loc) · 5.3 KB
/
reduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
import argparse
import time
import numpy as np
import arkouda as ak
OPS = ("sum", "prod", "min", "max")
TYPES = ("int64", "float64")
def time_ak_reduce(N_per_locale, trials, dtype, random, seed):
print(">>> arkouda {} reduce".format(dtype))
cfg = ak.get_config()
N = N_per_locale * cfg["numLocales"]
print("numLocales = {}, N = {:,}".format(cfg["numLocales"], N))
if random or seed is not None:
if dtype == "int64":
a = ak.randint(1, N, N, seed=seed)
elif dtype == "float64":
a = ak.uniform(N, seed=seed) + 0.5
else:
a = ak.arange(1, N, 1)
if dtype == "float64":
a = 1.0 * a
timings = {op: [] for op in OPS}
results = {}
for i in range(trials):
for op in timings.keys():
fxn = getattr(a, op)
start = time.time()
r = fxn()
end = time.time()
timings[op].append(end - start)
results[op] = r
tavg = {op: sum(t) / trials for op, t in timings.items()}
for op, t in tavg.items():
print("{} = {}".format(op, results[op]))
print(" {} Average time = {:.4f} sec".format(op, t))
bytes_per_sec = (a.size * a.itemsize) / t
print(" {} Average rate = {:.2f} GiB/sec".format(op, bytes_per_sec / 2**30))
def time_np_reduce(N, trials, dtype, random, seed):
print(">>> numpy {} reduce".format(dtype))
print("N = {:,}".format(N))
if seed is not None:
np.random.seed(seed)
if random or seed is not None:
if dtype == "int64":
a = np.random.randint(1, N, N)
elif dtype == "float64":
a = np.random.random(N) + 0.5
else:
a = np.arange(1, N, 1, dtype=dtype)
timings = {op: [] for op in OPS}
results = {}
for i in range(trials):
for op in timings.keys():
fxn = getattr(a, op)
start = time.time()
r = fxn()
end = time.time()
timings[op].append(end - start)
results[op] = r
tavg = {op: sum(t) / trials for op, t in timings.items()}
for op, t in tavg.items():
print("{} = {}".format(op, results[op]))
print(" {} Average time = {:.4f} sec".format(op, t))
bytes_per_sec = (a.size * a.itemsize) / t
print(" {} Average rate = {:.2f} GiB/sec".format(op, bytes_per_sec / 2**30))
def check_correctness(dtype, random, seed):
N = 10**4
if seed is not None:
np.random.seed(seed)
if random or seed is not None:
if dtype == "int64":
a = np.random.randint(1, N, N)
elif dtype == "float64":
a = np.random.random(N) + 0.5
else:
if dtype == "int64":
a = np.arange(1, N, 1, dtype=dtype)
elif dtype == "float64":
a = np.arange(1, 1 + 1 / N, (1 / N) / N, dtype=dtype)
for op in OPS:
npa = a
aka = ak.array(a)
fxn = getattr(npa, op)
npr = fxn()
fxn = getattr(aka, op)
akr = fxn()
# Because np.prod() returns an integer type with no infinity, it returns
# zero on overflow.
# By contrast, ak.prod() returns float64, so it returns inf on overflow
if dtype == "int64" and op == "prod" and npr == 0 and akr == np.inf:
continue
assert np.isclose(npr, akr)
def create_parser():
parser = argparse.ArgumentParser(description="Measure performance of reductions over arrays.")
parser.add_argument("hostname", help="Hostname of arkouda server")
parser.add_argument("port", type=int, help="Port of arkouda server")
parser.add_argument(
"-n", "--size", type=int, default=10**8, help="Problem size: length of array to reduce"
)
parser.add_argument(
"-t", "--trials", type=int, default=6, help="Number of times to run the benchmark"
)
parser.add_argument(
"-d", "--dtype", default="int64", help="Dtype of array ({})".format(", ".join(TYPES))
)
parser.add_argument(
"-r",
"--randomize",
default=False,
action="store_true",
help="Fill array with random values instead of range",
)
parser.add_argument(
"--numpy",
default=False,
action="store_true",
help="Run the same operation in NumPy to compare performance.",
)
parser.add_argument(
"--correctness-only",
default=False,
action="store_true",
help="Only check correctness, not performance.",
)
parser.add_argument(
"-s", "--seed", default=None, type=int, help="Value to initialize random number generator"
)
return parser
if __name__ == "__main__":
import sys
parser = create_parser()
args = parser.parse_args()
if args.dtype not in TYPES:
raise ValueError("Dtype must be {}, not {}".format("/".join(TYPES), args.dtype))
ak.verbose = False
ak.connect(args.hostname, args.port)
if args.correctness_only:
for dtype in TYPES:
check_correctness(dtype, args.randomize, args.seed)
sys.exit(0)
print("array size = {:,}".format(args.size))
print("number of trials = ", args.trials)
time_ak_reduce(args.size, args.trials, args.dtype, args.randomize, args.seed)
if args.numpy:
time_np_reduce(args.size, args.trials, args.dtype, args.randomize, args.seed)
sys.exit(0)