it should be simple and easy to process data at the speed of sequential io.
a simple and efficient data format for easily manipulating chunks of rows of columns while minimizing allocations and copies.
minimal cli tools for rapidly composing performant data flow pipelines.
column: 0-65536 bytes.
row: 0-65536 columns.
chunk: up to 5MB containing 1 or more complete rows.
note: row data cannot exceed chunk size.
| i32:size | u8[]:row | ... |
row:
| u16:max | u16:size | ... | u8[]:column | ... |
note: column bytes are always followed by a single null byte.
note: max is the maximum zero based index into the row.
>> curl https://raw.githubusercontent.com/nathants/bsv/master/scripts/install_archlinux.sh | bash
>> git clone https://github.com/nathants/bsv
>> cd bsv
>> make -j
>> sudo mv -fv bin/* /usr/local/bin
note: for best pipeline performance increase maximum pipe size
>> sudo sysctl fs.pipe-max-size=5242880
>> tox
>> docker build -t bsv:debian -f Dockerfile.debian .
>> docker run -v $(pwd):/code --rm -it bsv:debian bash -c 'cd /code && py.test -vvx --tb native -n auto test/'
>> docker build -t bsv:alpine -f Dockerfile.alpine .
>> docker run -v $(pwd):/code --rm -it bsv:alpine bash -c 'cd /code && py.test -vvx --tb native -n auto test/'
increase the number of generated tests cases with environment variable: TEST_FACTOR=5
add bsumall.c
to bsv/src/
:
#include "util.h"
#include "load.h"
#include "dump.h"
#define DESCRIPTION "sum columns of u16 as i64\n\n"
#define USAGE "... | bsumall \n\n"
#define EXAMPLE ">> echo '\n1,2\n3,4\n' | bsv | bschema a:u16,a:u16 | bsumall i64 | bschema i64:a,i64:a | csv\n4,6\n"
int main(int argc, char **argv) {
// setup state
SETUP();
readbuf_t rbuf = rbuf_init((FILE*[]){stdin}, 1, false);
writebuf_t wbuf = wbuf_init((FILE*[]){stdout}, 1, false);
i64 sums[MAX_COLUMNS] = {0};
row_t row;
// process input row by row
while (1) {
load_next(&rbuf, &row, 0);
if (row.stop)
break;
for (i32 i = 0; i <= row.max; i++) {
ASSERT(sizeof(u16) == row.sizes[i], "fatal: bad data\n");
sums[i] += *(u16*)row.columns[i];
}
}
// generate output row
row.max = -1;
for (i32 i = 0; i < MAX_COLUMNS; i++) {
if (!sums[i])
break;
row.sizes[i] = sizeof(i64);
row.columns[i] = &sums[i];
row.max++;
}
// dump output
if (row.max >= 0)
dump(&wbuf, &row, 0);
dump_flush(&wbuf, 0);
}
build and run:
>> ./scripts/makefile.sh
>> make bsumall
>> bsumall -h
sum columns of u16 as i64
usage: ... | bsumall
>> echo '
1,2
3,4
' | bsv | bschema a:u16,a:u16 | bsumall i64 | bschema i64:a,i64:a | csv
4,6
support of hardware other than little endian.
types and schemas as a part of the data format.
quickcheck style testing with python implementations to verify correct behavior for arbitrary inputs and varying buffer sizes.
performance experiments and alternate implementations.
s4 - a storage cluster that is cheap and fast, with data local compute and efficient shuffle.
optimizing a bsv data processing pipeline
performant batch processing with bsv, s4, and presto
discovering a baseline for data processing performance
refactoring common distributed data patterns into s4
scaling python data processing horizontally
scaling python data processing vertically
structured analysis of nyc taxi data with bsv and hive
name | description |
---|---|
bcat | cat some bsv files to csv |
bcombine | prepend a new column by combining values from existing columns |
bcounteach | count as i64 each contiguous identical row by the first column |
bcounteach-hash | count as i64 by hash of the first column |
bcountrows | count rows as i64 |
bcut | select some columns |
bdedupe | dedupe identical contiguous rows by the first column, keeping the first |
bdedupe-hash | dedupe rows by hash of the first column, keeping the first |
bdropuntil | for sorted input, drop until the first column is gte to VALUE |
bhead | keep the first n rows |
blz4 | compress bsv data |
blz4d | decompress bsv data |
bmerge | merge sorted files from stdin |
bpartition | split into multiple files by consistent hash of the first column value |
bquantile-merge | merge ddsketches and output quantile value pairs as f64 |
bquantile-sketch | collapse the first column into a single row ddsketch |
bschema | validate and converts row data with a schema of columns |
bsort | timsort rows by the first column |
bsplit | split a stream into multiple files |
bsum | sum the first column |
bsumeach | sum the second column of each contiguous identical row by the first column |
bsumeach-hash | sum as i64 the second column by hash of the first column |
bsv | convert csv to bsv |
btake | take while the first column is VALUE |
btakeuntil | for sorted input, take until the first column is gte to VALUE |
btopn | accumulate the top n rows in a heap by first column value |
bunzip | split a multi column input into single column outputs |
bzip | combine single column inputs into a multi column output |
csv | convert bsv to csv |
xxh3 | xxh3_64 hash stdin |
cat some bsv files to csv
usage: bcat [-l|--lz4] [-p|--prefix] [-h N|--head N] FILE1 ... FILEN
>> for char in a a b b c c; do
echo $char | bsv >> /tmp/$char
done
>> bcat --head 1 --prefix /tmp/{a,b,c}
/tmp/a:a
/tmp/b:b
/tmp/c:c
prepend a new column by combining values from existing columns
usage: ... | bcombine COL1,...,COLN
>> echo a,b,c | bsv | bcombine 3,2 | csv
b:a,a,b,c
count as i64 each contiguous identical row by the first column
usage: ... | bcounteach
echo '
a
a
b
b
b
a
' | bsv | bcounteach | bschema *,i64:a | csv
a,2
b,3
a,1
count as i64 by hash of the first column
usage: ... | bcounteach-hash
echo '
a
a
b
b
b
a
' | bsv | bcounteach-hash | bschema *,i64:a | bsort | csv
a,3
b,3
count rows as i64
usage: ... | bcountrows
>> echo '
1
2
3
4
' | bsv | bcountrows | csv
4
select some columns
usage: ... | bcut COL1,...,COLN
>> echo a,b,c | bsv | bcut 3,3,3,2,2,1 | csv
c,c,c,b,b,a
dedupe identical contiguous rows by the first column, keeping the first
usage: ... | bdedupe
>> echo '
a
a
b
b
a
a
' | bsv | bdedupe | csv
a
b
a
dedupe rows by hash of the first column, keeping the first
usage: ... | bdedupe-hash
>> echo '
a
a
b
b
a
a
' | bsv | bdedupe-hash | csv
a
b
for sorted input, drop until the first column is gte to VALUE
usage: ... | bdropuntil VALUE [TYPE]
>> echo '
a
b
c
d
' | bsv | bdropuntil c | csv
c
d
keep the first n rows
usage: ... | bhead N
>> echo '
a
b
c
' | bsv | btail 2 | csv
a
b
compress bsv data
usage: ... | blz4
>> echo a,b,c | bsv | blz4 | blz4d | csv
a,b,c
decompress bsv data
usage: ... | blz4d
>> echo a,b,c | bsv | blz4 | blz4d | csv
a,b,c
merge sorted files from stdin
usage: echo FILE1 ... FILEN | bmerge [TYPE] [-r|--reversed] [-l|--lz4]
>> echo -e 'a
c
e
' | bsv > a.bsv
>> echo -e 'b
d
f
' | bsv > b.bsv
>> echo a.bsv b.bsv | bmerge
a
b
c
d
e
f
split into multiple files by consistent hash of the first column value
usage: ... | bpartition NUM_BUCKETS [PREFIX] [-l|--lz4]
>> echo '
a
b
c
' | bsv | bpartition 10 prefix
prefix03
prefix06
merge ddsketches and output quantile value pairs as f64
usage: ... | bquantile-merge QUANTILES
>> seq 1 100 | bsv | bschema a:i64 | bquantile-sketch i64 | bquantile-merge .2,.5,.7 | bschema f64:a,f64:a | csv
0.2,19.88667024086646
0.5,49.90296094906742
0.7,70.11183939140405
collapse the first column into a single row ddsketch
usage: ... | bquantile-sketch TYPE [-a|--alpha] [-b|--max-bins] [-m|--min-value]
>> seq 1 100 | bsv | bschema a:i64 | bquantile-sketch i64 | bquantile-merge .2,.5,.7 | bschema f64:a,f64:a | csv
0.2,19.88667024086646
0.5,49.90296094906742
0.7,70.11183939140405
validate and converts row data with a schema of columns
usage: ... | bschema SCHEMA [--filter]
--filter remove bad rows instead of erroring
example schemas:
*,*,* = 3 columns of any size
8,* = a column with 8 bytes followed by a column of any size
8,*,... = same as above, but ignore any trailing columns
a:u16,a:i32,a:f64 = convert ascii to numerics
u16:a,i32:a,f64:a = convert numerics to ascii
4*,*4 = keep the first 4 bytes of column 1 and the last 4 of column 2
>> echo aa,bbb,cccc | bsv | bschema 2,3,4 | csv
aa,bbb,cccc
timsort rows by the first column
usage: ... | bsort [-r|--reversed] [TYPE]
>> echo '
3
2
1
' | bsv | bschema a:i64 | bsort i64 | bschema i64:a | csv
1
2
3
split a stream into multiple files
usage: ... | bsplit PREFIX [chunks_per_file=1]
>> echo -n a,b,c | bsv | bsplit prefix
prefix_0000000000
sum the first column
usage: ... | bsum TYPE
>> echo '
1
2
3
4
' | bsv | bschema a:i64 | bsum i64 | bschema i64:a | csv
10
sum the second column of each contiguous identical row by the first column
usage: ... | bsumeach TYPE
echo '
a,1
a,2
b,3
b,4
b,5
a,6
' | bsv | bschema *,a:i64 | bsumeach i64 | bschema *,i64:a | csv
a,3
b,12
a,6
sum as i64 the second column by hash of the first column
usage: ... | bsumeach-hash i64
echo '
a,1
a,2
b,3
b,4
b,5
a,6
' | bsv | bschema *,a:i64 | bsumeach-hash i64 | bschema *,i64:a | csv
a,3
b,12
a,6
convert csv to bsv
usage: ... | bsv
>> echo a,b,c | bsv | bcut 3,2,1 | csv
c,b,a
take while the first column is VALUE
usage: ... | btake VALUE
>> echo '
a
b
c
d
' | bsv | bdropntil c | btake c | csv
c
for sorted input, take until the first column is gte to VALUE
usage: ... | btakeuntil VALUE [TYPE]
>> echo '
a
b
c
d
' | bsv | btakeuntil c | csv
a
b
accumulate the top n rows in a heap by first column value
usage: ... | btopn N [TYPE] [-r|--reversed]
>> echo '
1
3
2
' | bsv | bschema a:i64 | btopn 2 i64 | bschema i64:a | csv
3
2
split a multi column input into single column outputs
usage: ... | bunzip PREFIX [-l|--lz4]
>> echo '
a,b,c
1,2,3
' | bsv | bunzip col && echo col_1 col_3 | bzip | csv
a,c
1,3
combine single column inputs into a multi column output
usage: ls column_* | bzip [COL1,...COLN] [-l|--lz4]
>> echo '
a,b,c
1,2,3
' | bsv | bunzip column && ls column_* | bzip 1,3 | csv
a,c
1,3
convert bsv to csv
usage: ... | csv
>> echo a,b,c | bsv | csv
a,b,c
xxh3_64 hash stdin
usage: ... | xxh3 [--stream|--int]
--stream pass stdin through to stdout with hash on stderr
--int output hash as int not hash
>> echo abc | xxh3
079364cbfdf9f4cb