Skip to content

Commit

Permalink
support tsdb_iter
Browse files Browse the repository at this point in the history
  • Loading branch information
pikasTech committed Jul 2, 2024
1 parent 00fabf7 commit a23a7a4
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 41 deletions.
18 changes: 17 additions & 1 deletion examples/flashdb/flashdb_tsdb1.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
import flashdb
import struct
import time
import os
DB_PATH = "test/out/fdb_tsdb"


tsdb = flashdb.TSDB("env", DB_PATH, max_len=512)

for i in range(100):
blob_i = struct.pack('i', i)
time.sleep(0.001)
ret = tsdb.append(blob_i)
ret = tsdb.tsl_append(blob_i)

assert ret == 0


def callback(tsl, user_data) -> int:
# print(tsl.get_time(), tsl.to_blob())
t = tsl.get_time()
blob_i = tsl.to_blob()
i = struct.unpack('i', blob_i)[0]
print(t, i, user_data)
return False # False: continue, True: stop


assert tsdb.tsl_iter(callback, 'user_data') == 0

print('PASS')
6 changes: 5 additions & 1 deletion package/flashdb/_flashdb.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from PikaObj import *


class kvdb_t:
...

Expand Down Expand Up @@ -37,7 +40,8 @@ class TSDB:

def __del__(self): ...

def append(self, blob: any) -> int: ...
def tsl_append(self, blob: any) -> int: ...
def tsl_iter(self, callback: any, user_data: any) -> int: ...


class TSL:
Expand Down
8 changes: 4 additions & 4 deletions package/flashdb/_flashdb_KVDB.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void _flashdb_KVDB_deinit(PikaObj* self) {
fdb_kvdb_deinit(_OBJ2KVDB(self));
}

