Skip to content

Commit

Permalink
test: start node
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Apr 24, 2024
1 parent fdd535c commit 8d32e3e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 92 deletions.
3 changes: 3 additions & 0 deletions library/libwaku.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ extern "C" {

typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData);

// Initializes the library. Should be called before any other function
void waku_setup();

// Creates a new instance of the waku node.
// Sets up the waku node from the given configuration.
// Returns a pointer to the Context needed by the rest of the API functions.
Expand Down
98 changes: 53 additions & 45 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
{.passl: "-Wl,-soname,libwaku.so"}

if defined(linux):
{.passl: "-Wl,-soname,libwaku.so"}

import std/[json, sequtils, atomics, times, strformat, options, atomics, strutils, os]
import chronicles, chronos
Expand Down Expand Up @@ -40,6 +42,17 @@ const RET_MISSING_CALLBACK: cint = 2
################################################################################
### Not-exported components


template foreignThreadGc(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()

body

when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()


proc relayEventCallback(ctx: ptr Context): WakuRelayHandler =
return proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
Expand All @@ -55,14 +68,16 @@ proc relayEventCallback(ctx: ptr Context): WakuRelayHandler =

try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
foreignThreadGc:
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
foreignThreadGc:
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)

### End of not-exported components
################################################################################
Expand All @@ -77,38 +92,30 @@ proc NimMain() {.importc.}
# To control when the library has been initialized
var initialized: Atomic[bool]

proc initDispatcher() {.thread.} =
## Begins a never ending global dispatcher poll loop.
## Raises different exceptions depending on the platform.
while initialized.load:
poll()
if defined(android):
# Redirect chronicles to Android System logs
when compiles(defaultChroniclesStream.outputs[0].writer):
defaultChroniclesStream.outputs[0].writer =
proc (logLevel: LogLevel, msg: LogOutputStr) {.raises: [].} =
echo logLevel, msg


### End of library setup
################################################################################


################################################################################
### Exported procs
proc waku_hello() {.dynlib, exportc.} =
proc waku_setup() {.dynlib, exportc.} =
NimMain()
if not initialized.load:
initialized.store(true)

when declared(setupForeignThreadGc):
setupForeignThreadGc()

# TODO: ask Ivan what is nimGC_setStackBottom for
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)

var t: Thread[void]
createThread(t, initDispatcher)
sleep(500) # TODO: the dispatcher must be running before any async operation is executed. Is there a way to avoid this sleep?

proc waku_bye() {.dynlib, exportc.} =
tearDownForeignThreadGc()

proc waku_new(
configJson: cstring, callback: WakuCallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
Expand All @@ -119,24 +126,23 @@ proc waku_new(

## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = waku_thread.createWakuThread().valueOr:
let msg = "Error in createWakuThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
foreignThreadGc:
let msg = "Error in createWakuThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil

ctx.userData = userData
echo "======================= WAKU_CREATE_NODE!"
let sendReqRes = waku_thread.sendRequestToWakuThread(

waku_thread.sendRequestToWakuThread(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson),
)
# echo "======================= REQUEST SENT"
# if sendReqRes.isErr():
# let msg = $sendReqRes.error
# echo "======================= ERROR SENDING REQUEST"
# callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
# return nil
# echo "======================= SUCCESS SENDING REQUEST"
).isOkOr:
foreignThreadGc:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil

return ctx

proc waku_destroy(
Expand All @@ -146,9 +152,10 @@ proc waku_destroy(
return RET_MISSING_CALLBACK

waku_thread.stopWakuThread(ctx).isOkOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
foreignThreadGc:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

return RET_OK

Expand All @@ -160,12 +167,13 @@ proc waku_version(
if isNil(callback):
return RET_MISSING_CALLBACK

callback(
RET_OK,
cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)),
userData,
)
foreignThreadGc:
callback(
RET_OK,
cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)),
userData,
)

return RET_OK

Expand Down
82 changes: 35 additions & 47 deletions library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,28 @@ var running: Atomic[bool]
proc runWaku(ctx: ptr Context) {.async.} =
## This is the worker body. This runs the Waku node
## and attends library user requests (stop, connect_to, etc.)

var node: WakuNode
while running.load == true:
echo "::::::::: WAITING FOR SIGNAL"
await ctx.reqSignal.wait()
echo "::::::::: SIGNAL RECEIVED"

## Trying to get a request from the libwaku main thread
# var request: ptr InterThreadRequest
# let recvOk = ctx.reqChannel.tryRecv(request)
# if recvOk == true:
# let resultResponse = await InterThreadRequest.process(request, addr node)
# ## Converting a `Result` into a thread-safe transferable response type
# let threadSafeResp = InterThreadResponse.createShared(resultResponse)

# ## The error-handling is performed in the main thread
# discard ctx.respChannel.trySend(threadSafeResp)
# discard ctx.respSignal.fireSync()

try:
discard ctx.reqSignal.waitSync()

# Trying to get a request from the libwaku main thread
var request: ptr InterThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse = await InterThreadRequest.process(request, addr node)
## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)

## The error-handling is performed in the main thread
discard ctx.respChannel.trySend(threadSafeResp)
discard ctx.respSignal.fireSync()
except:
echo "ERROR!!!"

proc run(ctx: ptr Context) {.thread.} =
## Launch waku worker
asyncSpawn runWaku(ctx)
waitFor runWaku(ctx)

proc createWakuThread*(): Result[ptr Context, string] =
## This proc is called from the main thread and it creates
Expand Down Expand Up @@ -94,39 +93,28 @@ proc stopWakuThread*(ctx: ptr Context): Result[void, string] =
proc sendRequestToWakuThread*(
ctx: ptr Context, reqType: RequestType, reqContent: pointer
): Result[string, string] =

let req = InterThreadRequest.createShared(reqType, reqContent)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
return err("Couldn't send a request to the waku thread: " & $req[])

let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
echo "::::: COULD NOT SEND SIGNAL"
else:
echo "::::: SIGNAL SENT", fireSyncRes.get()




# let req = InterThreadRequest.createShared(reqType, reqContent)
# ## Sending the request
# let sentOk = ctx.reqChannel.trySend(req)
# if not sentOk:
# return err("Couldn't send a request to the waku thread: " & $req[])

# let fireSyncRes = ctx.reqSignal.fireSync()
# if fireSyncRes.isErr():
# return err("failed fireSync: " & $fireSyncRes.error)
return err("failed fireSync: " & $fireSyncRes.error)

#if fireSyncRes.get() == false:
# return err("Couldn't fireSync in time")
if fireSyncRes.get() == false:
return err("Couldn't fireSync in time")

## Waiting for the response
#let res = waitSync(ctx.respSignal)
#if res.isErr():
# return err("Couldnt receive response signal")
# Waiting for the response
let res = waitSync(ctx.respSignal)
if res.isErr():
return err("Couldnt receive response signal")

# var response: ptr InterThreadResponse
# var recvOk = ctx.respChannel.tryRecv(response)
# if recvOk == false:
# return err("Couldn't receive response from the waku thread: " & $req[])
var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response)
if recvOk == false:
return err("Couldn't receive response from the waku thread: " & $req[])

# ## Converting the thread-safe response into a managed/CG'ed `Result`
# return InterThreadResponse.process(response)
## Converting the thread-safe response into a managed/CG'ed `Result`
return InterThreadResponse.process(response)

0 comments on commit 8d32e3e

Please sign in to comment.