Skip to content

Commit

Permalink
reviewed most of the code
Browse files Browse the repository at this point in the history
  • Loading branch information
zah committed May 29, 2018
1 parent 6451088 commit b77e041
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 2 deletions.
48 changes: 46 additions & 2 deletions asyncdispatch2/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@ type
function*: CallbackFunc
udata*: pointer

# ZAH: This can probably be stored with a cheaper representation
# until the moment it needs to be printed to the screen (e.g. seq[StackTraceEntry])
StackTrace = string

FutureBase* = ref object of RootObj ## Untyped future.
callbacks: Deque[AsyncCallback]

finished: bool
error*: ref Exception ## Stored exception
errorStackTrace*: string
errorStackTrace*: StackTrace

This comment has been minimized.

Copy link
@arnetheduck

arnetheduck May 30, 2018

Member

isn't this stored in Exception??

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Default nim's exception has only traditional stacktrace, which for async procedure useless.

when not defined(release):
stackTrace: string ## For debugging purposes only.
stackTrace: StackTrace ## For debugging purposes only.
id: int
fromProc: string

# ZAH: we have discussed some possible optimizations where
# the future can be stored within the caller's stack frame.
# How much refactoring is needed to make this a regular non-ref type?
# Obviously, it will still be allocated on the heap when necessary.
Future*[T] = ref object of FutureBase ## Typed future.
value: T ## Stored value

Expand All @@ -42,6 +50,8 @@ type
when not defined(release):
var currentID = 0

# ZAH: This seems unnecessary. Isn't it easy to introduce a seperate
# module for the dispatcher type, so it can be directly referenced here?
var callSoonHolder {.threadvar.}: CallSoonProc

proc getCallSoonProc*(): CallSoonProc {.gcsafe.} =
Expand All @@ -65,6 +75,11 @@ template setupFutureBase(fromProc: string) =
result.fromProc = fromProc
currentID.inc()

## ZAH: As far as I undestand `fromProc` is just a debugging helper.
## It would be more efficient if it's represented as a simple statically
## known `char *` in the final program (so it needs to be a `cstring` in Nim).
## The public API can be defined as a template expecting a `static[string]`
## and converting this immediately to a `cstring`.
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
Expand Down Expand Up @@ -106,6 +121,7 @@ proc checkFinished[T](future: Future[T]) =
err.cause = future
raise err

# ZAH: I've seen this code in asyncloop
proc call(callbacks: var Deque[AsyncCallback]) =
var count = len(callbacks)
if count > 0:
Expand All @@ -115,13 +131,24 @@ proc call(callbacks: var Deque[AsyncCallback]) =
dec(count)

proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
# ZAH: perhaps this is the default behavior with latest Nim (no need for the `len` check)
if len(callbacks) == 0:
callbacks = initDeque[AsyncCallback]()
callbacks.addLast(item)

