diff --git a/spec/std/thread/mutex_spec.cr b/spec/std/thread/mutex_spec.cr index 3346a1575615..405c812b0888 100644 --- a/spec/std/thread/mutex_spec.cr +++ b/spec/std/thread/mutex_spec.cr @@ -15,11 +15,11 @@ describe Thread::Mutex do a = 0 mutex = Thread::Mutex.new - threads = 10.times.map do + threads = Array.new(10) do Thread.new do mutex.synchronize { a += 1 } end - end.to_a + end threads.each(&.join) a.should eq(10) @@ -29,15 +29,16 @@ describe Thread::Mutex do mutex = Thread::Mutex.new mutex.try_lock.should be_true mutex.try_lock.should be_false - expect_raises(RuntimeError, "pthread_mutex_lock: ") { mutex.lock } + expect_raises(RuntimeError) { mutex.lock } mutex.unlock + Thread.new { mutex.synchronize { } }.join end it "won't unlock from another thread" do mutex = Thread::Mutex.new mutex.lock - expect_raises(RuntimeError, "pthread_mutex_unlock: ") do + expect_raises(RuntimeError) do Thread.new { mutex.unlock }.join end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 71f6286d9d36..3666b7ad512a 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -27,12 +27,12 @@ class Thread end require "./thread_linked_list" +require "./thread_condition_variable" {% if flag?(:wasi) %} require "./wasi/thread" {% elsif flag?(:unix) %} require "./unix/pthread" - require "./unix/pthread_condition_variable" {% elsif flag?(:win32) %} require "./win32/thread" {% else %} diff --git a/src/crystal/system/thread_condition_variable.cr b/src/crystal/system/thread_condition_variable.cr new file mode 100644 index 000000000000..ea5923601aec --- /dev/null +++ b/src/crystal/system/thread_condition_variable.cr @@ -0,0 +1,31 @@ +class Thread + class ConditionVariable + # Creates a new condition variable. + # def initialize + + # Unblocks one thread that is waiting on `self`. + # def signal : Nil + + # Unblocks all threads that are waiting on `self`. + # def broadcast : Nil + + # Causes the calling thread to wait on `self` and unlock the given *mutex* + # atomically. + # def wait(mutex : Thread::Mutex) : Nil + + # Causes the calling thread to wait on `self` and unlock the given *mutex* + # atomically within the given *time* span. Yields to the given block if a + # timeout occurs. + # def wait(mutex : Thread::Mutex, time : Time::Span, & : ->) + end +end + +{% if flag?(:wasi) %} + require "./wasi/thread_condition_variable" +{% elsif flag?(:unix) %} + require "./unix/pthread_condition_variable" +{% elsif flag?(:win32) %} + require "./win32/thread_condition_variable" +{% else %} + {% raise "thread condition variable not supported" %} +{% end %} diff --git a/src/crystal/system/thread_mutex.cr b/src/crystal/system/thread_mutex.cr index 1a5f3006d6eb..e3cf9ffeb6cb 100644 --- a/src/crystal/system/thread_mutex.cr +++ b/src/crystal/system/thread_mutex.cr @@ -24,5 +24,5 @@ end {% elsif flag?(:win32) %} require "./win32/thread_mutex" {% else %} - {% raise "thread not supported" %} + {% raise "thread mutex not supported" %} {% end %} diff --git a/src/crystal/system/unix/pthread_condition_variable.cr b/src/crystal/system/unix/pthread_condition_variable.cr index 225aad1c7105..a09811c79281 100644 --- a/src/crystal/system/unix/pthread_condition_variable.cr +++ b/src/crystal/system/unix/pthread_condition_variable.cr @@ -33,7 +33,7 @@ class Thread raise RuntimeError.from_os_error("pthread_cond_wait", Errno.new(ret)) unless ret == 0 end - def wait(mutex : Thread::Mutex, time : Time::Span) + def wait(mutex : Thread::Mutex, time : Time::Span, & : ->) ret = {% if flag?(:darwin) %} ts = uninitialized LibC::Timespec diff --git a/src/crystal/system/wasi/thread.cr b/src/crystal/system/wasi/thread.cr index b10439852f55..805c7fbb77a6 100644 --- a/src/crystal/system/wasi/thread.cr +++ b/src/crystal/system/wasi/thread.cr @@ -54,20 +54,4 @@ class Thread # TODO: Implement Pointer(Void).null end - - # :nodoc: - # TODO: Implement - class ConditionVariable - def signal : Nil - end - - def broadcast : Nil - end - - def wait(mutex : Thread::Mutex) : Nil - end - - def wait(mutex : Thread::Mutex, time : Time::Span, &) - end - end end diff --git a/src/crystal/system/wasi/thread_condition_variable.cr b/src/crystal/system/wasi/thread_condition_variable.cr new file mode 100644 index 000000000000..eb205333acdd --- /dev/null +++ b/src/crystal/system/wasi/thread_condition_variable.cr @@ -0,0 +1,16 @@ +# TODO: Implement +class Thread + class ConditionVariable + def signal : Nil + end + + def broadcast : Nil + end + + def wait(mutex : Thread::Mutex) : Nil + end + + def wait(mutex : Thread::Mutex, time : Time::Span, &) + end + end +end diff --git a/src/crystal/system/win32/process.cr b/src/crystal/system/win32/process.cr index 75c7e581e425..36ad7e31dd38 100644 --- a/src/crystal/system/win32/process.cr +++ b/src/crystal/system/win32/process.cr @@ -21,7 +21,7 @@ struct Crystal::System::Process end def wait - if LibC.WaitForSingleObject(@process_handle, LibC::INFINITE) != 0 + if LibC.WaitForSingleObject(@process_handle, LibC::INFINITE) != LibC::WAIT_OBJECT_0 raise RuntimeError.from_winerror("WaitForSingleObject") end diff --git a/src/crystal/system/win32/thread.cr b/src/crystal/system/win32/thread.cr index 1af2ee9660f2..5e7ae49b30fc 100644 --- a/src/crystal/system/win32/thread.cr +++ b/src/crystal/system/win32/thread.cr @@ -1,10 +1,11 @@ require "c/processthreadsapi" +require "c/synchapi" -# TODO: Implement for multithreading. class Thread # all thread objects, so the GC can see them (it doesn't scan thread locals) - @@threads = Thread::LinkedList(Thread).new + protected class_getter(threads) { Thread::LinkedList(Thread).new } + @th : LibC::HANDLE @exception : Exception? @detached = Atomic(UInt8).new(0) @main_fiber : Fiber? @@ -16,42 +17,87 @@ class Thread property previous : Thread? def self.unsafe_each - @@threads.unsafe_each { |thread| yield thread } + threads.unsafe_each { |thread| yield thread } end + # Starts a new system thread. + def initialize(&@func : ->) + @th = uninitialized LibC::HANDLE + + @th = GC.beginthreadex( + security: Pointer(Void).null, + stack_size: LibC::UInt.zero, + start_address: ->(data : Void*) { data.as(Thread).start; LibC::UInt.zero }, + arglist: self.as(Void*), + initflag: LibC::UInt.zero, + thrdaddr: Pointer(LibC::UInt).null) + end + + # Used once to initialize the thread object representing the main thread of + # the process (that already exists). def initialize + # `GetCurrentThread` returns a _constant_ and is only meaningful as an + # argument to Win32 APIs; to uniquely identify it we must duplicate the handle + @th = uninitialized LibC::HANDLE + cur_proc = LibC.GetCurrentProcess + LibC.DuplicateHandle(cur_proc, LibC.GetCurrentThread, cur_proc, pointerof(@th), 0, true, LibC::DUPLICATE_SAME_ACCESS) + + @func = ->{} @main_fiber = Fiber.new(stack_address, self) - @@threads.push(self) + + Thread.threads.push(self) + end + + private def detach + if @detached.compare_and_set(0, 1).last + yield + end end - @@current : Thread? = nil + # Suspends the current thread until this thread terminates. + def join : Nil + detach do + if LibC.WaitForSingleObject(@th, LibC::INFINITE) != LibC::WAIT_OBJECT_0 + @exception ||= RuntimeError.from_winerror("WaitForSingleObject") + end + if LibC.CloseHandle(@th) == 0 + @exception ||= RuntimeError.from_winerror("CloseHandle") + end + end - # Associates the Thread object to the running system thread. - protected def self.current=(@@current : Thread) : Thread + if exception = @exception + raise exception + end end + @[ThreadLocal] + @@current : Thread? + # Returns the Thread object associated to the running system thread. def self.current : Thread - @@current || raise "BUG: Thread.current returned NULL" + @@current ||= new + end + + # Associates the Thread object to the running system thread. + protected def self.current=(@@current : Thread) : Thread end - # Create the thread object for the current thread (aka the main thread of the - # process). - # - # TODO: consider moving to `kernel.cr` or `crystal/main.cr` - self.current = new + def self.yield : Nil + LibC.SwitchToThread + end # Returns the Fiber representing the thread's main stack. - def main_fiber + def main_fiber : Fiber @main_fiber.not_nil! end # :nodoc: - def scheduler + def scheduler : Crystal::Scheduler @scheduler ||= Crystal::Scheduler.new(main_fiber) end protected def start + Thread.threads.push(self) Thread.current = self @main_fiber = fiber = Fiber.new(stack_address, self) @@ -60,9 +106,9 @@ class Thread rescue ex @exception = ex ensure - @@threads.delete(self) + Thread.threads.delete(self) Fiber.inactive(fiber) - detach_self + detach { LibC.CloseHandle(@th) } end end @@ -71,4 +117,9 @@ class Thread Pointer(Void).new(low_limit) end + + # :nodoc: + def to_unsafe + @th + end end diff --git a/src/crystal/system/win32/thread_condition_variable.cr b/src/crystal/system/win32/thread_condition_variable.cr new file mode 100644 index 000000000000..423de9dc57f3 --- /dev/null +++ b/src/crystal/system/win32/thread_condition_variable.cr @@ -0,0 +1,41 @@ +require "c/synchapi" + +# :nodoc: +class Thread + # :nodoc: + class ConditionVariable + def initialize + @cond = uninitialized LibC::CONDITION_VARIABLE + LibC.InitializeConditionVariable(self) + end + + def signal : Nil + LibC.WakeConditionVariable(self) + end + + def broadcast : Nil + LibC.WakeAllConditionVariable(self) + end + + def wait(mutex : Thread::Mutex) : Nil + ret = LibC.SleepConditionVariableCS(self, mutex, LibC::INFINITE) + raise RuntimeError.from_winerror("SleepConditionVariableCS") if ret == 0 + end + + def wait(mutex : Thread::Mutex, time : Time::Span, & : ->) + ret = LibC.SleepConditionVariableCS(self, mutex, time.total_milliseconds) + return if ret != 0 + + error = WinError.value + if error == WinError::ERROR_TIMEOUT + yield + else + raise RuntimeError.from_os_error("SleepConditionVariableCS", error) + end + end + + def to_unsafe + pointerof(@cond) + end + end +end diff --git a/src/crystal/system/win32/thread_mutex.cr b/src/crystal/system/win32/thread_mutex.cr index be681c31398a..afd4cb1fbdcb 100644 --- a/src/crystal/system/win32/thread_mutex.cr +++ b/src/crystal/system/win32/thread_mutex.cr @@ -1,8 +1,61 @@ -# TODO: Implement +require "c/synchapi" + +# :nodoc: class Thread + # :nodoc: + # for Win32 condition variable interop we must use either a critical section + # or a slim reader/writer lock, not a Win32 mutex + # also note critical sections are reentrant; to match the behaviour in + # `../unix/pthread_mutex.cr` we must do extra housekeeping ourselves class Mutex + def initialize + @cs = uninitialized LibC::CRITICAL_SECTION + LibC.InitializeCriticalSectionAndSpinCount(self, 1000) + end + + def lock : Nil + LibC.EnterCriticalSection(self) + if @cs.recursionCount > 1 + LibC.LeaveCriticalSection(self) + raise RuntimeError.new "Attempt to lock a mutex recursively (deadlock)" + end + end + + def try_lock : Bool + if LibC.TryEnterCriticalSection(self) != 0 + if @cs.recursionCount > 1 + LibC.LeaveCriticalSection(self) + false + else + true + end + else + false + end + end + + def unlock : Nil + # `owningThread` is declared as `LibC::HANDLE` for historical reasons, so + # the following comparison is correct + unless @cs.owningThread == LibC::HANDLE.new(LibC.GetCurrentThreadId) + raise RuntimeError.new "Attempt to unlock a mutex locked by another thread" + end + LibC.LeaveCriticalSection(self) + end + def synchronize - yield + lock + yield self + ensure + unlock + end + + def finalize + LibC.DeleteCriticalSection(self) + end + + def to_unsafe + pointerof(@cs) end end end diff --git a/src/gc/boehm.cr b/src/gc/boehm.cr index 2a76eb950a4b..144092e84e03 100644 --- a/src/gc/boehm.cr +++ b/src/gc/boehm.cr @@ -107,8 +107,11 @@ lib LibGC fun size = GC_size(addr : Void*) : LibC::SizeT - {% unless flag?(:win32) || flag?(:wasm32) %} - # Boehm GC requires to use GC_pthread_create and GC_pthread_join instead of pthread_create and pthread_join + # Boehm GC requires to use its own thread manipulation routines instead of pthread's or Win32's + {% if flag?(:win32) %} + fun beginthreadex = GC_beginthreadex(security : Void*, stack_size : LibC::UInt, start_address : Void* -> LibC::UInt, + arglist : Void*, initflag : LibC::UInt, thrdaddr : LibC::UInt*) : Void* + {% elsif !flag?(:wasm32) %} fun pthread_create = GC_pthread_create(thread : LibC::PthreadT*, attr : LibC::PthreadAttrT*, start : Void* -> Void*, arg : Void*) : LibC::Int fun pthread_join = GC_pthread_join(thread : LibC::PthreadT, value : Void**) : LibC::Int fun pthread_detach = GC_pthread_detach(thread : LibC::PthreadT) : LibC::Int @@ -241,7 +244,14 @@ module GC reclaimed_bytes_before_gc: stats.reclaimed_bytes_before_gc) end - {% unless flag?(:win32) %} + {% if flag?(:win32) %} + # :nodoc: + def self.beginthreadex(security : Void*, stack_size : LibC::UInt, start_address : Void* -> LibC::UInt, arglist : Void*, initflag : LibC::UInt, thrdaddr : LibC::UInt*) : LibC::HANDLE + ret = LibGC.beginthreadex(security, stack_size, start_address, arglist, initflag, thrdaddr) + raise RuntimeError.from_errno("GC_beginthreadex") if ret.null? + ret.as(LibC::HANDLE) + end + {% else %} # :nodoc: def self.pthread_create(thread : LibC::PthreadT*, attr : LibC::PthreadAttrT*, start : Void* -> Void*, arg : Void*) LibGC.pthread_create(thread, attr, start, arg) diff --git a/src/gc/none.cr b/src/gc/none.cr index c243a071d3b9..2c0530e7599d 100644 --- a/src/gc/none.cr +++ b/src/gc/none.cr @@ -1,3 +1,7 @@ +{% if flag?(:win32) %} + require "c/process" +{% end %} + module GC def self.init end @@ -65,7 +69,14 @@ module GC reclaimed_bytes_before_gc: 0) end - {% unless flag?(:win32) || flag?(:wasm32) %} + {% if flag?(:win32) %} + # :nodoc: + def self.beginthreadex(security : Void*, stack_size : LibC::UInt, start_address : Void* -> LibC::UInt, arglist : Void*, initflag : LibC::UInt, thrdaddr : LibC::UInt*) : LibC::HANDLE + ret = LibC._beginthreadex(security, stack_size, start_address, arglist, initflag, thrdaddr) + raise RuntimeError.from_errno("_beginthreadex") if ret.null? + ret.as(LibC::HANDLE) + end + {% elsif !flag?(:wasm32) %} # :nodoc: def self.pthread_create(thread : LibC::PthreadT*, attr : LibC::PthreadAttrT*, start : Void* -> Void*, arg : Void*) LibC.pthread_create(thread, attr, start, arg) diff --git a/src/lib_c/x86_64-windows-msvc/c/process.cr b/src/lib_c/x86_64-windows-msvc/c/process.cr new file mode 100644 index 000000000000..e567068fb1e7 --- /dev/null +++ b/src/lib_c/x86_64-windows-msvc/c/process.cr @@ -0,0 +1,5 @@ +require "lib_c" + +lib LibC + fun _beginthreadex(security : Void*, stack_size : UInt, start_address : Void* -> UInt, arglist : Void*, initflag : UInt, thrdaddr : UInt*) : Void* +end diff --git a/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr b/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr index 6d0336a37e16..441d9acac697 100644 --- a/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr +++ b/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr @@ -33,6 +33,7 @@ lib LibC end fun GetCurrentThread : HANDLE + fun GetCurrentThreadId : DWORD fun GetCurrentThreadStackLimits(lowLimit : ULONG_PTR*, highLimit : ULONG_PTR*) : Void fun GetCurrentProcess : HANDLE fun GetCurrentProcessId : DWORD @@ -46,6 +47,7 @@ lib LibC fun SetThreadStackGuarantee(stackSizeInBytes : DWORD*) : BOOL fun GetProcessTimes(hProcess : HANDLE, lpCreationTime : FILETIME*, lpExitTime : FILETIME*, lpKernelTime : FILETIME*, lpUserTime : FILETIME*) : BOOL + fun SwitchToThread : BOOL PROCESS_QUERY_INFORMATION = 0x0400 end diff --git a/src/lib_c/x86_64-windows-msvc/c/synchapi.cr b/src/lib_c/x86_64-windows-msvc/c/synchapi.cr index 23804d0f3aaf..e101b7f6284b 100644 --- a/src/lib_c/x86_64-windows-msvc/c/synchapi.cr +++ b/src/lib_c/x86_64-windows-msvc/c/synchapi.cr @@ -1,7 +1,35 @@ require "c/basetsd" require "c/int_safe" +require "c/winbase" +require "c/wtypesbase" lib LibC + # the meanings of these fields are documented not in the Win32 API docs but in + # https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/displaying-a-critical-section + struct CRITICAL_SECTION + debugInfo : Void* # PRTL_CRITICAL_SECTION_DEBUG + lockCount : LONG + recursionCount : LONG + owningThread : HANDLE + lockSemaphore : HANDLE + spinCount : UInt64 + end + + struct CONDITION_VARIABLE + ptr : Void* + end + + fun InitializeCriticalSectionAndSpinCount(lpCriticalSection : CRITICAL_SECTION*, dwSpinCount : DWORD) : BOOL + fun DeleteCriticalSection(lpCriticalSection : CRITICAL_SECTION*) + fun EnterCriticalSection(lpCriticalSection : CRITICAL_SECTION*) + fun TryEnterCriticalSection(lpCriticalSection : CRITICAL_SECTION*) : BOOL + fun LeaveCriticalSection(lpCriticalSection : CRITICAL_SECTION*) + + fun InitializeConditionVariable(conditionVariable : CONDITION_VARIABLE*) + fun SleepConditionVariableCS(conditionVariable : CONDITION_VARIABLE*, criticalSection : CRITICAL_SECTION*, dwMilliseconds : DWORD) : BOOL + fun WakeConditionVariable(conditionVariable : CONDITION_VARIABLE*) + fun WakeAllConditionVariable(conditionVariable : CONDITION_VARIABLE*) + fun Sleep(dwMilliseconds : DWORD) fun WaitForSingleObject(hHandle : HANDLE, dwMilliseconds : DWORD) : DWORD end diff --git a/src/lib_c/x86_64-windows-msvc/c/winbase.cr b/src/lib_c/x86_64-windows-msvc/c/winbase.cr index a5d2ccbe72ae..5b245479df25 100644 --- a/src/lib_c/x86_64-windows-msvc/c/winbase.cr +++ b/src/lib_c/x86_64-windows-msvc/c/winbase.cr @@ -31,6 +31,11 @@ lib LibC INFINITE = 0xFFFFFFFF + WAIT_OBJECT_0 = 0x00000000_u32 + WAIT_IO_COMPLETION = 0x000000C0_u32 + WAIT_TIMEOUT = 0x00000102_u32 + WAIT_FAILED = 0xFFFFFFFF_u32 + STARTF_USESTDHANDLES = 0x00000100 MOVEFILE_REPLACE_EXISTING = 0x1_u32 diff --git a/src/lib_c/x86_64-windows-msvc/c/winsock2.cr b/src/lib_c/x86_64-windows-msvc/c/winsock2.cr index 4cf807c18622..223c2366b072 100644 --- a/src/lib_c/x86_64-windows-msvc/c/winsock2.cr +++ b/src/lib_c/x86_64-windows-msvc/c/winsock2.cr @@ -1,6 +1,7 @@ require "./ws2def" require "./basetsd" require "./guiddef" +require "./winbase" @[Link("WS2_32")] lib LibC @@ -65,13 +66,8 @@ lib LibC WSA_INVALID_EVENT = Pointer(WSAEVENT).null WSA_MAXIMUM_WAIT_EVENTS = MAXIMUM_WAIT_OBJECTS WSA_WAIT_FAILED = WAIT_FAILED - STATUS_WAIT_0 = 0_i64 - WAIT_OBJECT_0 = ((STATUS_WAIT_0) + 0) WSA_WAIT_EVENT_0 = WAIT_OBJECT_0 - STATUS_USER_APC = 0xc0 - WAIT_IO_COMPLETION = STATUS_USER_APC WSA_WAIT_IO_COMPLETION = WAIT_IO_COMPLETION - WAIT_TIMEOUT = 258_i64 WSA_WAIT_TIMEOUT = WAIT_TIMEOUT WSA_INFINITE = INFINITE