Skip to content

Commit

Permalink
Add asynchronous event source
Browse files Browse the repository at this point in the history
  • Loading branch information
kmcbride committed Feb 27, 2024
1 parent 26dcbeb commit 4ab4b74
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019-2024 Spotify AB.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public extension CompositeEventSourceBuilder {
/// Returns a new `CompositeEventSourceBuilder` with the specified `AsyncSequence` added to it.
///
/// - Note: The `consumerQueue` parameter is intended to be used when building a `MobiusLoop`.
/// It can safely be omitted when building a `MobiusController`, which automatically handles sending events to the loop queue.
///
/// - Parameter sequence: An `AsyncSequence` producing `Event`s.
/// - Parameter consumerQueue: An optional callback queue to consume events on.
/// - Returns: A `CompositeEventSourceBuilder` that includes the given event source.
func addEventSource<Sequence: AsyncSequence>(
_ sequence: Sequence,
receiveOn consumerQueue: DispatchQueue? = nil
) -> CompositeEventSourceBuilder<Event> where Sequence.Element == Event {
addEventSource(AsyncSequenceEventSource(sequence: sequence, consumerQueue: consumerQueue))
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private final class AsyncSequenceEventSource<Sequence: AsyncSequence>: EventSource {
private let sequence: Sequence
private let consumerQueue: DispatchQueue?

init(sequence: Sequence, consumerQueue: DispatchQueue? = nil) {
self.sequence = sequence
self.consumerQueue = consumerQueue
}

func subscribe(consumer: @escaping Consumer<Sequence.Element>) -> Disposable {
// Prevents sending events after dispose by wrapping the consumer to enforce synchronous access.
let protectedConsumer = Synchronized<Consumer<Sequence.Element>?>(value: consumer)
let threadSafeConsumer = { event in protectedConsumer.read { consumer in consumer?(event) } }

let task = Task { [consumerQueue] in
for try await event in sequence {
if let consumerQueue {
consumerQueue.async { threadSafeConsumer(event) }
} else {
threadSafeConsumer(event)
}
}
}

return AnonymousDisposable {
protectedConsumer.value = nil
task.cancel()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2019-2024 Spotify AB.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation
import MobiusCore
import Nimble
import Quick

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
final class CompositeEventSourceBuilder_ConcurrencyTests: QuickSpec {
// swiftlint:disable:next function_body_length
override func spec() {
describe("CompositeEventSourceBuilder") {
var sequence: AsyncStream<String>!
var elementProducer: AsyncStream<String>.Continuation!

beforeEach {
sequence = AsyncStream<String> { continuation in
elementProducer = continuation
}
}

context("when configuring the composite event source builder") {
var compositeEventSource: AnyEventSource<String>!
var disposable: Disposable!
var receivedEvents: [String]!

context("with an AsyncSequence event source") {
beforeEach {
let sut = CompositeEventSourceBuilder<String>()
.addEventSource(sequence, receiveOn: .main)

compositeEventSource = sut.build()
receivedEvents = []

disposable = compositeEventSource.subscribe {
receivedEvents.append($0)
}
}

afterEach {
disposable.dispose()
}

it("should receive events from the sequence") {
elementProducer.yield("foo")
expect(receivedEvents).toEventually(equal(["foo"]))

elementProducer.yield("bar")
expect(receivedEvents).toEventually(equal(["foo", "bar"]))
}
}
}

describe("DelayedSequence") {
context("MobiusLoop with an AsyncSequence event source") {
var loop: MobiusLoop<String, String, String>!
var receivedModels: [String]!

beforeEach {
let effectHandler = EffectRouter<String, String>()
.asConnectable

let eventSource = CompositeEventSourceBuilder<String>()
.addEventSource(sequence, receiveOn: .main)
.build()

loop = Mobius
.loop(update: { _, event in .next(event) }, effectHandler: effectHandler)
.withEventSource(eventSource)
.start(from: "foo")

receivedModels = []
loop.addObserver { model in receivedModels.append(model) }
}

afterEach {
loop.dispose()
}

it("should prevent events from being submitted after dispose") {
elementProducer.yield("bar")
expect(receivedModels).toEventually(equal(["foo", "bar"]))

loop.dispose()

elementProducer.yield("baz")
expect(receivedModels).toNever(equal(["foo", "bar", "baz"]))
}
}

context("MobiusController with an AsyncSequence event source") {
let loopQueue = DispatchQueue(label: "loop queue")
let viewQueue = DispatchQueue(label: "view queue")

var controller: MobiusController<String, String, String>!
var view: RecordingTestConnectable!

beforeEach {
let effectHandler = EffectRouter<String, String>()
.asConnectable

let eventSource = CompositeEventSourceBuilder<String>()
.addEventSource(sequence)
.build()

controller = Mobius
.loop(update: { _, event in .next(String(event)) }, effectHandler: effectHandler)
.withEventSource(eventSource)
.makeController(from: "foo", loopQueue: loopQueue, viewQueue: viewQueue)

view = RecordingTestConnectable(expectedQueue: viewQueue)
controller.connectView(view)
}

it("should prevent events from being submitted after dispose") {
controller.start()

elementProducer.yield("bar")
expect(view.recorder.items).toEventually(equal(["foo", "bar"]))

controller.stop()

elementProducer.yield("baz")
expect(view.recorder.items).toNever(equal(["foo", "bar", "baz"]))
}
}
}
}
}
}

0 comments on commit 4ab4b74

Please sign in to comment.