proc remove(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
if len(callbacks) > 0:
var count = len(callbacks)
# ZAH: This is not the most efficient way to implement this.
# When you discover an element suitalbe for removal, you can put the last
# element in its place and reduce the length. The problem is that the
# order of callbacks will be changed, which is unfortunate.
#
# Shifting the elements in-place will still be more efficient than the
# current copying due to the CPU cache (because otherwise we may end up
# touching memory that's residing on a different cache line).
#
# I recommend implementing this proper remove logic in the Deque type.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

optimized.

This comment has been minimized.

Copy link
@arnetheduck

arnetheduck May 30, 2018

Member

how many callbacks are expected to exist, and how often do they get removed?

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

@arnetheduck this was already fixed, you can check codebase, number of callbacks is depends on developer.

This comment has been minimized.

Copy link
@arnetheduck

arnetheduck May 30, 2018

Member

yeah, but curious about expected order-of-magnitude.. one per socket? millions? what's the working idea?

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

@arnetheduck, Future[T] callbacks is something different, from socket callbacks, Future[T] callbacks are used to get notified when Future[T] got completed. Socket events callbacks are not in Future[T]. Also there no need to have list of callbacks for socket.

while count > 0:
var p = callbacks.popFirst()
if p.function != item.function or p.udata != item.udata:
Expand Down Expand Up @@ -176,6 +203,7 @@ proc fail*[T](future: Future[T], error: ref Exception) =

proc clearCallbacks(future: FutureBase) =
if len(future.callbacks) > 0:
# ZAH: This could have been a single call to `setLen`
var count = len(future.callbacks)
while count > 0:
discard future.callbacks.popFirst()
Expand All @@ -187,6 +215,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## If future has already completed then ``cb`` will be called immediately.
assert cb != nil
if future.finished:
# ZAH: it seems that the Future needs to know its associated Dispatcher

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

because callSoon() is working with Dispatcher, and only Dispatcher is processing callSoon'ed callbacks, Yes.

callSoon(cb, udata)
else:
let acb = AsyncCallback(function: cb, udata: udata)
Expand Down Expand Up @@ -214,6 +243,7 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## If future has already completed then ``cb`` will be called immediately.
##
## It's recommended to use ``addCallback`` or ``then`` instead.
# ZAH: how about `setLen(1); callbacks[0] = cb`
future.clearCallbacks
future.addCallback(cb, udata)

Expand Down Expand Up @@ -358,19 +388,31 @@ proc asyncCheck*[T](future: Future[T]) =
##
## This should be used instead of ``discard`` to discard void futures.
assert(not future.isNil, "Future is nil")
# ZAH: This should probably add a callback instead of replacing all call-backs.
# Perhaps a new API can be introduced to avoid the breaking change.
future.callback = asyncCheckProxy[T]
# proc (udata: pointer) =
# if future.failed:
# injectStacktrace(future)
# raise future.error

proc spawn*[T](future: Future[T]) =
# ZAH: What is the purpose of this?
assert(not future.isNil, "Future is nil")
future.callback = spawnProxy[T]

# ZAH: The return type here could be a Future[(T, Y)]
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete.
# ZAH: The Rust implementation of futures is making the case that the
# `and` combinator can be implemented in a more efficient way without
# resorting to closures and callbacks. I haven't thought this through
# completely yet, but here is their write-up:
# http://aturon.github.io/2016/09/07/futures-design/
#
# We should investigate this further, before settling on the final design.
# The same reasoning applies to `or` and `all`.
var retFuture = newFuture[void]("asyncdispatch.`and`")
proc cb(data: pointer) =
if not retFuture.finished:
Expand Down Expand Up @@ -402,6 +444,8 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
fut2.callback = cb
return retFuture

# ZAH: The return type here could be a tuple
# This will enable waiting a heterogenous collection of futures.
proc all*[T](futs: varargs[Future[T]]): auto =
## Returns a future which will complete once
## all futures in ``futs`` complete.
Expand Down
13 changes: 13 additions & 0 deletions asyncdispatch2/asyncloop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ when defined(windows) or defined(nimdoc):
## (Unix) for the specified dispatcher.
return disp.ioPort

# ZAH: Shouldn't all of these procs be defined over the Dispatcher type?
# The "global" variants can be defined as templates passing the global dispatcher

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

I dont see any reasons why it should be defined over the Dispatcher type.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

Well, because they use the global dispatcher internally, preventing programs with multiple dispatchers. Admittedly, an edge case, but probably something that a general-purpose library should not impose on you.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Using multiple dispatchers in one thread?

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

Aha, it's a thread-local value. I didn't consider this. Given that the thread should be running a loop, this should be fine indeed.

proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()
Expand All @@ -263,6 +265,7 @@ when defined(windows) or defined(nimdoc):
var curTime = fastEpochTime()
var curTimeout = DWORD(0)

# ZAH: Please extract this code in a template

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

I wish not to extract this code into template, and leave code duplication here, because i can see behavior in more clear and logic way. This procedure is core of all async and hiding everything in sub-procedures or templates make it more complex to understand.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

This code duplication will only get worse once some of the other suggestions are implemented (e.g. using a priority queue), but I won't insist too much.

EDIT: Sorry, I wrote this before noticing the priority queue comment.

# Moving expired timers to `loop.callbacks` and calculate timeout
var count = len(loop.timers)
if count > 0:
Expand Down Expand Up @@ -308,6 +311,8 @@ when defined(windows) or defined(nimdoc):
if int32(errCode) != WAIT_TIMEOUT:
raiseOSError(errCode)

# ZAH: Please extract the code below in a template

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Answered above b77e041#r29173252


# Moving expired timers to `loop.callbacks`.
curTime = fastEpochTime()
count = len(loop.timers)
Expand All @@ -322,6 +327,8 @@ when defined(windows) or defined(nimdoc):
# poll() call.
count = len(loop.callbacks)
for i in 0..<count:
# ZAH: instead of calling `popFirst` here in a loop, why don't we
# call `setLen(0)` at the end after iterating over all callbacks?

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Inside of every callable.function(callable.udata), more callbacks can be added to loop.callbacks, so we can't iterate over all callbacks without introducing new problems nim-lang/Nim#7193.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

I've considered this, but there is still a different way to implement it. You can take note at the length prior to the loop and compare it with the length at the end. If they differ, execute the new callbacks again until the length doesn't change. Finally, call setLen(0).

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

New callbacks must not be executed, please read issue which represents what happens in such way. It is working as it must work, and i dont think you can optimize it more.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

Aha, I see, but then isn't it even easier? The Deque can have a contract proc that allows you to shrink it from the front with exactly count elements after the loop.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

So what the reason of this shrink? It transfers element from one memory address to another memory address? and i still need to iterate over it and call callSoon().

var callable = loop.callbacks.popFirst()
callable.function(callable.udata)

Expand Down Expand Up @@ -527,6 +534,7 @@ else:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}

