Skip to content

Commit

Permalink
clusterer: Fix potential AB/BA sync deadlock on DONOR nodes
Browse files Browse the repository at this point in the history
This deadlock was reproduced with usrloc, but may also affect other
sync-based modules, as follows:

---- sync DONOR node ----
1) [sync send Worker-1] grabs (A) cluster lock, then (B) usrloc hash lock
    while building/sending the sync packets
2) [handle SIP REGISTER Worker-2] grabs (B) usrloc hash lock, then (A)
    cluster lock when replicating data with cl_api.send_all()

This patch converts the "A+B" sequence in 1) into a "B,A" sequence.
  • Loading branch information
liviuchircu committed Oct 22, 2024
1 parent ae5fdc6 commit d89f170
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 52 deletions.
29 changes: 21 additions & 8 deletions bin_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void set_len(bin_packet_t *packet) {
}

/**
* bin_init - begins the construction of a new binary packet (header part):
* _bin_init - begins the construction of a new binary packet (header part):
*
* +-----------------------------+-------------------------------------------------------+
* | 12-byte HEADER | BODY max 65535 bytes |
Expand All @@ -48,8 +48,8 @@ void set_len(bin_packet_t *packet) {
* @param: { LEN, CAP } + CMD + VERSION
* @length: initial packet size. specify 0 to use the default size (BIN_MAX_BUF_LEN)
*/
int bin_init(bin_packet_t *packet, str *capability, int packet_type,
short version, int length)
int _bin_init(bin_packet_t *packet, str *capability, int packet_type,
short version, int length, int use_sysmalloc)
{
if (length != 0 && length < MIN_BIN_PACKET_SIZE + capability->len) {
LM_ERR("Length parameter has to be greater than: %zu\n",
Expand All @@ -60,14 +60,20 @@ int bin_init(bin_packet_t *packet, str *capability, int packet_type,
if (!length)
length = BIN_MAX_BUF_LEN;

packet->type = packet_type;

packet->buffer.s = pkg_malloc(length);
if (use_sysmalloc) {
packet->buffer.s = malloc(length);
packet->flags = BINFL_SYSMEM;
} else {
packet->buffer.s = pkg_malloc(length);
packet->flags = 0;
}
if (!packet->buffer.s) {
LM_ERR("No more pkg memory!\n");
return -1;
}

packet->buffer.len = 0;
packet->type = packet_type;
packet->size = length;

/* binary packet header: marker + pkg_len */
Expand Down Expand Up @@ -112,6 +118,7 @@ void bin_init_buffer(bin_packet_t *packet, char *buffer, int length)
packet->buffer.len = length;
packet->buffer.s = buffer;
packet->size = length;
packet->flags = 0;

bin_get_capability(packet, &capability);

Expand Down Expand Up @@ -441,6 +448,7 @@ void call_callbacks(char* buffer, struct receive_info *rcv)

packet.buffer.len = pkg_len;
packet.size = pkg_len + 50;
packet.flags = 0;
memcpy(packet.buffer.s, buffer, pkg_len);

bin_get_capability(&packet, &capability);
Expand Down Expand Up @@ -481,7 +489,9 @@ static int bin_extend(bin_packet_t *packet, int size)
else
packet->size = 2 * required;

packet->buffer.s = pkg_realloc(packet->buffer.s, packet->size);
packet->buffer.s = (packet->flags & BINFL_SYSMEM) ?
realloc(packet->buffer.s, packet->size) :
pkg_realloc(packet->buffer.s, packet->size);
if (!packet->buffer.s) {
LM_ERR("pkg realloc failed\n");
return -1;
Expand All @@ -493,7 +503,10 @@ static int bin_extend(bin_packet_t *packet, int size)
void bin_free_packet(bin_packet_t *packet)
{
if (packet->buffer.s) {
pkg_free(packet->buffer.s);
if (packet->flags & BINFL_SYSMEM)
free(packet->buffer.s);
else
pkg_free(packet->buffer.s);
packet->buffer.s = NULL;
} else {
LM_INFO("atempting to free uninitialized binary packet\n");
Expand Down
11 changes: 9 additions & 2 deletions bin_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@
} while (0)
#define ensure_bin_version(pkt, needed) _ensure_bin_version(pkt, needed, "")

typedef unsigned bin_packet_flags_t;
#define BINFL_SYSMEM (1U<<0)

typedef struct bin_packet {
str buffer;
char *front_pointer;
struct bin_packet *next;
int size;
int type;
bin_packet_flags_t flags;
/* not populated by bin_interface */
int src_id;
} bin_packet_t;
Expand Down Expand Up @@ -123,8 +128,10 @@ int bin_register_cb(str *cap, void (*cb)(bin_packet_t *, int,
*
* @return: 0 on success
*/
int bin_init(bin_packet_t *packet, str *capability, int packet_type, short version,
int length);
int _bin_init(bin_packet_t *packet, str *capability, int packet_type, short version,
int length, int use_sysmalloc);
#define bin_init(_pk, _cap, _pt, _ver, _len) \
_bin_init(_pk, _cap, _pt, _ver, _len, 0)

/**
* function called to build a binary packet with a known buffer
Expand Down
90 changes: 48 additions & 42 deletions modules/clusterer/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
int sync_packet_size = DEFAULT_SYNC_PACKET_SIZE;
int _sync_from_id = 0;

static bin_packet_t *sync_packet_snd;
static bin_packet_t *sync_packet_last;
static int sync_prev_buf_len;
static int *sync_last_chunk_sz;
static bin_packet_t *sync_packets;
static unsigned sync_packets_cnt;

int send_sync_req(str *capability, int cluster_id, int source_id)
{
Expand Down Expand Up @@ -204,8 +206,8 @@ bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id,
int aloc_new_pkt = 0;
bin_packet_t *new_packet = NULL;

if (sync_packet_snd) {
bin_get_buffer(sync_packet_snd, &bin_buffer);
if (sync_packet_last) {
bin_get_buffer(sync_packet_last, &bin_buffer);
prev_chunk_size = bin_buffer.len - sync_prev_buf_len;
/* assume this chunk will have aprox the same size as the previous one
* and check if there is enough space in the packet */
Expand All @@ -215,62 +217,59 @@ bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id,
aloc_new_pkt = 1;

if (aloc_new_pkt) { /* next chunk will be in a new packet */
if (sync_packet_snd) {
if (sync_packet_last) {
*sync_last_chunk_sz = prev_chunk_size;

/* send and free the previous packet */
msg_add_trailer(sync_packet_snd, cluster_id, dst_id);

if (clusterer_send_msg(sync_packet_snd, cluster_id, dst_id, 0,
1 /* we should be in a SYNC_REQ_RCV callback here so
* already locked*/) < 0)
LM_ERR("Failed to send sync packet\n");

bin_free_packet(sync_packet_snd);
pkg_free(sync_packet_snd);
sync_packet_snd = NULL;
/* properly end the previous packet (to be sent later) */
msg_add_trailer(sync_packet_last, cluster_id, dst_id);
sync_last_chunk_sz = NULL;
}

new_packet = pkg_malloc(sizeof *new_packet);
new_packet = malloc(sizeof *new_packet);
if (!new_packet) {
LM_ERR("No more pkg memory\n");
return NULL;
}
new_packet->next = NULL;

if (bin_init(new_packet,&cl_extra_cap,CLUSTERER_SYNC,BIN_SYNC_VERSION,0)<0) {
if (_bin_init(new_packet,&cl_extra_cap,CLUSTERER_SYNC,BIN_SYNC_VERSION,0,1)<0) {
LM_ERR("Failed to init bin packet\n");
pkg_free(new_packet);
free(new_packet);
return NULL;
}

bin_push_str(new_packet, capability);
bin_push_int(new_packet, data_version);
sync_packet_snd = new_packet;
if (sync_packet_last)
sync_packet_last->next = new_packet;
else
sync_packets = new_packet;
sync_packet_last = new_packet;
sync_packets_cnt++;
}

if (sync_last_chunk_sz)
*sync_last_chunk_sz = prev_chunk_size;

/* reserve and remember a holder for the upcoming data chunk size */
bin_get_buffer(sync_packet_snd, &bin_buffer);
bin_push_int(sync_packet_snd, 0);
bin_get_buffer(sync_packet_last, &bin_buffer);
bin_push_int(sync_packet_last, 0);
sync_last_chunk_sz = (int *)(bin_buffer.s + bin_buffer.len);

bin_push_int(sync_packet_snd, SYNC_CHUNK_START_MARKER);
bin_push_int(sync_packet_last, SYNC_CHUNK_START_MARKER);

bin_get_buffer(sync_packet_snd, &bin_buffer);
bin_get_buffer(sync_packet_last, &bin_buffer);
sync_prev_buf_len = bin_buffer.len;

no_sync_chunks_sent++;

return sync_packet_snd;
return sync_packet_last;
}

int no_sync_chunks_iter;

/* this mechanism allows modules to ignore all or part of a sync chunk
* without disrupting the sequencing / consuming of the remaining data */
/* this mechanism allows modules to ignore all or part of a sync chunk on the
* receiver node, without affecting their consuming of remaining sync chunks */
char *next_data_chunk;

int cl_sync_chunk_iter(bin_packet_t *packet)
Expand Down Expand Up @@ -324,42 +323,49 @@ int cl_sync_chunk_iter(bin_packet_t *packet)

void send_sync_repl(int sender, void *param)
{
bin_packet_t sync_end_pkt;
bin_packet_t sync_end_pkt, *pkt, *next_pkt;
str bin_buffer;
struct local_cap *cap;
int rc, cluster_id;
int rc, cluster_id, pkt_no;
struct reply_rpc_params *p = (struct reply_rpc_params *)param;

lock_start_read(cl_list_lock);

for (cap = p->cluster->capabilities; cap; cap = cap->next)
if (!str_strcmp(&p->cap_name, &cap->reg.name))
break;
if (!cap) {
LM_ERR("Sync request for unknown capability: %.*s\n",
p->cap_name.len, p->cap_name.s);
lock_stop_read(cl_list_lock);
return;
}

no_sync_chunks_sent = 0;

cap->reg.event_cb(SYNC_REQ_RCV, p->node_id);

if (sync_packet_snd) {
bin_get_buffer(sync_packet_snd, &bin_buffer);
lock_start_read(cl_list_lock);

if (sync_packets) {
bin_get_buffer(sync_packet_last, &bin_buffer);
*sync_last_chunk_sz = bin_buffer.len - sync_prev_buf_len;

/* send and free the lastly built packet */
msg_add_trailer(sync_packet_snd, p->cluster->cluster_id, p->node_id);
msg_add_trailer(sync_packet_last, p->cluster->cluster_id, p->node_id);

for (pkt = sync_packets; pkt; pkt = next_pkt) {
next_pkt = pkt->next;

if ((rc = clusterer_send_msg(sync_packet_snd, p->cluster->cluster_id,
p->node_id, 0, 1))<0)
LM_ERR("Failed to send sync packet, rc=%d\n", rc);
if ((rc = clusterer_send_msg(pkt, p->cluster->cluster_id,
p->node_id, 0, 1))<0)
LM_ERR("Failed to send sync packet, rc=%d\n", rc);

bin_free_packet(pkt);
free(pkt);
}

bin_free_packet(sync_packet_snd);
pkg_free(sync_packet_snd);
sync_packet_snd = NULL;
sync_packets = NULL;
pkt_no = sync_packets_cnt;
sync_packets_cnt = 0;
sync_packet_last = NULL;
sync_last_chunk_sz = NULL;
}

Expand All @@ -386,8 +392,8 @@ void send_sync_repl(int sender, void *param)

bin_free_packet(&sync_end_pkt);

LM_INFO("Sent all sync packets for capability '%.*s' to node %d, cluster "
"%d\n", p->cap_name.len, p->cap_name.s, p->node_id, cluster_id);
LM_INFO("Sent all sync packets (%d) for capability '%.*s' to node %d, cluster "
"%d\n", pkt_no, p->cap_name.len, p->cap_name.s, p->node_id, cluster_id);

shm_free(param);
}
Expand Down

0 comments on commit d89f170

Please sign in to comment.