Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add migration #147

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@
/test/replication-stress-test
/test/search-stress-test
/ylwrap
hyperdex-migrate-data
22 changes: 22 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ noinst_HEADERS += common/schema.h
noinst_HEADERS += common/serialization.h
noinst_HEADERS += common/server.h
noinst_HEADERS += common/transfer.h
noinst_HEADERS += common/migration.h
noinst_HEADERS += tools/common.h

check_PROGRAMS += common/test/ordered_encoding
Expand Down Expand Up @@ -201,6 +202,9 @@ noinst_HEADERS += daemon/state_transfer_manager.h
noinst_HEADERS += daemon/state_transfer_manager_pending.h
noinst_HEADERS += daemon/state_transfer_manager_transfer_in_state.h
noinst_HEADERS += daemon/state_transfer_manager_transfer_out_state.h
noinst_HEADERS += daemon/migration_manager.h
noinst_HEADERS += daemon/migration_manager_pending.h
noinst_HEADERS += daemon/migration_out_state.h

EXTRA_DIST += man/hyperdex-daemon.1.md
EXTRA_DIST += man/hyperdex-daemon.1.h2m
Expand Down Expand Up @@ -232,6 +236,7 @@ hyperdex_daemon_SOURCES += common/schema.cc
hyperdex_daemon_SOURCES += common/serialization.cc
hyperdex_daemon_SOURCES += common/server.cc
hyperdex_daemon_SOURCES += common/transfer.cc
hyperdex_daemon_SOURCES += common/migration.cc
hyperdex_daemon_SOURCES += cityhash/city.cc
hyperdex_daemon_SOURCES += daemon/communication.cc
hyperdex_daemon_SOURCES += daemon/coordinator_link_wrapper.cc
Expand Down Expand Up @@ -260,6 +265,9 @@ hyperdex_daemon_SOURCES += daemon/state_transfer_manager.cc
hyperdex_daemon_SOURCES += daemon/state_transfer_manager_pending.cc
hyperdex_daemon_SOURCES += daemon/state_transfer_manager_transfer_in_state.cc
hyperdex_daemon_SOURCES += daemon/state_transfer_manager_transfer_out_state.cc
hyperdex_daemon_SOURCES += daemon/migration_manager.cc
hyperdex_daemon_SOURCES += daemon/migration_manager_pending.cc
hyperdex_daemon_SOURCES += daemon/migration_out_state.cc
hyperdex_daemon_CXXFLAGS = $(AM_CXXFLAGS) $(CXXFLAGS)
hyperdex_daemon_LDADD =
hyperdex_daemon_LDADD += $(E_LIBS)
Expand Down Expand Up @@ -308,6 +316,7 @@ libhyperdex_coordinator_la_SOURCES += common/schema.cc
libhyperdex_coordinator_la_SOURCES += common/serialization.cc
libhyperdex_coordinator_la_SOURCES += common/server.cc
libhyperdex_coordinator_la_SOURCES += common/transfer.cc
libhyperdex_coordinator_la_SOURCES += common/migration.cc
libhyperdex_coordinator_la_SOURCES += coordinator/coordinator.cc
libhyperdex_coordinator_la_SOURCES += coordinator/replica_sets.cc
libhyperdex_coordinator_la_SOURCES += coordinator/server_barrier.cc
Expand Down Expand Up @@ -387,6 +396,7 @@ libhyperdex_client_la_SOURCES += common/schema.cc
libhyperdex_client_la_SOURCES += common/server.cc
libhyperdex_client_la_SOURCES += common/serialization.cc
libhyperdex_client_la_SOURCES += common/transfer.cc
libhyperdex_client_la_SOURCES += common/migration.cc
libhyperdex_client_la_SOURCES += cityhash/city.cc
libhyperdex_client_la_SOURCES += client/c.cc
libhyperdex_client_la_SOURCES += client/client.cc
Expand Down Expand Up @@ -472,6 +482,7 @@ libhyperdex_admin_la_SOURCES += common/schema.cc
libhyperdex_admin_la_SOURCES += common/serialization.cc
libhyperdex_admin_la_SOURCES += common/server.cc
libhyperdex_admin_la_SOURCES += common/transfer.cc
libhyperdex_admin_la_SOURCES += common/migration.cc
libhyperdex_admin_la_SOURCES += cityhash/city.cc
libhyperdex_admin_la_SOURCES += admin/admin.cc
libhyperdex_admin_la_SOURCES += admin/backup_state_machine.cc
Expand Down Expand Up @@ -983,6 +994,7 @@ hyperdexexec_PROGRAMS += hyperdex-wait-until-stable
hyperdexexec_PROGRAMS += hyperdex-backup
hyperdexexec_PROGRAMS += hyperdex-backup-manager
hyperdexexec_PROGRAMS += hyperdex-raw-backup
hyperdexexec_PROGRAMS += hyperdex-migrate-data
dist_man_MANS += man/hyperdex-add-space.1
dist_man_MANS += man/hyperdex-rm-space.1
dist_man_MANS += man/hyperdex-list-spaces.1
Expand All @@ -1000,6 +1012,7 @@ dist_man_MANS += man/hyperdex-wait-until-stable.1
dist_man_MANS += man/hyperdex-backup.1
dist_man_MANS += man/hyperdex-backup-manager.1
dist_man_MANS += man/hyperdex-raw-backup.1
dist_man_MANS += man/hyperdex-migrate-data.1
endif