# ZAH: Please extract this code in a template

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Answered above b77e041#r29173252.

# Moving expired timers to `loop.callbacks` and calculate timeout.
var count = len(loop.timers)
if count > 0:
Expand Down Expand Up @@ -570,6 +578,8 @@ else:
withData(loop.selector, fd, adata) do:
loop.callbacks.addLast(adata.reader)

# ZAH: Please extract the code below in a template

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Answered above b77e041#r29173252.


# Moving expired timers to `loop.callbacks`.
curTime = fastEpochTime()
count = len(loop.timers)
Expand All @@ -585,6 +595,8 @@ else:
# poll() call.
count = len(loop.callbacks)
for i in 0..<count:
# ZAH: instead of calling `popFirst` here in a loop, why don't we
# call `setLen(0)` at the end after iterating over all callbacks?

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Answered above b77e041#r29173324

var callable = loop.callbacks.popFirst()
callable.function(callable.udata)

Expand All @@ -597,6 +609,7 @@ proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =
let loop = getGlobalDispatcher()
var tcb = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
# ZAH: This should use a priority queue (e.g. a binary heap)

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Do i missing something and heapqueue is not a binary heap anymore?

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

Oh, I'm sorry. I saw loop.timers being defined as a Deque (perhaps nimsuggest mislead me or I just looked at the callbacks field below it).

loop.timers.push(tcb)

proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =
Expand Down
1 change: 1 addition & 0 deletions asyncdispatch2/transports/datagram.nim
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ else:
result.resumeRead()

proc close*(transp: DatagramTransport) =
## ZAH: This could use a destructor as well

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

explicit close is much more suitable, then usage of destructors, because you can put close where you need it, otherwise it will continue to waste your resources (receiving/writing) until destructor will be executed.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

Well, one is not incompatible with the other. You can have both close and a destructor. My remark is that we can drop the dependency on the GC.

