From 08b9dd1c8aafcd9479ee9ce0cd72cdc124bdb57f Mon Sep 17 00:00:00 2001 From: Vlad Paiu Date: Fri, 3 Jan 2025 17:55:24 +0200 Subject: [PATCH 1/3] Add async/launch for rtpengine_offer / rtpengine_answer / rtpengine_delete --- modules/rtpengine/rtpengine.c | 657 +++++++++++++++++++++++++++++----- modules/rtpengine/rtpengine.h | 3 + 2 files changed, 566 insertions(+), 94 deletions(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 41c04e0ed4..4a9ff0b127 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -228,6 +228,15 @@ struct rtpe_ignore_node { struct rtpe_ignore_node *next; }; +typedef struct rtpe_async_param_ { + bencode_buffer_t *bencbuf; + enum rtpe_operation op; + struct rtpe_node *node; + char* cookie; + pv_spec_t *spvar; + pv_spec_t *bpvar; +} rtpe_async_param; + static const char *command_strings[] = { [OP_OFFER] = "offer", [OP_ANSWER] = "answer", @@ -273,6 +282,12 @@ static const str stat_maps[] = { [STAT_PACKETLOSS_MAX_AT] = str_init("packetloss-max-at") }; +static int rtpengine_offer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar, + pv_spec_t *bpvar, str *body); +static int rtpengine_answer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar, + pv_spec_t *bpvar, str *body); +static int rtpengine_delete_af(struct sip_msg* msg, async_ctx *ctx, str *flags, pv_spec_t *spvar); + static char *gencookie(); static int rtpe_test(struct rtpe_node*, int, int); static int stop_recording_f(struct sip_msg* msg, str *flags, pv_spec_t *spvar); @@ -436,6 +451,23 @@ static struct rtp_relay_hooks rtp_relay; static pv_elem_t *extra_id_pv = NULL; +static acmd_export_t acmds[] = { + {"rtpengine_offer", (acmd_function)rtpengine_offer_af, { + {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}}, + {"rtpengine_answer", (acmd_function)rtpengine_answer_af, { + {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}}, + {"rtpengine_delete", (acmd_function)rtpengine_delete_af, { + {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, + {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}}, + {0, 0, {{0, 0, 0}}} +}; + static const cmd_export_t cmds[] = { {"rtpengine_use_set", (cmd_function)set_rtpengine_set_f, { {CMD_PARAM_INT, fixup_set_id, fixup_free_set_id}, {0,0,0}}, @@ -773,7 +805,7 @@ struct module_exports exports = { 0, /* load function */ &deps, /* OpenSIPS module dependencies */ cmds, - 0, + acmds, params, 0, /* exported statistics */ mi_cmds, /* exported MI functions */ @@ -1814,6 +1846,10 @@ static inline int rtpengine_connect_node(struct rtpe_node *pnode) freeaddrinfo(res); return 0; } + + pnode->ai_addrlen = res->ai_addrlen; + memcpy(&(pnode->ai_addr), res->ai_addr, res->ai_addrlen); + freeaddrinfo(res); return 1; } @@ -2589,129 +2625,117 @@ static int rtpe_check_ignore_node(str *error) return ret; } -static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg, - enum rtpe_operation op, str *flags_str, str *body_in, pv_spec_t *spvar, - struct rtpe_set *set, str *snode, bencode_item_t *extra_dict) +static int rtpe_function_call_prepare(bencode_buffer_t *bencbuf, struct sip_msg *msg, enum rtpe_operation op, + struct ng_flags_parse *ng_flags, str *flags_str, str *body_in, bencode_item_t *extra_dict, char **err) { - struct ng_flags_parse ng_flags; - bencode_item_t *item, *resp; - str viabranch, error; + bencode_item_t *item; + str viabranch; int ret, flags_exist = 0, callid_exist = 0, from_tag_exist = 0, to_tag_exist = 0; - struct rtpe_node *node, *failed_node; - char *cp, *err = NULL; - pv_value_t val; str flags_nt = {0,0}; - struct rtpe_ignore_node *ignore_list = NULL; - - /*** get & init basic stuff needed ***/ - memset(&ng_flags, 0, sizeof(ng_flags)); - error.len = 0; - error.s = ""; + if (bencode_buffer_init(bencbuf)) { + LM_ERR("could not initialize bencode_buffer_t\n"); + return -1; + } if (!extra_dict) { - if (bencode_buffer_init(bencbuf)) { - err = "could not initialize bencode_buffer_t"; - bencbuf = NULL; - goto error; - } - ng_flags.dict = bencode_dictionary(bencbuf); + ng_flags->dict = bencode_dictionary(bencbuf); } else { - ng_flags.dict = extra_dict; + ng_flags->dict = extra_dict; - ng_flags.flags = bencode_dictionary_get(ng_flags.dict, "flags"); - if (ng_flags.flags) + ng_flags->flags = bencode_dictionary_get(ng_flags->dict, "flags"); + if (ng_flags->flags) flags_exist = 1; - bencode_dictionary_get_str(ng_flags.dict, "call-id", &ng_flags.call_id); - bencode_dictionary_get_str(ng_flags.dict, "from-tag", &ng_flags.from_tag); - bencode_dictionary_get_str(ng_flags.dict, "to-tag", &ng_flags.to_tag); - if (ng_flags.call_id.len) + bencode_dictionary_get_str(ng_flags->dict, "call-id", &ng_flags->call_id); + bencode_dictionary_get_str(ng_flags->dict, "from-tag", &ng_flags->from_tag); + bencode_dictionary_get_str(ng_flags->dict, "to-tag", &ng_flags->to_tag); + if (ng_flags->call_id.len) callid_exist = 1; - if (ng_flags.from_tag.len) + if (ng_flags->from_tag.len) from_tag_exist = 1; - if (ng_flags.to_tag.len) + if (ng_flags->to_tag.len) to_tag_exist = 1; } if (!flags_exist) - ng_flags.flags = bencode_list(bencbuf); + ng_flags->flags = bencode_list(bencbuf); if (op == OP_OFFER || op == OP_ANSWER) { - ng_flags.direction = bencode_list(bencbuf); - ng_flags.replace = bencode_list(bencbuf); - ng_flags.rtcp_mux = bencode_list(bencbuf); + ng_flags->direction = bencode_list(bencbuf); + ng_flags->replace = bencode_list(bencbuf); + ng_flags->rtcp_mux = bencode_list(bencbuf); - bencode_dictionary_add_str(ng_flags.dict, "sdp", body_in); + bencode_dictionary_add_str(ng_flags->dict, "sdp", body_in); } else if (op == OP_SUBSCRIBE_ANSWER) { - bencode_dictionary_add_str(ng_flags.dict, "sdp", body_in); + bencode_dictionary_add_str(ng_flags->dict, "sdp", body_in); } /*** parse flags & build dictionary ***/ - ng_flags.to = (op == OP_DELETE) ? 0 : 1; + ng_flags->to = (op == OP_DELETE) ? 0 : 1; if (flags_str && pkg_nt_str_dup(&flags_nt, flags_str) < 0) { - err = "No more pkg mem"; + *err = "No more pkg mem"; goto error; } - if (parse_flags(&ng_flags, msg, &op, flags_nt.s)) { - err = "could not parse flags"; + if (parse_flags(ng_flags, msg, &op, flags_nt.s)) { + *err = "could not parse flags"; goto error; } - if (!ng_flags.call_id.len && - (get_callid(msg, &ng_flags.call_id) == -1 || ng_flags.call_id.len == 0)) { - err = "can't get Call-Id field"; + if (!ng_flags->call_id.len && + (get_callid(msg, &ng_flags->call_id) == -1 || ng_flags->call_id.len == 0)) { + *err = "can't get Call-Id field"; goto error; } - if (!ng_flags.to_tag.len && - get_to_tag(msg, &ng_flags.to_tag) == -1) { - err = "can't get To tag"; + if (!ng_flags->to_tag.len && + get_to_tag(msg, &ng_flags->to_tag) == -1) { + *err = "can't get To tag"; goto error; } - if (!ng_flags.from_tag.len && - (get_from_tag(msg, &ng_flags.from_tag) == -1 || ng_flags.from_tag.len == 0)) { - err = "can't get From tag"; + if (!ng_flags->from_tag.len && + (get_from_tag(msg, &ng_flags->from_tag) == -1 || ng_flags->from_tag.len == 0)) { + *err = "can't get From tag"; goto error; } /* only add those if any flags were given at all */ - if (ng_flags.direction && ng_flags.direction->child) - bencode_dictionary_add(ng_flags.dict, "direction", ng_flags.direction); - if (!flags_exist && ng_flags.flags && ng_flags.flags->child) - bencode_dictionary_add(ng_flags.dict, "flags", ng_flags.flags); - if (ng_flags.replace && ng_flags.replace->child) - bencode_dictionary_add(ng_flags.dict, "replace", ng_flags.replace); - if ((ng_flags.transport & 0x100)) - bencode_dictionary_add_string(ng_flags.dict, "transport-protocol", - transports[ng_flags.transport & 0x007]); - if (ng_flags.rtcp_mux && ng_flags.rtcp_mux->child) - bencode_dictionary_add(ng_flags.dict, "rtcp-mux", ng_flags.rtcp_mux); + if (ng_flags->direction && ng_flags->direction->child) + bencode_dictionary_add(ng_flags->dict, "direction", ng_flags->direction); + if (!flags_exist && ng_flags->flags && ng_flags->flags->child) + bencode_dictionary_add(ng_flags->dict, "flags", ng_flags->flags); + if (ng_flags->replace && ng_flags->replace->child) + bencode_dictionary_add(ng_flags->dict, "replace", ng_flags->replace); + if ((ng_flags->transport & 0x100)) + bencode_dictionary_add_string(ng_flags->dict, "transport-protocol", + transports[ng_flags->transport & 0x007]); + if (ng_flags->rtcp_mux && ng_flags->rtcp_mux->child) + bencode_dictionary_add(ng_flags->dict, "rtcp-mux", ng_flags->rtcp_mux); if (!callid_exist) - bencode_dictionary_add_str(ng_flags.dict, "call-id", &ng_flags.call_id); + bencode_dictionary_add_str(ng_flags->dict, "call-id", &ng_flags->call_id); - if (ng_flags.via) { - if (ng_flags.via == 1 || ng_flags.via == 2) - ret = get_via_branch(msg, ng_flags.via, &viabranch); - else if (ng_flags.via == -1 && extra_id_pv) + if (ng_flags->via) { + if (ng_flags->via == 1 || ng_flags->via == 2) + ret = get_via_branch(msg, ng_flags->via, &viabranch); + else if (ng_flags->via == -1 && extra_id_pv) ret = get_extra_id(msg, &viabranch); - else if (ng_flags.via == 4 && ng_flags.viabranch.len) { - viabranch = ng_flags.viabranch; + else if (ng_flags->via == 4 && ng_flags->viabranch.len) { + viabranch = ng_flags->viabranch; ret = 1; } else ret = -1; if (ret == -1 || viabranch.len == 0) { - err = "can't get Via branch/extra ID"; + *err = "can't get Via branch/extra ID"; goto error; } - bencode_dictionary_add_str(ng_flags.dict, "via-branch", &viabranch); + bencode_dictionary_add_str(ng_flags->dict, "via-branch", &viabranch); } item = bencode_list(bencbuf); - bencode_dictionary_add(ng_flags.dict, "received-from", item); + bencode_dictionary_add(ng_flags->dict, "received-from", item); if (msg) { - if (!ng_flags.received_from.len) { + if (!ng_flags->received_from.len) { bencode_list_add_string(item, (msg->rcv.src_ip.af == AF_INET) ? "IP4" : ( (msg->rcv.src_ip.af == AF_INET6) ? "IP6" : "?" @@ -2720,11 +2744,11 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_ } else { struct ip_addr *ip_tmp = NULL; - ip_tmp = str2ip(&ng_flags.received_from); + ip_tmp = str2ip(&ng_flags->received_from); if (!ip_tmp) { - ip_tmp = str2ip6(&ng_flags.received_from); + ip_tmp = str2ip6(&ng_flags->received_from); if (!ip_tmp) { - err = "received-from value is not an IP"; + *err = "received-from value is not an IP"; goto error; } } @@ -2741,41 +2765,72 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_ op == OP_BLOCK_MEDIA || op == OP_UNBLOCK_MEDIA || op == OP_BLOCK_DTMF || op == OP_UNBLOCK_DTMF || op == OP_START_FORWARD || op == OP_STOP_FORWARD) { - if (ng_flags.directional && !from_tag_exist) - bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.from_tag); - } else if (ng_flags.directional + if (ng_flags->directional && !from_tag_exist) + bencode_dictionary_add_str(ng_flags->dict, "from-tag", &ng_flags->from_tag); + } else if (ng_flags->directional || (msg && ((msg->first_line.type == SIP_REQUEST && op != OP_ANSWER) || (msg->first_line.type == SIP_REPLY && op == OP_DELETE) || (msg->first_line.type == SIP_REPLY && op == OP_ANSWER)))) { if (!from_tag_exist) - bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.from_tag); + bencode_dictionary_add_str(ng_flags->dict, "from-tag", &ng_flags->from_tag); if (op != OP_START_MEDIA && op != OP_STOP_MEDIA) { /* no need of to-tag if we are just playing media */ - if (ng_flags.to && ng_flags.to_tag.s && ng_flags.to_tag.len && !to_tag_exist && !extra_dict) - bencode_dictionary_add_str(ng_flags.dict, "to-tag", &ng_flags.to_tag); + if (ng_flags->to && ng_flags->to_tag.s && ng_flags->to_tag.len && !to_tag_exist && !extra_dict) + bencode_dictionary_add_str(ng_flags->dict, "to-tag", &ng_flags->to_tag); } } else { - if (!ng_flags.to_tag.s || !ng_flags.to_tag.len) { - err = "No to-tag present"; + if (!ng_flags->to_tag.s || !ng_flags->to_tag.len) { + *err = "No to-tag present"; goto error; } if (!from_tag_exist) - bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.to_tag); + bencode_dictionary_add_str(ng_flags->dict, "from-tag", &ng_flags->to_tag); if (!to_tag_exist && !extra_dict) - bencode_dictionary_add_str(ng_flags.dict, "to-tag", &ng_flags.from_tag); + bencode_dictionary_add_str(ng_flags->dict, "to-tag", &ng_flags->from_tag); } - bencode_dictionary_add_string(ng_flags.dict, "command", command_strings[op]); - - /*** send it out ***/ + bencode_dictionary_add_string(ng_flags->dict, "command", command_strings[op]); if (bencbuf->error) { - err = "out of memory - bencode failed"; + LM_ERR("bencode failed\n"); goto error; } + if (flags_nt.s) + pkg_free(flags_nt.s); + + return 1; + +error: + if (flags_nt.s) + pkg_free(flags_nt.s); + return -1; +} + +static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg, + enum rtpe_operation op, str *flags_str, str *body_in, pv_spec_t *spvar, + struct rtpe_set *set, str *snode, bencode_item_t *extra_dict) +{ + struct ng_flags_parse ng_flags; + bencode_item_t *resp; + str error; + struct rtpe_node *node, *failed_node; + char *cp, *err = NULL; + pv_value_t val; + struct rtpe_ignore_node *ignore_list = NULL; + int ret; + + memset(&ng_flags, 0, sizeof(ng_flags)); + error.len = 0; + error.s = ""; + + /*** get & init basic stuff needed ***/ + if (rtpe_function_call_prepare(bencbuf, msg, op, &ng_flags, flags_str, body_in, extra_dict,&err) < 0) + goto error; + + /*** send it out ***/ if (!set && (set=rtpe_ctx_set_get())==NULL ) set = *default_rtpe_set; @@ -2853,15 +2908,10 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_ LM_ERR("setting rtpengine pvar failed\n"); } - if (flags_nt.s) - pkg_free(flags_nt.s); - return resp; error: rtpe_free_ignore_node(ignore_list); - if (flags_nt.s) - pkg_free(flags_nt.s); if (err) { LM_ERR("%s\n", err); init_str(&error, err); @@ -3311,6 +3361,425 @@ rtpengine_delete_f(struct sip_msg* msg, str *flags, pv_spec_t *spvar) return rtpengine_delete(msg, flags, NULL, NULL, spvar); } +static bencode_item_t *rtpe_function_call_process(bencode_buffer_t *bencbuf, char* cp, int ret) +{ + bencode_item_t *resp; + str error; + + resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY); + if (!resp) { + LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp); + return NULL; + } + if (!bencode_dictionary_get_strcmp(resp, "result", "error")) { + if (!bencode_dictionary_get_str(resp, "error-reason", &error)) + LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp); + else + LM_ERR("proxy replied with error: %.*s\n", error.len, error.s); + return NULL; + } + + return resp; +} + +enum async_ret_code resume_async_send_rtpe_command(int fd, struct sip_msg *msg, void *_param) +{ + int len = 0, cookielen = 0; + static char buf[0x10000]; + char* cp = buf; + bencode_item_t *dict; + str oldbody = { 0, 0 }; + str newbody; + struct lump *anchor; + pv_value_t val; + struct rtpe_ctx *ctx; + rtpe_async_param *param = (rtpe_async_param *)_param; + + LM_DBG("Need to resume async rtpe call \n"); + + if (param->node->rn_umode == 0) { + do { + len = read(fd, buf, sizeof(buf) - 1); + } while (len == -1 && errno == EINTR); + close(fd); + if (len <= 0) { + LM_ERR("can't read reply from a RTP Engine\n"); + goto error; + } + } else { + /* UDP got triggered, wait for 1second max to read now */ + struct timeval tv; + tv.tv_sec = 1; + tv.tv_usec = 0; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv); + + len = recv(fd, buf, sizeof(buf)-1, 0); + if (len <= 0) { + LM_ERR("can't read reply from a RTP Engine (%d, %d)\n", len, errno); + RTPE_IO_ERROR_CLOSE(param->node->idx); + goto error; + } + cookielen = strlen(param->cookie); + if (len >= (cookielen - 1) && + memcmp(buf, param->cookie, (cookielen - 1)) == 0) { + len -= (cookielen - 1); + cp += (cookielen - 1); + if (len != 0) { + len--; + cp++; + } + } else { + LM_ERR("read reply from a RTP Ending FD %d. Cookie does NOT match (%.*s <> %.*s)\n", + fd, cookielen - 1, param->cookie, cookielen - 1, buf); + goto error; + } + } + + /* store the value of the selected node */ + if (param->spvar) { + memset(&val, 0, sizeof(pv_value_t)); + val.flags = PV_VAL_STR; + val.rs = param->node->rn_url; + if(pv_set_value(msg, param->spvar, (int)EQ_T, &val)<0) + LM_ERR("setting rtpengine pvar failed\n"); + } + + /* process reply */ + dict = rtpe_function_call_process(param->bencbuf, cp, len); + if(!dict) { + goto error; + } + + if (param->op != OP_DELETE) { + LM_DBG("Got reply for a non-delete - yay \n"); + if (!bencode_dictionary_get_str_dup(dict, "sdp", &newbody)) { + LM_ERR("failed to extract sdp body from proxy reply\n"); + goto error; + } + + /* if we have a variable to store into, use it */ + if (param->bpvar) { + memset(&val, 0, sizeof(pv_value_t)); + val.flags = PV_VAL_STR; + val.rs = newbody; + if(pv_set_value(msg, param->bpvar, (int)EQ_T, &val)<0) + LM_ERR("setting PV failed\n"); + pkg_free(newbody.s); + } else if (extract_body(msg, &oldbody) > 0) { + /* otherwise directly set the body of the message */ + anchor = del_lump(msg, oldbody.s - msg->buf, oldbody.len, 0); + if (!anchor) { + LM_ERR("del_lump failed\n"); + goto error; + } + if (!insert_new_lump_after(anchor, newbody.s, newbody.len, 0)) { + LM_ERR("insert_new_lump_after failed\n"); + goto error; + } + } else { + LM_ERR("cannot parse old body!\n"); + goto error; + } + } + + if (param->op == OP_DELETE && rtpengine_stats_used) { + /* if statistics are to be used, store stats in the ctx, if possible */ + if ((ctx = rtpe_ctx_get())) { + if (ctx->stats) { + rtpe_stats_free(ctx->stats); /* release the buffer */ + pkg_free(&(ctx->stats->buf)); + } else + ctx->stats = pkg_malloc(sizeof *ctx->stats); + if (ctx->stats) { + ctx->stats->buf = *(param->bencbuf); + ctx->stats->dict = dict; + ctx->stats->json.s = 0; + /* return here to prevent buffer from being freed */ + pkg_free(param->bencbuf); + pkg_free(param->cookie); + pkg_free(param); + async_status = ASYNC_DONE_CLOSE_FD; + return 1; + } else + LM_WARN("no more pkg memory - cannot cache stats!\n"); + } + } + + pkg_free(param->cookie); + bencode_buffer_free(param->bencbuf); + pkg_free(param->bencbuf); + pkg_free(param); + async_status = ASYNC_DONE_CLOSE_FD; + return 1; + +error: + pkg_free(param->cookie); + bencode_buffer_free(param->bencbuf); + pkg_free(param->bencbuf); + pkg_free(param); + async_status = ASYNC_DONE_CLOSE_FD; + return -1; +} + +enum async_ret_code timeout_async_send_rtpe_command(int fd, struct sip_msg *msg, void *_param) +{ + rtpe_async_param *param = (rtpe_async_param *)_param; + LM_ERR("can't read reply from a RTP proxy - TIMEOUT on %s\n",param->node->rn_url.s); + + param->node->rn_disabled = 1; + param->node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; + + pkg_free(param->cookie); + bencode_buffer_free(param->bencbuf); + pkg_free(param->bencbuf); + pkg_free(param); + async_status = ASYNC_DONE_CLOSE_FD; + return -1; +} + +static int start_async_send_rtpe_command(struct rtpe_node *node, bencode_item_t *dict, char* cookie, enum async_ret_code *out_fd) +{ + struct sockaddr_un addr; + int fd=-1, len, vcnt; + char buf[0x10000]; + struct pollfd fds[1]; + struct iovec *v; + + v = bencode_iovec(dict, &vcnt, 1, 0); + if (!v) { + LM_ERR("error converting bencode to iovec\n"); + goto error; + } + + len = 0; + if (node->rn_umode == 0) { + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_LOCAL; + strncpy(addr.sun_path, node->rn_address, + sizeof(addr.sun_path) - 1); +#ifdef HAVE_SOCKADDR_SA_LEN + addr.sun_len = strlen(addr.sun_path); +#endif + + fd = socket(AF_LOCAL, SOCK_STREAM, 0); + if (fd < 0) { + LM_ERR("can't create socket %d \n",errno); + goto badproxy; + } + if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { + LM_ERR("can't connect to RTP proxy %s (%d:%s)\n",node->rn_url.s,errno,strerror(errno)); + close(fd); + goto badproxy; + } + + do { + len = writev(fd, v + 1, vcnt); + } while (len == -1 && errno == EINTR); + if (len <= 0) { + LM_ERR("can't send command to RTP proxy %s (%d:%s)\n",node->rn_url.s, + errno, strerror(errno)); + close(fd); + goto badproxy; + } + *out_fd = fd; + } else { + if (rtpe_socks[node->idx] != -1) { + fds[0].fd = rtpe_socks[node->idx]; + fds[0].events = POLLIN; + fds[0].revents = 0; + /* Drain input buffer */ + while ((poll(fds, 1, 0) == 1) && + ((fds[0].revents & POLLIN) != 0)) { + if (fds[0].revents & (POLLERR|POLLNVAL|POLLHUP)) { + LM_WARN("error on rtpengine socket %d!\n", rtpe_socks[node->idx]); + RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]); + break; + } + fds[0].revents = 0; + if (recv(rtpe_socks[node->idx], buf, sizeof(buf) - 1, 0) < 0 && errno != EINTR) { + LM_WARN("error while draining rtpengine %d!\n", errno); + RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]); + break; + } + } + } + + v[0].iov_base = cookie; + v[0].iov_len = strlen(v[0].iov_base); + if (rtpe_socks[node->idx] == -1 && !rtpengine_connect_node(node)) { + LM_ERR("cannot reconnect RTP engine socket!\n"); + goto badproxy; + } + + fd = socket((node->rn_umode == 6) ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); + if (fd < 0) { + LM_ERR("can't create socket %d \n",errno); + goto badproxy; + } + if (connect(fd, &(node->ai_addr), node->ai_addrlen) < 0) { + LM_ERR("can't connect to RTP proxy %s (%d:%s)\n",node->rn_url.s,errno,strerror(errno)); + close(fd); + goto badproxy; + } + do { + len = writev(fd, v, vcnt + 1); + } while (len == -1 && (errno == EINTR || errno == ENOBUFS || errno == EMSGSIZE)); + + if (len <= 0) { + LM_ERR("can't send command to RTP proxy %s (%d:%s)\n",node->rn_url.s, + errno, strerror(errno)); + + RTPE_IO_ERROR_CLOSE(fd); + goto badproxy; + } + *out_fd = fd; + } + + return 1; + +badproxy: + LM_ERR("proxy <%s> does not respond, disable it\n", node->rn_url.s); + node->rn_disabled = 1; + node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout; +error: + if (fd>0) + close(fd); + + *out_fd = ASYNC_NO_IO; + return -1; +} + + +static int rtpe_function_call_async(struct sip_msg *msg, async_ctx *ctx, str *flags_str, + pv_spec_t *spvar, pv_spec_t *bpvar, str *body, enum rtpe_operation op) +{ + struct ng_flags_parse ng_flags; + str oldbody; + struct rtpe_node *node; + struct rtpe_set *set; + int ret, read_fd; + rtpe_async_param *param; + char* cookie = NULL; + char *err; + + bencode_buffer_t *bencbuf = pkg_malloc(sizeof(bencode_buffer_t)); + memset(&ng_flags, 0, sizeof(ng_flags)); + + /*** get & init basic stuff needed ***/ + + if (op != OP_DELETE) { + if (!body) { + if (extract_body(msg, &oldbody) == -1) { + LM_ERR("can't extract body from the message\n"); + goto error; + } + } else { + oldbody = *body; + } + } + + if(rtpe_function_call_prepare(bencbuf, msg, op, &ng_flags, flags_str, &oldbody, NULL,&err) < 0) + goto error; + + /*** send it out ***/ + if ( (set=rtpe_ctx_set_get())==NULL ) + set = *default_rtpe_set; + + RTPE_START_READ(); + + /* FIXME - failover logic for async is disabled for now */ + node = select_rtpe_node(ng_flags.call_id, set,NULL); + if (!node) { + LM_ERR("no available proxies\n"); + RTPE_STOP_READ(); + goto error; + } + + cookie = gencookie(); + ret = start_async_send_rtpe_command(node, ng_flags.dict, cookie, &read_fd); + + RTPE_STOP_READ(); + LM_DBG("async proxy reply: %d\n", ret); + + if (read_fd == ASYNC_NO_IO) { + ctx->resume_f = NULL; + ctx->resume_param = NULL; + bencode_buffer_free(bencbuf); + pkg_free(bencbuf); + return ret; + } else if (read_fd == ASYNC_SYNC) { + /* no need for async - transfer already completed! */ + async_status = ASYNC_SYNC; + bencode_buffer_free(bencbuf); + pkg_free(bencbuf); + return ret; + } + + param = pkg_malloc(sizeof(rtpe_async_param)); + if (!param) { + LM_ERR("no more pkg mem\n"); + goto error; + } + memset(param, 0, sizeof(rtpe_async_param)); + + param->bencbuf = bencbuf; + param->op = op; + param->node = node; + param->cookie = pkg_strdup(cookie); + param->bpvar = bpvar; + param->spvar = spvar; + + ctx->resume_f = resume_async_send_rtpe_command; + ctx->timeout_f = timeout_async_send_rtpe_command; + ctx->resume_param = param; + + /* async started with success */ + async_status = read_fd; + return 1; + +error: + bencode_buffer_free(bencbuf); + pkg_free(bencbuf); + return -1; +} + +static int +rtpengine_offer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar, pv_spec_t *bpvar, str *body) +{ + LM_DBG("Async rtpengine_offer\n"); + + if (set_rtpengine_set_from_avp(msg) == -1) + return -1; + + return rtpe_function_call_async(msg, ctx, flags, spvar, bpvar, body, OP_OFFER); +} + +static int +rtpengine_answer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar, pv_spec_t *bpvar, str *body) +{ + LM_DBG("Async rtpengine_answer\n"); + + if (set_rtpengine_set_from_avp(msg) == -1) + return -1; + + if (msg->first_line.type == SIP_REQUEST) + if (msg->first_line.u.request.method_value != METHOD_ACK) + return -1; + + return rtpe_function_call_async(msg, ctx, flags, spvar, bpvar, body, OP_ANSWER); +} + +static int +rtpengine_delete_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar) +{ + LM_DBG("Async rtpengine_delete\n"); + + if (set_rtpengine_set_from_avp(msg) == -1) + return -1; + + return rtpe_function_call_async(msg, ctx, flags, spvar, NULL, NULL, OP_DELETE); +} + /* This function assumes p points to a line of requested type. */ static int diff --git a/modules/rtpengine/rtpengine.h b/modules/rtpengine/rtpengine.h index a8c2bdd747..17241ae213 100644 --- a/modules/rtpengine/rtpengine.h +++ b/modules/rtpengine/rtpengine.h @@ -45,6 +45,9 @@ struct rtpe_node { unsigned int rn_recheck_ticks; unsigned int rn_last_ticks; int rn_flags; + socklen_t ai_addrlen; + struct sockaddr ai_addr; + struct rtpe_node *rn_next; }; From 953d83d80d33e12c06d843442b2690144a3f813a Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Wed, 20 Nov 2024 13:45:43 -0500 Subject: [PATCH 2/3] minor_fixes --- modules/presence/notify.c | 2 +- modules/presence/presence.c | 2 +- modules/rtpengine/rtpengine.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/presence/notify.c b/modules/presence/notify.c index 7b6bb186fd..601db3cec3 100644 --- a/modules/presence/notify.c +++ b/modules/presence/notify.c @@ -1520,7 +1520,7 @@ int get_subs_db(str* pres_uri, pres_ev_t* event, str* sender, if(result->n <=0 ) { - LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s" + LM_DBG("The query for subscription for [uri]= %.*s for [event]= %.*s" " returned no result\n",pres_uri->len, pres_uri->s, event->name.len, event->name.s); pa_dbf.free_result(pa_db, result); diff --git a/modules/presence/presence.c b/modules/presence/presence.c index 9e326b89e6..20987b9012 100644 --- a/modules/presence/presence.c +++ b/modules/presence/presence.c @@ -874,7 +874,7 @@ static mi_response_t *mi_list_shtable(const mi_params_t *params, str *from, str lock_get(&subs_htable[i].lock); for (s = subs_htable[i].entries->next; s; s = s->next) { if (from) { - /* print subscribtion if "from" and "to" match with given wildcard */ + /* print subscription if "from" and "to" match with given wildcard */ rc = from_to_match_subs(s, &match_from, &match_to, from_w, to_w); if (rc < 0) goto error; diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 4a9ff0b127..23f9432a1d 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -4615,7 +4615,7 @@ static int rtpengine_stopmedia_f(struct sip_msg* msg, str *flags, dict = rtpe_function_call_ok(&bencbuf, msg, OP_STOP_MEDIA, flags, NULL, spvar, NULL, NULL, NULL); if (!dict) { - LM_ERR("could not start media!\n"); + LM_ERR("could not stop media!\n"); return -1; } From 6291cff1aab104a61d433987811f9c1424295eb0 Mon Sep 17 00:00:00 2001 From: Razvan Crainea Date: Wed, 8 Jan 2025 18:19:57 +0200 Subject: [PATCH 3/3] rtpengine: add documentation for async commands --- modules/rtpengine/doc/rtpengine_admin.xml | 66 +++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/modules/rtpengine/doc/rtpengine_admin.xml b/modules/rtpengine/doc/rtpengine_admin.xml index d60d534b69..006a5fcf03 100644 --- a/modules/rtpengine/doc/rtpengine_admin.xml +++ b/modules/rtpengine/doc/rtpengine_admin.xml @@ -1349,6 +1349,72 @@ rtpengine_play_dtmf("0"); # send the 0 code upstream +
+ Exported Asyncronous Functions +
+ <function moreinfo="none">rtpengine_offer([flags[, sock_pvar[, sdp_pvar[, body]]]])</function> + + The asynchronous flavor of the + function. It receives the same parameters, with the same meanings. + + + Example of async rtpengine_offer() usage + +... +if (is_method("ACK") && has_totag() && has_body_part("application/sdp")) { + async(rtpengine_offer(), resume_invite); +} +... +route[resume_invite] { + t_relay(); +} +... + + +
+
+ <function moreinfo="none">rtpengine_answer([flags[, sock_pvar[, sdp_pvar[, body]]]])</function> + + The asynchronous flavor of the + function. It receives the same parameters, with the same meanings. + + + Example of async rtpengine_answer() usage + +... +if (is_method("ACK") && has_body_part("application/sdp")) { + # late negotiation + async(rtpengine_answer(), resume_ack); +} +... +route[resume_ack] { + t_relay(); +} +... + + +
+
+ + <function moreinfo="none">rtpengine_delete([flags[, sock_var]])</function> + + + The asynchronous flavor of the + function. It receives the same parameters, with the same meanings. + + + Example of async rtpengine_delete() usage + +... +if (is_method("BYE")) { + launch(rtpengine_delete()); +} +... + + +
+
+
Exported Pseudo-Variables