-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbpartition_test.py
126 lines (118 loc) · 3.87 KB
/
bpartition_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import string
import shell
import collections
import xxh3
from hypothesis.database import ExampleDatabase
from hypothesis import given, settings
from hypothesis.strategies import text, lists, composite, integers, tuples
from test_util import unindent, rm_whitespace, clone_source
def setup_module(m):
m.tempdir = clone_source()
m.orig = os.getcwd()
m.path = os.environ['PATH']
os.chdir(m.tempdir)
os.environ['PATH'] = f'{os.getcwd()}/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin:/bin'
shell.run('make clean && make bsv csv bcat bpartition', stream=True)
def teardown_module(m):
os.chdir(m.orig)
os.environ['PATH'] = m.path
assert m.tempdir.startswith('/tmp/') or m.tempdir.startswith('/private/var/folders/')
shell.run('rm -rf', m.tempdir)
@composite
def inputs(draw):
num_buckets = draw(integers(min_value=1, max_value=128))
num_columns = draw(integers(min_value=1, max_value=12))
column = text(string.ascii_lowercase, min_size=1)
columns = lists(column, min_size=1, max_size=num_columns)
lines = draw(lists(columns, min_size=1))
csv = '\n'.join([','.join(line) for line in lines]) + '\n'
return num_buckets, csv
def expected(num_buckets, csv):
res = collections.defaultdict(list)
size = len(str(num_buckets))
for line in csv.splitlines():
col0 = line.split(',', 1)[0]
bucket = xxh3.oneshot_int(col0.encode()) % num_buckets
res[str(bucket).zfill(size)].append(line)
val = ''
for k in sorted(res):
for line in res[k]:
val += f'prefix_{k}:{line}\n'
return val.strip()
@given(inputs())
@settings(database=ExampleDatabase(':memory:'), max_examples=100 * int(os.environ.get('TEST_FACTOR', 1)), deadline=os.environ.get("TEST_DEADLINE", 1000 * 60)) # type: ignore
def test_props(args):
num_buckets, csv = args
result = expected(num_buckets, csv)
with shell.tempdir():
stdout = '\n'.join(sorted({l.split(':')[0] for l in result.splitlines()}))
assert stdout == shell.run(f'bsv | bpartition {num_buckets} prefix', stdin=csv, echo=True)
assert result == shell.run('bcat --prefix prefix*')
def test_without_prefix():
with shell.tempdir():
stdin = """
b,c,d
e,f,g
h,i,j
"""
stdout = """
02
04
05
"""
assert rm_whitespace(unindent(stdout)) == shell.run('bsv | bpartition 10', stdin=unindent(stdin))
def test_basic():
with shell.tempdir():
stdin = """
b,c,d
e,f,g
h,i,j
"""
stdout = """
prefix_02
prefix_04
prefix_05
"""
assert rm_whitespace(unindent(stdout)) == shell.run('bsv | bpartition 10 prefix', stdin=unindent(stdin))
stdout = """
prefix_02:h,i,j
prefix_04:e,f,g
prefix_05:b,c,d
"""
assert unindent(stdout).strip() == shell.run('bcat --prefix prefix*')
stdout = """
prefix_02
prefix_04
prefix_05
"""
assert unindent(stdout).strip() == shell.run('ls prefix*')
def test_appends():
with shell.tempdir():
stdin = """
b,c,d
e,f,g
h,i,j
"""
stdout = """
prefix_02
prefix_04
prefix_05
"""
assert rm_whitespace(unindent(stdout)) == shell.run('bsv | bpartition 10 prefix', stdin=unindent(stdin))
assert rm_whitespace(unindent(stdout)) == shell.run('bsv | bpartition 10 prefix', stdin=unindent(stdin))
stdout = """
prefix_02:h,i,j
prefix_02:h,i,j
prefix_04:e,f,g
prefix_04:e,f,g
prefix_05:b,c,d
prefix_05:b,c,d
"""
assert unindent(stdout).strip() == shell.run('bcat --prefix prefix*')
stdout = """
prefix_02
prefix_04
prefix_05
"""
assert unindent(stdout).strip() == shell.run('ls prefix*')