-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EventLoop: direct epoll/kqueue integration #14959
Closed
ysbaddaden
wants to merge
73
commits into
crystal-lang:master
from
ysbaddaden:feature/single-epoll-kqueue-loop
Closed
Changes from all commits
Commits
Show all changes
73 commits
Select commit
Hold shift + click to select a range
2c20b91
Fix: don't cancel timeout select action event twice
ysbaddaden 26bbb20
Add :evloop to Crystal.trace
ysbaddaden 558e50f
Epoll: initial attempt (doesn't compile)
ysbaddaden ec1f4e0
Fix: epoll_event is only packed on x86_64
ysbaddaden c9e4554
Fix: disable EPOLLEXCLUSIVE for now
ysbaddaden 3c1dc8a
Fix: close in MT environment
ysbaddaden 24c56ee
Fix: add optional Crystal::EventLoop#after_fork_before_exec (MT)
ysbaddaden 8c186dc
Fix: after_fork (no MT) or after_fork_before_exec (MT only)
ysbaddaden 4089895
fixup! Fix: add optional Crystal::EventLoop#after_fork_before_exec (MT)
ysbaddaden b7d6fec
Prefer eventfd over pipe (only one fd, smaller struct in kernel)
ysbaddaden 3def18e
Save pointer to Node instead of fd (skips searches after wait)
ysbaddaden 887f29c
fixup! Save pointer to Node instead of fd (skips searches after wait)
ysbaddaden 1457032
fixup! Prefer eventfd over pipe (only one fd, smaller struct in kernel)
ysbaddaden f2a9b07
Add Crystal::System::EventFD abstraction
ysbaddaden f6fb444
Use generic :system event type instead of :interrupt
ysbaddaden 5c048f8
fixup! Add Crystal::System::EventFD abstraction
ysbaddaden 7c33a38
Extract timers + cleanup + one timerfd per eventloop
ysbaddaden 4cbf6e5
Fix: also check that timers are empty (not only events)
ysbaddaden b835d00
Fix: missing mutex sync
ysbaddaden 687f9df
Extract Evented::Eventloop from Epoll::EventLoop
ysbaddaden 2c7f8a5
Extract #system_pipe from Crystal::System::FileDescriptor.pipe
ysbaddaden 197a2a2
Add Crystal::Kqueue::EventLoop (*BSD, Darwin)
ysbaddaden 2c6ebd9
Fix: explicit none/read/write registration for EventQueue::Node
ysbaddaden 8185ecd
Fix: dequeue timer when io event is ready
ysbaddaden 1db5459
fix: format + cleanup
ysbaddaden 8d09249
Fix: pthread_mutex_unlock fails with EPERM in specs
ysbaddaden ff82437
Fix: cleanup sleep/select_timeout event
ysbaddaden b2292d5
Evented::Event#time => #wake_at
ysbaddaden 275f2cc
Fix: Eventfd#write raises on success
ysbaddaden a21f362
Experiment: single EventLoop for the process
ysbaddaden 6889332
Epoll: reenable timerfd, fix timers + review abstract API
ysbaddaden e8a0042
Epoll: enable EPOLLRDHUP
ysbaddaden ed5aae4
Kqueue: support single EventLoop per process (untested)
ysbaddaden e5e3381
Fix: runtime issues
ysbaddaden cb77996
fixup! Kqueue: support single EventLoop per process (untested)
ysbaddaden 03667ab
Fix: Process.run hangs forever
ysbaddaden 9a10710
cleanup
ysbaddaden a61cad9
Fix: broken interpreter build
ysbaddaden 3f94142
Kqueue: must add a kevent for each filter (one read, one write)
ysbaddaden a2519d4
Abstract process_timers + keep lock while processing
ysbaddaden 843cc51
Don't use evloop in Crystal::System::Process.spawn (unix)
ysbaddaden b432acb
Don't use evloop in Crystal::System::Process.reopen_io (unix)
ysbaddaden 8bdb283
Kqueue: be resilient to closed stdio/pipe
ysbaddaden 6328afc
Fix: crystal tool format
ysbaddaden 520ef7a
fixup! Don't use evloop in Crystal::System::Process.reopen_io (unix)
ysbaddaden 79c1c35
Fix: no need to re-create epoll/kqueue after fork before exec
ysbaddaden 3f286a2
Fix: spare timer update after processing timers
ysbaddaden fd02a1d
Fix: spec failures on darwin/kqueue (blind attempt)
ysbaddaden 5433e30
Fix: raise on fork (not supported by Crystal::Evented
ysbaddaden 835cbe1
Fix: disable fork codegen in compiler when Crystal::Evented
ysbaddaden 426cd73
Kqueue: compilation for OpenBSD
ysbaddaden d24c297
Fix: disable fork when Crystal::Evented is defined
ysbaddaden 095bbd3
Fix: EPOLLRDHUP musn't prevent handling EPOLLOUT
ysbaddaden 7d820a7
Revert "Fix: spec failures on darwin/kqueue (blind attempt)"
ysbaddaden 7ef7704
Test: remove fd from epoll on finalize
ysbaddaden a75ed0d
CI: set verbose mode to investigate segfault
ysbaddaden c4a9473
Fix: allocate poll descriptors in arena + lazy registration
ysbaddaden 4bd421c
Reenable fork since we know about registered fds
ysbaddaden 0e15128
Fix: add arena to kqueue evloop
ysbaddaden f278e04
Revert "CI: set verbose mode to investigate segfault"
ysbaddaden 187cebd
Fix: set @closed in System::FileDescriptor#file_descriptor_close
ysbaddaden 1d84083
Add generation to evloop arena
ysbaddaden f80622c
Fix: compilation error on crystal 1.0.0
ysbaddaden c0c1cfd
Fix: add generation arena index to kqueue evloop
ysbaddaden 8d808cc
fixup! Fix: add generation arena index to kqueue evloop
ysbaddaden ed41608
Fix: Crystal::SpinLock should be a struct
ysbaddaden 8b2fd43
Fix: compilation with preview_mt
ysbaddaden b3bc0ed
Allow multiple Evented::EventLoop instances (with ownership transfer)
ysbaddaden 4586892
Add :evloop_libevent to force libevent event loop
ysbaddaden 07220b2
Fix: global arena accessor (class vars aren't inherited)
ysbaddaden 086e0ee
Fix: close must cancel timers on the owning evloop (not the current one)
ysbaddaden e674803
Fix: ignore errno on epoll_del in IO finalizer
ysbaddaden d1d6d72
Add Crystal::EventLoop#delete to clean up on finalize
ysbaddaden File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
# OPTIMIZE: can the generation help to avoid the mutation lock (atomic)? | ||
# OPTIMIZE: consider a memory map (mmap, VirtualAlloc) with a maximum capacity | ||
class Crystal::Arena(T) | ||
struct Allocation(T) | ||
property generation = 0_u32 | ||
property? allocated = false | ||
@object = uninitialized T | ||
|
||
def pointer : Pointer(T) | ||
pointerof(@object) | ||
end | ||
|
||
def free : Nil | ||
@generation &+= 1_u32 | ||
@allocated = false | ||
pointer.clear(1) | ||
end | ||
end | ||
|
||
@buffer : Slice(Allocation(T)) | ||
|
||
def initialize | ||
@lock = SpinLock.new | ||
@buffer = Pointer(Allocation(T)).malloc(32).to_slice(32) | ||
end | ||
|
||
private def grow_buffer(capacity) | ||
buffer = Pointer(Allocation(T)).malloc(capacity).to_slice(capacity) | ||
buffer.to_unsafe.copy_from(@buffer.to_unsafe, @buffer.size) | ||
@buffer = buffer | ||
end | ||
|
||
# Returns a pointer to the object allocated at *gen_idx* (generation index). | ||
# Raises if the object isn't allocated. | ||
# Raises if the generation has changed (i.e. the object has been freed then reallocated) | ||
# Raises if *index* is negative. | ||
def get(gen_idx : Int64) : Pointer(T) | ||
index, generation = from_gen_index(gen_idx) | ||
|
||
in_bounds!(index) | ||
allocation = @buffer.to_unsafe + index | ||
|
||
unless allocation.value.allocated? | ||
raise RuntimeError.new("#{self.class.name}: object not allocated at index #{index}") | ||
end | ||
|
||
unless (actual = allocation.value.generation) == generation | ||
raise RuntimeError.new("#{self.class.name}: object generation changed at index #{index} (#{generation} => #{actual})") | ||
end | ||
|
||
allocation.value.pointer | ||
end | ||
|
||
# Yields and allocates the object at *index* unless already allocated. | ||
# Returns a pointer to the object at *index* and the generation index. | ||
# | ||
# There are no generational checks. | ||
# Raises if *index* is negative. | ||
def lazy_allocate(index : Int32, &) : {Pointer(T), Int64} | ||
# fast-path: check if already allocated | ||
if in_bounds?(index) | ||
allocation = @buffer.to_unsafe + index | ||
|
||
if allocation.value.allocated? | ||
return {allocation.value.pointer, to_gen_index(index, allocation)} | ||
end | ||
end | ||
|
||
# slow-path: allocate | ||
@lock.sync do | ||
if index >= @buffer.size | ||
# slowest-path: grow the buffer | ||
grow_buffer(Math.pw2ceil(Math.max(index, @buffer.size * 2))) | ||
end | ||
|
||
unsafe_allocate(index) do |pointer, gen_index| | ||
yield pointer, gen_index | ||
end | ||
end | ||
end | ||
|
||
private def unsafe_allocate(index : Int32, &) : {Pointer(T), Int64} | ||
allocation = @buffer.to_unsafe + index | ||
pointer = allocation.value.pointer | ||
gen_index = to_gen_index(index, allocation) | ||
|
||
unless allocation.value.allocated? | ||
allocation.value.allocated = true | ||
yield pointer, gen_index | ||
end | ||
|
||
{pointer, gen_index} | ||
end | ||
|
||
# Yields the object allocated at *index* then releases it. | ||
# Does nothing if the object wasn't allocated. | ||
# | ||
# Raises if *index* is negative. | ||
def free(index : Int32, &) : Nil | ||
return unless in_bounds?(index) | ||
|
||
@lock.sync do | ||
allocation = @buffer.to_unsafe + index | ||
return unless allocation.value.allocated? | ||
|
||
yield allocation.value.pointer | ||
allocation.value.free | ||
end | ||
end | ||
|
||
private def in_bounds?(index : Int32) : Bool | ||
if index.negative? | ||
raise ArgumentError.new("#{self.class.name}: negative index #{index}") | ||
else | ||
index < @buffer.size | ||
end | ||
end | ||
|
||
private def in_bounds!(index : Int32) : Nil | ||
if index.negative? | ||
raise ArgumentError.new("#{self.class.name}: negative index #{index}") | ||
elsif index >= @buffer.size | ||
raise IndexError.new("#{self.class.name}: out of bounds index #{index}") | ||
end | ||
end | ||
|
||
# Iterates all allocated objects, yields the actual index as well as the | ||
# generation index. | ||
def each(&) : Nil | ||
ptr = @buffer.to_unsafe | ||
|
||
@buffer.size.times do |index| | ||
allocation = ptr + index | ||
|
||
if allocation.value.allocated? | ||
yield index, to_gen_index(index, allocation) | ||
end | ||
end | ||
end | ||
|
||
private def to_gen_index(index : Int32, allocation : Pointer(Allocation(T))) : Int64 | ||
to_gen_index(index, allocation.value.generation) | ||
end | ||
|
||
private def to_gen_index(index : Int32, generation : UInt32) : Int64 | ||
(index.to_i64! << 32) | generation.to_u64! | ||
end | ||
|
||
private def from_gen_index(gen_index : Int64) : {Int32, UInt32} | ||
{(gen_index >> 32).to_i32!, gen_index.to_u32!} | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# :nodoc: | ||
class Crystal::SpinLock | ||
struct Crystal::SpinLock | ||
private UNLOCKED = 0 | ||
private LOCKED = 1 | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
{% skip_file unless flag?(:linux) || flag?(:solaris) %} | ||
|
||
require "c/sys/epoll" | ||
|
||
struct Crystal::System::Epoll | ||
def initialize | ||
@epfd = LibC.epoll_create1(LibC::EPOLL_CLOEXEC) | ||
raise RuntimeError.from_errno("epoll_create1") if @epfd == -1 | ||
end | ||
|
||
def fd : Int32 | ||
@epfd | ||
end | ||
|
||
def add(fd : Int32, epoll_event : LibC::EpollEvent*) : Nil | ||
if LibC.epoll_ctl(@epfd, LibC::EPOLL_CTL_ADD, fd, epoll_event) == -1 | ||
raise RuntimeError.from_errno("epoll_ctl(EPOLL_CTL_ADD)") unless Errno.value == Errno::EPERM | ||
end | ||
end | ||
|
||
def add(fd : Int32, events : UInt32, u64 : UInt64) : Nil | ||
epoll_event = uninitialized LibC::EpollEvent | ||
epoll_event.events = events | ||
epoll_event.data.u64 = u64 | ||
add(fd, pointerof(epoll_event)) | ||
end | ||
|
||
def modify(fd : Int32, epoll_event : LibC::EpollEvent*) : Nil | ||
if LibC.epoll_ctl(@epfd, LibC::EPOLL_CTL_MOD, fd, epoll_event) == -1 | ||
raise RuntimeError.from_errno("epoll_ctl(EPOLL_CTL_MOD)") | ||
end | ||
end | ||
|
||
# OPTIMIZE: we might be able to spare the errno checks for EPERM and ENOENT | ||
def delete(fd : Int32) : Nil | ||
delete(fd) do | ||
raise RuntimeError.from_errno("epoll_ctl(EPOLL_CTL_DEL)") unless Errno.value.in?(Errno::EPERM, Errno::ENOENT) | ||
end | ||
end | ||
|
||
def delete(fd : Int32, &) : Nil | ||
if LibC.epoll_ctl(@epfd, LibC::EPOLL_CTL_DEL, fd, nil) == -1 | ||
yield | ||
end | ||
end | ||
|
||
# `timeout` is in milliseconds; -1 will wait indefinitely; 0 will never wait. | ||
def wait(events : Slice(LibC::EpollEvent), timeout : Int32) : Slice(LibC::EpollEvent) | ||
count = LibC.epoll_wait(@epfd, events.to_unsafe, events.size, timeout) | ||
raise RuntimeError.from_errno("epoll_wait") if count == -1 && Errno.value != Errno::EINTR | ||
events[0, count.clamp(0..)] | ||
end | ||
|
||
def close : Nil | ||
LibC.close(@epfd) | ||
end | ||
end |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be renamed as
Crystal::Evented::Arena
since it's not a generic generational arena (memory region). It takes advantage that the OS kernels handle thefd
number (it's guaranteed unique) and always reuse closedfd
instead of growing (until it's needed).An actual generational arena would keep a list of free indexes.
Note: the goal of the arena is to: