Skip to content

Commit

Permalink
Improves locking behavior of Observable.merge (optimized array versio…
Browse files Browse the repository at this point in the history
…n). #1344
  • Loading branch information
kzaher committed Jul 19, 2017
1 parent 79ac3d0 commit 37a44d9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
8 changes: 4 additions & 4 deletions RxSwift/Observables/Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ fileprivate class MergeSink<SourceElement, SourceSequence: ObservableConvertible
}
do {
let value = try performMap(element)
_activeCount += 1
subscribeInner(value.asObservable())
}
catch let e {
Expand All @@ -482,18 +483,17 @@ fileprivate class MergeSink<SourceElement, SourceSequence: ObservableConvertible
func subscribeInner(_ source: Observable<Observer.E>) {
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = _group.insert(iterDisposable) {
_activeCount += 1
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter)
iterDisposable.setDisposable(subscription)
}
}

func run(_ sources: [SourceElement]) -> Disposable {
let _ = _group.insert(_sourceSubscription)
func run(_ sources: [Observable<Observer.E>]) -> Disposable {
_activeCount += sources.count

for source in sources {
self.on(.next(source))
subscribeInner(source)
}

_stopped = true
Expand Down
1 change: 1 addition & 0 deletions Sources/AllTestz/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ final class AnomaliesTest_ : AnomaliesTest, RxTestCase {
static var allTests: [(String, (AnomaliesTest_) -> () -> ())] { return [
("test936", AnomaliesTest.test936),
("test1323", AnomaliesTest.test1323),
("test1344", AnomaliesTest.test1344),
("testSeparationBetweenOnAndSubscriptionLocks", AnomaliesTest.testSeparationBetweenOnAndSubscriptionLocks),
] }
}
Expand Down
24 changes: 24 additions & 0 deletions Tests/RxSwiftTests/Anomalies.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,30 @@ extension AnomaliesTest {
}
}

func test1344(){
let disposeBag = DisposeBag()
let foo = Observable<Int>.create({ observer in
observer.on(.next(1))
Thread.sleep(forTimeInterval: 0.1)
observer.on(.completed)
return Disposables.create()
})
.flatMap { (int) -> Observable<[Int]> in
return Observable.create { (observer) -> Disposable in
DispatchQueue.global().async {
observer.onNext([int])
}
self.sleep(0.1)
return Disposables.create()
}
}

Observable.merge(foo, .just([42]))
.subscribe { (e) in
}
.disposed(by: disposeBag)
}

func testSeparationBetweenOnAndSubscriptionLocks() {
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
for i in 0 ..< 1 {
Expand Down

0 comments on commit 37a44d9

Please sign in to comment.