Skip to content

Commit

Permalink
Merge pull request #676 from ngs-lang/feature/shell-poll
Browse files Browse the repository at this point in the history
Feature/shell poll
  • Loading branch information
ilyash-b authored Nov 2, 2024
2 parents e234f14 + b20aa3c commit d5cf719
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Fix `after_last(Str, Str)`
* `encode_json()` - now supports `{'pretty': 'best-effort'}` hint
* Various build improvements
* Fix edge case in `del(Hash, Any)`

### Deprecated
* Deprecated `Deep` in favor of `AtPath`
Expand Down
107 changes: 81 additions & 26 deletions lib/autoload/globals/IO.ngs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ ns {
debug('server', "JsonRpcServer Read")

F send_error(code, message, data=null) {
error("JsonRpcServer on_data send_error -- ${code} ${message} ${data}")
error("JsonRpcServer on_data (or spawned thread) send_error -- ${code} ${message} ${data}")
ret = {
'jsonrpc': '2.0'
'id': req.get('id')
Expand All @@ -83,19 +83,25 @@ ns {
}.filterv()
}
hc.fire(events::Write(ret.encode_json() + "\n"))
b.return()
}

req = {} # allow send_error() to do req.get('id') if we crash in try
req = try {
data.decode_json()
} catch(e:JsonDecodeFail) {
send_error(-32700, 'Parse error', e.Hash().filterk(AnyOf('error', 'value', 'position')))
b.return()
}

if (req !~ _VALID_JSON_RPC_REQUEST) send_error(-32600, 'Invalid Request', 'See https://www.jsonrpc.org/specification')
if (req !~ _VALID_JSON_RPC_REQUEST) {
send_error(-32600, 'Invalid Request', 'See https://www.jsonrpc.org/specification')
b.return()
}

if (req.method not in jrs.methods) send_error(-32601, 'Method not found', "Method ${req.method} not found. Available methods: ${jrs.methods.keys()}")
if (req.method not in jrs.methods) {
send_error(-32601, 'Method not found', "Method ${req.method} not found. Available methods: ${jrs.methods.keys()}")
b.return()
}

args = []
kwargs = {}
Expand All @@ -106,26 +112,34 @@ ns {
}
}

debug('server', "JsonRpcServer on_data -- invoking ${req.method}")
result = try {
jrs.methods[req.method](*args, **kwargs)
} catch(mnf:MethodNotFound) {
guard mnf.callable === jrs.methods[req.method]
send_error(-32602, 'Invalid params', "Correct parameters can be seen in methods' descriptions: ${jrs.methods[req.method].Arr()}")
} catch(e:Error) {
send_error(-32603, 'Internal error', e.Str())
}

if 'id' in req {
# response expected
ret = {
'jsonrpc': '2.0'
'id': req.id
'result': result
}.filterv() # Why filterv()?
debug('server', 'JsonRpcServer on_data -- sending reply')
hc.fire(events::Write(ret.encode_json() + "\n"))
}
debug('server', "JsonRpcServer on_data -- invoking ${req.method} in a thread")

# TODO: join this thread
Thread(SeqId("JsonRpcServer-request-"), {
block b {
result = try {
jrs.methods[req.method](*args, **kwargs)
} catch(mnf:MethodNotFound) {
guard mnf.callable === jrs.methods[req.method]
send_error(-32602, 'Invalid params', "Correct parameters can be seen in methods' descriptions: ${jrs.methods[req.method].Arr()}")
b.return()
} catch(e:Error) {
send_error(-32603, 'Internal error', e.Str())
b.return()
}

if 'id' in req {
# response expected
ret = {
'jsonrpc': '2.0'
'id': req.id
'result': result
}.filterv() # Why filterv()?
debug('server', 'JsonRpcServer on_data -- sending reply')
hc.fire(events::Write(ret.encode_json() + "\n"))
}
}
})

} # handle Read

Expand All @@ -139,17 +153,24 @@ ns {
F init(jrc:JsonRpcClient) {
jrc.callbacks = {}
jrc.id = Iter(1..null)
jrc.pipeline = null
debug('client', 'JsonRpcClient()')
}

F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, a:events::Active) {
debug('client', 'JsonRpcClient Active')
jrc.pipeline = hc.pipeline
hc.fire(a)
}

F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, r:events::Read) block b {
data = r.data
debug('client', "JsonRpcClient Read")
}

