Skip to content

Commit

Permalink
cachedb_redis: Implement the "column oriented" CacheDB API
Browse files Browse the repository at this point in the history
This allows OpenSIPS to use Redis as storage for the User Location data,
in both "Federation" and "Full Sharing" scenarios.

Requirement: *Redis JSON support* (using Redis Stack Server or similar).
    (Redis JSON support is auto-detected, with appropriate info logs)

Federation Index Creation:
    FT.CREATE idx:usrloc ON JSON PREFIX 1 usrloc: SCORE 1.0 SCHEMA
	$.id AS id TEXT WEIGHT 1.0 $.aor AS aor TEXT WEIGHT 1.0
	$.home_ip AS home_ip TEXT WEIGHT 1.0

Full Sharing Index Creation:
    FT.CREATE idx:usrloc ON JSON PREFIX 1 usrloc: SCORE 1.0 SCHEMA
	$.aor AS aor TEXT WEIGHT 1.0 $.aorhash AS aorhash NUMERIC

Testing: no crashes during stress-tests (in either usrloc scenario).
  • Loading branch information
liviuchircu committed Jan 9, 2025
1 parent af495d7 commit 1977206
Show file tree
Hide file tree
Showing 7 changed files with 1,309 additions and 6 deletions.
261 changes: 261 additions & 0 deletions cachedb/cachedb_dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

#include "cachedb_types.h"
#include "../lib/cJSON.h"

int cdb_dict_add_str(cdb_dict_t *dest, const char *key, int key_len,
const str *val)
Expand Down Expand Up @@ -337,6 +338,20 @@ int cdb_dict_has_pair(const cdb_dict_t *haystack, const cdb_pair_t *pair)
return !val_cmp(&needle->val, &pair->val);
}

int cdb_dict_has_subkeys(const cdb_dict_t *dict)
{
struct list_head *it;
cdb_pair_t *p;

list_for_each (it, dict) {
p = list_entry(it, cdb_pair_t, list);
if (!ZSTR(p->subkey))
return 1;
}

return 0;
}

cdb_pair_t *nth_pair(const cdb_dict_t *dict, int nth)
{
struct list_head *_;
Expand All @@ -355,6 +370,252 @@ cdb_pair_t *nth_pair(const cdb_dict_t *dict, int nth)
return NULL;
}

char *cdb_dict_to_json(const cdb_dict_t *dict,
unsigned int (*escape)(char *dst, const str *src),
unsigned int (*calc_escaped_len)(str *in))
{
#define MAX_OBJ_PAIRS 32
struct list_head *_;
cdb_pair_t *pair;
unsigned int len = 1; /* { */
char *obj = NULL, *objs[MAX_OBJ_PAIRS];
unsigned int i = 0, cnt = 0, o = 0;
int obj_cnt = 0;

list_for_each (_, dict) {
pair = list_entry(_, cdb_pair_t, list);

switch (pair->val.type) {
case CDB_DICT:
if (obj_cnt == MAX_OBJ_PAIRS) {
LM_ERR("max DICT pairs limit exceeded (%d)\n", MAX_OBJ_PAIRS);
goto error;
}

objs[obj_cnt] = cdb_dict_to_json(
&pair->val.val.dict, escape, calc_escaped_len);
if (!objs[obj_cnt]) {
LM_ERR("oom\n");
goto error;
}

len += strlen(objs[obj_cnt++]);
goto next_item;

case CDB_STR:
len += 2; /* "" */
if (!calc_escaped_len)
len += pair->val.val.st.len;
else
len += calc_escaped_len(&pair->val.val.st);
break;

case CDB_INT32:
len += 1 + 10;
break;

case CDB_INT64:
len += 1 + 20;
break;

case CDB_NULL:
len += 4;
break;

default:
LM_ERR("unsupported CDB type: %d\n", pair->val.type);
continue;
}

next_item:
if (cnt++ > 0)
len++; /* , */

len += 1 + pair->key.name.len + 1 + 1; /* "" : ??? */
}

len += 2; /* }\0 */
obj = pkg_malloc(len);
if (!obj) {
LM_ERR("oom\n");
goto error;
}

obj[i++] = '{';

cnt = 0;
list_for_each (_, dict) {
pair = list_entry(_, cdb_pair_t, list);
if (cnt++ > 0)
obj[i++] = ',';

i += sprintf(obj+i, "\"%.*s\":", pair->key.name.len, pair->key.name.s);

switch (pair->val.type) {
case CDB_DICT:
if (o == obj_cnt && o > 0) {
LM_BUG("bufer overflow (%d) in JSON key %.*s\n",
o, pair->key.name.len, pair->key.name.s);
goto error;
}

i += sprintf(obj+i, "%s", objs[o++]);
break;

case CDB_STR:
if (!escape) {
i += sprintf(obj+i, "\"%.*s\"",
pair->val.val.st.len, pair->val.val.st.s);
} else {
obj[i++] = '\"';
i += escape(obj+i, &pair->val.val.st);
obj[i++] = '\"';
}
break;

case CDB_INT32:
i += sprintf(obj+i, "%d", pair->val.val.i32);
break;

case CDB_INT64:
i += sprintf(obj+i, "%ld", pair->val.val.i64);
break;

case CDB_NULL:
i += sprintf(obj+i, "null");
break;

default:
LM_ERR("unsupported value type (%d), key: %.*s\n", pair->val.type,
pair->key.name.len, pair->key.name.s);
break;
}
}

obj[i++] = '}';
obj[i++] = '\0';
if (i > len)
LM_BUG("buffer overflow (%u > %u) in JSON: '%s'\n", i, len, obj);

while (obj_cnt > 0)
pkg_free(objs[--obj_cnt]);
return obj;

error:
while (obj_cnt > 0)
pkg_free(objs[--obj_cnt]);
pkg_free(obj);
return NULL;
}