struct _flashdb_foreach_context {
struct _kvdb_foreach_context {
struct fdb_default_kv_node* def_kv_table;
PikaObj* self;
};
Expand All @@ -163,8 +163,8 @@ int32_t _flashdb_foreach(PikaObj* self_dict,
void* context) {
char* key = arg_getStr(keyEach);
ArgType argt_val = arg_getType(valEach);
struct _flashdb_foreach_context* foreach_context =
(struct _flashdb_foreach_context*)context;
struct _kvdb_foreach_context* foreach_context =
(struct _kvdb_foreach_context*)context;
struct fdb_default_kv_node* def_kv_table = foreach_context->def_kv_table;
PikaObj* self = foreach_context->self;

Expand Down Expand Up @@ -234,7 +234,7 @@ void _flashdb_KVDB___init__(PikaObj* self,
_FDBBUFFS, (4 * sizeof(struct fdb_default_kv_node)));
g_def_kv_table_idx = 0;

struct _flashdb_foreach_context context = {
struct _kvdb_foreach_context context = {
.def_kv_table = def_kv_table,
.self = self,
};
Expand Down
90 changes: 80 additions & 10 deletions package/flashdb/_flashdb_TSDB.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Arg* _flashdb_TSDB_kv_get(PikaObj *self, Arg* tsdb, char* key){
}
*/

int _flashdb_TSDB_append(PikaObj* self, Arg* blob_in) {
int _flashdb_TSDB_tsl_append(PikaObj* self, Arg* blob_in) {
fdb_err_t res = FDB_NO_ERR;
FDB_TSDB* tsdb = _OBJ2TSDB(self);

Expand All @@ -94,10 +94,6 @@ int _flashdb_TSDB_set_default(PikaObj* self, Arg* tsdb) {
return 0;
}

Arg* _flashdb_TSDB_to_blob(PikaObj* self, Arg* kv, Arg* blob) {
return NULL;
}

int _flashdb_TSDB_control(PikaObj* self, int cmd, Arg* arg) {
return -1;
}
Expand All @@ -106,7 +102,7 @@ void _flashdb_TSDB_deinit(PikaObj* self) {
fdb_tsdb_deinit(_OBJ2TSDB(self));
}

struct _flashdb_foreach_context {
struct _kvdb_foreach_context {
struct fdb_default_kv_node* def_kv_table;
PikaObj* self;
};
Expand Down Expand Up @@ -173,14 +169,88 @@ void _flashdb_TSDB_CTRL___init__(PikaObj* self) {
obj_setInt(self, "SET_NOT_FORMAT", FDB_TSDB_CTRL_SET_NOT_FORMAT);
}

#define _OBJ2TSL(self) obj_getPtr(self, "TSL")
#define _OBJSETTSL(self, tsl) obj_setPtr(self, "TSL", tsl)

int64_t _flashdb_TSL_get_time(PikaObj* self) {
//! TODO
return -1;
fdb_tsl_t tsl = _OBJ2TSL(self);
if (NULL == tsl) {
return -1;
}
return tsl->time;
}

fdb_blob_t blob_alloc(fdb_blob_t blob) {
uint8_t* buf = (uint8_t*)pikaMalloc(blob->saved.len + 1);
if (!buf) {
pika_platform_printf("alloc fail\n");
return NULL;
}
blob->buf = buf;
blob->size = blob->saved.len;
return blob;
}

int fdb_blob_free(fdb_blob_t blob) {
if (blob) {
pikaFree(blob->buf, blob->size + 1);
blob->buf = NULL;
blob->size = 0;
}
return 0;
}

Arg* _flashdb_TSL_to_blob(PikaObj* self) {
//! TODO
return NULL;
fdb_tsl_t tsl = _OBJ2TSL(self);
fdb_tsdb_t tsdb = obj_getPtr(self, "tsdb");
if (NULL == tsl) {
return NULL;
}
struct fdb_blob blob;
fdb_tsl_to_blob(tsl, &blob);
if (NULL == blob_alloc(&blob)) {
return NULL;
}
fdb_blob_read((fdb_db_t)tsdb, &blob);
Arg* res = arg_newBytes((uint8_t*)blob.buf, blob.size);
fdb_blob_free(&blob);
return res;
}

typedef struct _tsdb_foreach_context {
Arg* callback;
Arg* user_data;
fdb_tsdb_t tsdb;
} tsdb_foreach_context;

PikaObj* New__flashdb_TSDB(Args* args);
pika_bool _flashdb_TSL_iter_callback(fdb_tsl_t tsl, void* arg) {
tsdb_foreach_context* context = (tsdb_foreach_context*)arg;
Arg* callback = context->callback;
Arg* user_data = context->user_data;
PikaObj* tsl_obj = newNormalObj(New__flashdb_TSL);
_OBJSETTSL(tsl_obj, tsl);
obj_setPtr(tsl_obj, "tsdb", context->tsdb);
Arg* ret = pika_runFunction2(arg_copy(callback), arg_newObj(tsl_obj),
arg_copy(user_data));

if (NULL == ret) {
return pika_true;
}
pika_bool res = arg_getBool(ret);
arg_deinit(ret);
return res;
}

int _flashdb_TSDB_tsl_iter(PikaObj* self, Arg* callback, Arg* user_data) {
fdb_tsdb_t tsdb = _OBJ2TSDB(self);
tsdb_foreach_context context = {
.callback = callback,
.user_data = user_data,
.tsdb = tsdb,
};
fdb_tsl_iter(tsdb, _flashdb_TSL_iter_callback, &context);
return 0;
}

#undef strudp
4 changes: 4 additions & 0 deletions package/flashdb/flashdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ class TSDB(_flashdb.TSDB):
def __init__(self, name: str, path: str, max_len: int = 1024,
user_data=None):
super().__init__(name, path, max_len, user_data)


class TSL(_flashdb.TSL):
pass
6 changes: 5 additions & 1 deletion port/linux/package/pikascript/_flashdb.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from PikaObj import *


class kvdb_t:
...

Expand Down Expand Up @@ -37,7 +40,8 @@ class TSDB:

def __del__(self): ...

def append(self, blob: any) -> int: ...
def tsl_append(self, blob: any) -> int: ...
def tsl_iter(self, callback: any, user_data: any) -> int: ...


class TSL:
Expand Down
4 changes: 4 additions & 0 deletions port/linux/package/pikascript/flashdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ class TSDB(_flashdb.TSDB):
def __init__(self, name: str, path: str, max_len: int = 1024,
user_data=None):
super().__init__(name, path, max_len, user_data)


class TSL(_flashdb.TSL):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void _flashdb_KVDB_deinit(PikaObj* self) {
fdb_kvdb_deinit(_OBJ2KVDB(self));
}

struct _flashdb_foreach_context {
struct _kvdb_foreach_context {
struct fdb_default_kv_node* def_kv_table;
PikaObj* self;
};
Expand All @@ -163,8 +163,8 @@ int32_t _flashdb_foreach(PikaObj* self_dict,
void* context) {
char* key = arg_getStr(keyEach);
ArgType argt_val = arg_getType(valEach);
struct _flashdb_foreach_context* foreach_context =
(struct _flashdb_foreach_context*)context;
struct _kvdb_foreach_context* foreach_context =
(struct _kvdb_foreach_context*)context;
struct fdb_default_kv_node* def_kv_table = foreach_context->def_kv_table;
PikaObj* self = foreach_context->self;

Expand Down Expand Up @@ -234,7 +234,7 @@ void _flashdb_KVDB___init__(PikaObj* self,
_FDBBUFFS, (4 * sizeof(struct fdb_default_kv_node)));
g_def_kv_table_idx = 0;

struct _flashdb_foreach_context context = {
struct _kvdb_foreach_context context = {
.def_kv_table = def_kv_table,
.self = self,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Arg* _flashdb_TSDB_kv_get(PikaObj *self, Arg* tsdb, char* key){
}
*/

int _flashdb_TSDB_append(PikaObj* self, Arg* blob_in) {
int _flashdb_TSDB_tsl_append(PikaObj* self, Arg* blob_in) {
fdb_err_t res = FDB_NO_ERR;
FDB_TSDB* tsdb = _OBJ2TSDB(self);

Expand All @@ -94,10 +94,6 @@ int _flashdb_TSDB_set_default(PikaObj* self, Arg* tsdb) {
return 0;
}

Arg* _flashdb_TSDB_to_blob(PikaObj* self, Arg* kv, Arg* blob) {
return NULL;
}

int _flashdb_TSDB_control(PikaObj* self, int cmd, Arg* arg) {
return -1;
}
Expand All @@ -106,7 +102,7 @@ void _flashdb_TSDB_deinit(PikaObj* self) {
fdb_tsdb_deinit(_OBJ2TSDB(self));
}

struct _flashdb_foreach_context {
struct _kvdb_foreach_context {
struct fdb_default_kv_node* def_kv_table;
PikaObj* self;
};
Expand Down Expand Up @@ -173,14 +169,88 @@ void _flashdb_TSDB_CTRL___init__(PikaObj* self) {
obj_setInt(self, "SET_NOT_FORMAT", FDB_TSDB_CTRL_SET_NOT_FORMAT);
}

#define _OBJ2TSL(self) obj_getPtr(self, "TSL")
#define _OBJSETTSL(self, tsl) obj_setPtr(self, "TSL", tsl)

int64_t _flashdb_TSL_get_time(PikaObj* self) {
//! TODO
return -1;
fdb_tsl_t tsl = _OBJ2TSL(self);
if (NULL == tsl) {
return -1;
}
return tsl->time;
}

fdb_blob_t blob_alloc(fdb_blob_t blob) {
uint8_t* buf = (uint8_t*)pikaMalloc(blob->saved.len + 1);
if (!buf) {
pika_platform_printf("alloc fail\n");
return NULL;
}
blob->buf = buf;
blob->size = blob->saved.len;
return blob;
}

int fdb_blob_free(fdb_blob_t blob) {
if (blob) {
pikaFree(blob->buf, blob->size + 1);
blob->buf = NULL;
blob->size = 0;
}
return 0;
}

Arg* _flashdb_TSL_to_blob(PikaObj* self) {
//! TODO
return NULL;
fdb_tsl_t tsl = _OBJ2TSL(self);
fdb_tsdb_t tsdb = obj_getPtr(self, "tsdb");
if (NULL == tsl) {
return NULL;
}
struct fdb_blob blob;
fdb_tsl_to_blob(tsl, &blob);
if (NULL == blob_alloc(&blob)) {
return NULL;
}
fdb_blob_read((fdb_db_t)tsdb, &blob);
Arg* res = arg_newBytes((uint8_t*)blob.buf, blob.size);
fdb_blob_free(&blob);
return res;
}

typedef struct _tsdb_foreach_context {
Arg* callback;
Arg* user_data;
fdb_tsdb_t tsdb;
} tsdb_foreach_context;

PikaObj* New__flashdb_TSDB(Args* args);
pika_bool _flashdb_TSL_iter_callback(fdb_tsl_t tsl, void* arg) {
tsdb_foreach_context* context = (tsdb_foreach_context*)arg;
Arg* callback = context->callback;
Arg* user_data = context->user_data;
PikaObj* tsl_obj = newNormalObj(New__flashdb_TSL);
_OBJSETTSL(tsl_obj, tsl);
obj_setPtr(tsl_obj, "tsdb", context->tsdb);
Arg* ret = pika_runFunction2(arg_copy(callback), arg_newObj(tsl_obj),
arg_copy(user_data));

if (NULL == ret) {
return pika_true;
}
pika_bool res = arg_getBool(ret);
arg_deinit(ret);
return res;
}

int _flashdb_TSDB_tsl_iter(PikaObj* self, Arg* callback, Arg* user_data) {
fdb_tsdb_t tsdb = _OBJ2TSDB(self);
tsdb_foreach_context context = {
.callback = callback,
.user_data = user_data,
.tsdb = tsdb,
};
fdb_tsl_iter(tsdb, _flashdb_TSL_iter_callback, &context);
return 0;
}

#undef strudp
Loading

0 comments on commit a23a7a4

Please sign in to comment.