Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventLoop: yield fibers internally [fixup #14996] #15215

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/std/crystal/evented/poll_descriptor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require "spec"
class Crystal::Evented::FakeLoop < Crystal::Evented::EventLoop
getter operations = [] of {Symbol, Int32, Crystal::Evented::Arena::Index | Bool}

private def system_run(blocking : Bool) : Nil
private def system_run(blocking : Bool, & : Fiber ->) : Nil
end

private def interrupt : Nil
Expand Down
20 changes: 10 additions & 10 deletions src/crystal/system/unix/epoll/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop
end
{% end %}

private def system_run(blocking : Bool) : Nil
private def system_run(blocking : Bool, & : Fiber ->) : Nil
Crystal.trace :evloop, "run", blocking: blocking ? 1 : 0

# wait for events (indefinitely when blocking)
Expand All @@ -72,41 +72,41 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop
# TODO: panic if epoll_event.value.events != LibC::EPOLLIN (could be EPOLLERR or EPLLHUP)
Crystal.trace :evloop, "interrupted"
@eventfd.read
# OPTIMIZE: only reset interrupted before a blocking wait
@interrupted.clear
when @timerfd.fd
# TODO: panic if epoll_event.value.events != LibC::EPOLLIN (could be EPOLLERR or EPLLHUP)
Crystal.trace :evloop, "timer"
timer_triggered = true
else
process_io(epoll_event)
process_io(epoll_event) { |fiber| yield fiber }
end
end

process_timers(timer_triggered)
# OPTIMIZE: only process timers when timer_triggered (?)
process_timers(timer_triggered) { |fiber| yield fiber }
end

private def process_io(epoll_event : LibC::EpollEvent*) : Nil
private def process_io(epoll_event : LibC::EpollEvent*, &) : Nil
index = Evented::Arena::Index.new(epoll_event.value.data.u64)
events = epoll_event.value.events

Crystal.trace :evloop, "event", fd: index.index, index: index.to_i64, events: events

Evented.arena.get?(index) do |pd|
if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
return
end

if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN
[email protected]_one { |event| unsafe_resume_io(event) }
[email protected]_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end

if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT
[email protected]_one { |event| unsafe_resume_io(event) }
[email protected]_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end
end
end
Expand Down
26 changes: 16 additions & 10 deletions src/crystal/system/unix/evented/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop

# NOTE: thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking)
system_run(blocking) do |fiber|
Crystal::Scheduler.enqueue(fiber)
end
true
end

Expand Down Expand Up @@ -299,11 +301,15 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop

Evented.arena.free(index) do |pd|
[email protected]_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event))
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
end)
end

[email protected]_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event))
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
end)
end

pd.value.remove(io.fd)
Expand Down Expand Up @@ -418,15 +424,15 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
# Thread unsafe: we must hold the poll descriptor waiter lock for the whole
# duration of the dequeue/resume_io otherwise we might conflict with timers
# trying to cancel an IO event.
protected def unsafe_resume_io(event : Evented::Event*) : Bool
protected def unsafe_resume_io(event : Evented::Event*, &) : Bool
# we only partially own the poll descriptor; thanks to the lock we know that
# another thread won't dequeue it, yet it may still be in the timers queue,
# which at worst may be waiting on the lock to be released, so event* can be
# dereferenced safely.

if !event.value.wake_at? || delete_timer(event)
# no timeout or we canceled it: we fully own the event
Crystal::Scheduler.enqueue(event.value.fiber)
yield event.value.fiber
true
else
# failed to cancel the timeout so the timer owns the event (by rule)
Expand All @@ -439,7 +445,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
# Shall be called after processing IO events. IO events with a timeout that
# have succeeded shall already have been removed from `@timers` otherwise the
# fiber could be resumed twice!
private def process_timers(timer_triggered : Bool) : Nil
private def process_timers(timer_triggered : Bool, &) : Nil
# collect ready timers before processing them —this is safe— to avoids a
# deadlock situation when another thread tries to process a ready IO event
# (in poll descriptor waiters) with a timeout (same event* in timers)
Expand All @@ -458,11 +464,11 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
end

buffer.to_slice[0, size].each do |event|
process_timer(event)
process_timer(event) { |fiber| yield fiber }
end
end

private def process_timer(event : Evented::Event*)
private def process_timer(event : Evented::Event*, &)
# we dequeued the event from timers, and by rule we own it, so event* can
# safely be dereferenced:
fiber = event.value.fiber
Expand Down Expand Up @@ -492,7 +498,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
raise RuntimeError.new("BUG: unexpected event in timers: #{event.value}%s\n")
end

Crystal::Scheduler.enqueue(fiber)
yield fiber
end

# internals: system
Expand All @@ -505,7 +511,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
#
# The `PollDescriptor` of IO events can be retrieved using the *index*
# from the system event's user data.
private abstract def system_run(blocking : Bool) : Nil
private abstract def system_run(blocking : Bool, & : Fiber ->) : Nil

# Add *fd* to the polling system, setting *index* as user data.
protected abstract def system_add(fd : Int32, index : Index) : Nil
Expand Down
21 changes: 11 additions & 10 deletions src/crystal/system/unix/kqueue/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
end
{% end %}

private def system_run(blocking : Bool) : Nil
private def system_run(blocking : Bool, & : Fiber ->) : Nil
buffer = uninitialized LibC::Kevent[128]

Crystal.trace :evloop, "run", blocking: blocking ? 1 : 0
Expand All @@ -89,11 +89,12 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
# nothing special
timer_triggered = true
else
process_io(kevent)
process_io(kevent) { |fiber| yield fiber }
end
end

process_timers(timer_triggered)
# OPTIMIZE: only process timers when timer_triggered (?)
process_timers(timer_triggered) { |fiber| yield fiber }
end

private def process_interrupt?(kevent)
Expand All @@ -114,7 +115,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
false
end

private def process_io(kevent : LibC::Kevent*) : Nil
private def process_io(kevent : LibC::Kevent*, &) : Nil
index =
{% if flag?(:bits64) %}
Evented::Arena::Index.new(kevent.value.udata.address)
Expand All @@ -130,25 +131,25 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF
# apparently some systems may report EOF on write with EVFILT_READ instead
# of EVFILT_WRITE, so let's wake all waiters:
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
return
end

case kevent.value.filter
when LibC::EVFILT_READ
if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR
# OPTIMIZE: pass errno (kevent.data) through PollDescriptor
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
else
[email protected]_one { |event| unsafe_resume_io(event) }
[email protected]_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end
when LibC::EVFILT_WRITE
if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR
# OPTIMIZE: pass errno (kevent.data) through PollDescriptor
[email protected]_all { |event| unsafe_resume_io(event) }
[email protected]_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
else
[email protected]_one { |event| unsafe_resume_io(event) }
[email protected]_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end
end
end
Expand Down
Loading