-
Notifications
You must be signed in to change notification settings - Fork 138
/
sssp.cc
213 lines (191 loc) · 7.66 KB
/
sssp.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright (c) 2015, The Regents of the University of California (Regents)
// See LICENSE.txt for license details
#include <cinttypes>
#include <limits>
#include <iostream>
#include <queue>
#include <vector>
#include "benchmark.h"
#include "builder.h"
#include "command_line.h"
#include "graph.h"
#include "platform_atomics.h"
#include "pvector.h"
#include "timer.h"
/*
GAP Benchmark Suite
Kernel: Single-source Shortest Paths (SSSP)
Author: Scott Beamer, Yunming Zhang
Returns array of distances for all vertices from given source vertex
This SSSP implementation makes use of the ∆-stepping algorithm [1]. The type
used for weights and distances (WeightT) is typedefined in benchmark.h. The
delta parameter (-d) should be set for each input graph. This implementation
incorporates a new bucket fusion optimization [2] that significantly reduces
the number of iterations (& barriers) needed.
The bins of width delta are actually all thread-local and of type std::vector,
so they can grow but are otherwise capacity-proportional. Each iteration is
done in two phases separated by barriers. In the first phase, the current
shared bin is processed by all threads. As they find vertices whose distance
they are able to improve, they add them to their thread-local bins. During this
phase, each thread also votes on what the next bin should be (smallest
non-empty bin). In the next phase, each thread copies its selected
thread-local bin into the shared bin.
Once a vertex is added to a bin, it is not removed, even if its distance is
later updated and, it now appears in a lower bin. We find ignoring vertices if
their distance is less than the min distance for the current bin removes
enough redundant work to be faster than removing the vertex from older bins.
The bucket fusion optimization [2] executes the next thread-local bin in
the same iteration if the vertices in the next thread-local bin have the
same priority as those in the current shared bin. This optimization greatly
reduces the number of iterations needed without violating the priority-based
execution order, leading to significant speedup on large diameter road networks.
[1] Ulrich Meyer and Peter Sanders. "δ-stepping: a parallelizable shortest path
algorithm." Journal of Algorithms, 49(1):114–152, 2003.
[2] Yunming Zhang, Ajay Brahmakshatriya, Xinyi Chen, Laxman Dhulipala,
Shoaib Kamil, Saman Amarasinghe, and Julian Shun. "Optimizing ordered graph
algorithms with GraphIt." The 18th International Symposium on Code Generation
and Optimization (CGO), pages 158-170, 2020.
*/
using namespace std;
const WeightT kDistInf = numeric_limits<WeightT>::max()/2;
const size_t kMaxBin = numeric_limits<size_t>::max()/2;
const size_t kBinSizeThreshold = 1000;
inline
void RelaxEdges(const WGraph &g, NodeID u, WeightT delta,
pvector<WeightT> &dist, vector <vector<NodeID>> &local_bins) {
for (WNode wn : g.out_neigh(u)) {
WeightT old_dist = dist[wn.v];
WeightT new_dist = dist[u] + wn.w;
while (new_dist < old_dist) {
if (compare_and_swap(dist[wn.v], old_dist, new_dist)) {
size_t dest_bin = new_dist/delta;
if (dest_bin >= local_bins.size())
local_bins.resize(dest_bin+1);
local_bins[dest_bin].push_back(wn.v);
break;
}
old_dist = dist[wn.v]; // swap failed, recheck dist update & retry
}
}
}
pvector<WeightT> DeltaStep(const WGraph &g, NodeID source, WeightT delta,
bool logging_enabled = false) {
Timer t;
pvector<WeightT> dist(g.num_nodes(), kDistInf);
dist[source] = 0;
pvector<NodeID> frontier(g.num_edges_directed());
// two element arrays for double buffering curr=iter&1, next=(iter+1)&1
size_t shared_indexes[2] = {0, kMaxBin};
size_t frontier_tails[2] = {1, 0};
frontier[0] = source;
t.Start();
#pragma omp parallel
{
vector<vector<NodeID> > local_bins(0);
size_t iter = 0;
while (shared_indexes[iter&1] != kMaxBin) {
size_t &curr_bin_index = shared_indexes[iter&1];
size_t &next_bin_index = shared_indexes[(iter+1)&1];
size_t &curr_frontier_tail = frontier_tails[iter&1];
size_t &next_frontier_tail = frontier_tails[(iter+1)&1];
#pragma omp for nowait schedule(dynamic, 64)
for (size_t i=0; i < curr_frontier_tail; i++) {
NodeID u = frontier[i];
if (dist[u] >= delta * static_cast<WeightT>(curr_bin_index))
RelaxEdges(g, u, delta, dist, local_bins);
}
while (curr_bin_index < local_bins.size() &&
!local_bins[curr_bin_index].empty() &&
local_bins[curr_bin_index].size() < kBinSizeThreshold) {
vector<NodeID> curr_bin_copy = local_bins[curr_bin_index];
local_bins[curr_bin_index].resize(0);
for (NodeID u : curr_bin_copy)
RelaxEdges(g, u, delta, dist, local_bins);
}
for (size_t i=curr_bin_index; i < local_bins.size(); i++) {
if (!local_bins[i].empty()) {
#pragma omp critical
next_bin_index = min(next_bin_index, i);
break;
}
}
#pragma omp barrier
#pragma omp single nowait
{
t.Stop();
if (logging_enabled)
PrintStep(curr_bin_index, t.Millisecs(), curr_frontier_tail);
t.Start();
curr_bin_index = kMaxBin;
curr_frontier_tail = 0;
}
if (next_bin_index < local_bins.size()) {
size_t copy_start = fetch_and_add(next_frontier_tail,
local_bins[next_bin_index].size());
copy(local_bins[next_bin_index].begin(),
local_bins[next_bin_index].end(), frontier.data() + copy_start);
local_bins[next_bin_index].resize(0);
}
iter++;
#pragma omp barrier
}
#pragma omp single
if (logging_enabled)
cout << "took " << iter << " iterations" << endl;
}
return dist;
}
void PrintSSSPStats(const WGraph &g, const pvector<WeightT> &dist) {
auto NotInf = [](WeightT d) { return d != kDistInf; };
int64_t num_reached = count_if(dist.begin(), dist.end(), NotInf);
cout << "SSSP Tree reaches " << num_reached << " nodes" << endl;
}
// Compares against simple serial implementation
bool SSSPVerifier(const WGraph &g, NodeID source,
const pvector<WeightT> &dist_to_test) {
// Serial Dijkstra implementation to get oracle distances
pvector<WeightT> oracle_dist(g.num_nodes(), kDistInf);
oracle_dist[source] = 0;
typedef pair<WeightT, NodeID> WN;
priority_queue<WN, vector<WN>, greater<WN>> mq;
mq.push(make_pair(0, source));
while (!mq.empty()) {
WeightT td = mq.top().first;
NodeID u = mq.top().second;
mq.pop();
if (td == oracle_dist[u]) {
for (WNode wn : g.out_neigh(u)) {
if (td + wn.w < oracle_dist[wn.v]) {
oracle_dist[wn.v] = td + wn.w;
mq.push(make_pair(td + wn.w, wn.v));
}
}
}
}
// Report any mismatches
bool all_ok = true;
for (NodeID n : g.vertices()) {
if (dist_to_test[n] != oracle_dist[n]) {
cout << n << ": " << dist_to_test[n] << " != " << oracle_dist[n] << endl;
all_ok = false;
}
}
return all_ok;
}
int main(int argc, char* argv[]) {
CLDelta<WeightT> cli(argc, argv, "single-source shortest-path");
if (!cli.ParseArgs())
return -1;
WeightedBuilder b(cli);
WGraph g = b.MakeGraph();
SourcePicker<WGraph> sp(g, cli.start_vertex());
auto SSSPBound = [&sp, &cli] (const WGraph &g) {
return DeltaStep(g, sp.PickNext(), cli.delta(), cli.logging_en());
};
SourcePicker<WGraph> vsp(g, cli.start_vertex());
auto VerifierBound = [&vsp] (const WGraph &g, const pvector<WeightT> &dist) {
return SSSPVerifier(g, vsp.PickNext(), dist);
};
BenchmarkKernel(cli, g, SSSPBound, PrintSSSPStats, VerifierBound);
return 0;
}