Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asyncStream and asyncThrowingStream for Signal and SignalProducer #847

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# master
*Please add new entries at the top.*
1. Add Swift Concurrency extensions `asyncStream` and `asyncThrowingStream` to `Signal` and `SignalProducer` (#847)

1. Fix some issues related to locking, bumped min OS versions to iOS 10, macOS 10.12, tvOS 10, watchOS 3 (#859, kudos to @mluisbrown)
1. Add `async` helpers to Schedulers (#857, kudos to @p4checo)
Expand All @@ -16,7 +17,6 @@

# 6.7.0
# 6.7.0-rc1

1. New operator `SignalProducer.Type.interval(_:interval:on:)` for emitting elements from a given sequence regularly. (#810, kudos to @mluisbrown)

1. `Signal` offers two special variants for advanced users: unserialized and reentrant-unserialized. (#797)
Expand Down
28 changes: 28 additions & 0 deletions ReactiveSwift.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@
A9B315C61B3940810001CB9C /* Bag.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0C312BC19EF2A5800984962 /* Bag.swift */; };
A9B315C81B3940810001CB9C /* FoundationExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = D03B4A3C19F4C39A009E02AC /* FoundationExtensions.swift */; };
A9B315CA1B3940AB0001CB9C /* ReactiveSwift.h in Headers */ = {isa = PBXBuildFile; fileRef = D04725EF19E49ED7006002AA /* ReactiveSwift.h */; settings = {ATTRIBUTES = (Public, ); }; };
A9F3C403273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C401273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift */; };
A9F3C404273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C401273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift */; };
A9F3C405273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C401273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift */; };
A9F3C406273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C401273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift */; };
A9F3C407273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C402273E43C5000F0E18 /* Signal+SwiftConcurrency.swift */; };
A9F3C408273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C402273E43C5000F0E18 /* Signal+SwiftConcurrency.swift */; };
A9F3C409273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C402273E43C5000F0E18 /* Signal+SwiftConcurrency.swift */; };
A9F3C40A273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C402273E43C5000F0E18 /* Signal+SwiftConcurrency.swift */; };
A9F3C40C273E43E9000F0E18 /* SwiftConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C40B273E43E9000F0E18 /* SwiftConcurrencyTests.swift */; };
A9F3C40D273E43E9000F0E18 /* SwiftConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C40B273E43E9000F0E18 /* SwiftConcurrencyTests.swift */; };
A9F3C40E273E43E9000F0E18 /* SwiftConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = A9F3C40B273E43E9000F0E18 /* SwiftConcurrencyTests.swift */; };
A9F793341B60D0140026BCBA /* Optional.swift in Sources */ = {isa = PBXBuildFile; fileRef = D871D69E1B3B29A40070F16C /* Optional.swift */; };
B696FB811A7640C00075236D /* TestError.swift in Sources */ = {isa = PBXBuildFile; fileRef = B696FB801A7640C00075236D /* TestError.swift */; };
B696FB821A7640C00075236D /* TestError.swift in Sources */ = {isa = PBXBuildFile; fileRef = B696FB801A7640C00075236D /* TestError.swift */; };
Expand Down Expand Up @@ -411,6 +422,9 @@
A97451351B3A935E00F48E55 /* watchOS-Framework.xcconfig */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.xcconfig; path = "watchOS-Framework.xcconfig"; sourceTree = "<group>"; };
A97451361B3A935E00F48E55 /* watchOS-StaticLibrary.xcconfig */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.xcconfig; path = "watchOS-StaticLibrary.xcconfig"; sourceTree = "<group>"; };
A9B315541B3940610001CB9C /* ReactiveSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
A9F3C401273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SignalProducer+SwiftConcurrency.swift"; sourceTree = "<group>"; };
A9F3C402273E43C5000F0E18 /* Signal+SwiftConcurrency.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Signal+SwiftConcurrency.swift"; sourceTree = "<group>"; };
A9F3C40B273E43E9000F0E18 /* SwiftConcurrencyTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SwiftConcurrencyTests.swift; sourceTree = "<group>"; };
B696FB801A7640C00075236D /* TestError.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TestError.swift; sourceTree = "<group>"; };
BE9CF3941D751B6B003AE479 /* UnidirectionalBinding.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBinding.swift; sourceTree = "<group>"; };
BFA6B94A1A76044800C846D1 /* SignalProducerNimbleMatchers.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SignalProducerNimbleMatchers.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -573,6 +587,8 @@
9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */,
D08C54B11A69A2AC00AD8286 /* Signal.swift */,
D08C54B21A69A2AC00AD8286 /* SignalProducer.swift */,
A9F3C402273E43C5000F0E18 /* Signal+SwiftConcurrency.swift */,
A9F3C401273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift */,
BE9CF3941D751B6B003AE479 /* UnidirectionalBinding.swift */,
9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */,
);
Expand Down Expand Up @@ -669,6 +685,7 @@
0A0C8D68291BCFF000D1EAB7 /* TestSchedulerAsyncTestCase.swift */,
9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */,
9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */,
A9F3C40B273E43E9000F0E18 /* SwiftConcurrencyTests.swift */,
);
name = ReactiveSwiftTests;
path = Tests/ReactiveSwiftTests;
Expand Down Expand Up @@ -1026,6 +1043,7 @@
9A2D5CE8259F852B005682ED /* CombinePrevious.swift in Sources */,
9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D06259F8C39005682ED /* Reduce.swift in Sources */,
A9F3C406273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */,
9A2D5D56259FA000005682ED /* Throttle.swift in Sources */,
9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */,
9AFA491424E9A196003D263C /* Map.swift in Sources */,
Expand Down Expand Up @@ -1053,6 +1071,7 @@
4A0E11021D2A92720065D310 /* Lifetime.swift in Sources */,
9A2D5CB1259F8112005682ED /* TakeLast.swift in Sources */,
BE9CF3981D751B71003AE479 /* UnidirectionalBinding.swift in Sources */,
A9F3C40A273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */,
9A2D5CCF259F8263005682ED /* SkipWhile.swift in Sources */,
9A2D5C84259F7E3E005682ED /* DematerializeResults.swift in Sources */,
9A2D5C52259F7B21005682ED /* MapError.swift in Sources */,
Expand All @@ -1071,6 +1090,7 @@
7DFBED281CDB8DE300EE435B /* PropertySpec.swift in Sources */,
0A0C8D6B291BCFF000D1EAB7 /* TestSchedulerAsyncTestCase.swift in Sources */,
7DFBED291CDB8DE300EE435B /* SchedulerSpec.swift in Sources */,
A9F3C40E273E43E9000F0E18 /* SwiftConcurrencyTests.swift in Sources */,
7DFBED2A1CDB8DE300EE435B /* SignalLifetimeSpec.swift in Sources */,
7DFBED2B1CDB8DE300EE435B /* SignalProducerSpec.swift in Sources */,
9A681AA01E5A241B00B097CF /* DeprecationSpec.swift in Sources */,
Expand Down Expand Up @@ -1114,6 +1134,7 @@
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D05259F8C39005682ED /* Reduce.swift in Sources */,
9AFA491324E9A196003D263C /* Map.swift in Sources */,
A9F3C405273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */,
9A2D5CFB259F8634005682ED /* UniqueValues.swift in Sources */,
9A2D5C65259F7B47005682ED /* MaterializeAsResult.swift in Sources */,
5792972A26DE7623007A9F64 /* TakeUntil.swift in Sources */,
Expand Down Expand Up @@ -1141,6 +1162,7 @@
4A0E11011D2A92720065D310 /* Lifetime.swift in Sources */,
9A2D5CB0259F8112005682ED /* TakeLast.swift in Sources */,
BE9CF3971D751B71003AE479 /* UnidirectionalBinding.swift in Sources */,
A9F3C409273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */,
9A2D5CCE259F8263005682ED /* SkipWhile.swift in Sources */,
9A2D5C83259F7E3E005682ED /* DematerializeResults.swift in Sources */,
9A2D5C51259F7B21005682ED /* MapError.swift in Sources */,
Expand Down Expand Up @@ -1172,6 +1194,7 @@
9A2D5CE5259F852B005682ED /* CombinePrevious.swift in Sources */,
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D03259F8C39005682ED /* Reduce.swift in Sources */,
A9F3C403273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */,
9A2D5D53259FA000005682ED /* Throttle.swift in Sources */,
9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */,
9AFA491124E9A196003D263C /* Map.swift in Sources */,
Expand Down Expand Up @@ -1199,6 +1222,7 @@
D08C54B81A69A9D000AD8286 /* SignalProducer.swift in Sources */,
9A2D5CAE259F8112005682ED /* TakeLast.swift in Sources */,
BE9CF3951D751B6B003AE479 /* UnidirectionalBinding.swift in Sources */,
A9F3C407273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */,
9A2D5CCC259F8263005682ED /* SkipWhile.swift in Sources */,
9A2D5C81259F7E3E005682ED /* DematerializeResults.swift in Sources */,
9A2D5C4F259F7B21005682ED /* MapError.swift in Sources */,
Expand All @@ -1217,6 +1241,7 @@
D8170FC11B100EBC004192AD /* FoundationExtensionsSpec.swift in Sources */,
0A0C8D69291BCFF000D1EAB7 /* TestSchedulerAsyncTestCase.swift in Sources */,
C79B64741CD38B2B003F2376 /* TestLogger.swift in Sources */,
A9F3C40C273E43E9000F0E18 /* SwiftConcurrencyTests.swift in Sources */,
CA6F28501C52626B001879D2 /* FlattenSpec.swift in Sources */,
4A0E11041D2A95200065D310 /* LifetimeSpec.swift in Sources */,
9A681A9E1E5A241B00B097CF /* DeprecationSpec.swift in Sources */,
Expand Down Expand Up @@ -1260,6 +1285,7 @@
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D04259F8C39005682ED /* Reduce.swift in Sources */,
9AFA491224E9A196003D263C /* Map.swift in Sources */,
A9F3C404273E43C5000F0E18 /* SignalProducer+SwiftConcurrency.swift in Sources */,
9A2D5CFA259F8634005682ED /* UniqueValues.swift in Sources */,
9A2D5C64259F7B47005682ED /* MaterializeAsResult.swift in Sources */,
5792972926DE7623007A9F64 /* TakeUntil.swift in Sources */,
Expand Down Expand Up @@ -1287,6 +1313,7 @@
D0D11ABA1A6AE87700C1F8B1 /* Action.swift in Sources */,
9A2D5CAF259F8112005682ED /* TakeLast.swift in Sources */,
BE9CF3961D751B70003AE479 /* UnidirectionalBinding.swift in Sources */,
A9F3C408273E43C5000F0E18 /* Signal+SwiftConcurrency.swift in Sources */,
9A2D5CCD259F8263005682ED /* SkipWhile.swift in Sources */,
9A2D5C82259F7E3E005682ED /* DematerializeResults.swift in Sources */,
9A2D5C50259F7B21005682ED /* MapError.swift in Sources */,
Expand All @@ -1305,6 +1332,7 @@
D8024DB31B2E1BB0005E6B9A /* SignalProducerLiftingSpec.swift in Sources */,
0A0C8D6A291BCFF000D1EAB7 /* TestSchedulerAsyncTestCase.swift in Sources */,
BFA6B94E1A7604D500C846D1 /* SignalProducerNimbleMatchers.swift in Sources */,
A9F3C40D273E43E9000F0E18 /* SwiftConcurrencyTests.swift in Sources */,
B696FB821A7640C00075236D /* TestError.swift in Sources */,
D8170FC21B100EBC004192AD /* FoundationExtensionsSpec.swift in Sources */,
9A681A9F1E5A241B00B097CF /* DeprecationSpec.swift in Sources */,
Expand Down
52 changes: 52 additions & 0 deletions Sources/Signal+SwiftConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Signal+SwiftConcurrency.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//
#if compiler(>=5.5.2) && canImport(_Concurrency)
import Foundation

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension Signal {
public var asyncThrowingStream: AsyncThrowingStream<Value, Swift.Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use AsyncStream<Result<Value, Error>> to maintain the error type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump

AsyncThrowingStream<Value, Swift.Error> { continuation in
let disposable = observe { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the interrupted event throw a swift CancellationError?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable termination in
disposable?.dispose()
}
}
}
}

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension Signal where Error == Never {
public var asyncStream: AsyncStream<Value> {
AsyncStream<Value> { continuation in
let disposable = observe { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed:
fatalError("Never is impossible to construct")
}
}
continuation.onTermination = { @Sendable termination in
disposable?.dispose()
}
}
}
}
#endif
52 changes: 52 additions & 0 deletions Sources/SignalProducer+SwiftConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// SignalProducer+SwiftConcurrency.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//
#if compiler(>=5.5.2) && canImport(_Concurrency)
import Foundation

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension SignalProducer {
public var asyncThrowingStream: AsyncThrowingStream<Value, Swift.Error> {
AsyncThrowingStream<Value, Swift.Error> { continuation in
let disposable = start { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension SignalProducer where Error == Never {
public var asyncStream: AsyncStream<Value> {
AsyncStream<Value> { continuation in
let disposable = start { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed:
fatalError("Never is impossible to construct")
}
}
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}
#endif
129 changes: 129 additions & 0 deletions Tests/ReactiveSwiftTests/SwiftConcurrencyTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// SwiftConcurrencyTests.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//

#if compiler(>=5.5.2) && canImport(_Concurrency)
import Foundation
import ReactiveSwift
import XCTest

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
class SwiftConcurrencyTests: XCTestCase {
func testValuesAsyncSignalProducer() async {
let values = [1,2,3]
var sum = 0
let asyncStream = SignalProducer(values).asyncStream
for await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testValuesAsyncThrowingSignalProducer() async throws {
let values = [1,2,3]
var sum = 0
let asyncStream = SignalProducer(values).asyncThrowingStream
for try await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testCompleteAsyncSignalProducer() async {
let asyncStream = SignalProducer<String, Never>.empty.asyncStream
let first = await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testCompleteAsyncThrowingSignalProducer() async throws {
let asyncStream = SignalProducer<String, Error>.empty.asyncThrowingStream
let first = try await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testErrorSignalProducer() async {
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let asyncStream = SignalProducer<String, Error>(error: error).asyncThrowingStream
await XCTAssertThrowsError(try await asyncStream.first(where: { _ in true }))
}

func testValuesAsyncSignal() async {
let signal = Signal<Int, Never> { observer, _ in
DispatchQueue.main.async {
for number in [1, 2, 3] {
observer.send(value: number)
}
observer.sendCompleted()
}
}
var sum = 0
let asyncStream = signal.asyncStream
for await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testValuesAsyncThrowingSignal() async throws {
let signal = Signal<Int, Never> { observer, _ in
DispatchQueue.main.async {
for number in [1, 2, 3] {
observer.send(value: number)
}
observer.sendCompleted()
}
}
var sum = 0
let asyncStream = signal.asyncThrowingStream
for try await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testCompleteAsyncSignal() async {
let asyncStream = Signal<String, Never>.empty.asyncStream
let first = await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testCompleteAsyncThrowingSignal() async throws {
let asyncStream = Signal<String, Error>.empty.asyncThrowingStream
let first = try await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testErrorSignal() async {
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let signal = Signal<String, Error> { observer, _ in
DispatchQueue.main.async {
observer.send(error: error)
}
}
let asyncStream = signal.asyncThrowingStream
await XCTAssertThrowsError(try await asyncStream.first(where: { _ in true }))
}
}
// Extension to allow Throw assertion for async expressions
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
fileprivate extension XCTest {
func XCTAssertThrowsError<T: Sendable>(
_ expression: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath,
line: UInt = #line,
_ errorHandler: (_ error: Error) -> Void = { _ in }
) async {
do {
_ = try await expression()
XCTFail(message(), file: file, line: line)
} catch {
errorHandler(error)
}
}
}
#endif