diff --git a/weave/channels/channels_mpsc_unbounded_batch.nim b/weave/channels/channels_mpsc_unbounded_batch.nim index cde8d01..6236bf0 100644 --- a/weave/channels/channels_mpsc_unbounded_batch.nim +++ b/weave/channels/channels_mpsc_unbounded_batch.nim @@ -11,8 +11,9 @@ import # - https://github.com/nim-lang/Nim/issues/12714 # - https://github.com/nim-lang/Nim/issues/13048 -macro derefMPSC*(T: typedesc): typedesc = - # This somehows isn't bound properly when used in a typesection +macro derefMPSC*(keepCount: static bool, T: typedesc): typedesc = + # This somehows isn't bound properly when used in a typesection (it needs to be exported in this module) + # and it's even worse if that type section involves a static (it needs to be reexported without hops to all modules!) let instantiated = T.getTypeInst instantiated.expectkind(nnkBracketExpr) doAssert instantiated[0].eqIdent"typeDesc" @@ -44,7 +45,7 @@ type x.next is Atomic[pointer] # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 - ChannelMpscUnboundedBatch*[T: Enqueueable] = object + ChannelMpscUnboundedBatch*[keepCount: static bool, T: Enqueueable] = object ## Lockless multi-producer single-consumer channel ## ## Properties: @@ -64,7 +65,7 @@ type # Producers and consumer slow-path back{.align: MpscPadding.}: Atomic[pointer] # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 - # Accessed by all + # Accessed by all - don't remove it even when not keeping count. This pads back/front by 2x cache lines. count{.align: MpscPadding.}: Atomic[int] # Consumer only - front is a dummy node front{.align: MpscPadding.}: derefMPSC(T) @@ -87,12 +88,13 @@ type # Implementation # -------------------------------------------------------------- -proc initialize*[T](chan: var ChannelMpscUnboundedBatch[T]) {.inline.}= +proc initialize*[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T]) {.inline.}= chan.front.next.store(nil, moRelaxed) chan.back.store(chan.front.addr, moRelaxed) - chan.count.store(0, moRelaxed) + when keepCount: + chan.count.store(0, moRelaxed) -proc trySend*[T](chan: var ChannelMpscUnboundedBatch[T], src: sink T): bool {.inline.}= +proc trySend*[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], src: sink T): bool {.inline.}= ## Send an item to the back of the channel ## As the channel has unbounded capacity, this should never fail @@ -106,12 +108,13 @@ proc trySend*[T](chan: var ChannelMpscUnboundedBatch[T], src: sink T): bool {.in return true -proc trySendBatch*[T](chan: var ChannelMpscUnboundedBatch[T], first, last: sink T, count: SomeInteger): bool {.inline.}= +proc trySendBatch*[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], first, last: sink T, count: SomeInteger): bool {.inline.}= ## Send a list of items to the back of the channel ## They should be linked together by their next field ## As the channel has unbounded capacity this should never fail - discard chan.count.fetchAdd(int(count), moRelaxed) + when keepCount: + discard chan.count.fetchAdd(int(count), moRelaxed) last.next.store(nil, moRelaxed) fence(moRelease) let oldBack = chan.back.exchange(last, moRelaxed) @@ -121,7 +124,7 @@ proc trySendBatch*[T](chan: var ChannelMpscUnboundedBatch[T], first, last: sink return true -proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool = +proc tryRecv*[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], dst: var T): bool = ## Try receiving the next item buffered in the channel ## Returns true if successful (channel was not empty) ## This can fail spuriously on the last element if producer @@ -137,7 +140,8 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool = if not next.isNil: # Not competing with producers prefetch(first) - discard chan.count.fetchSub(1, moRelaxed) + when keepCount: + discard chan.count.fetchSub(1, moRelaxed) chan.front.next.store(next, moRelaxed) fence(moAcquire) dst = first @@ -153,7 +157,8 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool = chan.front.next.store(nil, moRelaxed) if compareExchange(chan.back, last, chan.front.addr, moAcquireRelease): # We won and replaced the last node with the channel front - discard chan.count.fetchSub(1, moRelaxed) + when keepCount: + discard chan.count.fetchSub(1, moRelaxed) dst = first return true @@ -170,13 +175,14 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool = next = first.next.load(moRelaxed) prefetch(first) - discard chan.count.fetchSub(1, moRelaxed) + when keepCount: + discard chan.count.fetchSub(1, moRelaxed) chan.front.next.store(next, moRelaxed) fence(moAcquire) dst = first return true -proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var T): int32 = +proc tryRecvBatch*[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], bFirst, bLast: var T): int32 = ## Try receiving all items buffered in the channel ## Returns true if at least some items are dequeued. ## There might be competition with producers for the last item @@ -210,8 +216,9 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var if front != last: # We lose the competition, bail out chan.front.next.store(front, moRelaxed) - discard chan.count.fetchSub(result, moRelaxed) - postCondition: chan.count.load(moRelaxed) >= 0 # TODO: somehow it can be negative + when keepCount: + discard chan.count.fetchSub(result, moRelaxed) + postCondition: chan.count.load(moRelaxed) >= 0 # TODO: somehow it can be negative return # front == last @@ -220,9 +227,10 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var # We won and replaced the last node with the channel front prefetch(front) result += 1 - discard chan.count.fetchSub(result, moRelaxed) bLast = front - postCondition: chan.count.load(moRelaxed) >= 0 + when keepCount: + discard chan.count.fetchSub(result, moRelaxed) + postCondition: chan.count.load(moRelaxed) >= 0 return # We lost but now we know that there is an extra node @@ -242,11 +250,13 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var prefetch(front) result += 1 - discard chan.count.fetchSub(result, moRelaxed) chan.front.next.store(next, moRelaxed) fence(moAcquire) bLast = front - postCondition: chan.count.load(moRelaxed) >= 0 + + when keepCount: + discard chan.count.fetchSub(result, moRelaxed) + postCondition: chan.count.load(moRelaxed) >= 0 func peek*(chan: var ChannelMpscUnboundedBatch): int32 {.inline.} = ## Estimates the number of items pending in the channel @@ -257,6 +267,7 @@ func peek*(chan: var ChannelMpscUnboundedBatch): int32 {.inline.} = ## the consumer removes them concurrently. ## ## This is a non-locking operation. + static: doAssert chan.keepCount result = int32 chan.count.load(moAcquire) # For the consumer it's always positive or zero @@ -278,13 +289,13 @@ when isMainModule: when not compileOption("threads"): {.error: "This requires --threads:on compilation flag".} - template sendLoop[T](chan: var ChannelMpscUnboundedBatch[T], + template sendLoop[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], data: sink T, body: untyped): untyped = while not chan.trySend(data): body - template recvLoop[T](chan: var ChannelMpscUnboundedBatch[T], + template recvLoop[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], data: var T, body: untyped): untyped = while not chan.tryRecv(data): @@ -319,7 +330,7 @@ when isMainModule: ThreadArgs = object ID: WorkerKind - chan: ptr ChannelMpscUnboundedBatch[Val] + chan: ptr ChannelMpscUnboundedBatch[true, Val] template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} = if args.ID == id: @@ -374,7 +385,7 @@ when isMainModule: echo "Testing if 15 threads can send data to 1 consumer" echo "------------------------------------------------------------------------" var threads: array[WorkerKind, Thread[ThreadArgs]] - let chan = createSharedU(ChannelMpscUnboundedBatch[Val]) # CreateU is not zero-init + let chan = createSharedU(ChannelMpscUnboundedBatch[true, Val]) # CreateU is not zero-init chan[].initialize() createThread(threads[Receiver], thread_func_receiver, ThreadArgs(ID: Receiver, chan: chan)) @@ -425,7 +436,7 @@ when isMainModule: echo "Testing if 15 threads can send data to 1 consumer with batch receive" echo "------------------------------------------------------------------------" var threads: array[WorkerKind, Thread[ThreadArgs]] - let chan = createSharedU(ChannelMpscUnboundedBatch[Val]) # CreateU is not zero-init + let chan = createSharedU(ChannelMpscUnboundedBatch[true, Val]) # CreateU is not zero-init chan[].initialize() # log("Channel address 0x%.08x (dummy 0x%.08x)\n", chan, chan.front.addr) diff --git a/weave/channels/pledges.nim b/weave/channels/pledges.nim index 66b6c97..7e65e0f 100644 --- a/weave/channels/pledges.nim +++ b/weave/channels/pledges.nim @@ -167,7 +167,7 @@ type # The MPSC Channel is intrusive to the PledgeImpl. # The end fields in the channel should be the consumer # to avoid cache-line conflicts with producer threads. - chan{.align: WV_CacheLinePadding div 2.}: ChannelMpscUnboundedBatch[TaskNode] + chan{.align: WV_CacheLinePadding div 2.}: ChannelMpscUnboundedBatch[false, TaskNode] deferredIn: Atomic[int32] deferredOut: Atomic[int32] fulfilled: Atomic[bool] @@ -513,8 +513,8 @@ macro delayedUntilMulti*(task: Task, pool: var TLPoolAllocator, pledges: varargs # TODO: Once upstream fixes https://github.com/nim-lang/Nim/issues/13122 # the size here will be wrong -assert sizeof(ChannelMpscUnboundedBatch[TaskNode]) == 56, # Upstream {.align.} bug - "MPSC channel size was " & $sizeof(ChannelMpscUnboundedBatch[TaskNode]) +assert sizeof(ChannelMpscUnboundedBatch[false, TaskNode]) == 56, # Upstream {.align.} bug + "MPSC channel size was " & $sizeof(ChannelMpscUnboundedBatch[false, TaskNode]) assert sizeof(PledgeImpl) == 128, "PledgeImpl size was " & $sizeof(PledgeImpl) diff --git a/weave/contexts.nim b/weave/contexts.nim index 52706c5..2c98a36 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -12,6 +12,8 @@ import ./config, ./instrumentation/[profilers, loggers, contracts] +export derefMPSC # Need to be reexported due to a static early resolution bug :/. + when defined(WV_metrics): import system/ansi_c, ./primitives/barriers @@ -49,10 +51,10 @@ template isRootTask*(task: Task): bool = template myTodoBoxes*: Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]] = globalCtx.com.tasks[localCtx.worker.ID] -template myThieves*: ChannelMpscUnboundedBatch[StealRequest] = +template myThieves*: ChannelMpscUnboundedBatch[true, StealRequest] = globalCtx.com.thefts[localCtx.worker.ID] -template getThievesOf*(worker: WorkerID): ChannelMpscUnboundedBatch[StealRequest] = +template getThievesOf*(worker: WorkerID): ChannelMpscUnboundedBatch[true, StealRequest] = globalCtx.com.thefts[worker] template myMemPool*: TLPoolAllocator = diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index 8d8010c..03f9fb2 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -36,7 +36,7 @@ type # per channel and a known max number of workers # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker" - thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]] + thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[true, StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] when static(WV_Backoff): parking*: ptr UncheckedArray[EventNotifier] diff --git a/weave/memory/memory_pools.nim b/weave/memory/memory_pools.nim index 7ec9f08..b347a21 100644 --- a/weave/memory/memory_pools.nim +++ b/weave/memory/memory_pools.nim @@ -35,6 +35,8 @@ import ./allocs, ./thread_id, std/atomics +export derefMPSC # Need to be reexported due to a static early resolution bug :/. + # Constants (move in config.nim) # ---------------------------------------------------------------------------------- @@ -112,7 +114,7 @@ type # ⚠️ Consumer thread field must be at the end # to prevent cache-line contention # and save on space (no padding on the next field) - remoteFree {.align: WV_CacheLinePadding.}: ChannelMpscUnboundedBatch[ptr MemBlock] + remoteFree {.align: WV_CacheLinePadding.}: ChannelMpscUnboundedBatch[true, ptr MemBlock] # Freed blocks, kept separately to deterministically trigger slow path # after an amortized amount of allocation localFree: ptr MemBlock @@ -623,8 +625,8 @@ proc takeover*(pool: var TLPoolAllocator, target: sink TLPoolAllocator) = # TODO: Once upstream fixes https://github.com/nim-lang/Nim/issues/13122 # the size here will likely be wrong -assert sizeof(ChannelMpscUnboundedBatch[ptr MemBlock]) == 272, - "MPSC channel size was " & $sizeof(ChannelMpscUnboundedBatch[ptr MemBlock]) +assert sizeof(ChannelMpscUnboundedBatch[true, ptr MemBlock]) == 272, + "MPSC channel size was " & $sizeof(ChannelMpscUnboundedBatch[true, ptr MemBlock]) assert sizeof(Arena) == WV_MemArenaSize, "The real arena size was " & $sizeof(Arena) & @@ -688,13 +690,13 @@ when isMainModule: when not compileOption("threads"): {.error: "This requires --threads:on compilation flag".} - template sendLoop[T](chan: var ChannelMpscUnboundedBatch[T], + template sendLoop[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], data: sink T, body: untyped): untyped = while not chan.trySend(data): body - template recvLoop[T](chan: var ChannelMpscUnboundedBatch[T], + template recvLoop[keepCount: static bool, T](chan: var ChannelMpscUnboundedBatch[keepCount, T], data: var T, body: untyped): untyped = while not chan.tryRecv(data): @@ -726,7 +728,7 @@ when isMainModule: ThreadArgs = object ID: WorkerKind - chan: ptr ChannelMpscUnboundedBatch[Val] + chan: ptr ChannelMpscUnboundedBatch[true, Val] pool: ptr TLPoolAllocator AllocKind = enum @@ -808,7 +810,7 @@ when isMainModule: var threads: array[WorkerKind, Thread[ThreadArgs]] var pools: ptr array[WorkerKind, TLPoolAllocator] - let chan = createSharedU(ChannelMpscUnboundedBatch[Val]) + let chan = createSharedU(ChannelMpscUnboundedBatch[true, Val]) chan[].initialize() pools = cast[typeof pools](createSharedU(TLPoolAllocator, pools[].len))