This post is the third in a series (part 1, part 2) about writing timely dataflow programs.
Today we are going to take what we've learned about timely dataflow for a spin, and write a "real program": breadth-first search on random graphs.
Ok, so you see the quotes, right? Clearly this could be more real, but we'll do this to learn about using timely dataflow, and then you can take it to the real problems and get all the fame and fortune and bug reports.
So what is breadth-first search on a random graph? What's a random graph, or even a plain old graph?
Let's start from the beginning.
Graphs are a fun combinatorial object that you should think of as a big pile of pairs of things. We are going to use "pairs of integers", but it makes sense beyond that. These "pairs of things" represent relations between the things. If you have two things that like each other, maybe you write down the pair of the two of them. Each thing can be found in more than one pair, if you like.
A common example is web pages, where each page is a thing, and there is a pair (a, b)
if web page a
has a hyperlink to web page b
. Another popular example, in social networks people (or their accounts) are things, and we have a pair (a, b)
if account a
follows b
, or messages with b
, or otherwise recommends b
. You get to choose!
In the context of graphs, a "thing" is usually called a "node". Each pair of nodes is called an "edge". The whole pile of edges is the "graph".
Let's imagine you've committed to a set of things, where we are going to use integers from 0
up to some number nodes
. If you would like a graph, but don't have any great ideas about where to find one, you can just make something up. Let's say we need to create some number edges
of random edges for our graph. We could just have each edge pick two random nodes, and make a bunch of edges this way:
let seed: &[_] = &[1, 2, 3, root.index()];
let mut rng: StdRng = SeedableRng::from_seed(seed);
let mut graph = Vec::with_capacity(edges);
for _ in 0..edges {
let a = rng.gen_range(0u32, nodes as u32);
let b = rng.gen_range(0u32, nodes as u32);
graph.push((a, b));
}
This is a random graph. Technically, it is a member of the G(n, m) family of random graphs, which just means that you have n nodes and m edges chosen at random.
There are other models from random graphs, with the most common being G(n, p) which has n nodes and each possible pair exists independently with probability p. It is more of a pain to produce this sort of graph accurately, and there are some theoretical results that say that there aren't too many things that distinguish the two models. There is a whole cottage industry of random graph models that "look like real graphs" (but invariably don't), and we are just going to ignore all of that for this post.
Given a graph, random or otherwise, we might be interested to learn something about its structure (let's pretend).
One pretty easy thing to ask is, from some specific node, for each other node how few edges must you traverse to go from one to the other? Perhaps even, for each of those other nodes, which immediately adjacent node leads you most directly to that specific node? This problem is often solved by "breadth-first search".
You can do breadth-first search in a few ways, but we are going to go for the simplest: we'll start from the source node, and repeatedly expand the set of reachable nodes by one step. Each node that is reachable can cause each of its immediate neighbors (nodes with which it shares an edge) to become reachable in one step.
Let's write down some pseudo-code to do this, where we track the node that lead us to it.
let mut done = vec![None; nodes];
let mut todo = VecDeque::new();
todo.push((root, root));
// for each edge reaching a `node`, have a think.
while let Some((node, prev)) = todo.dequeue() {
if done[node].is_none() {
done[node] = Some(prev);
for next in graph.neighbors(node) {
todo.enqueue((next, node));
}
}
}
This algorithm has the property that, because todo
is a queue, we visit all nodes at distance d before visiting any nodes at distance d+1.
You might say, "hey, Frank: your algorithm could be lots better" and you would be correct. For example, before enqueueing (next, node)
maybe we could check and see if done[next]
is set already; if it is, we shouldn't even bother enqueueing (next, node)
because we know we won't use it.
This is a good point, and brings us to an important design constraint: data-parallelism.
Although we've described breadth-first search as an algorithm for one worker, can we make it work for multiple workers, just by carefully directing the data to each worker? If we choose to do that, how does it limit the optimizations we might do?
We could assign graph edges to workers based on the source of the edge (all edges with the same source go to the same worker). That way, we could make this worker the source of truth for information about this node.
We have the problem that the todo
queue is not shared, but we can change that pretty easily, by switching to an exchange channel abstraction: records go in and records come out; the same set of records comes out as went in, but maybe not at the same places they went in. If we partition these records by the first field, next
in (next, node)
, then statements about reachability make their way to worker that is the source of truth about the next
node. That worker is excellently positioned to act on the statement, and make new statements from next
if needed.
So, this is almost already a nice data-parallel algorithm. We do need to introduce the use of channels and make the data movement more explicit, and we need a little more structure to recover the "first in, first out" property we were exploiting to make sure we saw nodes in breadth-first order. We want each worker to process all round d messages from each worker before processing any round d+1 messages.
That sounds a bit like notifications, from the previous post, right?
The whole program is about 100 lines of code, which is some boiler-plate, some comments, and just a few important moments. I'm going to start with a stylized version that breaks it down into these important moments, and then we'll flesh out each of them.
Here is the high-level view of the algorithm: we'll start from a stream of edges (pairs of nodes), prepare a stream of (next, node)
announcements to be used as a loop, and then write the logic that takes these two as input and produces the next round of announcements.
I've left much of the important logic as comments for now, so that you can see the overall structure of the algorithm.
timely::execute_from_args(std::env::args(), move |root| {
root.scoped(move |scope| {
let graph = generate_graph(scope);
let (handle, cycle) = scope.loop_variable(usize::max_value(), 1);
graph.binary_notify(
&cycle,
Exchange::new(|x: &(u32, u32)| x.0 as u64),
Exchange::new(|x: &(u32, u32)| x.0 as u64),
"BFS",
vec![],
move |input1, input2, output, notify| {
input1.for_each(|time, data| { /* recv edge data */ });
input2.for_each(|time, data| { /* recv node data */ });
notify.for_each(|time, _num| {
if time.inner == 0 { /* initialize the graph from edges */ }
else { /* drain node data and act on it */ }
});
}
)
.concat(&(vec![(source, source)].into_iter()).to_stream(scope))
.connect_loop(handle);
});
});
We are using binary_notify
for the first time, so it is worth walking through its arguments. They are similar to unary_notify
, extended as you might expect when you have a second input:
- We need to specify the other input
- We need partitioning information for the first input.
- We need partitioning information for the second input.
- We still want a descriptive name.
- If we want any notifications at start of day, we put them here.
- We must specify the operator logic in terms of two inputs, one output, and a notificator.
Although most of the meat is in the operator logic, it is important to get the partitioning information correct: we want to partition both edges and node reachability announcements by their source node. If we don't do that, the right data doesn't get where we need, and although we will compute something, it won't be the right thing.
Let's walk through each of the commented out parts, no one of which is particularly complicated.
The edge input of the operator is where we initially get most of our data from. A large pile of edges will arrive here, and we will eventually need to organize them. For now, we will just take the batch and put it in a queue. We use the replace_with
method defined on the type of data timely gives you (Content<D>
, which looks a lot like a Vec<D>
), to swap out a Vec<D>
and leave behind an empty vector. Why do we have to swap things? A good question we'll dive into later on.
// receive, stash batches of edges.
input1.for_each(|time, data| {
notify.notify_at(time);
edge_list.push(data.replace_with(Vec::new()));
});
We will do the sorting in the /* initialize the graph from edges */
section.
The second input provides our operator with reachability announcements of the form (node, prev)
. Each announcement means that node
is reachable from the root, by way of prev
. At this point we could try and look into done[node]
to see if prev
has been set yet, but let's hold off on that for the moment. There is the subtle issue that we might be seeing messages for different depths at the same time, and we don't want to do this logic if the message is from anything other than the smallest active depth.
For the moment, we will do the same as with the edges: stash the vector of data we got. Actually, we'll put it in a stash keyed by time
, which lets us easily tell these stashes apart. This involves a HashMap
lookup by time
, but this happens only once per batch, so it isn't all that expensive in total.
// receive (node, root) pairs, note any new ones.
input2.for_each(|&time, data| {
node_lists.entry(time)
.or_insert_with(|| {
notify.notify_at(&time);
Vec::new()
})
.push(data.replace_with(Vec::new()));
});
Of course, we also called notify_at
here as well, because we do want to be notified. Unlike above, we tried to be a bit clever here and only call notify_at
in the case that we need to create a new stash. It would be fine to call it unconditionally too, but I wanted to show off this pattern.
You can skip this part if you trust that I can put a graph together, and don't really care how it happens. If you are skeptical or interested, this is the code that I used. The good news is that if you start from random edge data, you are probably going to end up with a random graph, even if you botch the algorithm.
I use two additional bits of state to represent the graph:
offsets: Vec<u32>,
targets: Vec<u32>,
where the slice targets[offsets[node]..offsets[node+1]]
holds the neighbors of node
. If we are doing really big graph processing on just a few workers, you might want to have offsets
be a Vec<usize>
(if you have more than 4 billion edges per worker). It goes just a bit slower if you do that.
These two get populated by finishing the sorting of edges by source, and then just reading them out of the sorted order:
// sort the edges
sorter.sort(&mut edge_list, &|x| x.0);
// allocate sufficient memory, to avoid resizing.
let mut count = 0;
for buffer in &edge_list { count += buffer.len(); }
offsets = Vec::with_capacity(1 + (nodes / peers));
targets = Vec::with_capacity(count);
// construct the graph
offsets.push(0);
let mut prev_node = 0;
for buffer in edge_list.drain_temp() {
for (node, edge) in buffer {
let temp = node / peers as u32;
while prev_node < temp {
prev_node += 1;
offsets.push(targets.len() as u32)
}
targets.push(edge);
}
}
offsets.push(targets.len() as u32);
You might notice that offsets
has some redundant values added, which makes the offsets[node+1]
access easy to handle without special-casing the boundaries.
The drain_temp()
method is something I copy/pasted from the Rust source. They have a drain(..)
method which empties a Vec<D>
giving you ownership of the elements, but they haven't stabilized it yet (nor have they started to, as far as I can tell). It is a really important method for performant programming, because it is the only way (I think) to get owned data from a Vec<T>
without de-allocating the Vec
. Actually, in this case it is an even bigger problem, because the closure isn't allowed to de-allocate edge_list
; it can mutate it (like with drain_temp()
) but since we have to be able to call the closure multiple times, if the closure consumed edge_list
in one call, it wouldn't be safe to call it again.
Here is where we do the exciting BFS logic! The complicated moment we've been building up to! We had a queue containing those nodes we want to process, and we need to swing through each of these nodes and send announcements to the neighbors of newly reached nodes.
Specifically, for each newly reached node
, for each edge (node, next)
we want to announce (next, node)
to whomever is in charge of next
.
The main bit of non-obvious cleverness we are going to use here is to notice that if there are peers
workers, the nodes each worker handles all have the same value of node % peers
. That is just how the exchange channel works at the moment. Consequently, each worker can index per-node
state using the index node / peers
, which means a more compact array without risking collision. This also means that as we scale up the number of workers, the per-worker state scales down linearly rather than staying put, which is important if we want to work with teh big dataz.
We'll use the same done
array as indicated above, except rather than use None
and Some(prev)
to indicate whether we have seen a node or not, we'll use u32::max_value()
for None
and any other value to indicate a valid prev
. Rust has mechanisms to make this less error-prone (the NonZero
struct, which optimizes Option<NonZero<u32>>
to be a u32
, among other things), but they aren't stable yet. So we fake things out for now.
if let Some(mut todo) = node_lists.remove(&time) {
let mut session = output.session(&time);
for buffer in todo.drain_temp() {
for (node, prev) in buffer {
let temp = (node as usize) / peers;
if done[temp] == u32::max_value() {
done[temp] = prev;
let lower = offsets[temp];
let upper = offsets[temp + 1];
for &next in &targets[lower..upper] {
session.give((next, node));
}
}
}
}
}
Well, that was pretty painless.
If you are curious, I'd check out the full code in the repo. There you will also find a random graph generator, the variable definitions, and some diagnostics. The code also runs, and you can follow along at home with the next part.
The BFS example is rigged to take a number of nodes and a number of edges as command line parameters, and report a bit about how long until various moments are reached (edge data ready to be sorted, and then each of the iterations). Let's start small:
Echidnatron% cargo run -- 1000 10000
Running `target/debug/bfs 1000 10000`
0.00726696290075779: sorting
0.021402814891189337: time: (Root, 0)
0.02150881290435791: time: (Root, 1)
0.02165887097362429: time: (Root, 2)
0.021808998892083764: time: (Root, 3)
0.022090064943768084: time: (Root, 4)
0.022902220953255892: time: (Root, 5)
0.023720855941064656: time: (Root, 6)
Echidnatron%
Ten thousand edges in 17 milliseconds (0.024-0.007
)! That is like 1.7 microseconds per edge. Wooooo! Wooo!
Woooooooooo!
Oops. Wait. That was a debug build. Classic blunder. Wait a sec...
Echidnatron% cargo run --release -- 1000 10000
Running `target/release/bfs 1000 10000`
0.0006168830441311002: sorting
0.0014280889881774783: time: (Root, 0)
0.0014428909635171294: time: (Root, 1)
0.0014523250283673406: time: (Root, 2)
0.0014579229755327106: time: (Root, 3)
0.0014682130422443151: time: (Root, 4)
0.001544128987006843: time: (Root, 5)
0.001647635013796389: time: (Root, 6)
Echidnatron%
WOOOOOOOOO!!!
One milliseconds to do BFS on 10,000 edges, which is about 100ns per edge. I don't even know if that is any good. Let's assume not, and do better.
Let's go up a bit in size, to 100M nodes and 1B edges:
Echidnatron% cargo run --release -- 100000000 1000000000
Running `target/release/bfs 100000000 1000000000`
47.3983994909795: sorting
78.94112817093264: time: (Root, 0)
78.94117277988698: time: (Root, 1)
78.94120144797489: time: (Root, 2)
78.94121939688921: time: (Root, 3)
78.94141164899338: time: (Root, 4)
78.94323138298932: time: (Root, 5)
78.96418087789789: time: (Root, 6)
79.17357097589411: time: (Root, 7)
80.4100593709154: time: (Root, 8)
83.38715377799235: time: (Root, 9)
101.84932650893461: time: (Root, 10)
132.68470372690354: time: (Root, 11)
147.00046391994692: time: (Root, 12)
147.11236464395188: time: (Root, 13)
Echidnatron%
One billion edges in 147.11 - 47.39 = 99.72
seconds. So, about 100ns per edge here too, counting the edge sorting. I guess that is the answer, then? Maybe.
Let's try it with a few more workers. Here it is with two workers, just by adding -w 2
) to the command line arguments.
Echidnatron% cargo run --release -- 100000000 1000000000 -w 2
Running `target/release/bfs 100000000 1000000000 -w 2`
24.963413452962413: sorting
41.05532361601945: time: (Root, 0)
...
77.45530807902105: time: (Root, 13)
Echidnatron%
Now with four workers (-w 4
). Note: I do not have four physical cores, just some hyperthreading over my two cores.
Echidnatron% cargo run --release -- 100000000 1000000000 -w 4
Running `target/release/bfs 100000000 1000000000 -w 4`
16.311206860933453: sorting
30.02261466695927: time: (Root, 0)
...
61.09994436590932: time: (Root, 13)
Echidnatron%
Well, that sped up by a bunch. It was almost a 2x speed-up from one to two workers (that is bad; I'll explain why later), and less going from two to four workers. Part of that is evident in the graph generation (the first time measurement): there just aren't actually 2x the compute resources, which means some things just aren't going to get much faster.
Yeah, that was the title of the blog post from a few weeks back. In the post, I tried to explain how sorting might be good for you, in that it can be faster to sort and then do sequential access rather than blindly do random access.
So what? Well, we get a big queue todo
of node
values to look up, and they are pretty much random. In fact, you can use the principle of deferred decision to prove that they are actually, literally random. Random look-ups suck.
Hey let's sort them? Like, we'll just add a line
sorter.sort(&mut todo, &|x| x.0);
before we start processing todo
.
Echidnatron% cargo run --release -- 100000000 1000000000
Running `target/release/bfs 100000000 1000000000`
48.442140622995794: sorting
76.83494656998664: time: (Root, 0)
76.83500196388923: time: (Root, 1)
76.83592683402821: time: (Root, 2)
76.83855341793969: time: (Root, 3)
76.85178517387249: time: (Root, 4)
76.88814573083073: time: (Root, 5)
77.28426942881197: time: (Root, 6)
77.953474214999: time: (Root, 7)
78.88436190597713: time: (Root, 8)
80.1611772258766: time: (Root, 9)
85.42607392696664: time: (Root, 10)
100.22155155683868: time: (Root, 11)
112.15617105201818: time: (Root, 12)
112.28759473399259: time: (Root, 13)
Echidnatron%
I left all the measurements in there so you can compare with the unsorted run above. The take-away, though, is that the 100ns per edge time drops down to about 64ns per edge. If we focus in on the part of the computation we changed, starting only after we finish sorting the edges, the time drops from 68ns per edge to 36ns per edge. That is almost a 2x improvement due to sorting.
Here are the two and four worker numbers with sorting. The total times above are 77s and 61s, respectively, so we see some nice improvements here too.
Echidnatron% cargo run --release -- 100000000 1000000000 -w 2
Running `target/release/bfs 100000000 1000000000 -w 2`
24.986130700912327: sorting
40.65206292690709: time: (Root, 0)
...
60.18016406102106: time: (Root, 13)
Echidnatron%
Echidnatron% cargo run --release -- 100000000 1000000000 -w 4
Running `target/release/bfs 100000000 1000000000 -w 4`
15.913500403054059: sorting
28.71496215602383: time: (Root, 0)
...
47.13655676995404: time: (Root, 13)
Echidnatron%
Not quite a 2x speed-up any more. Cuz we are cutting out the fat.
The single-threaded worker literally does less work than than with more workers. When you say Exchange
, the single-threaded code says Pipeline
; timely optimizes out the exchange channel, and just swings some pointers around rather than move records between buffers. So, we shouldn't expect a 2x improvement, and that kind of scaling may be just because we left performance on the table.
Let's take a look at the number of nodes that have been reached by the end of each iteration. Remember that we have 100M nodes, and average degree 10, so we expect this to go up by about 10x each iteration, until it starts to saturate as we approach 100M.
Echidnatron% cargo run --release -- 100000000 1000000000 -w 1
Running `target/release/bfs 100000000 1000000000 -w 1`
(Root, 1): number: 1
(Root, 2): number: 10
(Root, 3): number: 101
(Root, 4): number: 940
(Root, 5): number: 9206
(Root, 6): number: 92093
(Root, 7): number: 916506
(Root, 8): number: 8757098
(Root, 9): number: 58338155
(Root, 10): number: 99706855
(Root, 11): number: 99995407
(Root, 12): number: 99995536
(Root, 13): number: 99995536
Echidnatron%
If we correlate this with the single-threaded numbers above (annoyingly, reported for the beginning of each iteration) we see that, from the point at which we finish sorting the edges, we fairly quickly reach 0.0875 of the nodes (in about four seconds) and are at 0.583 of the nodes within nine seconds. We then spend another almost thirty seconds finishing out the computation.
What is happening here is that by the time we've reached 50% of the nodes, the large majority of work we will do is redundant with work we have already done. Most of the edges we will process will reach already-reached nodes.
This observation is the basis of a very nice, short, and readable paper by Beamer et al. They observe that while push-based breadth-first search is great for a while, you may eventually want to switch to a pull-based approach. In the pull based approach, all of the nodes who are not yet reachable get panicky, and start asking their neighbors "are you reachable yet?" The advantage this approach has is that they can stop asking once they've found a reachable neighbor, and if the graph is 50% reached this will only require a few edges on average (note: technically more than two on average, in case you are a random-graph weenie).
That's a cool observation! And they show that things speed up by a bunch.
Let's do that!
Ok. How do we do that? Dataflow really is about pushing data, rather than pulling data, like they want to do. Have we found a horrible pit of badness for dataflow?
Sort-of, but not exactly.
Each of the memory requests a panicky node makes is a lot like sending a message. When your processor requests the data at some memory location, it doesn't block waiting for the response, it pushes the address to your memory management unit which eventually responds by pushing a response back to the processor. We can do the same thing in software, which sounds horrible but really doesn't have much lower throughput. We've actually already seen above how batching and sorting memory requests can have higher throughput than just hammering random locations in memory.
My hobby is looking at high-performance parallel algorithms and trying to figure out if I can just batch their memory requests in software. Often: yes, if throughput is the important metric. In the case of BFS: holy crap there are some good numbers already out there. Time to get batching!
The "bottom up BFS" algorithm saves because each node can stop making requests once it finds a reachable neighbor. To get the same benefit, we can't just have all unreached nodes issue requests to all of their neighbors. But then how do they know to which of their neighbors to issue requests in each iteration?
We want each unreached node to make a request of their first neighbor. Then, each still-unreached node should make a request of their second neighbor, then third, and so on. It is almost like we have another, nested loop going on here. Another, nested ...
O. M. G.
I think I know a data-parallel dataflow system that supports nested loops!
Tune in next time to find out which one it is.
What would you need to do to make the BFS code sort the edge data once, and then respond to a sequence of BFS requests (each providing a root
) perhaps generated by some external new_input
source? You'll want to clean up state between requests, because that done
vector gets pretty big, and you may even want to sequence the requests if someone gives you 1,000 all at once (probably didn't read the manual; just crash and mark it as "user error").
Is it lots more complicated than that? (Honest question; I haven't done it yet).
My homework is to do that Hybrid-BFS thing the Berkeley folks proposed. I'm off on a trip for a few days, so it may not be right away.