int cdb_cjson_to_dict(const cJSON *obj, cdb_dict_t *out_dict,
void (*unescape)(char *))
{
cJSON *it;
cdb_key_t key;
cdb_pair_t *pair;
union cdb_val_u *val;

INIT_LIST_HEAD(out_dict);

for (it = obj->child; it; it = it->next) {
LM_DBG(" iter object item, type %d\n", it->type);

init_str(&key.name, it->string);
key.is_pk = 0;

pair = cdb_mk_pair(&key, NULL);
if (!pair) {
LM_ERR("oom\n");
continue;
}

val = &pair->val.val;

switch (it->type) {
case cJSON_Object:
pair->val.type = CDB_DICT;
if (cdb_cjson_to_dict(it, &val->dict, unescape) != 0) {
LM_ERR("failed to convert cJSON to cdb_dict_t (oom)\n");
continue;
}
break;

case cJSON_NULL:
pair->val.type = CDB_NULL;
break;

case cJSON_True:
case cJSON_False:
pair->val.type = CDB_INT32;
val->i32 = it->valueint;
break;

case cJSON_Number:
pair->val.type = CDB_INT32;
val->i32 = it->valueint;
break;

case cJSON_String:
if (unescape)
unescape(it->valuestring);

str st = {.s = it->valuestring, .len = strlen(it->valuestring)};

pair->val.type = CDB_STR;
if (pkg_nt_str_dup(&val->st, &st) != 0) {
LM_ERR("oom\n");
pkg_free(pair);
continue;
}
break;

default:
LM_ERR("unimplemented type (%d), skipping!\n", it->type);
pkg_free(pair);
continue;
}

cdb_dict_add(pair, out_dict);
}


return 0;
}

int cdb_json_to_dict(const char *json, cdb_dict_t *out, void (*unescape)(char *))
{
cJSON *root;

root = cJSON_Parse(json);
if (!root) {
LM_ERR("failed to parse JSON: '%s'\n", json);
return -1;
}

LM_DBG("parsed JSON '%s'\n", json);

if (root->type != cJSON_Object) {
LM_ERR("unexpected root JSON type (%d), need Object in json: '%s'\n",
root->type, json);
goto error;
}

if (cdb_cjson_to_dict(root, out, unescape) != 0) {
LM_ERR("failed to convert cJSON to cdb_dict_t (oom)\n");
goto error;
}

cJSON_Delete(root);
return 0;

error:
cJSON_Delete(root);
return -1;
}

void cdb_free_entries(cdb_dict_t *dict, void (*free_val_str) (void *val))
{
struct list_head *_, *__;
Expand Down
6 changes: 6 additions & 0 deletions cachedb/cachedb_dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ void cdb_free_entries(cdb_dict_t *dict, void (*free_val_str) (void *val));
struct cdb_pair *cdb_dict_fetch(const struct cdb_key *key,
const cdb_dict_t *dict);
int cdb_dict_has_pair(const cdb_dict_t *haystack, const struct cdb_pair *pair);
int cdb_dict_has_subkeys(const cdb_dict_t *dict);
struct cdb_pair *nth_pair(const cdb_dict_t *dict, int nth);
char *cdb_dict_to_json(const cdb_dict_t *dict,
unsigned int (*escape)(char *dst, const str *src),
unsigned int (*calc_escaped_len)(str *in));
int cdb_json_to_dict(const char *json, cdb_dict_t *out,
void (*unescape)(char *inout));
int dict_cmp(const cdb_dict_t *a, const cdb_dict_t *b);
int val_cmp(const struct cdb_val *v1, const struct cdb_val *v2);

Expand Down
17 changes: 17 additions & 0 deletions modules/cachedb_redis/cachedb_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ static const param_export_t params[]={
{ "shutdown_on_error", INT_PARAM, &shutdown_on_error },
{ "cachedb_url", STR_PARAM|USE_FUNC_PARAM, (void *)&set_connection},
{ "use_tls", INT_PARAM, &use_tls},
{ "ftsearch_index_name", STR_PARAM, &fts_index_name.s},
{ "ftsearch_json_prefix", STR_PARAM, &fts_json_prefix.s},
{ "ftsearch_max_results", INT_PARAM, &fts_max_results},
{ "ftsearch_json_mset_expire", INT_PARAM, &fts_json_mset_expire},
{0,0,0}
};

Expand Down Expand Up @@ -121,6 +125,16 @@ static int mod_init(void)
cachedb_engine cde;

LM_NOTICE("initializing module cachedb_redis ...\n");

/* quick validations */
if (fts_max_results > 10000) {
LM_INFO("lowering 'fts_max_results' to 10000 (max value allowed)\n");
fts_max_results = 10000;
}

fts_index_name.len = strlen(fts_index_name.s);
fts_json_prefix.len = strlen(fts_json_prefix.s);

memset(&cde,0,sizeof(cachedb_engine));

cde.name = cache_mod_name;
Expand All @@ -131,8 +145,11 @@ static int mod_init(void)
cde.cdb_func.get_counter = redis_get_counter;
cde.cdb_func.set = redis_set;
cde.cdb_func.remove = redis_remove;
cde.cdb_func._remove = _redis_remove;
cde.cdb_func.add = redis_add;
cde.cdb_func.sub = redis_sub;
cde.cdb_func.query = redis_query;
cde.cdb_func.update = redis_update;
cde.cdb_func.raw_query = redis_raw_query;
cde.cdb_func.map_get = redis_map_get;
cde.cdb_func.map_set = redis_map_set;
Expand Down
Loading

0 comments on commit 1977206

Please sign in to comment.