# hyperdex
Expand Down Expand Up @@ -1167,6 +1180,15 @@ man/hyperdex-raw-backup.1: man/hyperdex-raw-backup.1.h2m tools/raw-backup.cc
@$(MAKE) --silent $(AM_MAKEFLAGS) hyperdex-raw-backup$(EXEEXT)
$(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-raw-backup$(EXEEXT)

# hyperdex-migrate-data
EXTRA_DIST += man/hyperdex-migrate-data.1.md
EXTRA_DIST += man/hyperdex-migrate-data.1.h2m
hyperdex_migrate_data_SOURCES = tools/migrate-data.cc
hyperdex_migrate_data_LDADD = libhyperdex-admin.la -lpopt
man/hyperdex-migrate-data.1: man/hyperdex-migrate-data.1.h2m tools/migrate-data.cc
@$(MAKE) --silent $(AM_MAKEFLAGS) hyperdex-migrate-data$(EXEEXT)
$(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-migrate-data$(EXEEXT)

################################################################################
################################# Documentation ################################
################################################################################
Expand Down
42 changes: 42 additions & 0 deletions admin/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,48 @@ admin :: fault_tolerance(const char* space, uint64_t ft,
}
}

int64_t
admin :: migrate_data(const char* space_from, const char* space_to,
enum hyperdex_admin_returncode* status)
{
if (!maintain_coord_connection(status))
{
return -1;
}

int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "migrate");
uint64_t space_from_sz = strlen(space_from);
uint64_t space_to_sz = strlen(space_to);

// Pack
size_t total_sz = sizeof(uint64_t) * 2 + space_from_sz + space_to_sz;
char buf[total_sz];
char* pos = buf;
e::pack64be(space_from_sz, pos);
pos += sizeof(uint64_t);
memcpy(pos, space_from, space_from_sz);
pos += space_from_sz;
e::pack64be(space_to_sz, pos);
pos += sizeof(uint64_t);
memcpy(pos, space_to, space_to_sz);

int64_t cid = m_coord.rpc("migrate", buf, total_sz,
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
m_coord_ops[cid] = op;
return op->admin_visible_id();
}
else
{
interpret_rpc_request_failure(op->repl_status, status);
return -1;
}
}

int
admin :: validate_space(const char* description,
hyperdex_admin_returncode* status)
Expand Down
2 changes: 2 additions & 0 deletions admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class admin
enum hyperdex_admin_returncode* status);
int64_t list_spaces(enum hyperdex_admin_returncode* status,
const char** spaces);
int64_t migrate_data(const char* space_from, const char* space_to,
enum hyperdex_admin_returncode* status);
// manage servers
int64_t server_register(uint64_t token, const char* address,
enum hyperdex_admin_returncode* status);
Expand Down
12 changes: 12 additions & 0 deletions admin/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ hyperdex_admin_list_spaces(struct hyperdex_admin* _adm,
);
}

