Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wan manager #218

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*.lo
*.o
*.pyc
*.py
*.sh
/bindings/java/org/hyperdex/*/*.class
/test/java/*.class
/man/hyperdex*.1
Expand Down Expand Up @@ -80,6 +82,9 @@
/hyperdex.upack
/hyperdex-validate-space
/hyperdex-wait-until-stable
/hyperdex-set-backup-cluster
/hyperdex-set-primary-cluster
/hyperdex-set-backup-affinity
/install-sh
/leveldb-dump
/libtool
Expand All @@ -94,3 +99,10 @@
/test/search-stress-test
/test/simple-consistency-stress-test
/ylwrap
MENG_TODO.txt
ac/*
ad/*
bc/*
bd/*
cc/*
cdae/*
35 changes: 35 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ noinst_HEADERS += daemon/auth.h
noinst_HEADERS += daemon/background_thread.h
noinst_HEADERS += daemon/communication.h
noinst_HEADERS += daemon/coordinator_link_wrapper.h
noinst_HEADERS += daemon/wan_manager.h
noinst_HEADERS += daemon/wan_manager_pending.h
noinst_HEADERS += daemon/wan_manager_transfer_in_state.h
noinst_HEADERS += daemon/wan_manager_transfer_out_state.h
noinst_HEADERS += daemon/daemon.h
noinst_HEADERS += daemon/datalayer_checkpointer_thread.h
noinst_HEADERS += daemon/datalayer_encodings.h
Expand Down Expand Up @@ -266,6 +270,10 @@ hyperdex_daemon_SOURCES += daemon/auth.cc
hyperdex_daemon_SOURCES += daemon/background_thread.cc
hyperdex_daemon_SOURCES += daemon/communication.cc
hyperdex_daemon_SOURCES += daemon/coordinator_link_wrapper.cc
hyperdex_daemon_SOURCES += daemon/wan_manager.cc
hyperdex_daemon_SOURCES += daemon/wan_manager_pending.cc
hyperdex_daemon_SOURCES += daemon/wan_manager_transfer_in_state.cc
hyperdex_daemon_SOURCES += daemon/wan_manager_transfer_out_state.cc
hyperdex_daemon_SOURCES += daemon/daemon.cc
hyperdex_daemon_SOURCES += daemon/datalayer.cc
hyperdex_daemon_SOURCES += daemon/datalayer_checkpointer_thread.cc
Expand Down Expand Up @@ -1143,6 +1151,9 @@ hyperdexexec_PROGRAMS += hyperdex-wait-until-stable
hyperdexexec_PROGRAMS += hyperdex-backup
hyperdexexec_PROGRAMS += hyperdex-backup-manager
hyperdexexec_PROGRAMS += hyperdex-raw-backup
hyperdexexec_PROGRAMS += hyperdex-set-backup-cluster
hyperdexexec_PROGRAMS += hyperdex-set-primary-cluster
hyperdexexec_PROGRAMS += hyperdex-set-backup-affinity
hyperdexexec_SCRIPTS += hyperdex-noc
dist_man_MANS += man/hyperdex-add-space.1
dist_man_MANS += man/hyperdex-rm-space.1
Expand Down Expand Up @@ -1300,6 +1311,30 @@ hyperdex_set_read_only_LDADD = libhyperdex-admin.la -lpopt
man/hyperdex-set-read-only.1: man/hyperdex-set-read-only.1.h2m tools/set-read-only.cc | hyperdex-set-read-only$(EXEEXT)
$(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-read-only$(EXEEXT)

# hyperdex-set-primary-cluster
EXTRA_DIST += man/hyperdex-set-primary-cluster.1.md
EXTRA_DIST += man/hyperdex-set-primary-cluster.1.h2m
hyperdex_set_primary_cluster_SOURCES = tools/set-primary-cluster.cc
hyperdex_set_primary_cluster_LDADD = libhyperdex-admin.la -lpopt
man/hyperdex-set-primary-cluster.1: man/hyperdex-set-primary-cluster.1.h2m tools/set-primary-cluster.cc | hyperdex-set-primary-cluster$(EXEEXT)
$(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-primary-cluster$(EXEEXT)

# hyperdex-set-backup-affinity
EXTRA_DIST += man/hyperdex-set-backup-affinity.1.md
EXTRA_DIST += man/hyperdex-set-backup-affinity.1.h2m
hyperdex_set_backup_affinity_SOURCES = tools/set-backup-affinity.cc
hyperdex_set_backup_affinity_LDADD = libhyperdex-admin.la -lpopt
man/hyperdex-set-backup-affinity.1: man/hyperdex-set-backup-affinity.1.h2m tools/set-backup-affinity.cc | hyperdex-set-backup-affinity$(EXEEXT)
$(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-backup-affinity$(EXEEXT)

# hyperdex-set-backup-cluster
EXTRA_DIST += man/hyperdex-set-backup-cluster.1.md
EXTRA_DIST += man/hyperdex-set-backup-cluster.1.h2m
hyperdex_set_backup_cluster_SOURCES = tools/set-backup-cluster.cc
hyperdex_set_backup_cluster_LDADD = libhyperdex-admin.la -lpopt
man/hyperdex-set-backup-cluster.1: man/hyperdex-set-backup-cluster.1.h2m tools/set-backup-cluster.cc | hyperdex-set-backup-cluster$(EXEEXT)
$(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-backup-cluster$(EXEEXT)

# hyperdex-set-read-write
EXTRA_DIST += man/hyperdex-set-read-write.1.md
EXTRA_DIST += man/hyperdex-set-read-write.1.h2m
Expand Down
91 changes: 91 additions & 0 deletions admin/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,97 @@ admin :: read_only(int ro, hyperdex_admin_returncode* status)
}
}

int64_t
admin :: set_primary_cluster(int prim, hyperdex_admin_returncode* status)
{
if (!maintain_coord_connection(status)) {
return -1;
}

int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "set primary-cluster");

char buf[sizeof(uint8_t)];
buf[0] = prim ? 1 : 0;

int64_t cid = m_coord.rpc("set_primary_cluster", buf, sizeof(uint8_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
m_coord_ops[cid] = op;
return op->admin_visible_id();
}
else
{
interpret_rpc_request_failure(op->repl_status, status);
return -1;
}
}

int64_t
admin :: set_backup_affinity(const char* host, int64_t port, hyperdex_admin_returncode* status)
{
if (!maintain_coord_connection(status)) {
return -1;
}

int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "set backup-affinity");
int64_t host_sz = strlen(host);

std::vector<char> buf(host_sz + sizeof(uint64_t));
memcpy(&buf[0], host, host_sz);
e::pack64be(port, &buf[0] + host_sz);

int64_t cid = m_coord.rpc("set_backup_affinity", &buf[0], host_sz + sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
m_coord_ops[cid] = op;
return op->admin_visible_id();
}
else
{
interpret_rpc_request_failure(op->repl_status, status);
return -1;
}
}

int64_t
admin :: set_backup_cluster(const char* host, int64_t port, hyperdex_admin_returncode* status)
{
if (!maintain_coord_connection(status)) {
return -1;
}

int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "set backup-cluster");
int64_t host_sz = strlen(host);

std::vector<char> buf(host_sz + sizeof(uint64_t));
memcpy(&buf[0], host, host_sz);
e::pack64be(port, &buf[0] + host_sz);

int64_t cid = m_coord.rpc("set_backup_cluster", &buf[0], host_sz + sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
m_coord_ops[cid] = op;
return op->admin_visible_id();
}
else
{
interpret_rpc_request_failure(op->repl_status, status);
return -1;
}
}

int64_t
admin :: wait_until_stable(enum hyperdex_admin_returncode* status)
{
Expand Down
6 changes: 6 additions & 0 deletions admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class admin
// cluster
int64_t read_only(int ro,
enum hyperdex_admin_returncode* status);
int64_t set_primary_cluster(int prim,
enum hyperdex_admin_returncode* status);
int64_t set_backup_cluster(const char* host, const int64_t port,
enum hyperdex_admin_returncode* status);
int64_t set_backup_affinity(const char* host, const int64_t port,
enum hyperdex_admin_returncode* status);
int64_t wait_until_stable(enum hyperdex_admin_returncode* status);
int64_t fault_tolerance(const char* space, uint64_t ft,
enum hyperdex_admin_returncode* status);
Expand Down
34 changes: 34 additions & 0 deletions admin/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ hyperdex_admin_dump_config(struct hyperdex_admin* _adm,
return adm->dump_config(status, config);
);
}

HYPERDEX_API int64_t
hyperdex_admin_read_only(struct hyperdex_admin* _adm,
int ro,
Expand All @@ -137,6 +138,39 @@ hyperdex_admin_read_only(struct hyperdex_admin* _adm,
);
}

HYPERDEX_API int64_t
hyperdex_admin_set_primary_cluster(struct hyperdex_admin* _adm,
int prim,
enum hyperdex_admin_returncode* status)
{
C_WRAP_EXCEPT(
hyperdex::admin* adm = reinterpret_cast<hyperdex::admin*>(_adm);
return adm->set_primary_cluster(prim, status);
);
}

HYPERDEX_API int64_t
hyperdex_admin_set_backup_affinity(struct hyperdex_admin* _adm,
const char* host, const int64_t port,
enum hyperdex_admin_returncode* status)
{
C_WRAP_EXCEPT(
hyperdex::admin* adm = reinterpret_cast<hyperdex::admin*>(_adm);
return adm->set_backup_affinity(host, port, status);
);
}

HYPERDEX_API int64_t
hyperdex_admin_set_backup_cluster(struct hyperdex_admin* _adm,
const char* host, const int64_t port,
enum hyperdex_admin_returncode* status)
{
C_WRAP_EXCEPT(
hyperdex::admin* adm = reinterpret_cast<hyperdex::admin*>(_adm);
return adm->set_backup_cluster(host, port, status);
);
}

HYPERDEX_API int64_t
hyperdex_admin_wait_until_stable(struct hyperdex_admin* _adm,
enum hyperdex_admin_returncode* status)
Expand Down
15 changes: 15 additions & 0 deletions common/attribute.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

// C
#include <string.h>

// HyperDex
#include "common/attribute.h"

Expand Down Expand Up @@ -55,3 +58,15 @@ attribute :: operator = (const attribute& rhs)
type = rhs.type;
return *this;
}

bool
attribute :: operator == (const attribute rhs) const
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a type is more than 64 bits it usually makes sense to do a "const T&".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheers, Kai! I pushed the changes.

{
return ((strcmp(name, rhs.name) == 0) && type == rhs.type);
}

bool
attribute :: operator != (const attribute rhs) const
{
return ((strcmp(name, rhs.name) != 0) || type != rhs.type);
}
2 changes: 2 additions & 0 deletions common/attribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class attribute

public:
attribute& operator = (const attribute& rhs);
bool operator == (const attribute rhs) const;
bool operator != (const attribute rhs) const;

public:
const char* name;
Expand Down
37 changes: 36 additions & 1 deletion common/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ configuration :: configuration()
, m_point_leaders_by_virtual()
, m_spaces()
, m_transfers()
, m_primary_coord()
{
refill_cache();
}
Expand All @@ -89,6 +90,7 @@ configuration :: configuration(const configuration& other)
, m_point_leaders_by_virtual(other.m_point_leaders_by_virtual)
, m_spaces(other.m_spaces)
, m_transfers(other.m_transfers)
, m_primary_coord(other.m_primary_coord)
{
refill_cache();
}
Expand All @@ -115,6 +117,18 @@ configuration :: read_only() const
return m_flags & HYPERDEX_CONFIG_READ_ONLY;
}

bool
configuration :: is_backup_cluster() const
{
return (m_flags & HYPERDEX_BACKUP_CLUSTER) >> 1;
}

po6::net::location
configuration :: get_primary_location()
{
return m_primary_coord;
}

void
configuration :: get_all_addresses(std::vector<std::pair<server_id, po6::net::location> >* addrs) const
{
Expand Down Expand Up @@ -201,6 +215,20 @@ configuration :: get_server_id(const virtual_server_id& id) const
return server_id();
}

virtual_server_id
configuration :: get_virtual(const server_id& si) const
{
for (std::vector<pair_uint64_t>::const_iterator it = m_server_ids_by_virtual.begin();
it != m_server_ids_by_virtual.end();
++it) {
if (it->second == si.get()) {
return virtual_server_id(it->first);
}
}

return virtual_server_id();
}

const schema*
configuration :: get_schema(const char* sname) const
{
Expand Down Expand Up @@ -395,6 +423,12 @@ configuration :: point_leaders(const server_id& si, std::vector<region_id>* serv
}
}

std::vector<hyperdex::space>
configuration :: get_spaces()
{
return m_spaces;
}

void
configuration :: key_regions(const server_id& si, std::vector<region_id>* regions) const
{
Expand Down Expand Up @@ -1064,6 +1098,7 @@ configuration :: operator = (const configuration& rhs)
m_cluster = rhs.m_cluster;
m_version = rhs.m_version;
m_flags = rhs.m_flags;
m_primary_coord = rhs.m_primary_coord;
m_servers = rhs.m_servers;
m_region_ids_by_virtual = rhs.m_region_ids_by_virtual;
m_server_ids_by_virtual = rhs.m_server_ids_by_virtual;
Expand Down Expand Up @@ -1183,7 +1218,7 @@ hyperdex :: operator >> (e::unpacker up, configuration& c)
uint64_t num_servers;
uint64_t num_spaces;
uint64_t num_transfers;
up = up >> c.m_cluster >> c.m_version >> c.m_flags
up = up >> c.m_cluster >> c.m_version >> c.m_flags >> c.m_primary_coord
>> num_servers >> num_spaces
>> num_transfers;
c.m_servers.clear();
Expand Down
Loading