## Closes and frees resources of transport ``transp``.
if ReadClosed notin transp.state and WriteClosed notin transp.state:
closeAsyncSocket(transp.fd)
Expand Down
12 changes: 12 additions & 0 deletions asyncdispatch2/transports/stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type
fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state
reader: Future[void] # Current reader Future
# ZAH: I'm not quite certain, but it seems to me that the intermediate
# buffer is not necessary. The receiving code needs to know how to grow
# the output buffer of the future attached to the read operation. If this
# is the case, the buffering can be replaced with direct writing to this
# output buffer. Furthermore, we'll be able to signal additional 'progress'
# events for the future to make the API more complete.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

This buffer allows to perform reading without addReader/read/removeReader, so it caches not only data, but reduces number of syscalls to receive this amount of data.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

Yes, but why can't we read directly into the output buffer of the associated Future? We'll just need to use some type-erasure trick to handle different output buffer types (the buffer resize operation in particular must be captured in a proc pointer).

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

because reading working as separate task, and performs reading on socket all the time until buffer is not full, even when there no pending transport's read operations.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

More reasons to keep it in heap:

  • its allocated only once, when connection established.
  • its passed around as pointer, so readStreamLoop() and writeStreamLoop() will know with what transport to work with.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

I'm entering crazy territory now, but it would be possible to play with the capacity/len of the Future's buffer to store elements past its requested end. You are also saying the the buffered reading will start before a read is explicitly requested by the client code, so no Future exists initially, but when the read is finally requested, we can move or swap the buffer here with the one in the future, thus avoiding the copy. Too bad strings and sequences are not the same type.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

@zah this is what utility functions do, they moving whole buffer or part of buffer according to procedure mission, e.g. (read until some characters will be found).

buffer: seq[byte] # Reading buffer
offset: int # Reading buffer offset
error: ref Exception # Current error
Expand Down Expand Up @@ -110,6 +116,7 @@ template checkPending(t: untyped) =
raise newException(TransportError, "Read operation already pending!")

template shiftBuffer(t, c: untyped) =
# ZAH: Nim is not C, you don't need to put () around template parameters

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

Maybe, but it still better looking with ( ) then without it.

if (t).offset > c:
moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c))
(t).offset = (t).offset - (c)
Expand Down Expand Up @@ -341,6 +348,10 @@ when defined(windows):
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("stream.socket.transport")
# ZAH: If these objects are going to be manually managed, why do we bother
# with using the GC at all? It's better to rely on a destructor. If someone
# wants to share a Transport reference, they can still create a GC-managed
# wrapping object.

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

We are using GC here because transp.future, transp.queue are from GC world. If this object will be allocated manually, then all references to GC objects must be protected with system.protect.

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

transp.queue is backed by a seq, so it won't be a GC object eventually. In another comment, I've suggested that perhaps we can do something about the futures as well (they don't need to be a ref type).

This comment has been minimized.

Copy link
@cheatfate

cheatfate May 30, 2018

Collaborator

@zah it looks like too many prerequisites required to remove this:

  1. Destructors got finally implemented.
  2. Asynchronous procedures allow var arguments.
  3. Future[T] become not GC.
  4. seq[T] become not GC.

After all of this will done i'm easily can remove this GC_ref(transp).

This comment has been minimized.

Copy link
@zah

zah May 30, 2018

Author Contributor

The descructors already work. They only have limitations and issues with temporary anonymous values. I think even defining C++-like smart pointer types for the GC-reliant fields will work correctly (i.e. wrapping objects that take care of the GC_ref/unref business). If you try to introduce such fields, it would be easy to set new names for them and use templates that deference them to preserve the current code working.

I'll have to verify these claims a bit though.

GC_ref(transp)
result = cast[StreamTransport](transp)

Expand Down Expand Up @@ -1060,6 +1071,7 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
while true:
if (ReadError in transp.state):
raise transp.getError()
# ZAH: Shouldn't this be {ReadEof, ReadClosed} * transp.state != {}
if (ReadEof in transp.state) or (ReadClosed in transp.state):
break

Expand Down

0 comments on commit b77e041

Please sign in to comment.