-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement basic event loop for win32 #9957
Implement basic event loop for win32 #9957
Conversation
Couple of questions regarding scope and some implementation notes: Scope and remaining workI just want to make sure that we're on the same page as far as what is included in the scope of this PR. Based on my understanding here's some of the remaining work we'd need in subsequent PRs to really take advantage of this:
Implementation
Is there anything else I'm missing? Feedback is definitely welcome as this is a work in progress :) Edit: Grammar and clarify stuff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File will have to be reworked to use FILE_FLAG_OVERLAPPED as well as CreateFileW, ReadFile,WriteFile
It, uh, might be prudent to leave asynchronous file io some later point and use synchronous for now. Making these asynchronous have a distinct risk of exploding in scope.
if LibC.GetQueuedCompletionStatusEx(Thread.current.iocp, io_entry, 1, out removed, sleepy_time, false) | ||
if removed == 1 && io_entry.first.lpOverlapped | ||
next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If removed is 0, this means that the sleepy_time passed without hitting any completed events, right? If so, if I read the code further down the min_value above is enqueued in the scheduler at line 27. But does anything actually cancel the completion of the event or keep track that it has timed out? Will it continue and eventually be completed? Because if it will then you will have a strange situation potentially enqueuing a fiber in unknown status unless I'm totally confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If removed is 0, this means that the sleepy_time passed without hitting any completed events, right?
Not exactly. If we make the GetQueuedCompletionStatusEx call, it times out, returns true, but removed is 0, then that means a sleep event completed. The idea was to treat sleep events the same as real i/o and use GetQueuedCompletionStatusEx's timeout as the mechanism for completing sleep events.
If so, if I read the code further down the min_value above is enqueued in the scheduler at line 27. But does anything actually cancel the completion of the event or keep track that it has timed out?
Yes, we then signal completion by removing the event from the event loop queue and telling the scheduler to switch to the event's associated fiber.
Will it continue and eventually be completed? Because if it will then you will have a strange situation potentially enqueuing a fiber in unknown status unless I'm totally confused.
As another example, let's say we're in the middle of a 5 second sleep event blocking on the GetQueuedCompletionStatusEx call and some real i/o (like a socket or file read) comes in. In this case, GetQueuedCompletionStatusEx will return as soon as possible (likely before our 5 second sleep is up) with the information about the real i/o. The removed
arg will be 1 and we'll have a pointer to lpOverlapped
which contains the crystal event/fiber that started the real i/o. Since the 5 second sleep was interrupted, we can safely put aside handling the sleep event and instead set the real i/o as the next_event
, dequeue it from the event loop, and tell the scheduler to run that associated fiber.
The sleep event that was put aside will get handled on the next iteration after we calculate time_elapsed
(which will likely be a hair over 5 seconds), at which point we completely skip the GetQueuedCompletionStatusEx call and instead enqueue the fiber associated with the sleep event. Hopefully that example adds more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use GetQueuedCompletionStatusEx's timeout as the mechanism for completing sleep events
I see. I guess I got confused by having spent quite a lot of time staring at a different completion mechanism on linux (io_uring) that do support arbitrary wait events directly.
Ok, I don't know enough about windows to say if that or using CreateTimerQueueTimer
or something similar would make most sense. But it makes a lot more sense than my initial impression at least. :)
How would event timeouts work? I suppose there is some way of putting that information in the entry on submission, but how would the result be communicated and would there need to be some handling in here for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would event timeouts work? I suppose there is some way of putting that information in the entry on submission, but how would the result be communicated and would there need to be some handling in here for that?
I missed this. After GetQueuedCompletionStatusEx
is called and times out, it could be a sleep event completing or a I/O timeout, but I'm not handling that second case. There's an IO::Evented instance passed in to create read/write events with a read/write_timed_out property on it.
I will add some logic in the event loop to check if next_event
was a read/write event. If so, set that boolean accordingly and thereby communicate the result.
Great work!
I was thinking about the same thing. What happens if two (or more) events is completed and returned? You could use GetQueuedCompletionStatus which only returns one event. The alternative would be to enqueue multiple fibers on the scheduler. The latter potentially saves a few syscalls under high load. It seems like you could simply call
There is only one non-obvious thing I think it's worth thinking about early. We don't need it to get something working, but I think it can be hard to retrofit late in the process and should at least be considered. Since we're "lending" out buffers to the OS for an arbitrary time span we should consider a strategy to limit the number of buffers we can have outstanding at the same time by creating a high-water mark where we queue calls instead of submitting buffers to the OS. The reason is that we can drain system memory or cause heap fragmentation to the point of failure for the GC (it can even be used with malicious intent). See: http://www.serverframework.com/asynchronousevents/2011/06/tcp-flow-control-and-asynchronous-writes.html |
next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event) | ||
end | ||
else | ||
raise RuntimeError.from_winerror("Error getting i/o completion status") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@incognitorobito I think you need to check the error code here since a timeout will trow a RuntimeError in this case. See GetQueuedCompletionStatusEx docs on the timeout parameter
The number of milliseconds that the caller is willing to wait for a completion packet to appear at the completion port. If a completion packet does not appear within the specified time, the function times out and returns FALSE.
We should call GetLastError
and check if the error code is WAIT_TIMEOUT
(code 258). If that's the case you know it was either an event that timed out or a timer which has expired.
def initialize(crystal_event : Crystal::Event) | ||
@cEvent = crystal_event.unsafe_as(Pointer(Void)) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this struct should be moved to src/crystal/system/win32/event_loop_iocp.cr
. That's because this has the additional property cEvent
compared with WSAOVERLAPPED
in C.
WSAOVERLAPPED is the primary communication structure for async I/O on Windows.
I don't think so. The primary communication structure is OVERLAPPED
. WSAOVERLAPPED
is defined as #define WSAOVERLAPPED OVERLAPPED
in winsock2.h
and used only in Windows Socket API. OVERLAPPED
is used in all others.
In addition, OVERLAPPED
has been in fileapi.cr
already. So I think that the struct should be changed as follows:
require "c/fileapi"
@[Extern]
struct Overlapped
overlapped : LibC::OVERRLAPPED
property cEvent : Void*
def initialize(crystal_event : Crystal::Event)
@cEvent = crystal_event.unsafe_as(Pointer(Void))
end
end
dwMilliseconds : DWORD, | ||
fAlertable : BOOL | ||
) : BOOL | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the definitions in this iocp.cr
move to other places? That's because almost definitions in this directory are placed in the .cr
files corresponding to .h
files. For examples, CreateFileW
is in fileapi.cr
because it is declared in fileapi.h
. _wmkdir
is in dirent.cr
because it is declared dirent.h
. Well, I think this is just a custom and not documented. Not all definitions comply. For example OVERLAPPED
is in fileapi.cr
though it is declared in minwinbase.h
.
I think that it is better to move them as follows:
OVERLAPPED_ENTRY
(along with OVERLAPPED
?) -> minwinbase.cr
CreateIoCompletionPort
, GetQueuedCompletionStatus
, GetQueuedCompletionStatusEx
-> ioapiset.cr
property cEvent : Void* | ||
|
||
def initialize(crystal_event : Crystal::Event) | ||
@cEvent = crystal_event.unsafe_as(Pointer(Void)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will work only when the size of Crystal::Event
is less than or equal to 8.
Crystal::Event
is converted to Pointer(Void)
here and reverted by the following code in event_loop_iocp.cr
next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event)
I suspected that it reverted only the first 8 bytes. So I wrote the following code and ran it.
struct DummyEvent
property buf : UInt8[32] = StaticArray(UInt8, 32).new { |i| i.to_u8 }
end
crystal_event = DummyEvent.new
puts(crystal_event)
cEvent = crystal_event.unsafe_as(Pointer(Void))
puts(cEvent.unsafe_as(DummyEvent))
This printed:
DummyEvent(@buf=StaticArray[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30
, 31])
DummyEvent(@buf=StaticArray[0, 1, 2, 3, 4, 5, 6, 7, 7, 226, 105, 253, 3, 2, 0, 0, 10, 251, 215, 22, 252, 0, 0, 0, 9, 116, 106, 253, 3, 2, 0,
0])
Only the first 8 bytes were reverted. The rest bytes seem uninitialized ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been a long time since I worked with pointers in Crystal, but it seems like this gives the correct result:
Store the pointer to Crystal::Event
:
def initialize(crystal_event : Crystal::Event)
@cEvent = pointerof(crystal_event)
...
Get the the pointed-to value back:
next_event = io_entry.first.lpOverlapped.value.cEvent.value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you do unsafe_as(Pointer(Void))
the value is interpreted as a pointer. That doesn't work because crystal_event
is not a pointer, but the actual value.
pointerof(crystal_event)
isn't reliable because it points at the stack which will be invalidated once the initialize
method returns. You either need to ensure that the stack pointer is valid during the lifetime of its use (not sure if that's the case here), or put it on the heap.
@incognitorobito Are you still up to continue with this? |
Sorry for the radio silence. I can't continue with this at the moment. I don't have the bandwidth I originally thought I would. |
Don't worry, there's no obligation on your part. I'm happy to take over from here. |
I appreciate it. And not necessarily, I'm more or less able to run the non-IO fiber examples straight from the docs.
This repo was my scratch space. |
I looked a bit more deeply into this and discovered that while the general idea is a good start, there are a couple of issues with the implementation.
It's pretty much impossible to work on the event loop implementation without supporting actual use cases. |
@straight-shoota Regarding 2: Can you expand on your reasoning on why it doesn't make sense? 3: I'd go further and claim that the whole of It might make sense to extract all event-loop specific implementations into platform specific files. One example of this is how I in
and the actual implementation (depending on if it is libevent or the completion based io_uring) is either https://github.com/yxhuvud/nested_scheduler/blob/main/src/nested_scheduler/libevent_context.cr#L19 or https://github.com/yxhuvud/nested_scheduler/blob/main/src/nested_scheduler/io_uring_context.cr#L49 (The latter doesn't yet implement timeouts etc, but the general idea should be clear). |
I already mentioned that IO handles are shared between fibers and threads. I'm not even sure how associating an IO handle with different completion ports across threads would work. And then we would need to keep track with which thread's completion port an IO handle has been associated. And then we would have to duplicate waiting for the completion status per thread. This seems all unnecessarily complicated. |
Hmm. Wait, completion ports require the file handles to be associated with them? Ok, fair enough then. That make things a lot more complicated for the multi handle variant. Then your stance make total sense :) It seems completion ports has basically the opposite behavior to io_uring which is simpler to use by creating a new one per thread as that avoids any need for synchronization.
Well, not really as it would have been fine if only one of the handles / threads would have waited on it. But if file handles are not interchangable like in linux then it doesn't matter.
If the file handles had been independent of the completion port there would have been less to synchronize as there is less shared data. I still recommend gathering event loop specific code in a separate file instead of sprinkling it into the different io classes. It is a lot easier to see what is going on if everything is in the same place and the places of use wouldn't have to have conditional macros or whatever (ew). |
Yes, as far as I understand it works like this (simplified pseudocode): # 1) event loop setup
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nil)
# 2) file handle setup
file_descriptor = CreateFile(filename)
CreateIoCompletionPort(file_descriptor, iocp)
# 3a) async op
overlapped = Overlapped.new
ReadFile(file_descriptor, buffer, overlapped)
sleep_fiber
# 3b) wait for completion status
GetQueuedCompletionStatus(iocp, out completed_overlapped)
resume_fiber Step 1) is done once, step 2) for every file descriptor. Step 3a) happens for every async operation on the file descriptor and 3b) in the event loop to let the fiber continue after the IO operation has completed. |
Closing this in favor of #12149. |
I added the the appropriate IOCP functions from the Win32 API and wrapped
Crystal::Event
in theWSAOVERLAPPED
struct as a void pointer. Then I put together a very simple algorithm for the event loop. This has got fibers working in an identical fashion to Crystal on Linux from my tests.