Skip to content
This repository has been archived by the owner on Feb 2, 2025. It is now read-only.

Commit

Permalink
Merge pull request #7 from belozierov/NewSharedCoroutineDispatcher
Browse files Browse the repository at this point in the history
New shared coroutine dispatcher
  • Loading branch information
belozierov authored Apr 8, 2020
2 parents faa347f + 2cca971 commit d1fc1da
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,10 @@ import Darwin
extension CoroutineProtocol {

@inlinable internal func performAsCurrent<T>(_ block: () -> T) -> T {
let unmanaged = Unmanaged.passRetained(self)
defer { unmanaged.release() }
if let caller = pthread_getspecific(.coroutine) {
pthread_setspecific(.coroutine, unmanaged.toOpaque())
defer { pthread_setspecific(.coroutine, caller) }
return block()
} else {
pthread_setspecific(.coroutine, unmanaged.toOpaque())
defer { pthread_setspecific(.coroutine, nil) }
return block()
}
let caller = pthread_getspecific(.coroutine)
pthread_setspecific(.coroutine, Unmanaged.passUnretained(self).toOpaque())
defer { pthread_setspecific(.coroutine, caller) }
return block()
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,151 +6,105 @@
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

import Dispatch

internal final class SharedCoroutineDispatcher: CoroutineTaskExecutor {

internal struct Task {
let scheduler: CoroutineScheduler, task: () -> Void
}

private let mutex = PsxLock()
private let stackSize: Int
private var tasks = FifoQueue<Task>()

private var contextsCount: Int
private var freeQueues = [SharedCoroutineQueue]()
private var suspendedQueues = Set<SharedCoroutineQueue>()
private var freeCount: AtomicInt
private let queuesCount: Int
private let queues: UnsafeMutablePointer<SharedCoroutineQueue>
private var freeQueuesMask = AtomicBitMask()
private var suspendedQueuesMask = AtomicBitMask()
private var tasks = ThreadSafeFifoQueues<Task>()

internal init(contextsCount: Int, stackSize: Int) {
self.stackSize = stackSize
self.contextsCount = contextsCount
freeCount = AtomicInt(value: contextsCount)
freeQueues.reserveCapacity(contextsCount)
suspendedQueues.reserveCapacity(contextsCount)
startDispatchSource()
queuesCount = min(contextsCount, 63)
queues = .allocate(capacity: queuesCount)
(0..<queuesCount).forEach {
freeQueuesMask.insert($0)
(queues + $0).initialize(to: .init(tag: $0, stackSize: stackSize))
}
}

// MARK: - Start
// MARK: - Free

internal func execute(on scheduler: CoroutineScheduler, task: @escaping () -> Void) {
func perform() {
freeCount.update { max(0, $0 - 1) }
mutex.lock()
if let queue = freeQueue {
mutex.unlock()
queue.start(dispatcher: self, task: .init(scheduler: scheduler, task: task))
} else {
tasks.push(.init(scheduler: scheduler, task: task))
mutex.unlock()
}
}
if freeCount.value == 0 {
mutex.lock()
defer { mutex.unlock() }
if freeCount.value == 0 {
return tasks.push(.init(scheduler: scheduler, task: task))
}
}
scheduler.scheduleTask(perform)
private var hasFree: Bool {
!freeQueuesMask.isEmpty || !suspendedQueuesMask.isEmpty
}

private var freeQueue: SharedCoroutineQueue? {
if let queue = freeQueues.popLast() { return queue }
if contextsCount > 0 {
contextsCount -= 1
return SharedCoroutineQueue(stackSize: stackSize)
} else if suspendedQueues.count < 2 {
return suspendedQueues.popFirst()
if !freeQueuesMask.isEmpty, let index = freeQueuesMask.pop() { return queues[index] }
if !suspendedQueuesMask.isEmpty, let index = suspendedQueuesMask
.pop(offset: suspendedQueuesMask.rawValue % queuesCount) {
return queues[index]
}
var min = suspendedQueues.first!
for queue in suspendedQueues {
if queue.started == 1 {
return suspendedQueues.remove(queue)
} else if queue.started < min.started {
min = queue
}
}
return suspendedQueues.remove(min)
return nil
}

// MARK: - Resume

internal func resume(_ coroutine: SharedCoroutine) {
mutex.lock()
if suspendedQueues.remove(coroutine.queue) == nil {
coroutine.queue.push(coroutine)
mutex.unlock()
} else {
mutex.unlock()
freeCount.decrease()
coroutine.scheduler.scheduleTask {
coroutine.queue.resume(coroutine: coroutine)
}
}
private func pushTask(_ task: Task) {
tasks.push(task)
if hasFree { tasks.pop().map(startTask) }
}

// MARK: - Next
// MARK: - Start

internal func performNext(for queue: SharedCoroutineQueue) {
mutex.lock()
if let coroutine = queue.pop() {
mutex.unlock()
coroutine.scheduler.scheduleTask {
queue.resume(coroutine: coroutine)
}
} else if let task = tasks.pop() {
mutex.unlock()
task.scheduler.scheduleTask {
internal func execute(on scheduler: CoroutineScheduler, task: @escaping () -> Void) {
hasFree
? startTask(.init(scheduler: scheduler, task: task))
: pushTask(.init(scheduler: scheduler, task: task))
}

private func startTask(_ task: Task) {
task.scheduler.scheduleTask {
if let queue = self.freeQueue {
queue.start(dispatcher: self, task: task)
}
} else {
if queue.started == 0 {
freeQueues.append(queue)
} else {
suspendedQueues.insert(queue)
self.pushTask(task)
}
freeCount.increase()
mutex.unlock()
}
}

// MARK: - DispatchSourceMemoryPressure

#if os(Linux)

private func startDispatchSource() {}

#else

private lazy var memoryPressureSource: DispatchSourceMemoryPressure = {
let source = DispatchSource.makeMemoryPressureSource(eventMask: [.warning, .critical])
source.setEventHandler { [unowned self] in self.reset() }
return source
}()
// MARK: - Resume

private func startDispatchSource() {
if #available(OSX 10.12, iOS 10.0, *) {
memoryPressureSource.activate()
internal func resume(_ coroutine: SharedCoroutine) {
coroutine.queue.mutex.lock()
if suspendedQueuesMask.remove(coroutine.queue.tag) {
coroutine.queue.mutex.unlock()
coroutine.resumeOnQueue()
} else {
memoryPressureSource.resume()
coroutine.queue.prepared.push(coroutine)
coroutine.queue.mutex.unlock()
}
}

#endif
// MARK: - Next

internal func reset() {
mutex.lock()
contextsCount += freeQueues.count
freeCount.add(freeQueues.count)
freeQueues.removeAll(keepingCapacity: true)
mutex.unlock()
internal func performNext(for queue: SharedCoroutineQueue) {
queue.mutex.lock()
if let coroutine = queue.prepared.pop() {
queue.mutex.unlock()
coroutine.resumeOnQueue()
} else {
queue.started == 0
? freeQueuesMask.insert(queue.tag)
: suspendedQueuesMask.insert(queue.tag)
queue.mutex.unlock()
if hasFree { tasks.pop().map(startTask) }
}
}

deinit {
mutex.free()
queues.deinitialize(count: queuesCount)
queues.deallocate()
}

}

extension SharedCoroutine {

fileprivate func resumeOnQueue() {
scheduler.scheduleTask { self.queue.resume(coroutine: self) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,18 @@ internal final class SharedCoroutineQueue {
case finished, suspended, restarting
}

internal let tag: Int
internal let context: CoroutineContext
internal let mutex = PsxLock()
internal var prepared = FifoQueue<SharedCoroutine>()
private var coroutine: SharedCoroutine?
private var prepared = FifoQueue<SharedCoroutine>()
private(set) var started = 0

internal init(stackSize size: Int) {
internal init(tag: Int, stackSize size: Int) {
self.tag = tag
context = CoroutineContext(stackSize: size)
}

// MARK: - Queue

internal func push(_ coroutine: SharedCoroutine) {
prepared.push(coroutine)
}

internal func pop() -> SharedCoroutine? {
prepared.pop()
}

// MARK: - Actions

internal func start(dispatcher: SharedCoroutineDispatcher, task: Task) {
Expand Down Expand Up @@ -69,16 +62,8 @@ internal final class SharedCoroutineQueue {
}
}

}

extension SharedCoroutineQueue: Hashable {

@inlinable internal static func == (lhs: SharedCoroutineQueue, rhs: SharedCoroutineQueue) -> Bool {
lhs === rhs
}

@inlinable internal func hash(into hasher: inout Hasher) {
ObjectIdentifier(self).hash(into: &hasher)
deinit {
mutex.free()
}

}
59 changes: 59 additions & 0 deletions Sources/SwiftCoroutine/Helpers/AtomicBitMask.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// AtomicBitMask.swift
// SwiftCoroutine
//
// Created by Alex Belozierov on 06.04.2020.
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

private let deBruijn = [00, 01, 48, 02, 57, 49, 28, 03,
61, 58, 50, 42, 38, 29, 17, 04,
62, 55, 59, 36, 53, 51, 43, 22,
45, 39, 33, 30, 24, 18, 12, 05,
63, 47, 56, 27, 60, 41, 37, 16,
54, 35, 52, 21, 44, 32, 23, 11,
46, 26, 40, 15, 34, 20, 31, 10,
25, 14, 19, 09, 13, 08, 07, 06]

struct AtomicBitMask {

private var atomic = AtomicInt(value: 0)
var rawValue: Int { atomic.value }
var isEmpty: Bool { atomic.value == 0 }

mutating func insert(_ index: Int) {
atomic.update { $0 | (1 << index) }
}

mutating func remove(_ index: Int) -> Bool {
if isEmpty { return false }
let (new, old) = atomic.update { $0 & ~(1 << index) }
return new != old
}

mutating func pop() -> Int? {
var index: Int!
atomic.update {
if $0 == 0 { index = nil; return $0 }
let uint = UInt(bitPattern: $0)
let value = uint & (0 &- uint)
index = deBruijn[Int((value &* 285870213051386505) >> 58)]
return $0 & ~(1 << index)
}
return index
}

mutating func pop(offset: Int) -> Int? {
var index: Int!
atomic.update {
if $0 == 0 { index = nil; return $0 }
let uint = UInt(bitPattern: ($0 << offset) + ($0 >> (64 - offset)))
let value = uint & (0 &- uint)
index = deBruijn[Int((value &* 285870213051386505) >> 58)] - offset
if index < 0 { index += 64 }
return $0 & ~(1 << index)
}
return index
}

}
3 changes: 0 additions & 3 deletions Sources/SwiftCoroutine/Helpers/AtomicInt.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ internal struct AtomicInt {
}
}

@inlinable mutating func increase() { add(1) }
@inlinable mutating func decrease() { add(-1) }

@discardableResult @inlinable
mutating func update(_ transform: (Int) -> Int) -> (old: Int, new: Int) {
withUnsafeMutablePointer(to: &_value) {
Expand Down
Loading

0 comments on commit d1fc1da

Please sign in to comment.