HYPERDEX_API int64_t
hyperdex_admin_migrate_data(struct hyperdex_admin* _adm,
const char* space_from,
const char* space_to,
enum hyperdex_admin_returncode* status)
{
C_WRAP_EXCEPT(
hyperdex::admin* adm = reinterpret_cast<hyperdex::admin*>(_adm);
return adm->migrate_data(space_from, space_to, status);
);
}

HYPERDEX_API int64_t
hyperdex_admin_server_register(struct hyperdex_admin* _adm,
uint64_t token, const char* address,
Expand Down
78 changes: 77 additions & 1 deletion common/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ using hyperdex::schema;
using hyperdex::server;
using hyperdex::server_id;
using hyperdex::subspace;
using hyperdex::space_id;
using hyperdex::subspace_id;
using hyperdex::virtual_server_id;

Expand All @@ -66,6 +67,7 @@ configuration :: configuration()
, m_point_leaders_by_virtual()
, m_spaces()
, m_transfers()
, m_migrations()
{
refill_cache();
}
Expand All @@ -88,6 +90,7 @@ configuration :: configuration(const configuration& other)
, m_point_leaders_by_virtual(other.m_point_leaders_by_virtual)
, m_spaces(other.m_spaces)
, m_transfers(other.m_transfers)
, m_migrations(other.m_migrations)
{
refill_cache();
}
Expand Down Expand Up @@ -282,6 +285,21 @@ configuration :: get_virtual(const region_id& ri, const server_id& si) const
return virtual_server_id();
}

space_id
configuration :: space_of(const region_id& ri) const
{
subspace_id ssid = subspace_of(ri);
for (size_t s = 0; s < m_spaces.size(); ++s)
{
for (size_t ss = 0; ss < m_spaces[s].subspaces.size(); ++ss)
{
if (m_spaces[s].subspaces[ss].id == ssid) {
return m_spaces[s].id;
}
}
}
}

subspace_id
configuration :: subspace_of(const region_id& ri) const
{
Expand Down Expand Up @@ -454,6 +472,39 @@ configuration :: point_leader(const char* sname, const e::slice& key) const
return virtual_server_id();
}

virtual_server_id
configuration :: point_leader(const space_id& sid, const e::slice& key) const
{
for (size_t s = 0; s < m_spaces.size(); ++s)
{
if (sid != m_spaces[s].id)
{
continue;
}

uint64_t h;
hash(m_spaces[s].sc, key, &h);

for (size_t pl = 0; pl < m_spaces[s].subspaces[0].regions.size(); ++pl)
{
if (m_spaces[s].subspaces[0].regions[pl].lower_coord[0] <= h &&
h <= m_spaces[s].subspaces[0].regions[pl].upper_coord[0])
{
if (m_spaces[s].subspaces[0].regions[pl].replicas.empty())
{
return virtual_server_id();
}

return m_spaces[s].subspaces[0].regions[pl].replicas[0].vsi;
}
}

abort();
}

return virtual_server_id();
}

virtual_server_id
configuration :: point_leader(const region_id& rid, const e::slice& key) const
{
Expand Down Expand Up @@ -614,6 +665,13 @@ configuration :: transfers_out_regions(const server_id& si, std::vector<region_i
}
}

void configuration :: migrations_out(const server_id& sid, std::vector<migration>* migrations) const
{
for (size_t m = 0; m < m_migrations.size(); ++m) {
migrations->push_back(m_migrations[m]);
}
}

void
configuration :: lookup_region(const subspace_id& ssid,
const std::vector<uint64_t>& hashes,
Expand Down Expand Up @@ -876,6 +934,11 @@ configuration :: dump() const
out << m_transfers[i] << std::endl;
}

for (size_t i = 0; i < m_migrations.size(); ++i)
{
out << m_migrations[i] << std::endl;
}

return out.str();
}

