-
Notifications
You must be signed in to change notification settings - Fork 168
/
Copy pathtransaction.hpp
543 lines (451 loc) · 18.3 KB
/
transaction.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
/*************************************************************************
*
* Copyright 2016 Realm Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**************************************************************************/
#ifndef REALM_TRANSACTION_HPP
#define REALM_TRANSACTION_HPP
#include <realm/db.hpp>
namespace realm {
class Transaction : public Group {
public:
Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage);
// convenience, so you don't need to carry a reference to the DB around
~Transaction();
DB::version_type get_version() const noexcept
{
return m_read_lock.m_version;
}
DB::version_type get_version_of_latest_snapshot()
{
return db->get_version_of_latest_snapshot();
}
/// Get a version id which may be used to request a different transaction locked to specific version.
DB::VersionID get_version_of_current_transaction() const noexcept
{
return VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
}
void close() REQUIRES(!m_async_mutex);
bool is_attached()
{
return m_transact_stage != DB::transact_Ready && db->is_attached();
}
/// Get the approximate size of the data that would be written to the file if
/// a commit were done at this point. The reported size will always be bigger
/// than what will eventually be needed as we reserve a bit more memory than
/// what will be needed.
size_t get_commit_size() const;
DB::version_type commit() REQUIRES(!m_async_mutex);
void rollback() REQUIRES(!m_async_mutex);
void end_read() REQUIRES(!m_async_mutex);
template <class O>
void parse_history(O& observer, DB::version_type begin, DB::version_type end);
// Live transactions state changes, often taking an observer functor:
VersionID commit_and_continue_as_read(bool commit_to_disk = true) REQUIRES(!m_async_mutex);
VersionID commit_and_continue_writing();
template <class O>
void rollback_and_continue_as_read(O& observer) REQUIRES(!m_async_mutex);
void rollback_and_continue_as_read() REQUIRES(!m_async_mutex);
template <class O>
void advance_read(O* observer, VersionID target_version = VersionID());
void advance_read(VersionID target_version = VersionID())
{
_impl::NullInstructionObserver* o = nullptr;
advance_read(o, target_version);
}
template <class O>
bool promote_to_write(O* observer, bool nonblocking = false) REQUIRES(!m_async_mutex);
bool promote_to_write(bool nonblocking = false) REQUIRES(!m_async_mutex)
{
_impl::NullInstructionObserver* o = nullptr;
return promote_to_write(o, nonblocking);
}
TransactionRef freeze();
// Frozen transactions are created by freeze() or DB::start_frozen()
bool is_frozen() const noexcept override
{
return m_transact_stage == DB::transact_Frozen;
}
bool is_async() noexcept REQUIRES(!m_async_mutex)
{
util::CheckedLockGuard lck(m_async_mutex);
return m_async_stage != AsyncState::Idle;
}
TransactionRef duplicate();
void copy_to(TransactionRef dest) const;
_impl::History* get_history() const;
// direct handover of accessor instances
Obj import_copy_of(const Obj& original);
TableRef import_copy_of(const ConstTableRef original);
LnkLst import_copy_of(const LnkLst& original);
LnkSet import_copy_of(const LnkSet& original);
LstBasePtr import_copy_of(const LstBase& original);
SetBasePtr import_copy_of(const SetBase& original);
CollectionBasePtr import_copy_of(const CollectionBase& original);
LnkLstPtr import_copy_of(const LnkLstPtr& original);
LnkSetPtr import_copy_of(const LnkSetPtr& original);
LinkCollectionPtr import_copy_of(const LinkCollectionPtr& original);
// handover of the heavier Query and TableView
std::unique_ptr<Query> import_copy_of(Query&, PayloadPolicy);
std::unique_ptr<TableView> import_copy_of(TableView&, PayloadPolicy);
/// Get the current transaction type
DB::TransactStage get_transact_stage() const noexcept
{
return m_transact_stage;
}
void upgrade_file_format(int target_file_format_version);
/// Task oriented/async interface for continuous transactions.
// true if this transaction already holds the write mutex
bool holds_write_mutex() const noexcept REQUIRES(!m_async_mutex)
{
util::CheckedLockGuard lck(m_async_mutex);
return m_async_stage == AsyncState::HasLock || m_async_stage == AsyncState::HasCommits;
}
// Convert an existing write transaction to an async write transaction
void promote_to_async() REQUIRES(!m_async_mutex);
// request full synchronization to stable storage for all writes done since
// last sync - or just release write mutex.
// The write mutex is released after full synchronization.
void async_complete_writes(util::UniqueFunction<void()> when_synchronized = nullptr) REQUIRES(!m_async_mutex);
// Complete all pending async work and return once the async stage is Idle.
// If currently in an async write transaction that transaction is cancelled,
// and any async writes which were committed are synchronized.
void prepare_for_close() REQUIRES(!m_async_mutex);
// true if sync to disk has been requested
bool is_synchronizing() noexcept REQUIRES(!m_async_mutex)
{
util::CheckedLockGuard lck(m_async_mutex);
return m_async_stage == AsyncState::Syncing;
}
std::exception_ptr get_commit_exception() noexcept REQUIRES(!m_async_mutex)
{
util::CheckedLockGuard lck(m_async_mutex);
auto err = std::move(m_commit_exception);
m_commit_exception = nullptr;
return err;
}
bool has_unsynced_commits() noexcept REQUIRES(!m_async_mutex)
{
util::CheckedLockGuard lck(m_async_mutex);
return static_cast<bool>(m_oldest_version_not_persisted);
}
util::Logger* get_logger() const noexcept
{
return db->m_logger.get();
}
private:
enum class AsyncState { Idle, Requesting, HasLock, HasCommits, Syncing };
DBRef get_db() const
{
return db;
}
Replication* const* get_repl() const final
{
return db->get_repl();
}
template <class O>
bool internal_advance_read(O* observer, VersionID target_version, _impl::History&, bool) REQUIRES(!db->m_mutex);
void set_transact_stage(DB::TransactStage stage) noexcept;
void do_end_read() noexcept REQUIRES(!m_async_mutex);
void initialize_replication();
void replicate(Transaction* dest, Replication& repl) const;
void complete_async_commit();
void acquire_write_lock() REQUIRES(!m_async_mutex);
void cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit);
void close_read_with_lock() REQUIRES(!m_async_mutex, db->m_mutex);
DBRef db;
mutable std::unique_ptr<_impl::History> m_history_read;
mutable _impl::History* m_history = nullptr;
DB::ReadLockInfo m_read_lock;
util::Optional<DB::ReadLockInfo> m_oldest_version_not_persisted;
std::exception_ptr m_commit_exception GUARDED_BY(m_async_mutex);
bool m_async_commit_has_failed = false;
// Mutex is protecting access to members just below
util::CheckedMutex m_async_mutex;
std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
std::chrono::steady_clock::time_point m_request_time_point;
bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;
DB::TransactStage m_transact_stage = DB::transact_Ready;
unsigned m_log_id;
friend class DB;
friend class DisableReplication;
};
/*
* classes providing backward Compatibility with the older
* ReadTransaction and WriteTransaction types.
*/
class ReadTransaction {
public:
ReadTransaction(DBRef sg)
: trans(sg->start_read())
{
}
~ReadTransaction() noexcept {}
operator Transaction&()
{
return *trans;
}
bool has_table(StringData name) const noexcept
{
return trans->has_table(name);
}
ConstTableRef get_table(TableKey key) const
{
return trans->get_table(key); // Throws
}
ConstTableRef get_table(StringData name) const
{
return trans->get_table(name); // Throws
}
const Group& get_group() const noexcept
{
return *trans.get();
}
/// Get the version of the snapshot to which this read transaction is bound.
DB::version_type get_version() const noexcept
{
return trans->get_version();
}
private:
TransactionRef trans;
};
class WriteTransaction {
public:
WriteTransaction(DBRef sg)
: trans(sg->start_write())
{
}
~WriteTransaction() noexcept {}
operator Transaction&()
{
return *trans;
}
bool has_table(StringData name) const noexcept
{
return trans->has_table(name);
}
TableRef get_table(TableKey key) const
{
return trans->get_table(key); // Throws
}
TableRef get_table(StringData name) const
{
return trans->get_table(name); // Throws
}
TableRef add_table(StringData name, Table::Type table_type = Table::Type::TopLevel) const
{
return trans->add_table(name, table_type); // Throws
}
TableRef get_or_add_table(StringData name, Table::Type table_type = Table::Type::TopLevel,
bool* was_added = nullptr) const
{
return trans->get_or_add_table(name, table_type, was_added); // Throws
}
Group& get_group() const noexcept
{
return *trans.get();
}
/// Get the version of the snapshot on which this write transaction is
/// based.
DB::version_type get_version() const noexcept
{
return trans->get_version();
}
DB::version_type commit()
{
return trans->commit();
}
void rollback() noexcept
{
trans->rollback();
}
private:
TransactionRef trans;
};
// Implementation:
template <class O>
inline void Transaction::advance_read(O* observer, VersionID version_id)
{
if (m_transact_stage != DB::transact_Reading)
throw WrongTransactionState("Not a read transaction");
// It is an error if the new version precedes the currently bound one.
if (version_id.version < m_read_lock.m_version)
throw IllegalOperation("Requesting an older version when advancing");
auto hist = get_history(); // Throws
if (!hist)
throw IllegalOperation("No transaction log when advancing");
internal_advance_read(observer, version_id, *hist, false); // Throws
}
template <class O>
inline bool Transaction::promote_to_write(O* observer, bool nonblocking)
{
if (m_transact_stage != DB::transact_Reading)
throw WrongTransactionState("Not a read transaction");
if (!holds_write_mutex()) {
if (nonblocking) {
bool succes = db->do_try_begin_write();
if (!succes) {
return false;
}
}
else {
auto t1 = std::chrono::steady_clock::now();
acquire_write_lock(); // Throws
if (db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
db->m_logger->log(util::Logger::Level::trace, "Tr %1: Acquired write lock in %2 us", m_log_id,
std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
}
}
}
auto old_version = m_read_lock.m_version;
try {
Replication* repl = db->get_replication();
if (!repl)
throw IllegalOperation("No transaction log when promoting to write");
VersionID version = VersionID(); // Latest
m_history = repl->_get_history_write();
bool history_updated = internal_advance_read(observer, version, *m_history, true); // Throws
REALM_ASSERT(repl); // Presence of `repl` follows from the presence of `hist`
DB::version_type current_version = m_read_lock.m_version;
m_alloc.init_mapping_management(current_version);
repl->initiate_transact(*this, current_version, history_updated); // Throws
// If the group has no top array (top_ref == 0), create a new node
// structure for an empty group now, to be ready for modifications. See
// also Group::attach_shared().
if (!m_top.is_attached())
create_empty_group(); // Throws
}
catch (...) {
if (!holds_write_mutex())
db->end_write_on_correct_thread();
m_history = nullptr;
throw;
}
if (db->m_logger) {
db->m_logger->log(util::Logger::Level::trace, "Tr %1: Promote to write: %2 -> %3", m_log_id, old_version,
m_read_lock.m_version);
}
set_transact_stage(DB::transact_Writing);
return true;
}
template <class O>
inline void Transaction::rollback_and_continue_as_read(O& observer)
{
if (m_transact_stage != DB::transact_Writing)
throw WrongTransactionState("Not a write transaction");
Replication* repl = db->get_replication();
if (!repl)
throw IllegalOperation("No transaction log when rolling back");
BinaryData uncommitted_changes = repl->get_uncommitted_changes();
if (uncommitted_changes.size()) {
util::SimpleInputStream in(uncommitted_changes);
_impl::parse_transact_log(in, observer); // Throws
}
rollback_and_continue_as_read();
}
inline void Transaction::rollback_and_continue_as_read()
{
if (m_transact_stage != DB::transact_Writing)
throw WrongTransactionState("Not a write transaction");
Replication* repl = db->get_replication();
if (!repl)
throw IllegalOperation("No transaction log when rolling back");
// Mark all managed space (beyond the attached file) as free.
db->reset_free_space_tracking(); // Throws
m_read_lock.check();
ref_type top_ref = m_read_lock.m_top_ref;
size_t file_size = m_read_lock.m_file_size;
// since we had the write lock, we already have the latest encrypted pages in memory
m_alloc.update_reader_view(file_size); // Throws
update_allocator_wrappers(false);
advance_transact(top_ref, nullptr, false); // Throws
if (!holds_write_mutex())
db->end_write_on_correct_thread();
if (db->m_logger) {
db->m_logger->log(util::Logger::Level::trace, "Tr %1, Rollback", m_log_id);
}
m_history = nullptr;
set_transact_stage(DB::transact_Reading);
}
template <class O>
inline bool Transaction::internal_advance_read(O* observer, VersionID version_id, _impl::History& hist, bool writable)
{
DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, version_id); // Throws
REALM_ASSERT(new_read_lock.m_version >= m_read_lock.m_version);
if (new_read_lock.m_version == m_read_lock.m_version) {
db->release_read_lock(new_read_lock);
// _impl::History::update_early_from_top_ref() was not called
// update allocator wrappers merely to update write protection
update_allocator_wrappers(writable);
if (db->m_logger) {
db->m_logger->log(util::Logger::Level::trace, "Tr %1: Already on version: %2", m_log_id,
m_read_lock.m_version);
}
return false;
}
DB::version_type old_version = m_read_lock.m_version;
DB::ReadLockGuard g(*db, new_read_lock);
DB::version_type new_version = new_read_lock.m_version;
size_t new_file_size = new_read_lock.m_file_size;
ref_type new_top_ref = new_read_lock.m_top_ref;
// Synchronize readers view of the file
SlabAlloc& alloc = m_alloc;
alloc.update_reader_view(new_file_size);
update_allocator_wrappers(writable);
using gf = _impl::GroupFriend;
ref_type hist_ref = gf::get_history_ref(alloc, new_top_ref);
hist.update_from_ref_and_version(hist_ref, new_version);
if (observer) {
// This has to happen in the context of the originally bound snapshot
// and while the read transaction is still in a fully functional state.
_impl::ChangesetInputStream in(hist, old_version, new_version);
_impl::parse_transact_log(in, *observer); // Throws
}
// The old read lock must be retained for as long as the change history is
// accessed (until Group::advance_transact() returns). This ensures that the
// oldest needed changeset remains in the history, even when the history is
// implemented as a separate unversioned entity outside the Realm (i.e., the
// old implementation and ShortCircuitHistory in
// test_lang_Bind_helper.cpp). On the other hand, if it had been the case,
// that the history was always implemented as a versioned entity, that was
// part of the Realm state, then it would not have been necessary to retain
// the old read lock beyond this point.
_impl::ChangesetInputStream in(hist, old_version, new_version);
advance_transact(new_top_ref, &in, writable); // Throws
g.release();
db->release_read_lock(m_read_lock);
m_read_lock = new_read_lock;
if (db->m_logger) {
db->m_logger->log(util::Logger::Level::trace, "Tr %1: Advance read: %2 -> %3 ref %4", m_log_id, old_version,
m_read_lock.m_version, m_read_lock.m_top_ref);
}
return true; // _impl::History::update_early_from_top_ref() was called
}
template <class O>
void Transaction::parse_history(O& observer, DB::version_type begin, DB::version_type end)
{
REALM_ASSERT(m_transact_stage != DB::transact_Ready);
REALM_ASSERT(end <= m_read_lock.m_version);
auto hist = get_history(); // Throws
REALM_ASSERT(hist);
hist->ensure_updated(m_read_lock.m_version);
_impl::ChangesetInputStream in(*hist, begin, end);
_impl::parse_transact_log(in, observer); // Throws
}
} // namespace realm
#endif /* REALM_TRANSACTION_HPP */