I recently took some time to improve the throughput of differential dataflow. One important step in differential dataflow is grouping a collection of (key, val, wgt)
triples by (key, val)
, so that the wgt
values can be accumulated. I did that by sorting the triples, which often leads to a fairly natural reaction of "wow, that's probably a bit heavyweight; wouldn't a HashMap
work better?" I even thought this for a while.
Constant time random access does sound better than doing super-linear work sorting. In fact, this has been the subject of a bunch of recent work in the database community, with this paper on sort joins versus hash joins being a great place to start. One thing you notice when you start to read is that by "hash join" they don't mean HashMap
, they mean radix sort. The second thing you notice is that the performance the database community achieves completely blows the performance the systems community seems to expect out of the water.
I'm going to explain the first thing, and why "doing random access" isn't even on the table for performance-conscious people. The second thing will have to remain a mystery for now, much like sixth normal form and "The Chase".
Ok, so there was this paper back at SOSP 2013 called X-Stream, whose abstract indicates that its novelty lies at least partly in
... streaming completely unordered edge lists rather than performing random access.
The idea was that some graph datasets are so large, even sorting the data is prohibitively expensive. You'd rather just take the edges as they come, eat a random access into your per-node state, but avoid the scary task of sorting lots of data.
The statement above always weirded me out a bit, because I think of unordered edge lists as inducing a lot of random access: if you don't have any locality in the edge endpoints, you stab around in memory pretty much arbitrarily. I think what is meant is that some sorting algorithms (e.g. counting sort) end up doing random access into an amount of memory proportional to the number of edges you have. Naughty counting sort.
When X-Stream was presented at SOSP 2013, I asked the authors how come the time to process the unordered edges on their system was larger than the time to process ordered edges on my laptop (with all of its "random accesses").
Specifically, I asked
What overheads does X-Stream introduce, and at what scales does X-Stream outperform more specialized approaches? E.g., PageRank iterations on my laptop (40 lines single threaded C#) takes 64s, versus 75s for 16 cores in X-Stream
Clearly, even at this point I was already well on my way down the path of making friends and influencing people.
The answer, and the motivation for today's post, was that yes, it is about sorting being more expensive than you think.
X-Stream trades pre-processing overheads for inefficiency at runtime. We estimate it would take about 600 seconds to sort the Twitter edge list [1.48B edges] into the format you need for your implementation. Your single core implementation therefore takes around 664 seconds as compared to our 75 seconds. This is a very good question though and underlines the need for proper baselines and a comparative study of graph processing systems.
This got read aloud, so the whole audience got to hear just how silly I was. It is conventional wisdom that sorting is very, very expensive. At least, expensive relative to just dereferencing a memory location.
Let's challenge this conventional wisdom, and see how sorting stacks up against random access. How much slower is sorting really, and when should you switch from unordered access to first sorting the data.
We'll now take a brief tour through sorting algorithms, starting from the very easy, up to the slightly more intelligent, and on to what you find to be the state of the art when you go talk to the database crowd who apparently do this for a living.
I'm using Rust, and Rust has a sort
method on mutable slices of types T: Ord
. The implementation of sort
uses merge sort and allocates twice the size of the original slice, so it's going to be our straw man.
Because it does all these allocations, I'm not really able to try it out with the 1.48B edges, because that would be 36GB or so. Instead, I'll sort some smaller things, and we will already be disappointed, which is the goal of this section anyhow.
We are going to sort 1 << 29
elements, which is a weird way to write things. When you see 1 << 29
you should think either "half a billion" , or "one third of that graph Frank asked about", or just "some big-ish number of edges".
We are using 1 << 29
because that is what fits in my laptop's memory, and coincidentally that is the largest number of elements the X-Stream paper sorted when evaluating the cost of sorting (their Figure 18). We will be able to compare our results with their results! SCIENCE!
I'm going to sort random (u32, u32)
data, then flip the data around and do it again, ten times in total:
extern crate rand;
use rand::{Rng, SeedableRng, StdRng};
fn main() {
let mut vector = vec![];
let seed: &[_] = &[1, 2, 3, 4];
let mut rng: StdRng = SeedableRng::from_seed(seed);
for i in 0u32..(1 << 29) {
vector.push((rng.next_u32(), rng.next_u32()));
}
for _ in 0..10 {
for index in 0..vector.len() {
let (x,y) = vector[index];
vector[index] = (y,x);
}
vector.sort();
}
}
Here is what I got:
Echidnatron% time cargo run
Running `target/release/merge`
cargo run 1034.41s user 204.27s system 90% cpu 22:56.07 total
That is about 1376 seconds, 9 seconds of which are data generation, to do ten iterations, so 137.6 seconds per iteration.
By comparison, in X-Stream's Figure 18, at 1 << 29
edges gcc
's sort doesn't finish in 500 seconds (and it looks like it is heading for 1000 seconds), and counting sort clocks in at a little over 250 seconds.
It isn't clear what has changed since 2013 (and perhaps I misunderstood the scale of graph they are working with), but if you multiply this by three (to get to 1.5B edges, per the graph in question), you do get about 400 seconds, plus some slop to account for a few years passing... Maybe 600 seconds is the right ballpark?
Although, Rust's sort
is quite primitive. Isn't there anything better?
Counting sort is the other sorting algorithm X-Stream compares against. It is meant to be better than mergesort and quicksort because it is specialized to integer keys and does just two passes over the data.
Unfortunately, it isn't a very good sorting algorithm for large amounts of data, for exactly the reason that we are trying to sort the data in the first place: counting sort does lots of random accesses, and those random accesses get very expensive as the space of keys gets large.
Counting sort first determines each node's degree by scanning the elements and updating counts, then it accumulates these degrees to determine offsets in a larger array, where the sorted data will land. It then scans the elements again and writes them to the final array at the appropriate offset:
// determine node degrees
let mut counts = vec![0; nodes];
for &(src,dst) in &edges {
counts[src] += 1;
}
// accumulate degrees
let mut offset = 0;
for i in 1..nodes {
mem::swap(&mut offset, &mut counts[i]);
offset += counts[i];
}
// write edge destinations at offsets
let mut dests = vec![0; edges.len()];
for &(src,dst) in &edges {
dests[counts[src]] = dst;
counts[src] += 1;
}
If you do the math, X-Stream's 250 seconds to counting sort 1 << 29
edges means 500ns per edge. This seems a bit high to me based on what it needs to do, but looking at the code you can understand why it isn't going to get too much faster with all the random accesses. You can improve it with pre-fetching, we aren't going to go in that direction.
Instead, we are going to go in the direction of awesome.
Radix sort is a very cool not-comparison-based sorting algorithm. Incidentally, it is also one of the first links on the counting sort wikipedia page, filed under "what counting sort is actually useful for".
Radix sorting works by repeatedly partitioning a sequence of elements into some number of disjoint subsequences. That number is usually 256, and the partitioning is usually based on looking at a byte in each element. After partitioning, you put the subsequences back together in order, and repeat with the next byte.
If you look at it right, radix sort is a bit like counting sort, but used byte-at-a-time. Rather than disassemble the sequence and put it back together, we could just write out to another array at positions determined by accumulating the number of records with each byte. So, it's a bit like counting sort, but with totally different performance.
Each pass of radix sort only sorts based on one byte, but it is a stable sort, so any ties remain in the same order they were in before. If you do multiple passes looking at different bytes, the resulting sequence is ordered first by the byte you last looked at, then by the second most recent byte you examined, etc. Once you've looked at all bytes, the sequence is ordered by the binary representation of each element, assuming you looked at the bytes from least significant to most significant.
Here is a quick sketch of what each pass of radix sort might look like, if we wanted to partition based on some get_byte
method I just made up.
for element in input {
output[get_byte(&element)].push(element);
}
Pretty easy, huh?
You do four of these sequential scans, and in each you write sequentially to one of 256 locations. Four passes, no random access. This is great. I don't even see a log n
there, do you?
The great thing about radix sort is that in one scan through the data it does the equivalent of 8 comparisons which would normally take a bunch of conditional logic. Instead, it just pulled out a byte and used it to look up an address.
The downside to radix sort is that is just looks at bytes. If you had some deep and semantically meaningful ordering defined over your type, (or, like, a pointer) that's great but we're sorting by its bytes. In the context of graph edges, this works just fine.
Maybe the above algorithm is a little too simple. The code above has the sketchy property that each of the output[byte]
lists need to re-allocate as they grow, and that is a waste of everyone's time.
There are several variations here, but one I like fills buffers of some small size (e.g. 1024 elements), and enqueues them instead. These buffers can be recycled each round, like so:
for list in input {
for element in list.drain(..) {
let byte = get_byte(&element);
buffer[byte].push(element);
if buffer[byte].len() == 1024 {
let free = stash.pop().unwrap_or_else(|| Vec::with_capacity(1024));
let full = mem::replace(&mut buffer[byte], free);
output[byte].push(full);
}
}
stash.push(list);
}
Ah, that looks better. This takes input data in short lists (which is how timely dataflow provides them to operators), and re-uses the lists when it needs to write out byte-partitioned data. We just need to put a loop around it, add an unsafe { }
, erase a few bounds checks, and we are good to go!
Echidnatron% cargo run --release
Running `target/release/radix`
data generated 6.605743866995908
sorted in 9.62945753801614
sorted in 7.738075986970216
sorted in 7.423567182035185
sorted in 7.664827859029174
sorted in 7.620034374995157
sorted in 7.483836957952008
sorted in 7.485334731056355
sorted in 7.414655723958276
sorted in 7.516964436043054
sorted in 7.4602396480040625
Oh wow. Is that 7.5 seconds to sort 1 << 29
edges?
Well, that sure is faster than the 250 seconds counting sort took in the X-Stream paper. Thanks Wikipedia!
Also, this time is for my 100 line radix sort implementation; there are better implementations out there, people writing new papers each year, and one should only expect things to get better. Personally, I'm planning on stealing as much as I can.
Let's re-test a simple instance of the main thesis of the X-Stream paper with an experiment.
Note: X-Stream is a weird and complicated system, and this simple experiment isn't going to tell us about X-Stream, so much as whether and when you should sort edges before you process them. There is also an important caveat just before the conclusions.
The simplest graph computation I can think of is computing node degrees. "For each source, how many edges does it have to other nodes?" We could either scan unordered edges and accumulate counts in an array, or we could sort the edge data and then accumulate counts in an array.
Let's do both and record how long they take for various numbers of graph nodes and edges. Get your lab notebooks out.
We first make some random graph data. We'll follow the X-Stream paper and use 16 times as many edges as nodes. We'll start with 1 << 25
nodes because that means 1 << 29
edges which we've discussed already.
let mut nodes = vec![0; (1 << 25)];
let mut edges = vec![];
for _ in 0..(16 * nodes.len()) {
vector.push((rng.next_u32() % (nodes.len() as u32),
(rng.next_u32() % (nodes.len() as u32))));
}
We want to compute the degrees two ways: unordered, and after sorting. We'll do ten iterations of each, to warm up memory and stuff.
First, using random access:
let start = time::precise_time_s();
for i in 0..10 {
for &(src,dst) in &edges {
nodes[src as usize] += 1;
}
}
println!("random {}", (time::precise_time_s() - start) / 10.0);
Next we'll do it by sorting the edges from scratch, each of the ten iterations.
To reiterate: each iteration, the data are sorted again, from the unsorted data.
let start = time::precise_time_s();
let mut sorter = RadixSorter::new();
for _ in 0..10 {
for &edge in &edges {
sorter.push(edge, &|x| x.0);
}
let output = sorter.finish(&|x| x.0);
for &(src,dst) in output.iter().flat_map(|x| x.iter()) {
nodes[src as usize] += 1;
}
sorter.recycle(output);
}
println!("sorted {}", (time::precise_time_s() - start) / 10.0);
Ok, now in true science-y fashion, we start the experiment, step back, and close our eyes.
Echidnatron% cargo run --release
Running `target/release/xstream`
random 10.483409735094756
sorted 8.601547808106989
Science says you should sort your edges.
Sorting is faster than blind random updates, because radix sorting specifically has great locality of reference, and just does four passes over the data. Each of those passes is more than four times faster than a some random read out of cold, dark main memory.
This isn't a fluke, and awkwardly, it only gets worse the larger the data gets. For massive graphs, processing unordered edges is an increasingly bad idea as memory gets colder, darker and farther away. Radix sorting, with its excellent locality, has a relatively flat running time that depends on the throughput of your storage rather than its latency.
Here is a picture where I computed the average number of nanoseconds per edge to do degree counting using unsorted data, and sorting the data using radix sort and then doing degree counting. The number of nodes goes from 2^16 to 2^25, and the number of edges is 16x the number of nodes. It tops out at 2^25 nodes and 2^29 edges, but you can see where this is going.
I did this by putting a loop around the bits of code I wrote up above.
Next up is to try out the graph invoked at the beginning of the post, twitter_rv
which has 65M nodes and 1.5B edges. Annoyingly for this line of work, the data come pre-sorted, so we'll have to fake some random data with the same number of nodes and edges.
I did that, and found I could sort it in 50 seconds on one core (admittedly using a machine with more than 16GB of memory), improving on prior estimates of 600 seconds. When you add 15 seconds, the time we got for single-threaded pagerank on adjacency-list data, you get 65 seconds, which (and this is totally a coincidence, seriously) ends up basically the same as the 64 seconds we started with.
Note: The above raises an important point: the performance of radix sort depends on the throughput of the storage where your edges live, and the throughput of memory is quite high, if you can swing memory. If your data were on disk, the throughput would be lower. However, it is the throughput of your storage that matters for sorting, not the latency (the X-Stream paper focuses on the latter, which I think is a bug). If your edge data is on disk you should compare the sequential throughput of your disk to the random throughput of your memory when trying to determine whether sorting is right for you.
Here is where you might expect that I would make some snarky, unkind comments about how not everyone is always right. Whatever; I was just as wrong (relatively) about how long PageRank should take as the X-Stream folks were in an on-the-spot performance guesstimate. Who knows, maybe I am the one who is wrong about sorting too.
Here is a bigger problem: in the big data processing session of one of the top computer systems conferences, no one (speaker, audience, me) seemed to know how long it should take to sort some numbers. I mean, we laugh at those database people who can't unstick their capslock key, but at least they have this figured out.
If you wanted to do an iterative graph computation like PageRank, it would literally be faster to sort the edges from scratch each and every iteration, than to use unsorted edges.
If you want to do graph computation, please sort your edges.
Actually, you know what: if you want to do any big data computation, please sort your records. Stop talking sass about how Hadoop sorts things it doesn't need to, read some papers, run some tests, and then sort your damned data. Or at least run faster than me when I sort your data for you.
(Aside: Who's the big data system that radix sorts all its records? (Differential dataflow!) You're damn right.)