Expand Down Expand Up @@ -917,6 +980,7 @@ configuration :: operator = (const configuration& rhs)
m_point_leaders_by_virtual = rhs.m_point_leaders_by_virtual;
m_spaces = rhs.m_spaces;
m_transfers = rhs.m_transfers;
m_migrations = rhs.m_migrations;
refill_cache();
return *this;
}
Expand Down Expand Up @@ -1022,9 +1086,11 @@ hyperdex :: operator >> (e::unpacker up, configuration& c)
uint64_t num_servers;
uint64_t num_spaces;
uint64_t num_transfers;
uint64_t num_migrations;
up = up >> c.m_cluster >> c.m_version >> c.m_flags
>> num_servers >> num_spaces
>> num_transfers;
>> num_transfers >> num_migrations;

c.m_servers.clear();
c.m_servers.reserve(num_servers);

Expand Down Expand Up @@ -1055,6 +1121,16 @@ hyperdex :: operator >> (e::unpacker up, configuration& c)
c.m_transfers.push_back(xfer);
}

c.m_migrations.clear();
c.m_migrations.reserve(num_migrations);

for (size_t i = 0; !up.error() && i < num_migrations; ++i)
{
migration m;
up = up >> m;
c.m_migrations.push_back(m);
}

c.refill_cache();
return up;
}
8 changes: 8 additions & 0 deletions common/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "common/schema.h"
#include "common/server.h"
#include "common/transfer.h"
#include "common/migration.h"

BEGIN_HYPERDEX_NAMESPACE

Expand Down Expand Up @@ -78,6 +79,7 @@ class configuration
const schema* get_schema(const region_id& ri) const;
const subspace* get_subspace(const region_id& ri) const;
virtual_server_id get_virtual(const region_id& ri, const server_id& si) const;
space_id space_of(const region_id& ri) const;
subspace_id subspace_of(const region_id& ri) const;
subspace_id subspace_prev(const subspace_id& ss) const;
subspace_id subspace_next(const subspace_id& ss) const;
Expand All @@ -88,6 +90,7 @@ class configuration
void key_regions(const server_id& s, std::vector<region_id>* servers) const;
bool is_point_leader(const virtual_server_id& e) const;
virtual_server_id point_leader(const char* space, const e::slice& key) const;
virtual_server_id point_leader(const space_id& sid, const e::slice& key) const;
// point leader for this key in the same space as ri
virtual_server_id point_leader(const region_id& ri, const e::slice& key) const;
// lhs and rhs are in adjacent subspaces such that lhs sends CHAIN_PUT
Expand All @@ -109,6 +112,10 @@ class configuration
void transfers_in_regions(const server_id& s, std::vector<region_id>* transfers) const;
void transfers_out_regions(const server_id& s, std::vector<region_id>* transfers) const;

// migrations
public:
void migrations_out(const server_id& s, std::vector<migration>* migrations) const;

// hashing functions
public:
void lookup_region(const subspace_id& subspace,
Expand Down Expand Up @@ -155,6 +162,7 @@ class configuration
std::vector<uint64_t> m_point_leaders_by_virtual;
std::vector<space> m_spaces;
std::vector<transfer> m_transfers;
std::vector<migration> m_migrations;
};

e::buffer::packer
Expand Down
6 changes: 4 additions & 2 deletions common/datatype_int64.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ datatype_int64 :: apply(const e::slice& old_value,
for (size_t i = 0; i < funcs_sz; ++i)
{
const funcall* func = funcs + i;
int64_t arg;
e::unpack64le(func->arg1.data(), &arg);
int64_t arg = 0;
if (func->arg1.size() == sizeof(int64_t)) {
e::unpack64le(func->arg1.data(), &arg);
}

switch (func->name)
{
Expand Down
1 change: 1 addition & 0 deletions common/ids.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ CREATE_ID(server)
CREATE_ID(space)
CREATE_ID(subspace)
CREATE_ID(transfer)
CREATE_ID(migration)
CREATE_ID(virtual_server)

END_HYPERDEX_NAMESPACE
Expand Down
1 change: 1 addition & 0 deletions common/ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ CREATE_ID(server)
CREATE_ID(space)
CREATE_ID(subspace)
CREATE_ID(transfer)
CREATE_ID(migration)
CREATE_ID(virtual_server)

END_HYPERDEX_NAMESPACE
Expand Down
Loading