-
Notifications
You must be signed in to change notification settings - Fork 2
/
sliding_queue.h
168 lines (148 loc) · 3.83 KB
/
sliding_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#pragma once
#include <emu_cxx_utils/replicated.h>
#include <emu_cxx_utils/repl_array.h>
#include <emu_cxx_utils/for_each.h>
#include <emu_cxx_utils/pointer_manipulation.h>
class sliding_queue
{
private:
// Index of next available slot in the queue
long next_;
// Start and end of the current window
long start_;
long end_;
// Index of the current window
long window_;
// Storage for items in the queue
emu::repl_array<long> buffers_;
// Storage for starting positions of each window
emu::repl_array<long> heads_;
long * head_ptr_;
long * buffer_ptr_;
public:
long * begin() { return &buffer_ptr_[start_]; }
long * end() { return &buffer_ptr_[end_]; }
explicit sliding_queue(long size)
// Each queue only needs to store vertices on the local nodelet
: buffers_(size / NODELETS())
, heads_(size / NODELETS())
, head_ptr_(heads_.get_localto(this))
, buffer_ptr_(buffers_.get_localto(this))
{
reset();
}
// Shallow copy constructor
sliding_queue(const sliding_queue& other, emu::shallow_copy)
: next_(other.next_)
, start_(other.start_)
, end_(other.end_)
, window_(other.window_)
, buffers_(other.buffers_)
, heads_(other.heads_)
, head_ptr_(heads_.get_localto(this))
, buffer_ptr_(buffers_.get_localto(this))
{}
void
reset()
{
next_ = 0;
start_ = 0;
end_ = 0;
window_ = 0;
}
void
reset_all()
{
// Call reset on each copy of the queue
emu::repl_for_each(emu::parallel_policy<8>(), *this,
[](sliding_queue& self) { self.reset(); });
}
// Returns a reference to the copy of T on the Nth nodelet
sliding_queue&
get_nth(long n)
{
return *emu::pmanip::get_nth(this, n);
}
void
slide_window()
{
start_ = window_ == 0 ? 0 : head_ptr_[window_ - 1];
end_ = next_;
head_ptr_[window_] = end_;
window_ += 1;
}
void
slide_all_windows()
{
// Call slide_window on each replicated copy
emu::repl_for_each(emu::parallel_policy<8>(), *this,
[](sliding_queue& self) { self.slide_window(); });
}
void
push_back(long v)
{
long pos = emu::atomic_addms(&next_, 1);
buffer_ptr_[pos] = v;
}
bool
is_empty()
{
return start_ == end_;
}
long
size()
{
return end_ - start_;
}
bool
all_empty()
{
// TODO we could parallelize this. But in the common case, we will
// find that the first queue is non-empty and exit early.
for (long n = 0; n < NODELETS(); ++n) {
if (!get_nth(n).is_empty()) {
return false;
}
}
return true;
}
long
combined_size()
{
long size = 0;
emu::repl_for_each(emu::parallel_policy<8>(), *this,
[&size](sliding_queue& self) {
emu::remote_add(&size, self.size());
}
);
return size;
}
void
dump()
{
for (long i = start_; i < end_; ++i) {
printf("%li ", buffer_ptr_[i]);
}
}
void
dump_all()
{
// Call dump() on each copy
emu::repl_for_each(emu::seq, *this,
[](sliding_queue& self) { self.dump(); });
}
template<class Function>
void forall_items(Function worker)
{
// First, spawn a thread on each nodelet to handle the local queue
emu::repl_for_each(emu::parallel_policy<1>(), *this,
[&](sliding_queue& queue){
// Spawn threads to dynamically pull items off of this queue
emu::parallel::for_each(
emu::dyn,
queue.begin(), queue.end(), worker
);
}
);
}
};