Skip to content

Commit

Permalink
Initial implementation of async commits
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Stigsen committed Jun 25, 2013
1 parent 809a392 commit db10ce5
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 4 deletions.
87 changes: 86 additions & 1 deletion src/tightdb/group_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <tightdb/safe_int_ops.hpp>
#include <tightdb/string_buffer.hpp>
#include <tightdb/group_shared.hpp>
#include <tightdb/group_writer.hpp>

// FIXME: We should not include files from the test directory here. A
// solution would probably be to move the definition of
Expand Down Expand Up @@ -89,7 +90,7 @@ class ScopedMutexLock {
// - Possible reinitialization due to temporary unlocking during downgrade of file lock

void SharedGroup::open(const string& file, bool no_create_file,
DurabilityLevel dlevel)
DurabilityLevel dlevel, bool is_backend)
{
TIGHTDB_ASSERT(!is_attached());

Expand Down Expand Up @@ -183,6 +184,16 @@ void SharedGroup::open(const string& file, bool no_create_file,
// change the db
m_version = 0;

// In async mode we need a separate process to do the async commits
// We start it up here during init so that it only get started once
if (dlevel == durability_Async) {
if (fork() == 0) {
unattached_tag tag;
SharedGroup async_committer(tag);
async_committer.open(file, true, durability_Async, true);
}
}

// FIXME: This downgrading of the lock is not guaranteed to be atomic

// Downgrade lock to shared now that it is initialized,
Expand Down Expand Up @@ -215,6 +226,18 @@ void SharedGroup::open(const string& file, bool no_create_file,
#ifdef TIGHTDB_DEBUG
m_transact_stage = transact_Ready;
#endif

if (dlevel == durability_Async) {
if (is_backend) {
do_async_commits(); // will never return
}
else {
// In async mode we need to wait for the commit process to get ready
// so we wait for first read lock being made by async_commit process
SharedInfo* const info = m_file_map.get_addr();
while (info->put_pos == 0) usleep(2);
}
}
}


Expand All @@ -239,6 +262,11 @@ SharedGroup::~SharedGroup()

SharedInfo* info = m_file_map.get_addr();

// In sync mode, cleanup will be handled by the async_commit process
// (but we might still be able to get exclusive lock, as it will
// release it while doing its own try_lock_exclusive())
if (info->flags == durability_Async) return;

// If the db file is just backing for a transient data structure,
// we can delete it when done.
if (info->flags == durability_MemOnly) {
Expand Down Expand Up @@ -287,6 +315,63 @@ bool SharedGroup::has_changed() const TIGHTDB_NOEXCEPT
return is_changed;
}

void SharedGroup::do_async_commits()
{
bool shutdown = false;
SharedInfo* info = m_file_map.get_addr();
TIGHTDB_ASSERT(info->current_version == 0);

// We always want to keep a read lock on the last version
// that was commited to disk, to protect it against being
// overwritten by commits being made to memory by others.
// Note that taking this lock also signals to the other
// processes that that they can start commiting to the db.
begin_read();
size_t last_version = m_version;
m_group.invalidate();

while(true) {
// If we can get an exclusive lock, we know that we are
// the last process using the db so we can close down
// (all other processes using the db holds shared locks)
if (m_file.try_lock_exclusive()) {
shutdown = true;
}

if (has_changed()) {
// Get a read lock on the (current) version that we want
// to commit to disk.
#ifdef TIGHTDB_DEBUG
m_transact_stage = transact_Ready;
#endif
begin_read();
size_t current_version = m_version;
size_t current_top_ref = m_group.get_top_array().GetRef();

GroupWriter writer(m_group, true);
writer.DoCommit(current_top_ref);

// Now we can release the version that was previously commited
// to disk and just keep the lock on the latest version.
m_version = last_version;
end_read();
last_version = current_version;
}
else if (!shutdown) {
usleep(20);
}

if (shutdown) {
// Being the backend process, we own the lock file, so we
// have to clean up when we shut down.
pthread_mutex_destroy(&info->readmutex);
pthread_mutex_destroy(&info->writemutex);
remove(m_file_path.c_str());
exit(EXIT_SUCCESS);
}
}
}


const Group& SharedGroup::begin_read()
{
Expand Down
7 changes: 5 additions & 2 deletions src/tightdb/group_shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class SharedGroup {
public:
enum DurabilityLevel {
durability_Full,
durability_MemOnly
durability_MemOnly,
durability_Async
};

/// Equivalent to calling open(const std::string&, bool,
Expand Down Expand Up @@ -84,7 +85,7 @@ class SharedGroup {
/// thrown. Note that InvalidDatabase is among these derived
/// exception types.
void open(const std::string& file, bool no_create = false,
DurabilityLevel dlevel=durability_Full);
DurabilityLevel dlevel=durability_Full, bool is_backend=false);

#ifdef TIGHTDB_ENABLE_REPLICATION

Expand Down Expand Up @@ -189,6 +190,8 @@ class SharedGroup {
// mutex.
void low_level_commit(std::size_t new_version);

void do_async_commits();

friend class ReadTransaction;
friend class WriteTransaction;
};
Expand Down
2 changes: 1 addition & 1 deletion src/tightdb/group_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class GroupWriter {
void ZeroFreeSpace();
#endif

private:
void DoCommit(uint64_t topPos);

private:
std::size_t get_free_space(size_t len);
std::size_t reserve_free_space(size_t len, size_t start=0);
void add_free_space(size_t pos, size_t len, size_t version=0);
Expand Down
36 changes: 36 additions & 0 deletions test/testshared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,3 +875,39 @@ TEST(StringIndex_Bug)
}
}
}

TEST(Shared_Async)
{
// Clean up old state
File::try_remove("asynctest.tightdb");
File::try_remove("asynctest.tightdb.lock");

// Do some changes in a async db
{
SharedGroup db("asynctest.tightdb", false, SharedGroup::durability_Async);

for (size_t n = 0; n < 100; ++n) {
//printf("t %d\n", (int)n);
WriteTransaction wt(db);
TestTableShared::Ref t1 = wt.get_table<TestTableShared>("test");
t1->add(1, n, false, "test");
wt.commit();
}
}

// Wait for async_commit process to shutdown
while (File::exists("asynctest.tightdb.lock")) {
sleep(1);
}

// Read the db again in normal mode to verify
{
SharedGroup db("asynctest.tightdb");

for (size_t n = 0; n < 100; ++n) {
ReadTransaction rt(db);
TestTableShared::ConstRef t1 = rt.get_table<TestTableShared>("test");
CHECK(t1->size() == 100);
}
}
}

0 comments on commit db10ce5

Please sign in to comment.