Skip to content

Commit

Permalink
Improves locking behavior of merge and switch operators. #1344
Browse files Browse the repository at this point in the history
  • Loading branch information
kzaher committed Jul 20, 2017
1 parent e782081 commit b343881
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 52 deletions.
94 changes: 57 additions & 37 deletions RxSwift/Observables/Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ fileprivate final class MergeLimitedBasicSink<SourceSequence: ObservableConverti

fileprivate class MergeLimitedSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>
: Sink<Observer>
, LockOwnerType
, SynchronizedOnType where Observer.E == SourceSequence.E {
, ObserverType where Observer.E == SourceSequence.E {
typealias QueueType = Queue<SourceSequence>

let _maxConcurrent: Int
Expand Down Expand Up @@ -267,14 +266,10 @@ fileprivate class MergeLimitedSink<SourceElement, SourceSequence: ObservableConv
func performMap(_ element: SourceElement) throws -> SourceSequence {
rxAbstractMethod()
}

func on(_ event: Event<SourceElement>) {
synchronizedOn(event)
}

func _synchronized_on(_ event: Event<SourceElement>) {
switch event {
case .next(let element):
@inline(__always)
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
_lock.lock(); defer { _lock.unlock() } // {
let subscribe: Bool
if _activeCount < _maxConcurrent {
_activeCount += 1
Expand All @@ -293,17 +288,31 @@ fileprivate class MergeLimitedSink<SourceElement, SourceSequence: ObservableConv

if subscribe {
do {
let value = try performMap(element)
self.subscribe(value, group: _group)
return try performMap(element)
} catch {
forwardOn(.error(error))
dispose()
}
}

return nil
// }
}

func on(_ event: Event<SourceElement>) {
switch event {
case .next(let element):
if let sequence = self.nextElementArrived(element: element) {
self.subscribe(sequence, group: _group)
}
case .error(let error):
_lock.lock(); defer { _lock.unlock() }

forwardOn(.error(error))
dispose()
case .completed:
_lock.lock(); defer { _lock.unlock() }

if _activeCount == 0 {
forwardOn(.completed)
dispose()
Expand Down Expand Up @@ -452,48 +461,59 @@ fileprivate class MergeSink<SourceElement, SourceSequence: ObservableConvertible
func performMap(_ element: SourceElement) throws -> SourceSequence {
rxAbstractMethod()
}

@inline(__always)
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
_lock.lock(); defer { _lock.unlock() } // {
if !subscribeNext {
return nil
}

do {
let value = try performMap(element)
_activeCount += 1
return value
}
catch let e {
forwardOn(.error(e))
dispose()
return nil
}
// }
}

func on(_ event: Event<SourceElement>) {
_lock.lock(); defer { _lock.unlock() } // lock {
switch event {
case .next(let element):
if !subscribeNext {
return
}
do {
let value = try performMap(element)
subscribeInner(value.asObservable())
}
catch let e {
forwardOn(.error(e))
dispose()
}
case .error(let error):
forwardOn(.error(error))
dispose()
case .completed:
_stopped = true
_sourceSubscription.dispose()
checkCompleted()
switch event {
case .next(let element):
if let value = nextElementArrived(element: element) {
subscribeInner(value.asObservable())
}
//}
case .error(let error):
_lock.lock(); defer { _lock.unlock() }
forwardOn(.error(error))
dispose()
case .completed:
_lock.lock(); defer { _lock.unlock() }
_stopped = true
_sourceSubscription.dispose()
checkCompleted()
}
}

func subscribeInner(_ source: Observable<Observer.E>) {
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = _group.insert(iterDisposable) {
_activeCount += 1
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter)
iterDisposable.setDisposable(subscription)
}
}

func run(_ sources: [SourceElement]) -> Disposable {
let _ = _group.insert(_sourceSubscription)
func run(_ sources: [Observable<Observer.E>]) -> Disposable {
_activeCount += sources.count

for source in sources {
self.on(.next(source))
subscribeInner(source)
}

_stopped = true
Expand Down
35 changes: 20 additions & 15 deletions RxSwift/Observables/Switch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ extension ObservableType where E : ObservableConvertibleType {

fileprivate class SwitchSink<SourceType, S: ObservableConvertibleType, O: ObserverType>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType where S.E == O.E {
, ObserverType where S.E == O.E {
typealias E = SourceType

fileprivate let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
Expand All @@ -69,39 +67,46 @@ fileprivate class SwitchSink<SourceType, S: ObservableConvertibleType, O: Observ
_subscriptions.setDisposable(subscription)
return Disposables.create(_subscriptions, _innerSubscription)
}

func on(_ event: Event<E>) {
synchronizedOn(event)
}

func performMap(_ element: SourceType) throws -> S {
rxAbstractMethod()
}

func _synchronized_on(_ event: Event<E>) {
switch event {
case .next(let element):
@inline(__always)
final private func nextElementArrived(element: E) -> (Int, Observable<S.E>)? {
_lock.lock(); defer { _lock.unlock() } // {
do {
let observable = try performMap(element).asObservable()
_hasLatest = true
_latest = _latest &+ 1
let latest = _latest
return (_latest, observable)
}
catch let error {
forwardOn(.error(error))
dispose()
}

return nil
// }
}

func on(_ event: Event<E>) {
switch event {
case .next(let element):
if let (latest, observable) = nextElementArrived(element: element) {
let d = SingleAssignmentDisposable()
_innerSubscription.disposable = d

let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
let disposable = observable.subscribe(observer)
d.setDisposable(disposable)
}
catch let error {
forwardOn(.error(error))
dispose()
}
case .error(let error):
_lock.lock(); defer { _lock.unlock() }
forwardOn(.error(error))
dispose()
case .completed:
_lock.lock(); defer { _lock.unlock() }
_stopped = true

_subscriptions.dispose()
Expand Down
1 change: 1 addition & 0 deletions Sources/AllTestz/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ final class AnomaliesTest_ : AnomaliesTest, RxTestCase {
static var allTests: [(String, (AnomaliesTest_) -> () -> ())] { return [
("test936", AnomaliesTest.test936),
("test1323", AnomaliesTest.test1323),
("test1344", AnomaliesTest.test1344),
("testSeparationBetweenOnAndSubscriptionLocks", AnomaliesTest.testSeparationBetweenOnAndSubscriptionLocks),
] }
}
Expand Down
24 changes: 24 additions & 0 deletions Tests/RxSwiftTests/Anomalies.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,30 @@ extension AnomaliesTest {
}
}

func test1344(){
let disposeBag = DisposeBag()
let foo = Observable<Int>.create({ observer in
observer.on(.next(1))
Thread.sleep(forTimeInterval: 0.1)
observer.on(.completed)
return Disposables.create()
})
.flatMap { (int) -> Observable<[Int]> in
return Observable.create { (observer) -> Disposable in
DispatchQueue.global().async {
observer.onNext([int])
}
self.sleep(0.1)
return Disposables.create()
}
}

Observable.merge(foo, .just([42]))
.subscribe { (e) in
}
.disposed(by: disposeBag)
}

func testSeparationBetweenOnAndSubscriptionLocks() {
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
for i in 0 ..< 1 {
Expand Down

0 comments on commit b343881

Please sign in to comment.