-
Notifications
You must be signed in to change notification settings - Fork 112
/
mpscq.h
83 lines (71 loc) · 1.95 KB
/
mpscq.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#pragma once
#include "bits.h"
#include "helpers.h"
#include <utility>
namespace snmalloc
{
template<class T>
class MPSCQ
{
private:
static_assert(
std::is_same<decltype(T::next), std::atomic<T*>>::value,
"T->next must be a std::atomic<T*>");
std::atomic<T*> back = nullptr;
T* front = nullptr;
public:
void invariant()
{
#ifndef NDEBUG
assert(back != nullptr);
assert(front != nullptr);
#endif
}
void init(T* stub)
{
stub->next.store(nullptr, std::memory_order_relaxed);
front = stub;
back.store(stub, std::memory_order_relaxed);
invariant();
}
T* destroy()
{
T* fnt = front;
back.store(nullptr, std::memory_order_relaxed);
front = nullptr;
return fnt;
}
inline bool is_empty()
{
T* bk = back.load(std::memory_order_relaxed);
return bk == front;
}
void enqueue(T* first, T* last)
{
// Pushes a list of messages to the queue. Each message from first to
// last should be linked together through their next pointers.
invariant();
last->next.store(nullptr, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
T* prev = back.exchange(last, std::memory_order_relaxed);
prev->next.store(first, std::memory_order_relaxed);
}
std::pair<T*, bool> dequeue()
{
// Returns the front message, or null if not possible to return a message.
invariant();
T* first = front;
T* next = first->next.load(std::memory_order_relaxed);
if (next != nullptr)
{
front = next;
AAL::prefetch(&(next->next));
assert(front);
std::atomic_thread_fence(std::memory_order_acquire);
invariant();
return std::pair(first, true);
}
return std::pair(nullptr, false);
}
};
} // namespace snmalloc