Skip to content

Commit

Permalink
Windows: Event loop based on IOCP (#12149)
Browse files Browse the repository at this point in the history
Co-authored-by: Ulrich Kramer <[email protected]>
  • Loading branch information
straight-shoota and wonderix authored Jun 29, 2022
1 parent cd74e1d commit 632a86b
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 72 deletions.
2 changes: 1 addition & 1 deletion spec/std/concurrent/select_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ describe "select" do
x.should eq 2
end

it "stress select with send/receive in multiple fibers" do
pending_win32 "stress select with send/receive in multiple fibers" do
fibers = 4
msg_per_sender = 1000
ch = Array.new(fibers) { Array.new(fibers) { Channel(Int32).new } }
Expand Down
37 changes: 36 additions & 1 deletion spec/std/socket/tcp_socket_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe TCPSocket, tags: "network" do
end
end

pending_win32 "sync from server" do
it "sync from server" do
port = unused_local_port

TCPServer.open("::", port) do |server|
Expand Down Expand Up @@ -155,4 +155,39 @@ describe TCPSocket, tags: "network" do
end
end
end

it "sends and receives messages" do
port = unused_local_port

channel = Channel(Exception?).new
spawn do
TCPServer.open("::", port) do |server|
channel.send nil
sock = server.accept
sock.read_timeout = 3.second
sock.write_timeout = 3.second

sock.gets(4).should eq("ping")
sock << "pong"
channel.send nil
end
rescue exc
channel.send exc
end

if exc = channel.receive
raise exc
end

TCPSocket.open("localhost", port) do |client|
client.read_timeout = 3.second
client.write_timeout = 3.second
client << "ping"
client.gets(4).should eq("pong")
end

if exc = channel.receive
raise exc
end
end
end
1 change: 1 addition & 0 deletions spec/std/socket/unix_server_spec.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% skip_file if flag?(:win32) %}
require "../spec_helper"
require "socket"
require "../../support/fibers"
Expand Down
1 change: 1 addition & 0 deletions spec/std/socket/unix_socket_spec.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% skip_file if flag?(:win32) %}
require "spec"
require "socket"
require "../../support/tempfile"
Expand Down
4 changes: 2 additions & 2 deletions spec/win32_std_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require "./std/big/number_spec.cr"
require "./std/bit_array_spec.cr"
require "./std/bool_spec.cr"
require "./std/box_spec.cr"
# require "./std/channel_spec.cr" (failed codegen)
require "./std/channel_spec.cr"
require "./std/char/reader_spec.cr"
require "./std/char_spec.cr"
require "./std/class_spec.cr"
Expand All @@ -27,7 +27,7 @@ require "./std/compress/zip/zip_spec.cr"
require "./std/compress/zlib/reader_spec.cr"
require "./std/compress/zlib/stress_spec.cr"
require "./std/compress/zlib/writer_spec.cr"
# require "./std/concurrent/select_spec.cr" (failed to run)
require "./std/concurrent/select_spec.cr"
require "./std/concurrent_spec.cr"
require "./std/crypto/bcrypt/base64_spec.cr"
require "./std/crypto/bcrypt/password_spec.cr"
Expand Down
34 changes: 28 additions & 6 deletions src/crystal/system/win32/event_loop_iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,29 @@ module Crystal::EventLoop
next_event = @@queue.min_by { |e| e.wake_at }

if next_event
sleep_time = next_event.wake_at - Time.monotonic
now = Time.monotonic

if sleep_time > Time::Span.zero
LibC.Sleep(sleep_time.total_milliseconds)
if next_event.wake_at > now
sleep_time = next_event.wake_at - now
timed_out = IO::Overlapped.wait_queued_completions(sleep_time.total_milliseconds) do |fiber|
Crystal::Scheduler.enqueue fiber
end

return unless timed_out
end

dequeue next_event

Crystal::Scheduler.enqueue next_event.fiber
fiber = next_event.fiber

unless fiber.dead?
if next_event.timeout? && (select_action = fiber.timeout_select_action)
fiber.timeout_select_action = nil
select_action.time_expired(fiber)
else
Crystal::Scheduler.enqueue fiber
end
end
else
Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n"
::exit
Expand Down Expand Up @@ -80,21 +94,29 @@ module Crystal::EventLoop
def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event
Crystal::Event.new(Fiber.current)
end

def self.create_timeout_event(fiber)
Crystal::Event.new(fiber, timeout: true)
end
end

struct Crystal::Event
getter fiber
getter wake_at
getter? timeout

def initialize(@fiber : Fiber)
@wake_at = Time.monotonic
def initialize(@fiber : Fiber, @wake_at = Time.monotonic, *, @timeout = false)
end

# Frees the event
def free : Nil
Crystal::EventLoop.dequeue(self)
end

def delete
free
end

def add(time_span : Time::Span) : Nil
@wake_at = Time.monotonic + time_span
Crystal::EventLoop.enqueue(self)
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/win32/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ module Crystal::System::Socket
private def unbuffered_read(slice : Bytes)
wsabuf = wsa_buffer(slice)

bytes_read = overlapped_read(fd, "WSARecv") do |overlapped|
bytes_read = overlapped_operation(fd, "WSARecv", read_timeout, connreset_is_error: false) do |overlapped|
flags = 0_u32
LibC.WSARecv(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil)
end
Expand Down
Loading

0 comments on commit 632a86b

Please sign in to comment.