-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathConcurrentMap.cc
309 lines (257 loc) · 11.8 KB
/
ConcurrentMap.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
//
// ConcurrentMap.cc
//
// Copyright 2020-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
//
#include "ConcurrentMap.hh"
#include <algorithm>
#include <cmath>
#if defined(_WIN32) || defined(_WIN64)
#include <Windows.h>
#endif
using namespace std;
namespace fleece {
/*
This is based on the “folklore” table described in “Concurrent Hash Tables: Fast and
General(?)!” by Maier et al. <https://arxiv.org/pdf/1601.04017.pdf>. It's a pretty basic
open hash table with linear probing. Atomic compare-and-swap operations are used to update
entries, but not to read them.
It doesn't support modifying the value of an entry, simply because SharedKeys doesn't need it.
It cannot grow past its initial capacity; this is rather difficult to do in a concurrent map
(the paper describes how) and would add a lot of complexity ... and again, SharedKeys has a
fixed capacity of 2048 so it doesn't need this.
Since insertions are not very common, it's worth the expense to materialize the count in an
atomic integer variable, and update it on insert/delete, instead of the more complex
techniques used in the paper.
Maier et al state (without explanation) that, unlike ordinary open hash tables, the "folklore"
table cannot reuse 'tombstones' for new entries. I believe the reason is that there could be
incorrect results from "torn reads" -- non-atomic reads where the two fields of the Entry
are not consistent with each other. However, this implementation does not suffer from torn
reads since its Entry is only 32 bits, as opposed to two words (128 bits). So to the best
of my knowledge, reusing deleted entries is safe.
Still, this table does have the common problem that large numbers of tombstones degrade
read performance, since tombstones have to be scanned past by the `find` method just as if they
were occupied. The solution would be to copy the extant entries to a new table, but (as with
growing) this is pretty complex in a lockless concurrent table.
*/
// Minimum size [not capacity] of table to create initially
static constexpr size_t kMinInitialSize = 16;
// Max fraction of table entries that should be occupied (else lookups slow down)
static constexpr float kMaxLoad = 0.6f;
// Special values of Entry::keyOffset
static constexpr uint16_t kEmptyKeyOffset = 0, // an empty Entry
kDeletedKeyOffset = 1, // a deleted Entry
kMinKeyOffset = 2; // first actual key offset
// Cross-platform atomic test-and-set primitive:
// If `*value == oldValue`, stores `newValue` and returns true. Else returns false.
// Note: This could work with values up to 128 bits, on current 64-bit CPUs.
static inline bool atomicCompareAndSwap(uint32_t *value, uint32_t oldValue, uint32_t newValue) {
#if defined(_WIN32) || defined(_WIN64)
// https://docs.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-interlockedcompareexchange
return InterlockedCompareExchange(value, newValue, oldValue) == oldValue;
#else
// https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html (also in Clang)
return __sync_bool_compare_and_swap(value, oldValue, newValue);
#endif
}
ConcurrentMap::ConcurrentMap(int capacity, int stringCapacity) {
precondition(capacity <= kMaxCapacity);
int size;
for (size = kMinInitialSize; size * kMaxLoad < capacity; size *= 2)
;
_capacity = int(floor(size * kMaxLoad));
_sizeMask = size - 1;
if (stringCapacity == 0)
stringCapacity = 17 * _capacity; // assume 16-byte strings by default
stringCapacity = min(stringCapacity, int(kMaxStringCapacity));
size_t tableSize = size * sizeof(Entry);
_heap = ConcurrentArena(tableSize + stringCapacity);
_entries = ConcurrentArenaAllocator<Entry, true>(_heap).allocate(size);
_keysOffset = tableSize - kMinKeyOffset;
postcondition(stringCapacity >= 0 && _heap.available() == size_t(stringCapacity));
}
ConcurrentMap::ConcurrentMap(ConcurrentMap &&map) {
*this = std::move(map);
}
ConcurrentMap& ConcurrentMap::operator=(ConcurrentMap &&map) {
_sizeMask = map._sizeMask;
_capacity = map._capacity;
_count = map._count.load();
_entries = map._entries;
_heap = std::move(map._heap);
return *this;
}
int ConcurrentMap::stringBytesCapacity() const {
return int(_heap.capacity() - (_keysOffset + kMinKeyOffset));
}
int ConcurrentMap::stringBytesCount() const {
return int(_heap.allocated() - (_keysOffset + kMinKeyOffset));
}
__hot
bool ConcurrentMap::Entry::compareAndSwap(Entry expected, Entry swapWith) {
static_assert(sizeof(Entry) == 4);
return atomicCompareAndSwap(&asInt32(), expected.asInt32(), swapWith.asInt32());
}
__hot FLPURE
static inline bool equalKeys(const char *keyPtr, slice key) {
return memcmp(keyPtr, key.buf, key.size) == 0 && keyPtr[key.size] == 0;
}
__hot
inline uint16_t ConcurrentMap::keyToOffset(const char *allocedKey) const {
ptrdiff_t result = _heap.toOffset(allocedKey) - _keysOffset;
assert(result >= kMinKeyOffset && result <= UINT16_MAX);
return uint16_t(result);
}
__hot
inline const char* ConcurrentMap::offsetToKey(uint16_t offset) const {
assert(offset >= kMinKeyOffset);
return (const char*)_heap.toPointer(_keysOffset + offset);
}
__hot
ConcurrentMap::result ConcurrentMap::find(slice key, hash_t hash) const noexcept {
assert_precondition(key);
for (int i = indexOfHash(hash); true; i = wrap(i + 1)) {
Entry current = _entries[i];
switch (current.keyOffset) {
case kEmptyKeyOffset:
return {};
case kDeletedKeyOffset:
break;
default:
if (auto keyPtr = offsetToKey(current.keyOffset); equalKeys(keyPtr, key))
return {slice(keyPtr, key.size), current.value};
break;
}
}
}
__hot
ConcurrentMap::result ConcurrentMap::insert(slice key, value_t value, hash_t hash) {
assert_precondition(key);
const char *allocedKey = nullptr;
int i = indexOfHash(hash);
while (true) {
retry:
Entry current = _entries[i];
switch (current.keyOffset) {
case kEmptyKeyOffset:
case kDeletedKeyOffset: {
// Found an empty or deleted entry to use. First allocate the string:
if (!allocedKey) {
if (_count >= _capacity)
return {}; // Hash table overflow
allocedKey = allocKey(key);
if (!allocedKey)
return {}; // Key-strings overflow
}
Entry newEntry = {keyToOffset(allocedKey), value};
// Try to store my new entry, if another thread didn't beat me to it:
if (_usuallyFalse(!_entries[i].compareAndSwap(current, newEntry))) {
// I was beaten to it; retry (at the same index,
// in case CAS was a false negative)
goto retry;
}
// Success!
++_count;
assert(_count <= _capacity);
return {slice(allocedKey, key.size), value};
}
default:
if (auto keyPtr = offsetToKey(current.keyOffset); equalKeys(keyPtr, key)) {
// Key already exists in table. Deallocate any string I allocated:
freeKey(allocedKey);
return {slice(keyPtr, key.size), current.value};
}
break;
}
i = wrap(i + 1);
}
}
bool ConcurrentMap::remove(slice key, hash_t hash) {
assert_precondition(key);
int i = indexOfHash(hash);
while (true) {
retry:
Entry current = _entries[i];
switch (current.keyOffset) {
case kEmptyKeyOffset:
// Not found.
return false;
case kDeletedKeyOffset:
break;
default:
if (auto keyPtr = offsetToKey(current.keyOffset); equalKeys(keyPtr, key)) {
// Found it -- now replace with a tombstone. Leave the value alone in case
// a concurrent torn read sees the prior offset + new value.
Entry tombstone = {kDeletedKeyOffset, current.value};
if (_usuallyFalse(!_entries[i].compareAndSwap(current, tombstone))) {
// I was beaten to it; retry (at the same index,
// in case CAS was a false negative)
goto retry;
}
// Success!
--_count;
// Freeing the key string will only do anything if it was the latest key
// to be added, but it's worth a try.
(void)freeKey(keyPtr);
return true;
}
break;
}
i = wrap(i + 1);
}
}
const char* ConcurrentMap::allocKey(slice key) {
auto result = (char*)_heap.alloc(key.size + 1);
if (result) {
key.copyTo(result);
result[key.size] = 0;
}
return result;
}
bool ConcurrentMap::freeKey(const char *allocedKey) {
return allocedKey == nullptr || _heap.free((void*)allocedKey, strlen(allocedKey) + 1);
}
__cold
void ConcurrentMap::dump() const {
int size = tableSize();
int realCount = 0, tombstones = 0, totalDistance = 0, maxDistance = 0;
for (int i = 0; i < size; i++) {
auto e = _entries[i];
switch (e.keyOffset) {
case kEmptyKeyOffset:
printf("%6d\n", i);
break;
case kDeletedKeyOffset:
++tombstones;
printf("%6d xxx\n", i);
break;
default: {
++realCount;
auto keyPtr = offsetToKey(e.keyOffset);
hash_t hash = hashCode(slice(keyPtr));
int bestIndex = indexOfHash(hash);
printf("%6d: %-10s = %08x [%5d]", i, keyPtr, (uint32_t)hash, bestIndex);
if (i != bestIndex) {
if (bestIndex > i)
bestIndex -= size;
auto distance = i - bestIndex;
printf(" +%d", distance);
totalDistance += distance;
maxDistance = max(maxDistance, distance);
}
printf("\n");
}
}
}
printf("Occupancy = %d / %d (%.0f%%), with %d tombstones\n",
realCount, size, realCount/double(size)*100.0, tombstones);
printf("Average probes = %.1f, max probes = %d\n",
1.0 + (totalDistance / (double)realCount), maxDistance);
}
}