Skip to content

Commit

Permalink
Fix: global backaddr is assumed to be static #84
Browse files Browse the repository at this point in the history
  • Loading branch information
dmatetelki committed Oct 10, 2017
1 parent 99ef536 commit 511340e
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 50 deletions.
7 changes: 7 additions & 0 deletions src/configuration.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#define CFG_WORKERS "workers"
#define CFG_BACKLOG "backlog"
#define CFG_KEEPALIVE "keepalive"
#define CFG_BACKEND_REFRESH "backendrefresh"
#define CFG_CHROOT "chroot"
#define CFG_USER "user"
#define CFG_GROUP "group"
Expand Down Expand Up @@ -218,6 +219,7 @@ config_new(void)
r->SYSLOG = 0;
r->SYSLOG_FACILITY = LOG_DAEMON;
r->TCP_KEEPALIVE_TIME = 3600;
r->BACKEND_REFRESH_TIME = 1000;
r->DAEMONIZE = 0;
r->PREFER_SERVER_CIPHERS = 0;
r->TEST = 0;
Expand Down Expand Up @@ -748,6 +750,8 @@ config_param_validate(char *k, char *v, hitch_config *cfg,
r = config_param_val_int(v, &cfg->BACKLOG, 0);
} else if (strcmp(k, CFG_KEEPALIVE) == 0) {
r = config_param_val_int(v, &cfg->TCP_KEEPALIVE_TIME, 1);
} else if (strcmp(k, CFG_BACKEND_REFRESH) == 0) {
r = config_param_val_int(v, &cfg->BACKEND_REFRESH_TIME, 1);
}
#ifdef USE_SHARED_CACHE
else if (strcmp(k, CFG_SHARED_CACHE) == 0) {
Expand Down Expand Up @@ -1048,6 +1052,7 @@ config_print_usage_fd(char *prog, FILE *out)
fprintf(out, " -n --workers=NUM Number of worker processes (Default: %ld)\n", cfg->NCORES);
fprintf(out, " -B --backlog=NUM Set listen backlog size (Default: %d)\n", cfg->BACKLOG);
fprintf(out, " -k --keepalive=SECS TCP keepalive on client socket (Default: %d)\n", cfg->TCP_KEEPALIVE_TIME);
fprintf(out, " -R --backendrefresh=USECS Periodic backend IP lookup (Default: %d, 0 to disable)\n", cfg->BACKEND_REFRESH_TIME);

#ifdef USE_SHARED_CACHE
fprintf(out, " -C --session-cache=NUM Enable and set SSL session cache to specified number\n");
Expand Down Expand Up @@ -1187,6 +1192,7 @@ config_parse_cli(int argc, char **argv, hitch_config *cfg, int *retval)
#endif
{ CFG_PIDFILE, 1, NULL, 'p' },
{ CFG_KEEPALIVE, 1, NULL, 'k' },
{ CFG_BACKEND_REFRESH, 1, NULL, 'R' },
{ CFG_CHROOT, 1, NULL, 'r' },
{ CFG_USER, 1, NULL, 'u' },
{ CFG_GROUP, 1, NULL, 'g' },
Expand Down Expand Up @@ -1282,6 +1288,7 @@ CFG_ARG('M', CFG_SHARED_CACHE_MCASTIF);
#endif
CFG_ARG('p', CFG_PIDFILE);
CFG_ARG('k', CFG_KEEPALIVE);
CFG_ARG('R', CFG_BACKEND_REFRESH);
CFG_ARG('r', CFG_CHROOT);
CFG_ARG('u', CFG_USER);
CFG_ARG('g', CFG_GROUP);
Expand Down
1 change: 1 addition & 0 deletions src/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ struct __hitch_config {
int SYSLOG;
int SYSLOG_FACILITY;
int TCP_KEEPALIVE_TIME;
int BACKEND_REFRESH_TIME;
int DAEMONIZE;
int PREFER_SERVER_CIPHERS;
int BACKEND_CONNECT_TIMEOUT;
Expand Down
185 changes: 135 additions & 50 deletions src/hitch.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ enum worker_state_e {

static enum worker_state_e worker_state;

struct simple_addrinfo {
int family;
int len;
struct sockaddr_storage addr;
};

struct worker_proc {
unsigned magic;
#define WORKER_PROC_MAGIC 0xbc7fe9e6
Expand All @@ -148,6 +154,7 @@ struct worker_proc {
pid_t pid;
unsigned gen;
int core_id;
struct simple_addrinfo backaddr;
VTAILQ_ENTRY(worker_proc) list;
};

Expand Down Expand Up @@ -250,6 +257,20 @@ struct ha_proxy_v2_hdr {
union ha_proxy_v2_addr addr;
};

enum worker_update_type {
WORKER_GEN,
BACKEND_REFRESH
};

union worker_update_payload {
unsigned gen;
struct simple_addrinfo addr;
};

struct worker_update {
enum worker_update_type type;
union worker_update_payload payload;
};

/* set a file descriptor (socket) to non-blocking mode */
static int
Expand Down Expand Up @@ -1930,6 +1951,17 @@ static void end_handshake(proxystate *ps) {
} else if (CONFIG->WRITE_IP_OCTET) {
write_ip_octet(ps);
}

int back = create_back_socket();
if (back == -1) {
ERR("{backend-socket}: %s\n", strerror(errno));
return;
}
ps->fd_down = back;
ev_io_init(&ps->ev_w_connect, handle_connect, back, EV_WRITE);
ev_io_init(&ps->ev_w_clear, clear_write, back, EV_WRITE);
ev_io_init(&ps->ev_r_clear, clear_read, back, EV_READ);

/* start connect now */
if (0 != start_connect(ps))
return;
Expand Down Expand Up @@ -2272,13 +2304,6 @@ handle_accept(struct ev_loop *loop, ev_io *w, int revents)

settcpkeepalive(client);

int back = create_back_socket();
if (back == -1) {
(void) close(client);
ERR("{backend-socket}: %s\n", strerror(errno));
return;
}

CAST_OBJ_NOTNULL(fr, w->data, FRONTEND_MAGIC);
if (fr->ssl_ctxs != NULL)
CAST_OBJ_NOTNULL(so, fr->ssl_ctxs, SSLCTX_MAGIC);
Expand All @@ -2287,7 +2312,6 @@ handle_accept(struct ev_loop *loop, ev_io *w, int revents)

SSL *ssl = SSL_new(so->ctx);
if (ssl == NULL) {
(void)close(back);
(void)close(client);
ERR("{SSL_new}: %s\n", strerror(errno));
return;
Expand All @@ -2296,7 +2320,6 @@ handle_accept(struct ev_loop *loop, ev_io *w, int revents)
ALLOC_OBJ(ps, PROXYSTATE_MAGIC);
if (ps == NULL) {
SSL_free(ssl);
(void)close(back);
(void)close(client);
ERR("{malloc-err}: %s\n", strerror(errno));
return;
Expand All @@ -2311,7 +2334,7 @@ handle_accept(struct ev_loop *loop, ev_io *w, int revents)
SSL_set_fd(ssl, client);

ps->fd_up = client;
ps->fd_down = back;
ps->fd_down = 0;
ps->ssl = ssl;
ps->want_shutdown = 0;
ps->clear_connected = 0;
Expand All @@ -2336,13 +2359,9 @@ handle_accept(struct ev_loop *loop, ev_io *w, int revents)

ev_io_init(&ps->ev_proxy, client_proxy_proxy, client, EV_READ);

ev_io_init(&ps->ev_w_connect, handle_connect, back, EV_WRITE);
ev_timer_init(&ps->ev_t_connect, connect_timeout,
CONFIG->BACKEND_CONNECT_TIMEOUT, 0.);

ev_io_init(&ps->ev_w_clear, clear_write, back, EV_WRITE);
ev_io_init(&ps->ev_r_clear, clear_read, back, EV_READ);

ps->ev_r_ssl.data = ps;
ps->ev_w_ssl.data = ps;
ps->ev_r_clear.data = ps;
Expand Down Expand Up @@ -2394,13 +2413,13 @@ check_ppid(struct ev_loop *loop, ev_timer *w, int revents)
static void
handle_mgt_rd(struct ev_loop *loop, ev_io *w, int revents)
{
unsigned cg;
ssize_t r;
struct frontend *fr;
struct listen_sock *ls;
struct worker_update wu;

(void) revents;
r = read(w->fd, &cg, sizeof(cg));
r = read(w->fd, &wu, sizeof(wu));
if (r == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN)
return;
Expand All @@ -2416,7 +2435,7 @@ handle_mgt_rd(struct ev_loop *loop, ev_io *w, int revents)
_exit(1);
}

if (cg != worker_gen) {
if (wu.type == WORKER_GEN && wu.payload.gen != worker_gen) {
/* This means this process has reached its retirement age. */
worker_state = WORKER_EXITING;

Expand All @@ -2431,10 +2450,16 @@ handle_mgt_rd(struct ev_loop *loop, ev_io *w, int revents)
}

check_exit_state();
}

LOGL("Worker %d (gen: %d): State %s\n", core_id, worker_gen,
(worker_state == WORKER_EXITING) ? "EXITING" : "ACTIVE");
LOGL("Worker %d (gen: %d): State %s\n", core_id, worker_gen,
(worker_state == WORKER_EXITING) ? "EXITING" : "ACTIVE");
}
else if (wu.type == BACKEND_REFRESH) {
backaddr->ai_family = wu.payload.addr.family;
backaddr->ai_addrlen = wu.payload.addr.len;
memcpy(backaddr->ai_addr, &wu.payload.addr.addr,
sizeof(struct sockaddr_storage));
}
}

static void
Expand Down Expand Up @@ -2725,25 +2750,40 @@ drop_privileges(void)
#endif
}

void
init_globals(void)
{
/* backaddr */
int
get_backend_addrinfo() {
static struct addrinfo *addrinfo;
struct addrinfo hints;

VTAILQ_INIT(&frontends);
VTAILQ_INIT(&worker_procs);

memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
const int gai_err = getaddrinfo(CONFIG->BACK_IP, CONFIG->BACK_PORT,
&hints, &backaddr);
&hints, &addrinfo);
if (gai_err != 0) {
ERR("{getaddrinfo-backend}: %s\n", gai_strerror(gai_err));
exit(1);
}
if (memcmp(addrinfo->ai_addr, backaddr->ai_addr,
sizeof(struct sockaddr_storage)) != 0) {
backaddr = addrinfo;
return 1;
}
return 0;
}

void
init_globals(void)
{
struct addrinfo hints;

VTAILQ_INIT(&frontends);
VTAILQ_INIT(&worker_procs);

get_backend_addrinfo();

(void)hints;

#ifdef USE_SHARED_CACHE
if (CONFIG->SHARED_CACHE) {
Expand Down Expand Up @@ -3413,15 +3453,47 @@ cert_query(hitch_config *cfg, struct cfg_tpc_obj_head *cfg_objs)
}

static void
reconfigure(int argc, char **argv)
notify_workers(struct worker_update *wu)
{
struct worker_proc *c;
int i;
VTAILQ_FOREACH(c, &worker_procs, list) {
if ((wu->type == WORKER_GEN && wu->payload.gen != c->gen) ||
(wu->type == BACKEND_REFRESH)) {
errno = 0;
do {
i = write(c->pfd, (void*)wu, sizeof(wu));
if (i == -1 && errno != EINTR) {
if (wu->type == WORKER_GEN)
ERR("WARNING: {core} Unable to "
"gracefully reload worker %d"
" (%s).\n",
c->pid, strerror(errno));
else
ERR("WARNING: {core} Unable to "
"notify worker %d "
"with changed backend address (%s).\n",
c->pid, strerror(errno));

(void)kill(c->pid, SIGTERM);
break;
}
} while (i == -1 && errno == EINTR);
(void)close(c->pfd);
}
}
}

static void
reconfigure(int argc, char **argv)
{
hitch_config *cfg_new;
int i, rv;
int rv;
struct cfg_tpc_obj_head cfg_objs;
struct cfg_tpc_obj *cto, *cto_tmp;
struct timeval tv;
double t0, t1;
struct worker_update wu;

LOGL("Received SIGHUP: Initiating configuration reload.\n");
AZ(gettimeofday(&tv, NULL));
Expand Down Expand Up @@ -3465,24 +3537,10 @@ reconfigure(int argc, char **argv)

worker_gen++;
start_workers(0, CONFIG->NCORES);
VTAILQ_FOREACH(c, &worker_procs, list) {
if (c->gen != worker_gen) {
errno = 0;
do {
i = write(c->pfd, &worker_gen,
sizeof(worker_gen));
if (i == -1 && errno != EINTR) {
ERR("WARNING: {core} Unable to "
"gracefully reload worker %d"
" (%s).\n",
c->pid, strerror(errno));
(void)kill(c->pid, SIGTERM);
break;
}
} while (i == -1 && errno == EINTR);
(void)close(c->pfd);
}
}

wu.type = WORKER_GEN;
wu.payload.gen = worker_gen;
notify_workers(&wu);

if (CONFIG->OCSP_DIR != NULL) {
(void) kill(ocsp_proc_pid, SIGTERM);
Expand All @@ -3496,6 +3554,32 @@ reconfigure(int argc, char **argv)
CONFIG = cfg_new;
}

void
sleep_and_refresh(hitch_config *CONFIG)
{
if (!CONFIG->BACKEND_CONNECT_TIMEOUT) {
pause();
return;
}

int rv = 0;
while (1) {
rv = usleep(CONFIG->BACKEND_CONNECT_TIMEOUT);
if (rv == EINTR)
break;
else if(get_backend_addrinfo()) {
struct worker_update wu;
wu.type = BACKEND_REFRESH;
wu.payload.addr.len = backaddr->ai_addrlen;
wu.payload.addr.family = backaddr->ai_family;
memcpy(&wu.payload.addr.addr,
backaddr->ai_addr,
sizeof(struct sockaddr_storage));
notify_workers(&wu);
}
}
}

/* Process command line args, create the bound socket,
* spawn child (worker) processes, and respawn if any die */
int
Expand Down Expand Up @@ -3628,9 +3712,10 @@ main(int argc, char **argv)
ev_loop(loop, EVRUN_ONCE);
}
} else
pause();
sleep_and_refresh(CONFIG);
#else
pause();

sleep_and_refresh(CONFIG);
/* Sleep and let the children work.
* Parent will be woken up if a signal arrives */
#endif /* USE_SHARED_CACHE */
Expand Down

0 comments on commit 511340e

Please sign in to comment.