From e552497cac9428ef385528476cb6765074d24f68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20B=C3=B6ving?= Date: Mon, 2 Dec 2024 11:27:56 +0100 Subject: [PATCH] refactor: move Channel/Mutex to Std --- src/Init/Data/Channel.lean | 14 ++++ src/Init/System/Mutex.lean | 25 ++++-- src/Lean/Server/FileWorker.lean | 14 ++-- src/Lean/Server/Watchdog.lean | 6 +- src/Std.lean | 1 + src/Std/Sync.lean | 8 ++ src/Std/Sync/Channel.lean | 137 ++++++++++++++++++++++++++++++++ src/Std/Sync/Mutex.lean | 121 ++++++++++++++++++++++++++++ 8 files changed, 309 insertions(+), 17 deletions(-) create mode 100644 src/Std/Sync.lean create mode 100644 src/Std/Sync/Channel.lean create mode 100644 src/Std/Sync/Mutex.lean diff --git a/src/Init/Data/Channel.lean b/src/Init/Data/Channel.lean index 087e36052bdf..716be9161fc8 100644 --- a/src/Init/Data/Channel.lean +++ b/src/Init/Data/Channel.lean @@ -8,6 +8,8 @@ import Init.Data.Queue import Init.System.Promise import Init.System.Mutex +set_option linter.deprecated false + namespace IO /-- @@ -15,6 +17,7 @@ Internal state of an `Channel`. We maintain the invariant that at all times either `consumers` or `values` is empty. -/ +@[deprecated "Use Std.Channel.State from Std.Sync.Channel instead" (since := "2024-12-02")] structure Channel.State (α : Type) where values : Std.Queue α := ∅ consumers : Std.Queue (Promise (Option α)) := ∅ @@ -27,12 +30,14 @@ FIFO channel with unbounded buffer, where `recv?` returns a `Task`. A channel can be closed. Once it is closed, all `send`s are ignored, and `recv?` returns `none` once the queue is empty. -/ +@[deprecated "Use Std.Channel from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel (α : Type) : Type := Mutex (Channel.State α) instance : Nonempty (Channel α) := inferInstanceAs (Nonempty (Mutex _)) /-- Creates a new `Channel`. -/ +@[deprecated "Use Std.Channel.new from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.new : BaseIO (Channel α) := Mutex.new {} @@ -41,6 +46,7 @@ Sends a message on an `Channel`. This function does not block. -/ +@[deprecated "Use Std.Channel.send from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.send (ch : Channel α) (v : α) : BaseIO Unit := ch.atomically do let st ← get @@ -54,6 +60,7 @@ def Channel.send (ch : Channel α) (v : α) : BaseIO Unit := /-- Closes an `Channel`. -/ +@[deprecated "Use Std.Channel.close from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.close (ch : Channel α) : BaseIO Unit := ch.atomically do let st ← get @@ -67,6 +74,7 @@ Every message is only received once. Returns `none` if the channel is closed and the queue is empty. -/ +@[deprecated "Use Std.Channel.recv? from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) := ch.atomically do let st ← get @@ -85,6 +93,7 @@ def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) := Note that if this function is called twice, each `forAsync` only gets half the messages. -/ +@[deprecated "Use Std.Channel.forAsync from Std.Sync.Channel instead" (since := "2024-12-02")] partial def Channel.forAsync (f : α → BaseIO Unit) (ch : Channel α) (prio : Task.Priority := .default) : BaseIO (Task Unit) := do BaseIO.bindTask (prio := prio) (← ch.recv?) fun @@ -96,11 +105,13 @@ Receives all currently queued messages from the channel. Those messages are dequeued and will not be returned by `recv?`. -/ +@[deprecated "Use Std.Channel.recvAllCurrent from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.recvAllCurrent (ch : Channel α) : BaseIO (Array α) := ch.atomically do modifyGet fun st => (st.values.toArray, { st with values := ∅ }) /-- Type tag for synchronous (blocking) operations on a `Channel`. -/ +@[deprecated "Use Std.Channel.Sync from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.Sync := Channel /-- @@ -110,6 +121,7 @@ For example, `ch.sync.recv?` blocks until the next message, and `for msg in ch.sync do ...` iterates synchronously over the channel. These functions should only be used in dedicated threads. -/ +@[deprecated "Use Std.Channel.sync from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.sync (ch : Channel α) : Channel.Sync α := ch /-- @@ -118,9 +130,11 @@ Synchronously receives a message from the channel. Every message is only received once. Returns `none` if the channel is closed and the queue is empty. -/ +@[deprecated "Use Std.Channel.Sync.recv? from Std.Sync.Channel instead" (since := "2024-12-02")] def Channel.Sync.recv? (ch : Channel.Sync α) : BaseIO (Option α) := do IO.wait (← Channel.recv? ch) +@[deprecated "Use Std.Channel.Sync.forIn from Std.Sync.Channel instead" (since := "2024-12-02")] private partial def Channel.Sync.forIn [Monad m] [MonadLiftT BaseIO m] (ch : Channel.Sync α) (f : α → β → m (ForInStep β)) : β → m β := fun b => do match ← ch.recv? with diff --git a/src/Init/System/Mutex.lean b/src/Init/System/Mutex.lean index b4ee170eb7a6..c081be44a462 100644 --- a/src/Init/System/Mutex.lean +++ b/src/Init/System/Mutex.lean @@ -7,6 +7,9 @@ prelude import Init.System.IO import Init.Control.StateRef + +set_option linter.deprecated false + namespace IO private opaque BaseMutexImpl : NonemptyType.{0} @@ -16,12 +19,13 @@ Mutual exclusion primitive (a lock). If you want to guard shared state, use `Mutex α` instead. -/ +@[deprecated "Use Std.BaseMutex from Std.Sync.Mutex instead" (since := "2024-12-02")] def BaseMutex : Type := BaseMutexImpl.type instance : Nonempty BaseMutex := BaseMutexImpl.property /-- Creates a new `BaseMutex`. -/ -@[extern "lean_io_basemutex_new"] +@[extern "lean_io_basemutex_new", deprecated "Use Std.BaseMutex.new from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque BaseMutex.new : BaseIO BaseMutex /-- @@ -30,7 +34,7 @@ Locks a `BaseMutex`. Waits until no other thread has locked the mutex. The current thread must not have already locked the mutex. Reentrant locking is undefined behavior (inherited from the C++ implementation). -/ -@[extern "lean_io_basemutex_lock"] +@[extern "lean_io_basemutex_lock", deprecated "Use Std.BaseMutex.lock from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque BaseMutex.lock (mutex : @& BaseMutex) : BaseIO Unit /-- @@ -39,33 +43,35 @@ Unlocks a `BaseMutex`. The current thread must have already locked the mutex. Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation). -/ -@[extern "lean_io_basemutex_unlock"] +@[extern "lean_io_basemutex_unlock", deprecated "Use Std.BaseMutex.unlock from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque BaseMutex.unlock (mutex : @& BaseMutex) : BaseIO Unit private opaque CondvarImpl : NonemptyType.{0} /-- Condition variable. -/ +@[deprecated "Use Std.Condvar from Std.Sync.Mutex instead" (since := "2024-12-02")] def Condvar : Type := CondvarImpl.type instance : Nonempty Condvar := CondvarImpl.property /-- Creates a new condition variable. -/ -@[extern "lean_io_condvar_new"] +@[extern "lean_io_condvar_new", deprecated "Use Std.Condvar.new from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque Condvar.new : BaseIO Condvar /-- Waits until another thread calls `notifyOne` or `notifyAll`. -/ -@[extern "lean_io_condvar_wait"] +@[extern "lean_io_condvar_wait", deprecated "Use Std.Condvar.wait from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque Condvar.wait (condvar : @& Condvar) (mutex : @& BaseMutex) : BaseIO Unit /-- Wakes up a single other thread executing `wait`. -/ -@[extern "lean_io_condvar_notify_one"] +@[extern "lean_io_condvar_notify_one", deprecated "Use Std.Condvar.notifyOne from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque Condvar.notifyOne (condvar : @& Condvar) : BaseIO Unit /-- Wakes up all other threads executing `wait`. -/ -@[extern "lean_io_condvar_notify_all"] +@[extern "lean_io_condvar_notify_all", deprecated "Use Std.Condvar.notifyAll from Std.Sync.Mutex instead" (since := "2024-12-02")] opaque Condvar.notifyAll (condvar : @& Condvar) : BaseIO Unit /-- Waits on the condition variable until the predicate is true. -/ +@[deprecated "Use Std.Condvar.waitUntil from Std.Sync.Mutex instead" (since := "2024-12-02")] def Condvar.waitUntil [Monad m] [MonadLift BaseIO m] (condvar : Condvar) (mutex : BaseMutex) (pred : m Bool) : m Unit := do while !(← pred) do @@ -78,6 +84,7 @@ The type `Mutex α` is similar to `IO.Ref α`, except that concurrent accesses are guarded by a mutex instead of atomic pointer operations and busy-waiting. -/ +@[deprecated "Use Std.Mutex from Std.Sync.Mutex instead" (since := "2024-12-02")] structure Mutex (α : Type) where private mk :: private ref : IO.Ref α mutex : BaseMutex @@ -86,6 +93,7 @@ structure Mutex (α : Type) where private mk :: instance : CoeOut (Mutex α) BaseMutex where coe := Mutex.mutex /-- Creates a new mutex. -/ +@[deprecated "Use Std.Mutex.new from Std.Sync.Mutex instead" (since := "2024-12-02")] def Mutex.new (a : α) : BaseIO (Mutex α) := return { ref := ← mkRef a, mutex := ← BaseMutex.new } @@ -94,9 +102,11 @@ def Mutex.new (a : α) : BaseIO (Mutex α) := with outside monad `m`. The action has access to the state `α` of the mutex (via `get` and `set`). -/ +@[deprecated "Use Std.AtomicT from Std.Sync.Mutex instead" (since := "2024-12-02")] abbrev AtomicT := StateRefT' IO.RealWorld /-- `mutex.atomically k` runs `k` with access to the mutex's state while locking the mutex. -/ +@[deprecated "Use Std.Mutex.atomically from Std.Sync.Mutex instead" (since := "2024-12-02")] def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m] (mutex : Mutex α) (k : AtomicT α m β) : m β := do try @@ -110,6 +120,7 @@ def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m] waiting on `condvar` until `pred` returns true. Both `k` and `pred` have access to the mutex's state. -/ +@[deprecated "Use Std.Mutex.atomicallyOnce from Std.Sync.Mutex instead" (since := "2024-12-02")] def Mutex.atomicallyOnce [Monad m] [MonadLiftT BaseIO m] [MonadFinally m] (mutex : Mutex α) (condvar : Condvar) (pred : AtomicT α m Bool) (k : AtomicT α m β) : m β := diff --git a/src/Lean/Server/FileWorker.lean b/src/Lean/Server/FileWorker.lean index a6a4dccbc36d..a10dda3aba85 100644 --- a/src/Lean/Server/FileWorker.lean +++ b/src/Lean/Server/FileWorker.lean @@ -6,7 +6,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki -/ prelude import Init.System.IO -import Init.Data.Channel +import Std.Sync.Channel import Lean.Data.RBMap import Lean.Environment @@ -64,7 +64,7 @@ open Widget in structure WorkerContext where /-- Synchronized output channel for LSP messages. Notifications for outdated versions are discarded on read. -/ - chanOut : IO.Channel JsonRpc.Message + chanOut : Std.Channel JsonRpc.Message /-- Latest document version received by the client, used for filtering out notifications from previous versions. @@ -75,7 +75,7 @@ structure WorkerContext where Channel that receives a message for every a `$/lean/fileProgress` notification, indicating whether the notification suggests that the file is currently being processed. -/ - chanIsProcessing : IO.Channel Bool + chanIsProcessing : Std.Channel Bool /-- Diagnostics that are included in every single `textDocument/publishDiagnostics` notification. -/ @@ -271,7 +271,7 @@ open Language Lean in Callback from Lean language processor after parsing imports that requests necessary information from Lake for processing imports. -/ -def setupImports (meta : DocumentMeta) (cmdlineOpts : Options) (chanOut : Channel JsonRpc.Message) +def setupImports (meta : DocumentMeta) (cmdlineOpts : Options) (chanOut : Std.Channel JsonRpc.Message) (srcSearchPathPromise : Promise SearchPath) (stx : Syntax) : Language.ProcessingT IO (Except Language.Lean.HeaderProcessedSnapshot SetupImportsResult) := do let importsAlreadyLoaded ← importsLoadedRef.modifyGet ((·, true)) @@ -337,7 +337,7 @@ section Initialization let clientHasWidgets := initParams.initializationOptions?.bind (·.hasWidgets?) |>.getD false let maxDocVersionRef ← IO.mkRef 0 let freshRequestIdRef ← IO.mkRef (0 : Int) - let chanIsProcessing ← IO.Channel.new + let chanIsProcessing ← Std.Channel.new let stickyDiagnosticsRef ← IO.mkRef ∅ let chanOut ← mkLspOutputChannel maxDocVersionRef chanIsProcessing let srcSearchPathPromise ← IO.Promise.new @@ -380,8 +380,8 @@ section Initialization the output FS stream after discarding outdated notifications. This is the only component of the worker with access to the output stream, so we can synchronize messages from parallel elaboration tasks here. -/ - mkLspOutputChannel maxDocVersion chanIsProcessing : IO (IO.Channel JsonRpc.Message) := do - let chanOut ← IO.Channel.new + mkLspOutputChannel maxDocVersion chanIsProcessing : IO (Std.Channel JsonRpc.Message) := do + let chanOut ← Std.Channel.new let _ ← chanOut.forAsync (prio := .dedicated) fun msg => do -- discard outdated notifications; note that in contrast to responses, notifications can -- always be silently discarded diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index 67586ecf2fc8..a665908f22be 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -6,7 +6,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki -/ prelude import Init.System.IO -import Init.System.Mutex +import Std.Sync.Mutex import Init.Data.ByteArray import Lean.Data.RBMap @@ -113,7 +113,7 @@ section FileWorker structure FileWorker where doc : DocumentMeta proc : Process.Child workerCfg - exitCode : IO.Mutex (Option UInt32) + exitCode : Std.Mutex (Option UInt32) commTask : Task WorkerEvent state : WorkerState -- This should not be mutated outside of namespace FileWorker, @@ -392,7 +392,7 @@ section ServerM -- open session for `kill` above setsid := true } - let exitCode ← IO.Mutex.new none + let exitCode ← Std.Mutex.new none let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap) let initialDependencyBuildMode := m.dependencyBuildMode let updatedDependencyBuildMode := diff --git a/src/Std.lean b/src/Std.lean index e48e80154693..0a2f53c641b6 100644 --- a/src/Std.lean +++ b/src/Std.lean @@ -6,6 +6,7 @@ Authors: Sebastian Ullrich prelude import Std.Data import Std.Sat +import Std.Sync import Std.Time import Std.Tactic import Std.Internal diff --git a/src/Std/Sync.lean b/src/Std/Sync.lean new file mode 100644 index 000000000000..dcac3c49a83a --- /dev/null +++ b/src/Std/Sync.lean @@ -0,0 +1,8 @@ +/- +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Henrik Böving +-/ +prelude +import Std.Sync.Channel +import Std.Sync.Mutex diff --git a/src/Std/Sync/Channel.lean b/src/Std/Sync/Channel.lean new file mode 100644 index 000000000000..6b668b746bf6 --- /dev/null +++ b/src/Std/Sync/Channel.lean @@ -0,0 +1,137 @@ +/- +Copyright (c) 2022 Microsoft Corporation. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Gabriel Ebner +-/ +prelude +import Init.System.Promise +import Init.Data.Queue +import Std.Sync.Mutex + +namespace Std + +/-- +Internal state of an `Channel`. + +We maintain the invariant that at all times either `consumers` or `values` is empty. +-/ +structure Channel.State (α : Type) where + values : Std.Queue α := ∅ + consumers : Std.Queue (IO.Promise (Option α)) := ∅ + closed := false + deriving Inhabited + +/-- +FIFO channel with unbounded buffer, where `recv?` returns a `Task`. + +A channel can be closed. Once it is closed, all `send`s are ignored, and +`recv?` returns `none` once the queue is empty. +-/ +def Channel (α : Type) : Type := Mutex (Channel.State α) + +instance : Nonempty (Channel α) := + inferInstanceAs (Nonempty (Mutex _)) + +/-- Creates a new `Channel`. -/ +def Channel.new : BaseIO (Channel α) := + Mutex.new {} + +/-- +Sends a message on an `Channel`. + +This function does not block. +-/ +def Channel.send (ch : Channel α) (v : α) : BaseIO Unit := + ch.atomically do + let st ← get + if st.closed then return + if let some (consumer, consumers) := st.consumers.dequeue? then + consumer.resolve (some v) + set { st with consumers } + else + set { st with values := st.values.enqueue v } + +/-- +Closes an `Channel`. +-/ +def Channel.close (ch : Channel α) : BaseIO Unit := + ch.atomically do + let st ← get + for consumer in st.consumers.toArray do consumer.resolve none + set { st with closed := true, consumers := ∅ } + +/-- +Receives a message, without blocking. +The returned task waits for the message. +Every message is only received once. + +Returns `none` if the channel is closed and the queue is empty. +-/ +def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) := + ch.atomically do + let st ← get + if let some (a, values) := st.values.dequeue? then + set { st with values } + return .pure a + else if !st.closed then + let promise ← IO.Promise.new + set { st with consumers := st.consumers.enqueue promise } + return promise.result + else + return .pure none + +/-- +`ch.forAsync f` calls `f` for every messages received on `ch`. + +Note that if this function is called twice, each `forAsync` only gets half the messages. +-/ +partial def Channel.forAsync (f : α → BaseIO Unit) (ch : Channel α) + (prio : Task.Priority := .default) : BaseIO (Task Unit) := do + BaseIO.bindTask (prio := prio) (← ch.recv?) fun + | none => return .pure () + | some v => do f v; ch.forAsync f prio + +/-- +Receives all currently queued messages from the channel. + +Those messages are dequeued and will not be returned by `recv?`. +-/ +def Channel.recvAllCurrent (ch : Channel α) : BaseIO (Array α) := + ch.atomically do + modifyGet fun st => (st.values.toArray, { st with values := ∅ }) + +/-- Type tag for synchronous (blocking) operations on a `Channel`. -/ +def Channel.Sync := Channel + +/-- +Accesses synchronous (blocking) version of channel operations. + +For example, `ch.sync.recv?` blocks until the next message, +and `for msg in ch.sync do ...` iterates synchronously over the channel. +These functions should only be used in dedicated threads. +-/ +def Channel.sync (ch : Channel α) : Channel.Sync α := ch + +/-- +Synchronously receives a message from the channel. + +Every message is only received once. +Returns `none` if the channel is closed and the queue is empty. +-/ +def Channel.Sync.recv? (ch : Channel.Sync α) : BaseIO (Option α) := do + IO.wait (← Channel.recv? ch) + +private partial def Channel.Sync.forIn [Monad m] [MonadLiftT BaseIO m] + (ch : Channel.Sync α) (f : α → β → m (ForInStep β)) : β → m β := fun b => do + match ← ch.recv? with + | some a => + match ← f a b with + | .done b => pure b + | .yield b => ch.forIn f b + | none => pure b + +/-- `for msg in ch.sync do ...` receives all messages in the channel until it is closed. -/ +instance [MonadLiftT BaseIO m] : ForIn m (Channel.Sync α) α where + forIn ch b f := ch.forIn f b + +end Std diff --git a/src/Std/Sync/Mutex.lean b/src/Std/Sync/Mutex.lean new file mode 100644 index 000000000000..62011aef4b30 --- /dev/null +++ b/src/Std/Sync/Mutex.lean @@ -0,0 +1,121 @@ +/- +Copyright (c) 2022 Microsoft Corporation. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Gabriel Ebner +-/ +prelude +import Init.System.IO +import Init.Control.StateRef + +namespace Std + +private opaque BaseMutexImpl : NonemptyType.{0} + +/-- +Mutual exclusion primitive (a lock). + +If you want to guard shared state, use `Mutex α` instead. +-/ +def BaseMutex : Type := BaseMutexImpl.type + +instance : Nonempty BaseMutex := BaseMutexImpl.property + +/-- Creates a new `BaseMutex`. -/ +@[extern "lean_io_basemutex_new"] +opaque BaseMutex.new : BaseIO BaseMutex + +/-- +Locks a `BaseMutex`. Waits until no other thread has locked the mutex. + +The current thread must not have already locked the mutex. +Reentrant locking is undefined behavior (inherited from the C++ implementation). +-/ +@[extern "lean_io_basemutex_lock"] +opaque BaseMutex.lock (mutex : @& BaseMutex) : BaseIO Unit + +/-- +Unlocks a `BaseMutex`. + +The current thread must have already locked the mutex. +Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation). +-/ +@[extern "lean_io_basemutex_unlock"] +opaque BaseMutex.unlock (mutex : @& BaseMutex) : BaseIO Unit + +private opaque CondvarImpl : NonemptyType.{0} + +/-- Condition variable. -/ +def Condvar : Type := CondvarImpl.type + +instance : Nonempty Condvar := CondvarImpl.property + +/-- Creates a new condition variable. -/ +@[extern "lean_io_condvar_new"] +opaque Condvar.new : BaseIO Condvar + +/-- Waits until another thread calls `notifyOne` or `notifyAll`. -/ +@[extern "lean_io_condvar_wait"] +opaque Condvar.wait (condvar : @& Condvar) (mutex : @& BaseMutex) : BaseIO Unit + +/-- Wakes up a single other thread executing `wait`. -/ +@[extern "lean_io_condvar_notify_one"] +opaque Condvar.notifyOne (condvar : @& Condvar) : BaseIO Unit + +/-- Wakes up all other threads executing `wait`. -/ +@[extern "lean_io_condvar_notify_all"] +opaque Condvar.notifyAll (condvar : @& Condvar) : BaseIO Unit + +/-- Waits on the condition variable until the predicate is true. -/ +def Condvar.waitUntil [Monad m] [MonadLift BaseIO m] + (condvar : Condvar) (mutex : BaseMutex) (pred : m Bool) : m Unit := do + while !(← pred) do + condvar.wait mutex + +/-- +Mutual exclusion primitive (lock) guarding shared state of type `α`. + +The type `Mutex α` is similar to `IO.Ref α`, +except that concurrent accesses are guarded by a mutex +instead of atomic pointer operations and busy-waiting. +-/ +structure Mutex (α : Type) where private mk :: + private ref : IO.Ref α + mutex : BaseMutex + deriving Nonempty + +instance : CoeOut (Mutex α) BaseMutex where coe := Mutex.mutex + +/-- Creates a new mutex. -/ +def Mutex.new (a : α) : BaseIO (Mutex α) := + return { ref := ← IO.mkRef a, mutex := ← BaseMutex.new } + +/-- +`AtomicT α m` is the monad that can be atomically executed inside a `Mutex α`, +with outside monad `m`. +The action has access to the state `α` of the mutex (via `get` and `set`). +-/ +abbrev AtomicT := StateRefT' IO.RealWorld + +/-- `mutex.atomically k` runs `k` with access to the mutex's state while locking the mutex. -/ +def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m] + (mutex : Mutex α) (k : AtomicT α m β) : m β := do + try + mutex.mutex.lock + k mutex.ref + finally + mutex.mutex.unlock + +/-- +`mutex.atomicallyOnce condvar pred k` runs `k`, +waiting on `condvar` until `pred` returns true. +Both `k` and `pred` have access to the mutex's state. +-/ +def Mutex.atomicallyOnce [Monad m] [MonadLiftT BaseIO m] [MonadFinally m] + (mutex : Mutex α) (condvar : Condvar) + (pred : AtomicT α m Bool) (k : AtomicT α m β) : m β := + let _ : MonadLift BaseIO (AtomicT α m) := ⟨liftM⟩ + mutex.atomically do + condvar.waitUntil mutex pred + k + +end Std