diff --git a/modules/mqueue/Makefile b/modules/mqueue/Makefile new file mode 100644 index 00000000000..ae97d8aee7b --- /dev/null +++ b/modules/mqueue/Makefile @@ -0,0 +1,7 @@ +# WARNING: do not run this directly, it should be run by the master Makefile + +include ../../Makefile.defs +auto_gen= +NAME=mqueue.so + +include ../../Makefile.modules diff --git a/modules/mqueue/api.h b/modules/mqueue/api.h new file mode 100644 index 00000000000..ad08a04ceab --- /dev/null +++ b/modules/mqueue/api.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com) + * + * This file is part of opensips, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + + +#ifndef _MQUEUE_EXT_API_H_ +#define _MQUEUE_EXT_API_H_ + +typedef int (*mq_add_f)(str *, str *, str *); +typedef struct mq_api +{ + mq_add_f add; +} mq_api_t; + +typedef int (*bind_mq_f)(mq_api_t *api); + +static inline int load_mq_api(mq_api_t *api) +{ + bind_mq_f bindmq; + + bindmq = (bind_mq_f)find_export("bind_mq", 0); + if(bindmq == 0) { + LM_ERR("cannot find bind_mq\n"); + return -1; + } + if(bindmq(api) < 0) { + LM_ERR("cannot bind mq api\n"); + return -1; + } + return 0; +} + +#endif diff --git a/modules/mqueue/doc/contributors.xml b/modules/mqueue/doc/contributors.xml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/modules/mqueue/doc/mqueue.xml b/modules/mqueue/doc/mqueue.xml new file mode 100644 index 00000000000..964bd0d0e0c --- /dev/null +++ b/modules/mqueue/doc/mqueue.xml @@ -0,0 +1,29 @@ + + + + + + +%docentities; + +]> + + + + mqueue Module + &osipsname; + + + + &admin; + &contrib; + + &docCopyrights; + ©right; 2010 Elena-Ramona Modroiu + ©right; 2018-2020 Julien chavanton, Flowroute + ©right; 2024 Ovidiu Sas, VoIP Embedded, Inc. + diff --git a/modules/mqueue/doc/mqueue_admin.xml b/modules/mqueue/doc/mqueue_admin.xml new file mode 100644 index 00000000000..0f01a6ac0bd --- /dev/null +++ b/modules/mqueue/doc/mqueue_admin.xml @@ -0,0 +1,338 @@ + + + + + &adminguide; + +
+ Overview + + The mqueue module offers a generic message queue system in shared + memory for inter-process communication using the config file. + One example of usage is to send time consuming operations to one or + several timer processes that consumes items in the queue, without + affecting SIP message handling in the socket-listening process. + + + There can be many defined queues. Access to queued values is done via + pseudo variables. + +
+
+ Dependencies +
+ &osips; Modules + + The following modules must be loaded before this module: + + + None. + + + +
+
+ External Libraries or Applications + + The following libraries or applications must be installed before + running &osips; with this module loaded: + + + None. + + + +
+
+ + +
+ Exported Parameters + +
+ <varname>db_url</varname> (str) + + The URL to connect to database for loading values + in mqueue table at start up and/or saving values at shutdown. + + + Default value is NULL (do not connect). + + + Set <varname>db_url</varname> parameter + +... +modparam("mqueue", "db_url", "&defaultdb;") + +# Example of table in sqlite, +# you have the set the fields to support the length according +# to the data that will be present in the mqueue +CREATE TABLE mqueue_name ( +id INTEGER PRIMARY KEY AUTOINCREMENT, +key character varying(64) DEFAULT "" NOT NULL, +val character varying(4096) DEFAULT "" NOT NULL +); +... + + +
+
+ <varname>mqueue</varname> (string) + Definition of a memory queue + + + Default value is none. + + + + Value must be a list of parameters: attr=value;... + + + Mandatory attributes: + + + + name: name of the queue. + + + + + + Optional attributes: + + + + size: size of the queue. + Specifies the maximum number of items in queue. + If exceeded the oldest one is removed. + If not set the queue will be limitless. + + + + + dbmode: If set to 1, the content of the + queue is written to database table when the SIP server is + stopped (i.e., ensure persistency over restarts). + If set to 2, it is written at shutdown but not read at startup. + If set to 3, it is read at sartup but not written at shutdown. + Default value is 0 (no db table interaction). + + + + + addmode: how to add new (key,value) pairs. + + + + 0: + Will push all new (key,value) pairs at the end of + the queue. (default) + + + + + 1: + Will keep oldest (key,value) pair in the queue, + based on the key. + + + + + 2: + Will keep newest (key,value) pair in the queue, + based on the key. + + + + + + + + + + + The parameter can be set many times, each holding the + definition of one queue. + + + Set <varname>mqueue</varname> parameter + +... +modparam("mqueue", "mqueue", "name=myq;size=20;") +modparam("mqueue", "mqueue", "name=myq;size=10000;addmode=2") +modparam("mqueue", "mqueue", "name=qaz") +modparam("mqueue", "mqueue", "name=qaz;addmode=1") +... + + +
+
+ +
+ Exported Functions +
+ + <function moreinfo="none">mq_add(queue, key, value)</function> + + + Add a new item (key, value) in the queue. If max size of queue is + exceeded, the oldest one is removed. + + + <function>mq_add</function> usage + +... +mq_add("myq", "$rU", "call from $fU"); +... + + +
+ +
+ + <function moreinfo="none">mq_fetch(queue)</function> + + + Take oldest item from queue and fill $mqk(queue) and + $mqv(queue) pseudo variables. + + + Return: true on success (1); false on failure (-1) or + no item fetched (-2). + + + <function>mq_fetch</function> usage + +... +while(mq_fetch("myq")) +{ + xlog("$mqk(myq) - $mqv(myq)\n"); +} +... + + +
+ +
+ + <function moreinfo="none">mq_pv_free(queue)</function> + + + Free the item fetched in pseudo-variables. It is optional, + a new fetch frees the previous values. + + + <function>mq_pv_free</function> usage + +... +mq_pv_free("myq"); +... + + +
+ +
+ + <function moreinfo="none">mq_size(queue)</function> + + + Returns the current number of elements in the mqueue. + + + If the mqueue is empty, the function returns -1. If the + mqueue is not found, the function returns -2. + + + <function>mq_size</function> usage + +... +$var(q_size) = mq_size("queue"); +xlog("L_INFO", "Size of queue is: $var(q_size)\n"); +... + + +
+
+ + +
+ Exported MI Functions +
+ mq_get_size + Get the size of a memory queue. + Parameters: + + + name - the name of memory queue + + + + <function>mq_get_size</function> usage + +... +opensips-cli -x mq_get_size xyz +... + + +
+
+ mq_fetch + Fetch a key-value pair from a memory queue. + Parameters: + + + name - the name of memory queue + + + + <function>mq_fetch</function> usage + +... +opensips-cli -x mq_fetch xyz +... + + +
+ +
+ mq_get_sizes + Get the size for all memory queues. + Parameters: none + + <function>mq_get_sizes</function> usage + +... +opensips-cli -x mq_get_sizes +... + + +
+
+ + +
+ Exported Pseudo-Variables +
+ <varname>$mqk(mqueue)</varname> + + The variable is read-only and returns the most recent item key + fetched from the specified mqueue. + +
+
+ <varname>$mqv(mqueue)</varname> + + The variable is read-only and returns the most recent item value + fetched from the specified mqueue. + +
+
+ <varname>$mq_size(mqueue)</varname> + + The variable is read-only and returns the size of the specified + mqueue. + +
+
+ +
diff --git a/modules/mqueue/mqueue_api.c b/modules/mqueue/mqueue_api.c new file mode 100644 index 00000000000..ff7f4d364b1 --- /dev/null +++ b/modules/mqueue/mqueue_api.c @@ -0,0 +1,530 @@ +/** + * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com) + * + * This file is part of opensips, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + + +#include +#include +#include +#include + +#include "../../dprint.h" +#include "../../mem/mem.h" +#include "../../mem/shm_mem.h" +#include "../../parser/parse_param.h" +#include "../../ut.h" + +#include "mqueue_api.h" +#include "mqueue_db.h" + +/** + * + */ +//static mq_head_t *_mq_head_list = NULL; +mq_head_t *_mq_head_list = NULL; + +/** + * + */ +static mq_pv_t *_mq_pv_list = NULL; + +/** + * + */ +int mq_head_defined(void) +{ + if(_mq_head_list != NULL) + return 1; + return 0; +} + +/** + * + */ +void mq_destroy(void) +{ + mq_head_t *mh = NULL; + mq_pv_t *mp = NULL; + mq_item_t *mi = NULL; + mq_head_t *mh1 = NULL; + mq_pv_t *mp1 = NULL; + mq_item_t *mi1 = NULL; + + mh = _mq_head_list; + while(mh != NULL) { + if(mh->dbmode == 1 || mh->dbmode == 3) { + LM_INFO("mqueue[%.*s] dbmode[%d]\n", mh->name.len, mh->name.s, + mh->dbmode); + mqueue_db_save_queue(&mh->name); + } + mi = mh->ifirst; + while(mi != NULL) { + mi1 = mi; + mi = mi->next; + shm_free(mi1); + } + mh1 = mh; + mh = mh->next; + lock_destroy(&mh1->lock); + shm_free(mh1); + } + _mq_head_list = 0; + mp = _mq_pv_list; + while(mp != NULL) { + mp1 = mp; + mp = mp->next; + pkg_free(mp1); + } +} + +/** + * + */ +int mq_head_add(str *name, int msize, int addmode) +{ + mq_head_t *mh = NULL; + mq_pv_t *mp = NULL; + int len; + + //FIXME: + //if(!shm_initialized()) { + // LM_ERR("shm not initialized - cannot define mqueue now\n"); + // return 0; + //} + + mh = _mq_head_list; + while(mh != NULL) { + if(name->len == mh->name.len + && strncmp(mh->name.s, name->s, name->len) == 0) { + LM_ERR("mqueue redefined: %.*s\n", name->len, name->s); + return -1; + } + mh = mh->next; + } + + mp = (mq_pv_t *)pkg_malloc(sizeof(mq_pv_t)); + if(mp == NULL) { + LM_ERR("no more pkg for: %.*s\n", name->len, name->s); + return -1; + } + memset(mp, 0, sizeof(mq_pv_t)); + + len = sizeof(mq_head_t) + name->len + 1; + mh = (mq_head_t *)shm_malloc(len); + if(mh == NULL) { + LM_ERR("no more shm for: %.*s\n", name->len, name->s); + pkg_free(mp); + return -1; + } + memset(mh, 0, len); + if(lock_init(&mh->lock) == 0) { + LM_CRIT("failed to init lock\n"); + pkg_free(mp); + shm_free(mh); + return -1; + } + + mh->name.s = (char *)mh + sizeof(mq_head_t); + memcpy(mh->name.s, name->s, name->len); + mh->name.len = name->len; + mh->name.s[name->len] = '\0'; + mh->msize = msize; + mh->addmode = addmode; + mh->next = _mq_head_list; + _mq_head_list = mh; + + mp->name = &mh->name; + mp->next = _mq_pv_list; + _mq_pv_list = mp; + + return 0; +} + +/** + * + */ +mq_head_t *mq_head_get(str *name) +{ + mq_head_t *mh = NULL; + + mh = _mq_head_list; + if(!name) { + return mh; + } + while(mh != NULL) { + if(name->len == mh->name.len + && strncmp(mh->name.s, name->s, name->len) == 0) { + return mh; + } + mh = mh->next; + } + return NULL; +} + +/** + * + */ +int mq_set_dbmode(str *name, int dbmode) +{ + mq_head_t *mh = NULL; + + mh = _mq_head_list; + while(mh != NULL) { + if(name->len == mh->name.len + && strncmp(mh->name.s, name->s, name->len) == 0) { + mh->dbmode = dbmode; + return 0; + } + mh = mh->next; + } + return -1; +} + +/** + * + */ +int mq_get_dbmode(str *name) +{ + mq_head_t *mh = NULL; + + mh = _mq_head_list; + while(mh != NULL) { + if(name->len == mh->name.len + && strncmp(mh->name.s, name->s, name->len) == 0) + return mh->dbmode; + mh = mh->next; + } + return -1; +} + +/** + * + */ +mq_pv_t *mq_pv_get(str *name) +{ + mq_pv_t *mp = NULL; + + mp = _mq_pv_list; + while(mp != NULL) { + if(mp->name->len == name->len + && strncmp(mp->name->s, name->s, name->len) == 0) + return mp; + mp = mp->next; + } + return NULL; +} + +/** + * + */ +int mq_head_fetch(str *name) +{ + mq_head_t *mh = NULL; + mq_pv_t *mp = NULL; + + mp = mq_pv_get(name); + if(mp == NULL) + return -1; + if(mp->item != NULL) { + shm_free(mp->item); + mp->item = NULL; + } + mh = mq_head_get(name); + if(mh == NULL) + return -1; + lock_get(&mh->lock); + + if(mh->ifirst == NULL) { + /* empty queue */ + lock_release(&mh->lock); + return -2; + } + + mp->item = mh->ifirst; + mh->ifirst = mh->ifirst->next; + if(mh->ifirst == NULL) { + mh->ilast = NULL; + } + mh->csize--; + + lock_release(&mh->lock); + return 0; +} + +/** + * + */ +void mq_pv_free(str *name) +{ + mq_pv_t *mp = NULL; + + mp = mq_pv_get(name); + if(mp == NULL) + return; + if(mp->item != NULL) { + shm_free(mp->item); + mp->item = NULL; + } +} + +/** + * + */ +int mq_item_add(str *qname, str *key, str *val) +{ + mq_head_t *mh = NULL; + mq_item_t *mi = NULL; + mq_item_t *miter = NULL; + mq_item_t *miter_prev = NULL; + int oplock = 0; + int len; + + mh = mq_head_get(qname); + if(mh == NULL) { + LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s); + return -1; + } + + if(mh->addmode == 1 || mh->addmode == 2) { + lock_get(&mh->lock); + oplock = 1; + miter = mh->ifirst; + miter_prev = mh->ifirst; + while(miter) { + // found mqueue item + if(miter->key.len == key->len + && strncmp(miter->key.s, key->s, key->len) == 0) { + // mode unique and keep oldest: just return + if(mh->addmode == 1) { + lock_release(&mh->lock); + return 0; + } + + // mode unique and keep newest: free oldest and further add newest + if(miter == mh->ifirst && miter == mh->ilast) { + mh->ifirst = NULL; + mh->ilast = NULL; + } else if(miter == mh->ifirst) { + mh->ifirst = miter->next; + } else if(miter == mh->ilast) { + mh->ilast = miter_prev; + mh->ilast->next = NULL; + } else { + miter_prev->next = miter->next; + } + + shm_free(miter); + mh->csize--; + + break; + } + miter_prev = miter; + miter = miter->next; + } + } + + // mode default + len = sizeof(mq_item_t) + key->len + val->len + 2; + mi = (mq_item_t *)shm_malloc(len); + if(mi == NULL) { + LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s); + if(oplock) { + lock_release(&mh->lock); + } + return -1; + } + memset(mi, 0, len); + mi->key.s = (char *)mi + sizeof(mq_item_t); + memcpy(mi->key.s, key->s, key->len); + mi->key.len = key->len; + mi->key.s[key->len] = '\0'; + + mi->val.s = mi->key.s + mi->key.len + 1; + memcpy(mi->val.s, val->s, val->len); + mi->val.len = val->len; + mi->val.s[val->len] = '\0'; + + if(oplock == 0) { + lock_get(&mh->lock); + } + if(mh->ifirst == NULL) { + mh->ifirst = mi; + mh->ilast = mi; + } else { + mh->ilast->next = mi; + mh->ilast = mi; + } + mh->csize++; + if(mh->msize > 0 && mh->csize > mh->msize) { + mi = mh->ifirst; + mh->ifirst = mh->ifirst->next; + if(mh->ifirst == NULL) { + mh->ilast = NULL; + } + mh->csize--; + shm_free(mi); + } + lock_release(&mh->lock); + return 0; +} + +/** + * + */ +int pv_parse_mq_name(pv_spec_t *sp, const str *in) +{ + sp->pvp.pvn.u.isname.name.s = *in; + sp->pvp.pvn.type = PV_NAME_INTSTR; + sp->pvp.pvn.u.isname.type = 1; + return 0; +} + +/** + * + */ +int pv_get_mqk(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) +{ + mq_pv_t *mp = NULL; + + if (!param) + return pv_get_null(msg, param, res); + + if(pv_get_spec_name(msg, param, res)!=0 || (!(res->flags&PV_VAL_STR))) { + LM_ERR("invalid name\n"); + return -1; + } + + LM_NOTICE("Getting key from [%.*s]\n", res->rs.len, res->rs.s); + if(mq_head_get(&res->rs) == NULL) { + LM_ERR("mqueue not found: %.*s\n", res->rs.len, res->rs.s); + return -1; + } + + mp = mq_pv_get(&res->rs); + if(mp == NULL || mp->item == NULL || mp->item->key.len <= 0) + return pv_get_null(msg, param, res); + return pv_get_strval(msg, param, res, &mp->item->key); +} + +/** + * + */ +str *get_mqk(str *in) +{ + mq_pv_t *mp = NULL; + + if(mq_head_get(in) == NULL) { + LM_ERR("mqueue not found: %.*s\n", in->len, in->s); + return NULL; + } + + mp = mq_pv_get(in); + if(mp == NULL || mp->item == NULL || mp->item->key.len <= 0) + return NULL; + return &mp->item->key; +} + +/** + * + */ +str *get_mqv(str *in) +{ + mq_pv_t *mp = NULL; + + if(mq_head_get(in) == NULL) { + LM_ERR("mqueue not found: %.*s\n", in->len, in->s); + return NULL; + } + + mp = mq_pv_get(in); + if(mp == NULL || mp->item == NULL || mp->item->val.len <= 0) + return NULL; + return &mp->item->val; +} + +/** + * + */ +int pv_get_mqv(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) +{ + mq_pv_t *mp = NULL; + + if (!param) + return pv_get_null(msg, param, res); + + if(pv_get_spec_name(msg, param, res)!=0 || (!(res->flags&PV_VAL_STR))) { + LM_ERR("invalid name\n"); + return -1; + } + + LM_NOTICE("Getting val from [%.*s]\n", res->rs.len, res->rs.s); + if(mq_head_get(&res->rs) == NULL) { + LM_ERR("mqueue not found: %.*s\n", res->rs.len, res->rs.s); + return -1; + } + + mp = mq_pv_get(&res->rs); + if(mp == NULL || mp->item == NULL || mp->item->val.len <= 0) + return pv_get_null(msg, param, res); + return pv_get_strval(msg, param, res, &mp->item->val); +} + +/** + * + */ +int pv_get_mq_size(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) +{ + int mqs = -1; + + if (!param) + return pv_get_null(msg, param, res); + + if(pv_get_spec_name(msg, param, res)!=0 || (!(res->flags&PV_VAL_STR))) { + LM_ERR("invalid name\n"); + return -1; + } + + LM_NOTICE("Getting size of [%.*s]\n", res->rs.len, res->rs.s); + mqs = _mq_get_csize(&res->rs); + + if(mqs < 0) { + LM_ERR("mqueue not found: %.*s\n", res->rs.len, res->rs.s); + return -1; + } + + return pv_get_sintval(msg, param, res, mqs); +} +/** + * Return head->csize for a given queue + */ +int _mq_get_csize(str *name) +{ + mq_head_t *mh = mq_head_get(name); + int mqueue_size = 0; + + if(mh == NULL) + return -1; + + lock_get(&mh->lock); + mqueue_size = mh->csize; + lock_release(&mh->lock); + + return mqueue_size; +} diff --git a/modules/mqueue/mqueue_api.h b/modules/mqueue/mqueue_api.h new file mode 100644 index 00000000000..48ba16eada7 --- /dev/null +++ b/modules/mqueue/mqueue_api.h @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com) + * + * This file is part of opensips, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + + +#ifndef _MQUEUE_API_H_ +#define _MQUEUE_API_H_ + +#include "../../pvar.h" +#include "../../parser/msg_parser.h" + +/** + * + */ +typedef struct _mq_item +{ + str key; + str val; + struct _mq_item *next; +} mq_item_t; + +/** + * + */ +typedef struct _mq_head +{ + str name; + int msize; + int csize; + int dbmode; + int addmode; + gen_lock_t lock; + mq_item_t *ifirst; + mq_item_t *ilast; + struct _mq_head *next; +} mq_head_t; + +/** + * + */ +typedef struct _mq_pv +{ + str *name; + mq_item_t *item; + struct _mq_pv *next; +} mq_pv_t; + +mq_pv_t *mq_pv_get(str *name); +int pv_parse_mq_name(pv_spec_p sp, const str *in); +int pv_get_mqk(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); +int pv_get_mqv(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); +int pv_get_mq_size(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); +str *get_mqk(str *name); +str *get_mqv(str *name); +int mq_head_defined(void); +void mq_destroy(void); +int mq_head_add(str *name, int msize, int addmode); +int mq_head_fetch(str *name); +void mq_pv_free(str *name); +int mq_item_add(str *qname, str *key, str *val); +mq_head_t *mq_head_get(str *name); + +int _mq_get_csize(str *); +int mq_set_dbmode(str *, int dbmode); +int mq_get_dbmode(str *); + +#endif diff --git a/modules/mqueue/mqueue_db.c b/modules/mqueue/mqueue_db.c new file mode 100644 index 00000000000..fbc941b9c3f --- /dev/null +++ b/modules/mqueue/mqueue_db.c @@ -0,0 +1,323 @@ +/** + * Copyright (C) 2020 Julien Chavanton + * + * This file is part of opensips, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include "../../db/db.h" +#include "mqueue_api.h" + +/** database connection */ +db_con_t *mqueue_db_con = NULL; +db_func_t mq_dbf; + +/** db parameters */ +str mqueue_db_url = {0, 0}; +str mq_db_key_column = str_init("key"); +str mq_db_val_column = str_init("val"); +str mq_db_id_column = str_init("id"); + +/** + * initialize database connection + */ +int mqueue_db_init_con(void) +{ + LM_NOTICE("mqueue_db_url=[%.*s]\n", mqueue_db_url.len, mqueue_db_url.s); + if(mqueue_db_url.len <= 0) { + LM_ERR("failed to connect to the database, no db url\n"); + return -1; + } + /* binding to DB module */ + if(db_bind_mod(&mqueue_db_url, &mq_dbf)) { + LM_ERR("database module not found\n"); + return -1; + } + + if(!DB_CAPABILITY(mq_dbf, DB_CAP_ALL)) { + LM_ERR("database module does not " + "implement all functions needed by the module\n"); + return -1; + } + return 0; +} + +/** + * open database connection + */ +int mqueue_db_open_con(void) +{ + if(mqueue_db_init_con() == 0) { + mqueue_db_con = mq_dbf.init(&mqueue_db_url); + if(mqueue_db_con == NULL) { + LM_ERR("failed to connect to the database\n"); + return -1; + } + LM_NOTICE("database connection opened successfully\n"); + return 0; + } + return 0; +} + +/** + * close database connection + */ +int mqueue_db_close_con(void) +{ + if(mqueue_db_con != NULL && mq_dbf.close != NULL) + mq_dbf.close(mqueue_db_con); + mqueue_db_con = NULL; + return 0; +} + +int mqueue_db_load_queue(str *name) +{ + int ncols = 2; + db_res_t *db_res = NULL; + db_key_t db_cols[2] = {&mq_db_key_column, &mq_db_val_column}; + db_key_t db_ord = &mq_db_id_column; + int mq_fetch_rows = 100; + int ret = 0; + str val = str_init(""); + str key = str_init(""); + int i; + int cnt = 0; + + if(mqueue_db_open_con() != 0) { + LM_ERR("no db connection\n"); + return -1; + } + + if(mq_dbf.use_table(mqueue_db_con, name) < 0) { + LM_ERR("failed to use_table\n"); + goto error; + } + + LM_INFO("=============== loading queue table [%.*s] from database\n", + name->len, name->s); + + if(DB_CAPABILITY(mq_dbf, DB_CAP_FETCH)) { + if(mq_dbf.query(mqueue_db_con, 0, 0, 0, db_cols, 0, ncols, db_ord, 0) + < 0) { + LM_ERR("Error while querying db\n"); + goto error; + } + if(mq_dbf.fetch_result(mqueue_db_con, &db_res, mq_fetch_rows) < 0) { + LM_ERR("Error while fetching result\n"); + if(db_res) + mq_dbf.free_result(mqueue_db_con, db_res); + goto error; + } else { + if(RES_ROW_N(db_res) == 0) { + mq_dbf.free_result(mqueue_db_con, db_res); + LM_NOTICE("Nothing to be loaded in queue\n"); + mqueue_db_close_con(); + return 0; + } + } + } else { + if((ret = mq_dbf.query(mqueue_db_con, NULL, NULL, NULL, db_cols, 0, + ncols, 0, &db_res)) + != 0 + || RES_ROW_N(db_res) <= 0) { + if(ret == 0) { + mq_dbf.free_result(mqueue_db_con, db_res); + mqueue_db_close_con(); + return 0; + } else { + goto error; + } + } + } + + do { + for(i = 0; i < RES_ROW_N(db_res); i++) { + if(VAL_NULL(&RES_ROWS(db_res)[i].values[0])) { + LM_ERR("mqueue [%.*s] row [%d] has NULL key string\n", + name->len, name->s, i); + goto error; + } + if(VAL_NULL(&RES_ROWS(db_res)[i].values[1])) { + LM_ERR("mqueue [%.*s] row [%d] has NULL value string\n", + name->len, name->s, i); + goto error; + } + switch(RES_ROWS(db_res)[i].values[0].type) { + case DB_STR: + key.s = (RES_ROWS(db_res)[i].values[0].val.str_val.s); + if(key.s == NULL) { + LM_ERR("mqueue [%.*s] row [%d] has NULL key\n", + name->len, name->s, i); + goto error; + } + key.len = (RES_ROWS(db_res)[i].values[0].val.str_val.len); + break; + case DB_BLOB: + key.s = (RES_ROWS(db_res)[i].values[0].val.blob_val.s); + if(key.s == NULL) { + LM_ERR("mqueue [%.*s] row [%d] has NULL key\n", + name->len, name->s, i); + goto error; + } + key.len = (RES_ROWS(db_res)[i].values[0].val.blob_val.len); + break; + case DB_STRING: + key.s = (char *)(RES_ROWS(db_res)[i] + .values[0] + .val.string_val); + if(key.s == NULL) { + LM_ERR("mqueue [%.*s] row [%d] has NULL key\n", + name->len, name->s, i); + goto error; + } + key.len = strlen(key.s); + break; + default: + LM_ERR("key type must be string (type=%d)\n", + RES_ROWS(db_res)[i].values[0].type); + goto error; + } + switch(RES_ROWS(db_res)[i].values[1].type) { + case DB_STR: + val.s = (RES_ROWS(db_res)[i].values[1].val.str_val.s); + if(val.s == NULL) { + LM_ERR("mqueue [%.*s] row [%d] has NULL value\n", + name->len, name->s, i); + goto error; + } + val.len = (RES_ROWS(db_res)[i].values[1].val.str_val.len); + break; + case DB_BLOB: + val.s = (RES_ROWS(db_res)[i].values[1].val.blob_val.s); + if(val.s == NULL) { + LM_ERR("mqueue [%.*s] row [%d] has NULL value\n", + name->len, name->s, i); + goto error; + } + val.len = (RES_ROWS(db_res)[i].values[1].val.blob_val.len); + break; + case DB_STRING: + val.s = (char *)(RES_ROWS(db_res)[i] + .values[1] + .val.string_val); + if(val.s == NULL) { + LM_ERR("mqueue [%.*s] row [%d] has NULL value\n", + name->len, name->s, i); + goto error; + } + val.len = strlen(val.s); + break; + default: + LM_ERR("key type must be string (type=%d)\n", + RES_ROWS(db_res)[i].values[1].type); + goto error; + } + cnt++; + LM_NOTICE("adding item[%d] key[%.*s] value[%.*s]\n", cnt, key.len, + key.s, val.len, val.s); + mq_item_add(name, &key, &val); + } + + if(DB_CAPABILITY(mq_dbf, DB_CAP_FETCH)) { + if(mq_dbf.fetch_result(mqueue_db_con, &db_res, mq_fetch_rows) < 0) { + LM_ERR("Error while fetching!\n"); + goto error; + } + } else { + break; + } + } while(RES_ROW_N(db_res) > 0); + + mq_dbf.free_result(mqueue_db_con, db_res); + + if(mq_dbf.delete(mqueue_db_con, 0, 0, 0, 0) < 0) { + LM_ERR("failed to clear table\n"); + goto error; + } + + LM_NOTICE("loaded %d values in queue\n", cnt); + mqueue_db_close_con(); + return 0; +error: + mqueue_db_close_con(); + return -1; +} + +int mqueue_db_save_queue(str *name) +{ + int ncols = 2; + db_key_t db_cols[2] = {&mq_db_key_column, &mq_db_val_column}; + db_val_t db_vals[2]; + int i; + int mqueue_sz = 0; + int ret = 0; + + if(mqueue_db_open_con() != 0) { + LM_ERR("no db connection\n"); + return -1; + } + + if(mq_dbf.use_table(mqueue_db_con, name) < 0) { + LM_ERR("failed to use_table\n"); + goto error; + } + + if(name->len <= 0 || name->s == NULL) { + LM_ERR("bad mqueue name\n"); + goto error; + } + + mqueue_sz = _mq_get_csize(name); + + if(mqueue_sz < 0) { + LM_ERR("no such mqueue\n"); + goto error; + } + for(i = 0; i < mqueue_sz; i++) { + ret = mq_head_fetch(name); + if(ret != 0) + break; + str *key = NULL; + str *val = NULL; + key = get_mqk(name); + val = get_mqv(name); + LM_NOTICE("inserting mqueue[%.*s] name[%.*s] value[%.*s]\n", name->len, + name->s, key->len, key->s, val->len, val->s); + db_vals[0].type = DB_STR; + db_vals[0].nul = 0; + db_vals[0].val.str_val.s = key->s; + db_vals[0].val.str_val.len = key->len; + db_vals[1].type = DB_STR; + db_vals[1].nul = 0; + db_vals[1].val.str_val.s = val->s; + db_vals[1].val.str_val.len = val->len; + LM_NOTICE("mq_dbf.insert()\n"); + if(mq_dbf.insert(mqueue_db_con, db_cols, db_vals, ncols) < 0) { + LM_ERR("failed to store key [%.*s] val [%.*s]\n", key->len, key->s, + val->len, val->s); + LM_NOTICE("done mq_dbf.insert()\n"); + } + } + + LM_INFO("queue [%.*s] saved in db\n", name->len, name->s); + mqueue_db_close_con(); + return 0; +error: + mqueue_db_close_con(); + return -1; +} diff --git a/modules/mqueue/mqueue_db.h b/modules/mqueue/mqueue_db.h new file mode 100644 index 00000000000..56fcd078564 --- /dev/null +++ b/modules/mqueue/mqueue_db.h @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2020 Julien Chavanton + * + * This file is part of opensips, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#ifndef _MQUEUE_DB_H_ +#define _MQUEUE_DB_H_ + +#include "../../db/db.h" +#include "mqueue_api.h" + +extern str mqueue_db_url; + +int mqueue_db_load_queue(str *name); +int mqueue_db_save_queue(str *name); +#endif diff --git a/modules/mqueue/mqueue_mod.c b/modules/mqueue/mqueue_mod.c new file mode 100644 index 00000000000..72ffe8e277c --- /dev/null +++ b/modules/mqueue/mqueue_mod.c @@ -0,0 +1,435 @@ +/** + * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com) + * + * This file is part of opensips, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include +#include +#include +#include + +#include "../../sr_module.h" +#include "../../dprint.h" +#include "../../ut.h" +#include "../../pvar.h" +#include "../../mod_fix.h" +#include "../../parser/parse_param.h" +#include "../../mem/mem.h" + +#include "mqueue_api.h" +#include "mqueue_db.h" +#include "api.h" + + +static int mod_init(void); +static int child_init(int rank); +static void mod_destroy(void); + +static int w_mq_add(struct sip_msg *msg, str *mq, str *key, str *val); +static int w_mq_fetch(struct sip_msg *msg, str *mq); +static int w_mq_size(struct sip_msg *msg, str *mq_val); +static int w_mq_pv_free(struct sip_msg *msg, str *mq); +int mq_param(modparam_t type, void *val); +static int bind_mq(mq_api_t *api); +mi_response_t *mi_get_sizes(const mi_params_t *params, + struct mi_handler *async_hdl); +mi_response_t *mi_get_size(const mi_params_t *params, + struct mi_handler *async_hdl); +mi_response_t *mi_fetch(const mi_params_t *params, + struct mi_handler *async_hdl); + +static pv_export_t mod_pvs[] = { + { {"mqk", sizeof("mqk") - 1}, 1090, pv_get_mqk, 0, + pv_parse_mq_name, 0, 0, 0}, + { {"mqv", sizeof("mqv") - 1}, 1090, pv_get_mqv, 0, + pv_parse_mq_name, 0, 0, 0}, + { {"mq_size", sizeof("mq_size") - 1}, 1090, pv_get_mq_size, 0, + pv_parse_mq_name, 0, 0, 0}, + { {0, 0}, 0, 0, 0, 0, 0, 0, 0} +}; + +static cmd_export_t cmds[] = { + {"mq_add", (cmd_function)w_mq_add, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"mq_fetch", (cmd_function)w_mq_fetch, { + {CMD_PARAM_STR,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"mq_size", (cmd_function)w_mq_size, { + {CMD_PARAM_STR,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"mq_pv_free", (cmd_function)w_mq_pv_free, { + {CMD_PARAM_STR,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"bind_mq", (cmd_function)bind_mq, { + {0,0,0}}, + 0}, + {0,0,{{0,0,0}},0} +}; + +static param_export_t params[] = { + {"db_url", STR_PARAM, &mqueue_db_url.s}, + {"mqueue", STR_PARAM|USE_FUNC_PARAM, (void *)&mq_param}, + {0, 0, 0} +}; + +static const stat_export_t mod_stats[] = { + {0,0,0} +}; + +#define MQH1 "Params: none ; Get the size of all memory queues." +#define MQH2 "Params: [mqueue] ; Get the size of a memory queue." +#define MQH3 "Params: [mqueue] ; Fetch a key-value pair from a memory queue." + +static const mi_export_t mi_cmds[] = { + {"mq_get_sizes", MQH1, 0, 0, { + {mi_get_sizes, {0}}, + {EMPTY_MI_RECIPE}} + }, + {"mq_get_size", MQH2, 0, 0, { + {mi_get_size, {"name", 0}}, + {EMPTY_MI_RECIPE}} + }, + {"mq_fetch", MQH3, 0, 0, { + {mi_fetch, {"name", 0}}, + {EMPTY_MI_RECIPE}} + }, + {EMPTY_MI_EXPORT} +}; + +static const dep_export_t deps = { + { /* OpenSIPS module dependencies */ + { MOD_TYPE_NULL, NULL, 0 }, + }, + { /* modparam dependencies */ + { "db_url", get_deps_sqldb_url}, + { NULL, NULL }, + }, +}; + +struct module_exports exports = { + "mqueue", /* module's name */ + MOD_TYPE_DEFAULT,/* class of this module */ + MODULE_VERSION, + DEFAULT_DLFLAGS, /* dlopen flags */ + 0, /* load function */ + &deps, /* OpenSIPS module dependencies */ + cmds, /* exported functions */ + 0, /* exported async functions */ + params, /* param exports */ + mod_stats, /* exported statistics */ + mi_cmds, /* exported MI functions */ + mod_pvs, /* exported pseudo-variables */ + 0, /* exported transformations */ + 0, /* extra processes */ + 0, /* module pre-initialization function */ + mod_init, /* module initialization function */ + 0, /* reply processing function */ + mod_destroy, /* destroy function */ + child_init, /* per-child init function */ + 0 /* reload confirm function */ +}; + +extern mq_head_t *_mq_head_list; + +/** + * init module function + */ +static int mod_init(void) +{ + mq_head_t *mh = NULL; + + LM_NOTICE("initializing...\n"); + + init_db_url( mqueue_db_url , 1 /*can be null*/); + + if(!mq_head_defined()) + LM_WARN("no mqueue defined\n"); + else { + mh = _mq_head_list; + while(mh != NULL) { + if (mh->dbmode == 1 || mh->dbmode == 2) { + LM_NOTICE("queue=[%.*s]\n", mh->name.len, mh->name.s); + if(mqueue_db_load_queue(&mh->name) < 0) { + LM_ERR("error loading mqueue: %.*s from DB\n", mh->name.len, mh->name.s); + return -1; + } + } + mh = mh->next; + } + } + + + return 0; +} + +static int child_init(int rank) +{ + //FIXME: + return 0; +} + +/** + * destroy module function + */ +static void mod_destroy(void) +{ + mq_destroy(); +} + +static int w_mq_fetch(struct sip_msg *msg, str *mq) +{ + int ret; + + ret = mq_head_fetch(mq); + if(ret < 0) + return ret; + return 1; +} + +static int w_mq_size(struct sip_msg *msg, str *mq_val) +{ + int ret; + + ret = _mq_get_csize(mq_val); + + if(ret < 0) + LM_ERR("mqueue %.*s not found\n", mq_val->len, mq_val->s); + if(ret <= 0) + ret--; + + return ret; +} + +static int w_mq_add(struct sip_msg *msg, str *mq, str *key, str *val) +{ + if(mq_item_add(mq, key, val) < 0) + return -1; + return 1; +} + +static int w_mq_pv_free(struct sip_msg *msg, str *mq) +{ + mq_pv_free(mq); + return 1; +} + +int mq_param(modparam_t type, void *val) +{ + str mqs; + param_t *params_list = NULL; + param_hooks_t phooks; + param_t *pit = NULL; + str qname = {0, 0}; + int msize = 0; + int dbmode = 0; + int addmode = 0; + + if(val == NULL) + return -1; + + //FIXME: + //if(!shm_initialized()) { + // LM_ERR("shm not initialized - cannot define mqueue now\n"); + // return 0; + //} + + mqs.s = (char *)val; + mqs.len = strlen(mqs.s); + if(mqs.s[mqs.len - 1] == ';') + mqs.len--; + if(parse_params(&mqs, CLASS_ANY, &phooks, ¶ms_list) < 0) + return -1; + for(pit = params_list; pit; pit = pit->next) { + if(pit->name.len == 4 && strncasecmp(pit->name.s, "name", 4) == 0) { + qname = pit->body; + } else if(pit->name.len == 4 + && strncasecmp(pit->name.s, "size", 4) == 0) { + str2sint(&pit->body, &msize); + } else if(pit->name.len == 6 + && strncasecmp(pit->name.s, "dbmode", 6) == 0) { + str2sint(&pit->body, &dbmode); + } else if(pit->name.len == 7 + && strncasecmp(pit->name.s, "addmode", 7) == 0) { + str2sint(&pit->body, &addmode); + } else { + LM_ERR("unknown param: %.*s\n", pit->name.len, pit->name.s); + free_params(params_list); + return -1; + } + } + if(qname.len <= 0) { + LM_ERR("mqueue name not defined: %.*s\n", mqs.len, mqs.s); + free_params(params_list); + return -1; + } + if(mq_head_add(&qname, msize, addmode) < 0) { + LM_ERR("cannot add mqueue: %.*s\n", mqs.len, mqs.s); + free_params(params_list); + return -1; + } + LM_INFO("mqueue param: [%.*s|%d|%d|%d]\n", qname.len, qname.s, dbmode, + addmode, msize); + //if(dbmode == 1 || dbmode == 2) { + // if(mqueue_db_load_queue(&qname) < 0) { + // LM_ERR("error loading mqueue: %.*s from DB\n", qname.len, qname.s); + // free_params(params_list); + // return -1; + // } + //} + mq_set_dbmode(&qname, dbmode); + free_params(params_list); + return 0; +} + +static int bind_mq(mq_api_t *api) +{ + if(!api) + return -1; + api->add = mq_item_add; + return 0; +} + +mi_response_t *mi_get_size(const mi_params_t *params, + struct mi_handler *async_hdl) +{ + mi_response_t *resp; + mi_item_t *resp_obj, *mq_obj; + str mqueue_name; + int mqueue_sz = 0; + + if (get_mi_string_param(params, "name", &mqueue_name.s, &mqueue_name.len) < 0) + return init_mi_param_error(); + + mqueue_sz = _mq_get_csize(&mqueue_name); + if(mqueue_sz < 0) + return init_mi_error(404, MI_SSTR("No such queue")); + + resp = init_mi_result_object(&resp_obj); + if (!resp) + return NULL; + + mq_obj = add_mi_object(resp_obj, MI_SSTR("Queue")); + if (!mq_obj) + goto error; + + if (add_mi_string_fmt(mq_obj, MI_SSTR("name"), mqueue_name.s, mqueue_name.len) < 0) + goto error; + if (add_mi_number(mq_obj, MI_SSTR("size"), mqueue_sz) < 0) + goto error; + + return resp; + +error: + LM_ERR("Unable to create reply\n"); + free_mi_response(resp); + return NULL; +} + +mi_response_t *mi_get_sizes(const mi_params_t *params, + struct mi_handler *async_hdl) +{ + mi_response_t *resp; + mi_item_t *resp_obj, *mq_array, *mq_item; + mq_head_t *mh = mq_head_get(NULL); + int size; + + resp = init_mi_result_object(&resp_obj); + if (!resp) + return NULL; + + mq_array = add_mi_array(resp_obj, MI_SSTR("Queue")); + if (!mq_array) + goto error; + + while(mh != NULL) { + lock_get(&mh->lock); + size = mh->csize; + lock_release(&mh->lock); + mq_item = add_mi_object(mq_array, MI_SSTR("")); + if (!mq_item) + goto error; + if (add_mi_string_fmt(mq_item, MI_SSTR("name"), mh->name.s, mh->name.len) < 0) + goto error; + if (add_mi_number(mq_item, MI_SSTR("size"), size) < 0) + goto error; + mh = mh->next; + } + + return resp; + +error: + LM_ERR("Unable to create reply\n"); + free_mi_response(resp); + return NULL; +} + +mi_response_t *mi_fetch(const mi_params_t *params, + struct mi_handler *async_hdl) +{ + mi_response_t *resp; + mi_item_t *resp_obj, *mq_item; + str mqueue_name; + int mqueue_sz = 0; + int ret = 0; + str *key = NULL; + str *val = NULL; + + if (get_mi_string_param(params, "name", &mqueue_name.s, &mqueue_name.len) < 0) + return init_mi_param_error(); + + mqueue_sz = _mq_get_csize(&mqueue_name); + if(mqueue_sz < 0) + return init_mi_error(404, MI_SSTR("No such queue")); + + ret = mq_head_fetch(&mqueue_name); + if(ret == -2) + return init_mi_error(404, MI_SSTR("Empty queue")); + else if(ret < 0) + return init_mi_error(404, MI_SSTR("Unexpected error (fetch)")); + + key = get_mqk(&mqueue_name); + val = get_mqv(&mqueue_name); + + if(!val || !key) + return init_mi_error(404, MI_SSTR("Unexpected error (result)")); + + resp = init_mi_result_object(&resp_obj); + if (!resp) + return NULL; + + mq_item = add_mi_object(resp_obj, MI_SSTR("Item")); + if (!mq_item) + goto error; + if (add_mi_string_fmt(mq_item, MI_SSTR("key"), key->s, key->len) < 0) + goto error; + if (add_mi_string_fmt(mq_item, MI_SSTR("value"), val->s, val->len) < 0) + goto error; + + return resp; + +error: + LM_ERR("Unable to create reply\n"); + free_mi_response(resp); + return NULL; +} +