Skip to content

Commit

Permalink
track unexpected discs (fixes #223)
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Apr 10, 2021
1 parent ab1ef96 commit 10832ac
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
16 changes: 8 additions & 8 deletions R/qsys.r
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ QSys = R6::R6Class("QSys",

# Evaluate an arbitrary expression on a worker
send_call = function(expr, env=list(), ref=substitute(expr)) {
private$send(id="DO_CALL", expr=substitute(expr), env=env, ref=ref)
private$send_work(id="DO_CALL", expr=substitute(expr), env=env, ref=ref)
},

# Sets the common data as an zeromq message object
Expand Down Expand Up @@ -89,17 +89,17 @@ QSys = R6::R6Class("QSys",
send_common_data = function() {
if (is.null(private$common_data))
stop("Need to set_common_data() first")
private$zmq$send(private$common_data)
private$zmq$send_work(private$common_data)
},

# Send iterated data to one worker
send_job_data = function(...) {
private$send(id="DO_CHUNK", token=private$token, ...)
private$send_work(id="DO_CHUNK", token=private$token, ...)
},

# Wait for a total of 50 ms
send_wait = function(wait=0.05*self$workers_running) {
private$send(id="WORKER_WAIT", wait=wait)
private$send_work(id="WORKER_WAIT", wait=wait)
},

# Read data from the socket
Expand Down Expand Up @@ -150,7 +150,7 @@ QSys = R6::R6Class("QSys",

# Send shutdown signal to worker
send_shutdown_worker = function() {
private$send(id="WORKER_STOP")
private$zmq$send_shutdown(data=list(id="WORKER_STOP"))
},

# Make sure all resources are closed properly
Expand Down Expand Up @@ -212,12 +212,12 @@ QSys = R6::R6Class("QSys",
pkg_warn = utils::packageVersion("clustermq"),
auth = "",

send = function(...) {
private$zmq$send(data = list(...))
send_work = function(...) {
private$zmq$send_work(data = list(...))
},

disconnect_worker = function(msg) {
private$send()
private$zmq$send_shutdown(list())
# private$zmq$disconnect() #FIXME: disconnect worker, don't close socket
private$workers_up = private$workers_up - 1
private$workers_total = private$workers_total - 1
Expand Down
32 changes: 20 additions & 12 deletions src/CMQMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,34 @@ class CMQMaster : public ZeroMQ {
return sock->listen(addrs);
}
// temporary for refactor, Rcpp errors if only defined in base class (or same name)
void disconnect2() {
disconnect("master");
void send_work(SEXP data) {
send_work(data, false);
}
void send2(SEXP data) {
send2(data, false);
}
void send2(SEXP data, bool send_more=false) {
void send_work(SEXP data, bool send_more=false) {
std::cerr << "setting worker " << cur_s << " active\n";
peer_active[cur_s] = true;
send(cur, "master", false, true);
send_null("master", false, true);
send(data, "master", false, send_more);
}
void send_shutdown(SEXP data) {
std::cerr << "setting worker " << cur_s << " inactive\n";
peer_active[cur_s] = false;
send(cur, "master", false, true);
send_null("master", false, true);
send(data, "master", false, false);
}
SEXP receive2() {
cur = receive("master", false, false);
cur_s = std::string(reinterpret_cast<const char*>(RAW(cur)), Rf_xlength(cur));
auto null = receive("master", false, false);
SEXP msg;
if (rcv_more("master")) {
msg = receive("master", true, true);
peer_active[cur_s] = false;
} else {
std::cerr << "notify disconnect from " << cur_s << "\n";
if (peer_active[cur_s])
Rf_error("Unexpected worker disconnect: check your logs");
peer_active.erase(cur_s);
msg = R_NilValue;
}
Expand All @@ -78,16 +85,17 @@ class CMQMaster : public ZeroMQ {

RCPP_MODULE(cmq_master) {
using namespace Rcpp;
void (CMQMaster::*send_1)(SEXP) = &CMQMaster::send2 ;
void (CMQMaster::*send_2)(SEXP, bool) = &CMQMaster::send2 ;
void (CMQMaster::*send_1)(SEXP) = &CMQMaster::send_work ;
void (CMQMaster::*send_2)(SEXP, bool) = &CMQMaster::send_work ;
void (CMQMaster::*send_3)(SEXP) = &CMQMaster::send_shutdown ;
class_<CMQMaster>("CMQMaster")
.constructor()
// .constructor<zmq::context_t*>()
.method("main_loop", &CMQMaster::main_loop)
.method("listen", &CMQMaster::listen2)
.method("disconnect", &CMQMaster::disconnect2)
.method("send", send_1)
.method("send", send_2)
.method("send_work", send_1)
.method("send_work", send_2)
.method("send_shutdown", send_3)
.method("receive", &CMQMaster::receive2)
.method("poll", &CMQMaster::poll2)
;
Expand Down

0 comments on commit 10832ac

Please sign in to comment.