Skip to content

Commit

Permalink
siprec: add replication over B2B support
Browse files Browse the repository at this point in the history
This enables replication of the siprec structures over multiple nodes,
allowing them to failover in case the main instance dissapears.

Many thanks to Voxtronic for sponsoring this work!
  • Loading branch information
razvancrainea committed Dec 19, 2023
1 parent d836f4a commit f2f68d5
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 59 deletions.
13 changes: 13 additions & 0 deletions modules/siprec/siprec_logic.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ int src_init(void)
return -1;
}

if (srec_b2b.register_cb(src_event_received,
B2BCB_RECV_EVENT, &mod_name) < 0) {
LM_ERR("could not register SIPREC event receive callback!\n");
return -1;
}

if (srec_b2b.register_cb(src_event_trigger,
B2BCB_TRIGGER_EVENT, &mod_name) < 0) {
LM_ERR("could not register SIPREC event trigger callback!\n");
return -1;
}


return 0;
}

Expand Down
201 changes: 142 additions & 59 deletions modules/siprec/siprec_sess.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,15 @@ int src_add_participant(struct src_sess *sess, str *aor, str *name,

#define SIPREC_BIN_POP(_type, _value) \
do { \
if (bin_pop_##_type(&packet, _value) < 0) { \
if (bin_pop_##_type(packet, _value) < 0) { \
LM_ERR("cannot pop '" #_value "' from bin packet!\n"); \
goto error; \
} \
} while (0)

void srec_loaded_callback(struct dlg_cell *dlg, int type,
struct dlg_cb_params *params)

static int srec_pop_sess(struct dlg_cell *dlg, bin_packet_t *packet)
{
int_str buf;
int val_type;
struct src_sess *sess = NULL;
struct srs_node *node = NULL;
bin_packet_t packet;
int version;
time_t ts;
str tmp, media_ip, srs_uri, group;
Expand All @@ -288,37 +283,29 @@ void srec_loaded_callback(struct dlg_cell *dlg, int type,
int p_type;
int flags;
str from_tag, to_tag;
struct srs_node *node = NULL;
struct src_sess *sess = NULL;

if (!dlg) {
LM_ERR("null dialog - cannot fetch siprec info!\n");
return;
/* first, double check if we've already done this */
sess = (struct src_sess *)srec_dlg.dlg_ctx_get_ptr(dlg, srec_dlg_idx);
if (sess) {
LM_DBG("SIPREC session already popped\n");
return 0;
}

/* retrieve the RTP information */
rtp = srec_rtp.get_ctx_dlg(dlg);
if (!rtp) {
LM_DBG("no RTP Relay context not available!\n");
return;
}

if (srec_dlg.fetch_dlg_value(dlg, &srec_dlg_name, &val_type, &buf, 0) < 0) {
LM_DBG("cannot fetch siprec info from the dialog\n");
return;
}

bin_init_buffer(&packet, buf.s.s, buf.s.len);

if (get_bin_pkg_version(&packet) != SIPREC_SESSION_VERSION) {
LM_ERR("invalid serialization version (%d != %d)\n",
get_bin_pkg_version(&packet), SIPREC_SESSION_VERSION);
return;
return -1;
}

SIPREC_BIN_POP(str, &tmp);

if (tmp.len != sizeof(ts)) {
LM_ERR("invalid length for timestamp (%d != %d)\n", tmp.len,
(int)sizeof(ts));
return;
return -1;
}
memcpy(&ts, tmp.s, tmp.len);
SIPREC_BIN_POP(int, &version);
Expand Down Expand Up @@ -348,7 +335,7 @@ void srec_loaded_callback(struct dlg_cell *dlg, int type,
if (tmp.len != sizeof(siprec_uuid)) {
LM_ERR("invalid length for uuid (%d != %d)\n", tmp.len,
(int)sizeof(siprec_uuid));
return;
return -1;
}
memcpy(&uuid, tmp.s, tmp.len);

Expand All @@ -359,7 +346,7 @@ void srec_loaded_callback(struct dlg_cell *dlg, int type,
(session_custom_extension.len ? &session_custom_extension : NULL));
if (!sess) {
LM_ERR("cannot create a new siprec session!\n");
return;
return -1;
}
sess->flags = flags;

Expand Down Expand Up @@ -416,7 +403,7 @@ void srec_loaded_callback(struct dlg_cell *dlg, int type,
if (tmp.len != sizeof(ts)) {
LM_ERR("invalid length for timestamp (%d != %d)\n", tmp.len,
(int)sizeof(ts));
return;
goto error;
}
memcpy(&ts, tmp.s, tmp.len);
if (src_add_participant(sess, &aor, &name, xml, &uuid, &ts) < 0) {
Expand All @@ -442,29 +429,61 @@ void srec_loaded_callback(struct dlg_cell *dlg, int type,
}
}

/* all good: continue with dialog support! */
SIPREC_REF(sess);
srec_hlog(sess, SREC_REF, "registered dlg");
sess->dlg = dlg;

/* restore b2b callbacks */
if (srec_restore_callback(sess) < 0) {
LM_ERR("cannot restore b2b callbacks!\n");
return;
goto error_unref;
}

/* all good: continue with dialog support! */
SIPREC_REF(sess);
srec_hlog(sess, SREC_REF, "registered dlg");
sess->dlg = dlg;
srec_dlg.dlg_ctx_put_ptr(dlg, srec_dlg_idx, sess);

if (srec_register_callbacks(sess) < 0) {
LM_ERR("cannot register callback for terminating session\n");
srec_hlog(sess, SREC_UNREF, "error registering dlg callbacks");
SIPREC_UNREF(sess);
goto error;
goto error_unref;
}

return;
return 0;
error_unref:
srec_hlog(sess, SREC_UNREF, "error registering dlg callbacks");
SIPREC_UNREF(sess);
error:
if (sess)
src_free_session(sess);
return -1;
}

void srec_loaded_callback(struct dlg_cell *dlg, int type,
struct dlg_cb_params *params)
{
int_str buf;
int val_type;
bin_packet_t packet;

if (!dlg) {
LM_ERR("null dialog - cannot fetch siprec info!\n");
return;
}

if (srec_dlg.fetch_dlg_value(dlg, &srec_dlg_name, &val_type, &buf, 0) < 0) {
LM_DBG("cannot fetch siprec info from the dialog\n");
return;
}

bin_init_buffer(&packet, buf.s.s, buf.s.len);

if (get_bin_pkg_version(&packet) != SIPREC_SESSION_VERSION) {
LM_ERR("invalid serialization version (%d != %d)\n",
get_bin_pkg_version(&packet), SIPREC_SESSION_VERSION);
return;
}

if (srec_pop_sess(dlg, &packet) < 0)
LM_ERR("failed to pop SIPREC session\n");
}
#undef SIPREC_BIN_POP

Expand All @@ -480,35 +499,18 @@ static inline str *srec_serialize(void *field, int size)

#define SIPREC_BIN_PUSH(_type, _value) \
do { \
if (bin_push_##_type(&packet, _value) < 0) { \
if (bin_push_##_type(packet, _value) < 0) { \
LM_ERR("cannot push '" #_value "' in bin packet!\n"); \
bin_free_packet(&packet); \
return; \
return -1; \
} \
} while (0)

void srec_dlg_write_callback(struct dlg_cell *dlg, int type,
struct dlg_cb_params *params)
static int srec_push_sess(struct src_sess *ss, bin_packet_t *packet)
{
str name = str_init("siprec");
str empty = str_init("");
bin_packet_t packet;
struct src_sess *ss;
int p, c;
int_str buffer;
struct list_head *l;
struct srs_sdp_stream *s;

if (!params) {
LM_ERR("no parameter specified to dlg callback!\n");
return;
}
ss = *params->param;

if (bin_init(&packet, &name, 0, SIPREC_SESSION_VERSION, 0) < 0) {
LM_ERR("cannot initialize bin packet!\n");
return;
}
int p, c;

SIPREC_BIN_PUSH(str, SIPREC_SERIALIZE(ss->ts));
SIPREC_BIN_PUSH(int, ss->version);
Expand Down Expand Up @@ -569,6 +571,33 @@ void srec_dlg_write_callback(struct dlg_cell *dlg, int type,
SIPREC_BIN_PUSH(str, SIPREC_SERIALIZE(s->uuid));
}
}
return 0;
}

void srec_dlg_write_callback(struct dlg_cell *dlg, int type,
struct dlg_cb_params *params)
{
str name = str_init("siprec");
bin_packet_t packet;
struct src_sess *ss;
int_str buffer;

if (!params) {
LM_ERR("no parameter specified to dlg callback!\n");
return;
}
ss = *params->param;

if (bin_init(&packet, &name, 0, SIPREC_SESSION_VERSION, 0) < 0) {
LM_ERR("cannot initialize bin packet!\n");
return;
}
if (srec_push_sess(ss, &packet) < 0) {
LM_ERR("cannot push session in bin packet!\n");
bin_free_packet(&packet);
return;
}

bin_get_buffer(&packet, &buffer.s);
bin_free_packet(&packet);

Expand All @@ -577,5 +606,59 @@ void srec_dlg_write_callback(struct dlg_cell *dlg, int type,
return;
}
}

static void src_event_trigger_create(struct src_sess *sess, bin_packet_t *store)
{
if (srec_push_sess(sess, store) < 0)
LM_WARN("could not create replicated session!\n");
}

void src_event_trigger(enum b2b_entity_type et, str *key,
str *logic_key, void *param, enum b2b_event_type event_type,
bin_packet_t *store, int backend)
{
struct src_sess *sess = (struct src_sess *)param;

switch (event_type) {
case B2B_EVENT_CREATE:
src_event_trigger_create(sess, store);
break;
default:
/* nothing else for now */
break;
}
}

static void src_event_receive_create(str *key, bin_packet_t *packet)
{
struct dlg_cell *dlg;
/* search for the dialog based on the key */
dlg = srec_dlg.get_dlg_by_callid(key, 0);
if (!dlg) {
LM_ERR("cannot find replicated dialog for callid %.*s\n", key->len, key->s);
return;
}

if (srec_pop_sess(dlg, packet) < 0)
LM_ERR("failed to pop SIPREC session\n");
srec_dlg.dlg_unref(dlg, 1);
}

void src_event_received(enum b2b_entity_type et, str *key,
str *logic_key, void *param, enum b2b_event_type event_type,
bin_packet_t *store, int backend)
{
if (!store)
return;

switch (event_type) {
case B2B_EVENT_CREATE:
src_event_receive_create(logic_key, store);
break;
default:
/* nothing else for now */
break;
}
}
#undef SIPREC_SERIALIZE
#undef SIPREC_BIN_PUSH
6 changes: 6 additions & 0 deletions modules/siprec/siprec_sess.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,11 @@ void srec_loaded_callback(struct dlg_cell *dlg, int type,
struct dlg_cb_params *params);
void srec_dlg_write_callback(struct dlg_cell *dlg, int type,
struct dlg_cb_params *params);
void src_event_trigger(enum b2b_entity_type et, str *key,
str *logic_key, void *param, enum b2b_event_type event_type,
bin_packet_t *store, int backend);
void src_event_received(enum b2b_entity_type et, str *key,
str *logic_key, void *param, enum b2b_event_type event_type,
bin_packet_t *store, int backend);

#endif /* _SIPREC_SESS_H_ */

0 comments on commit f2f68d5

Please sign in to comment.