F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, w:events::Write) block b {
data = w.data
debug('client', "JsonRpcClient Write")
debug('client', "JsonRpcClient Write ${w.data.SafeStr()}")
id = w.data.get('id', { jrc.id.next() })
w2 = events::Write({
'jsonrpc': '2.0'
Expand All @@ -166,7 +187,7 @@ ns {
# TODO: make this more resilient
F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, r:events::Read) block b {
data = r.data
debug('client', "JsonRpcClient Read ${data}")
debug('client', "JsonRpcClient Read ${data.SafeStr()}")

res = try {
data.decode_json()
Expand All @@ -184,6 +205,40 @@ ns {
)
}

# Is this the right way to do sync and async?
# TODO: what async should return?
global call
F call(jrc:JsonRpcClient, method:Str, params=[], callback:Fun=null) {
sync = not(callback)

lock = Lock()
result = null
if sync {
lock.acquire()
callback = F json_rpc_client_sync_callback(data) {
# Runs in the reader thread
debug('client', 'JsonRpcClient callback')
result = data
lock.release()
}
}

assert(jrc.pipeline, 'Before using JsonRpcClient#call, call fire(COR::Pipeline, IO::events::Active()) on the pipeline where JsonRpcClient were pushed')
jrc.pipeline.fire(events::Write({
'call': {
'method': method
'params': params
}
'callback': callback
}))

not(sync) returns

debug('client', 'JsonRpcClient blocked on waiting for result')
lock.acquire().release()
debug('client', 'JsonRpcClient got result')
result
}
}


Expand Down
15 changes: 15 additions & 0 deletions lib/autoload/globals/SeqId.ngs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ns {
# TODO: more fine grained locking or lock-less mechanism
l = Lock()
ids = {}

global SeqId
doc %STATUS - Experimental
F SeqId(s:Str) {
i = l.acquire({
ids.dflt(s, 0)
ids[s] += 1
})
"${s}${i}"
}
}
62 changes: 59 additions & 3 deletions lib/autoload/globals/Timeline.ngs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,61 @@ ns {

# WIP - timeline, mainly for UI

global Timeline, Time, Lines, init, each, map, sort, push, echo_cli
global Time, Lines, init, each, map, sort, push, echo_cli

section "Item" {

doc time - Time
type Item

F init(ti:Item) throw InvalidArgument("Item() was called without arguments")

F init(ti:Item, time:Time=null) {
ti.id = "ti-${`line: uuidgen`}" # temporary
ti.time = time
}

F Time(ti:Item) ti.time
}

section "GroupItem" {

type GroupItem(Item)

F init(ti:GroupItem) {
super(ti)
ti.items = []
}

F push(gti:GroupItem, ti:Item) gti::{
if not(gti.items) {
gti.time = ti.time
}
gti.items.push(ti)
}
}

section "ResultItem" {

type ResultItem(Item)

F init(ti:ResultItem, time:Time, result) {
super(ti, time)
ti.result = result
}
}

section "TextualCommandItem" {

type TextualCommandItem(Item)

F init(ti:TextualCommandItem) throw InvalidArgument("TextualCommandItem() was called without arguments")

F init(ti:TextualCommandItem, time:Time, command:Str) {
super(ti, time)
ti.command = command
}
}

section "Timeline" {

Expand All @@ -16,6 +70,7 @@ ns {

F init(t:Timeline) {
t.id = "tl-${`line: uuidgen`}" # temporary
t.version = 1 # TODO: make sure it's thread safe
t.name = '(unnamed)'
t.status = 'TODO'
t.error = null
Expand All @@ -32,11 +87,12 @@ ns {
t.sort()
}

F push(t:Timeline, x:Timeline) t::{
A.items.push(x)
F push(t:Timeline, ti:Item) t::{
A.items.push(ti)
A.sort() # TODO: insert in the correct place, keeping .items sorted by time
# TODO: maximize time_last_update and time_end into t
# TODO: minimize time_start into t
t.version += 1
}

F each(t:Timeline, cb:Fun) t::{A.items.each(cb)}
Expand Down
37 changes: 27 additions & 10 deletions lib/autoload/globals/net.ngs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ns {

global handle
F handle(cpd:ConnectionPipelineDelegate, e:IO::events::Write) {
debug('server', "ConnectionPipelineDelegate Write")
debug('server', "ConnectionPipelineDelegate Write ${e.data.SafeStr()}")
cpd.connection.send(e.data)
}
F handle(cpd:ConnectionPipelineDelegate, e:IO::events::Active) {
Expand Down Expand Up @@ -155,7 +155,7 @@ ns {
debug('server', 'ThreadedServerDelegate on_connect()')

# TODO: join the thread / use Executor
Thread("connection-${c.id}", {
Thread("ThreadedServerDelegate-Connection-${c.id}", {
try {
debug('server', 'ThreadedServerDelegate on_listen() - new thread')
c.pipeline.fire(IO::events::Active())
Expand All @@ -173,7 +173,7 @@ ns {
}
} catch(e) {
debug('server', "ThreadedServerDelegate before logging")
log("ThreadedServerDelegate - exception in thread: ${e}")
log("ThreadedServerDelegate - exception in thread: ${Str(e) tor '<Unable to convert exception to string>'}")
debug('server', "ThreadedServerDelegate after logging")
guard false
}
Expand Down Expand Up @@ -254,20 +254,37 @@ ns {
}

section "UnixStreamClient" {

block _ {
id = Iter(1..null)
global init
F init(usc:UnixStreamClient) {
super(usc)
usc.id = id.next()
}
}

F connect(usc:UnixStreamClient, path:Str, cd:ClientDelegate) {
debug('client', 'UnixStreamClient#connect')
sock = socket(C_PF_UNIX, C_SOCK_STREAM)
usc.sock = sock
connect(sock, c_sockaddr_un(path))
c = Connection(usc, sock)
debug('client', 'UnixStreamClient will call on_connect')
cd.on_connect(c)
while true {
data = recvfrom(sock, 1024, 0, c_sockaddr_un())
if data == '' {
cd.on_remote_close()
break
usc.reader = Thread("UnixStreamClient-${usc.id}-receiver", {
while true {
debug('client', 'UnixStreamClient will recvfrom()')
data = recvfrom(sock, 1024, 0, c_sockaddr_un())
debug('client', "UnixStreamClient did recvfrom() ${data.SafeStr()}")
if data == '' {
cd.on_remote_close()
break
}
c.pipeline.fire(IO::events::Read(data))
}
c.pipeline.fire(IO::events::Read(data))
}
})
usc
}
}
}
Expand Down
Loading

0 comments on commit d5cf719

Please sign in to comment.