From 4695ab9de78ef77107d7b370fafa65f04ea8b51e Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sun, 13 Sep 2015 14:54:47 +0200 Subject: [PATCH] Adds `range` operator. --- CHANGELOG.md | 3 ++ Rx.xcodeproj/project.pbxproj | 6 +++ RxExample/RxExample.xcodeproj/project.pbxproj | 8 ++- RxSwift/DataStructures/Queue.swift | 3 +- .../Observables/Implementations/Range.swift | 51 +++++++++++++++++++ RxSwift/Observables/Observable+Creation.swift | 20 ++++++++ .../Schedulers/CurrentThreadScheduler.swift | 27 +++++++++- .../Tests/Observable+CreationTest.swift | 29 +++++++++++ .../Tests/Observable+TimeTest.swift | 10 ++++ 9 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 RxSwift/Observables/Implementations/Range.swift diff --git a/CHANGELOG.md b/CHANGELOG.md index 88ccfcba2..e996643f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,12 @@ All notable changes to this project will be documented in this file. * Renames `ImmediateScheduler` protocol to `ImmediateSchedulerType` * Renames `Scheduler` protocol to `SchedulerType` +* Adds `CurrentThreadScheduler` * Adds `generate` operator * Cleanup of dead observer code. * Removes `SpinLock`s in disposables in favor of more performant `OSAtomicCompareAndSwap32`. +* Adds `buffer` operator (version with time and count). +* Adds `range` operator. ## [2.0.0-alpha.2](https://github.com/ReactiveX/RxSwift/releases/tag/2.0.0-alpha.2) diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 6d891aaad..a680fccc1 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -262,6 +262,8 @@ C84B38EA1BA43380001B7D88 /* ScheduledItem.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38E71BA43380001B7D88 /* ScheduledItem.swift */; }; C84B38EE1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; }; C84B38EF1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; }; + C86409FC1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; }; + C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; }; C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; }; C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; }; C88254181B8A752B00B02D69 /* ItemEvents.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F41B8A752B00B02D69 /* ItemEvents.swift */; }; @@ -456,6 +458,7 @@ C821DBA11BA4DCAB008F3809 /* Buffer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Buffer.swift; sourceTree = ""; }; C84B38E71BA43380001B7D88 /* ScheduledItem.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ScheduledItem.swift; sourceTree = ""; }; C84B38ED1BA433CD001B7D88 /* Generate.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Generate.swift; sourceTree = ""; }; + C86409FB1BA593F500D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = ""; }; C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = ""; }; C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = ""; }; C88253F41B8A752B00B02D69 /* ItemEvents.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ItemEvents.swift; sourceTree = ""; }; @@ -667,6 +670,7 @@ C8093C801B8A72BE0088E94D /* ObserveOn.swift */, C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */, C8093C831B8A72BE0088E94D /* Producer.swift */, + C86409FB1BA593F500D3C4E8 /* Range.swift */, C8093C841B8A72BE0088E94D /* Reduce.swift */, C8093C851B8A72BE0088E94D /* RefCount.swift */, C8093C861B8A72BE0088E94D /* Sample.swift */, @@ -1354,6 +1358,7 @@ C8093D421B8A72BE0088E94D /* Switch.swift in Sources */, C8093DA01B8A72BE0088E94D /* BehaviorSubject.swift in Sources */, C8093D181B8A72BE0088E94D /* DelaySubscription.swift in Sources */, + C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */, C8093D221B8A72BE0088E94D /* Map.swift in Sources */, C8093CD01B8A72BE0088E94D /* InfiniteSequence.swift in Sources */, C8093D661B8A72BE0088E94D /* ObservableType.swift in Sources */, @@ -1464,6 +1469,7 @@ C8093D411B8A72BE0088E94D /* Switch.swift in Sources */, C8093D9F1B8A72BE0088E94D /* BehaviorSubject.swift in Sources */, C8093D171B8A72BE0088E94D /* DelaySubscription.swift in Sources */, + C86409FC1BA593F500D3C4E8 /* Range.swift in Sources */, C8093D211B8A72BE0088E94D /* Map.swift in Sources */, C8093CCF1B8A72BE0088E94D /* InfiniteSequence.swift in Sources */, C8093D651B8A72BE0088E94D /* ObservableType.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index f94f4864b..0cbe8afde 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -240,6 +240,7 @@ C86409F81BA5909000D3C4E8 /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098D1BA5909000D3C4E8 /* ReplaySubject.swift */; }; C86409F91BA5909000D3C4E8 /* SubjectType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098E1BA5909000D3C4E8 /* SubjectType.swift */; }; C86409FA1BA5909000D3C4E8 /* Variable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098F1BA5909000D3C4E8 /* Variable.swift */; }; + C86409FF1BA5A87200D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FE1BA5A87200D3C4E8 /* Range.swift */; }; C86E2F3E1AE5A0CA00C31024 /* SearchResultViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F321AE5A0CA00C31024 /* SearchResultViewModel.swift */; }; C86E2F3F1AE5A0CA00C31024 /* SearchViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F331AE5A0CA00C31024 /* SearchViewModel.swift */; }; C86E2F451AE5A0CA00C31024 /* WikipediaAPI.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F3B1AE5A0CA00C31024 /* WikipediaAPI.swift */; }; @@ -520,6 +521,7 @@ C864098D1BA5909000D3C4E8 /* ReplaySubject.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplaySubject.swift; sourceTree = ""; }; C864098E1BA5909000D3C4E8 /* SubjectType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubjectType.swift; sourceTree = ""; }; C864098F1BA5909000D3C4E8 /* Variable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Variable.swift; sourceTree = ""; }; + C86409FE1BA5A87200D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = ""; }; C86E2F321AE5A0CA00C31024 /* SearchResultViewModel.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = SearchResultViewModel.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; C86E2F331AE5A0CA00C31024 /* SearchViewModel.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = SearchViewModel.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; C86E2F3B1AE5A0CA00C31024 /* WikipediaAPI.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WikipediaAPI.swift; sourceTree = ""; }; @@ -973,10 +975,10 @@ C864093F1BA5909000D3C4E8 /* AsObservable.swift */, C86409401BA5909000D3C4E8 /* Buffer.swift */, C86409411BA5909000D3C4E8 /* Catch.swift */, + C86409451BA5909000D3C4E8 /* CombineLatest.swift */, C86409421BA5909000D3C4E8 /* CombineLatest+arity.swift */, C86409431BA5909000D3C4E8 /* CombineLatest+arity.tt */, C86409441BA5909000D3C4E8 /* CombineLatest+CollectionType.swift */, - C86409451BA5909000D3C4E8 /* CombineLatest.swift */, C86409461BA5909000D3C4E8 /* Concat.swift */, C86409471BA5909000D3C4E8 /* ConnectableObservable.swift */, C86409481BA5909000D3C4E8 /* Debug.swift */, @@ -997,6 +999,7 @@ C86409571BA5909000D3C4E8 /* ObserveOn.swift */, C86409581BA5909000D3C4E8 /* ObserveOnSerialDispatchQueue.swift */, C86409591BA5909000D3C4E8 /* Producer.swift */, + C86409FE1BA5A87200D3C4E8 /* Range.swift */, C864095A1BA5909000D3C4E8 /* Reduce.swift */, C864095B1BA5909000D3C4E8 /* RefCount.swift */, C864095C1BA5909000D3C4E8 /* Sample.swift */, @@ -1011,10 +1014,10 @@ C86409651BA5909000D3C4E8 /* TakeWhile.swift */, C86409661BA5909000D3C4E8 /* Throttle.swift */, C86409671BA5909000D3C4E8 /* Timer.swift */, + C864096B1BA5909000D3C4E8 /* Zip.swift */, C86409681BA5909000D3C4E8 /* Zip+arity.swift */, C86409691BA5909000D3C4E8 /* Zip+arity.tt */, C864096A1BA5909000D3C4E8 /* Zip+CollectionType.swift */, - C864096B1BA5909000D3C4E8 /* Zip.swift */, ); path = Implementations; sourceTree = ""; @@ -1405,6 +1408,7 @@ C86409B51BA5909000D3C4E8 /* ConnectableObservable.swift in Sources */, C84B3A3A1BA4345A001B7D88 /* DeinitAction.swift in Sources */, C84B3A621BA4345A001B7D88 /* UITableView+Rx.swift in Sources */, + C86409FF1BA5A87200D3C4E8 /* Range.swift in Sources */, C86409E61BA5909000D3C4E8 /* ObserverBase.swift in Sources */, C84B3A631BA4345A001B7D88 /* UITextField+Rx.swift in Sources */, C84B3A571BA4345A001B7D88 /* UICollectionView+Rx.swift in Sources */, diff --git a/RxSwift/DataStructures/Queue.swift b/RxSwift/DataStructures/Queue.swift index 238784a53..8545acb43 100644 --- a/RxSwift/DataStructures/Queue.swift +++ b/RxSwift/DataStructures/Queue.swift @@ -113,9 +113,8 @@ public struct Queue: SequenceType { public mutating func enqueue(element: T) { version++ - _ = count == storage.count if count == storage.count { - resizeTo(storage.count * resizeFactor) + resizeTo(max(storage.count, 1) * resizeFactor) } storage[pushNextIndex] = element diff --git a/RxSwift/Observables/Implementations/Range.swift b/RxSwift/Observables/Implementations/Range.swift new file mode 100644 index 000000000..5ef7c934b --- /dev/null +++ b/RxSwift/Observables/Implementations/Range.swift @@ -0,0 +1,51 @@ +// +// Range.swift +// Rx +// +// Created by Krunoslav Zaher on 9/13/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class RangeProducer<_CompilerWorkaround> : Producer { + let start: Int + let count: Int + let scheduler: ImmediateSchedulerType + + init(start: Int, count: Int, scheduler: ImmediateSchedulerType) { + self.start = start + self.count = count + self.scheduler = scheduler + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let sink = RangeSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return sink.run() + } +} + +class RangeSink<_CompilerWorkaround, O: ObserverType where O.E == Int> : Sink { + typealias Parent = RangeProducer<_CompilerWorkaround> + + let parent: Parent + + init(parent: Parent, observer: O, cancel: Disposable) { + self.parent = parent + super.init(observer: observer, cancel: cancel) + } + + func run() -> Disposable { + return self.parent.scheduler.scheduleRecursive(0) { i, recurse in + if i < self.parent.count { + self.observer?.on(.Next(self.parent.start + i)) + recurse(i + 1) + } + else { + self.observer?.on(.Completed) + self.dispose() + } + } + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Creation.swift b/RxSwift/Observables/Observable+Creation.swift index f06ff9aa8..61d20cd4e 100644 --- a/RxSwift/Observables/Observable+Creation.swift +++ b/RxSwift/Observables/Observable+Creation.swift @@ -128,4 +128,24 @@ to run the loop send out observer messages. */ public func generate(initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable { return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler) +} + +/** +Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to generate and send out observer messages. + +- parameter start: The value of the first integer in the sequence. +- parameter count: The number of sequential integers to generate. +- parameter scheduler: Scheduler to run the generator loop on. +- returns: An observable sequence that contains a range of sequential integral numbers. +*/ +public func range(start: Int, _ count: Int, _ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { + if count < 0 { + rxFatalError("count can't be negative") + } + + if start &+ (count - 1) < start { + rxFatalError("overflow of count") + } + + return RangeProducer(start: start, count: count, scheduler: scheduler) } \ No newline at end of file diff --git a/RxSwift/Schedulers/CurrentThreadScheduler.swift b/RxSwift/Schedulers/CurrentThreadScheduler.swift index 1c467d6d6..d8f2b2d2a 100644 --- a/RxSwift/Schedulers/CurrentThreadScheduler.swift +++ b/RxSwift/Schedulers/CurrentThreadScheduler.swift @@ -26,9 +26,19 @@ class CurrentThreadSchedulerKey : NSObject, NSCopying { } } +/** +Represents an object that schedules units of work on the current thread. + +This is the default scheduler for operators that generate elements. + +This scheduler is also sometimes called `trampoline scheduler`. +*/ public class CurrentThreadScheduler : ImmediateSchedulerType { typealias ScheduleQueue = RxMutableBox> + /** + The singleton instance of the current thread scheduler. + */ public static let instance = CurrentThreadScheduler() static var queue : ScheduleQueue? { @@ -40,10 +50,23 @@ public class CurrentThreadScheduler : ImmediateSchedulerType { } } - static var isScheduleRequired: Bool { + /** + Gets a value that indicates whether the caller must call a `schedule` method. + */ + public static var isScheduleRequired: Bool { return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] == nil } + /** + Schedules an action to be executed as soon as possible on current thread. + + If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be + automatically installed and uninstalled after all work is performed. + + - parameter state: State passed to the action to be executed. + - parameter action: Action to be executed. + - returns: The disposable object used to cancel the scheduled action (best effort). + */ public func schedule(state: StateType, action: (StateType) -> Disposable) -> Disposable { let queue = CurrentThreadScheduler.queue @@ -53,7 +76,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType { return scheduledItem } - let newQueue = RxMutableBox(Queue(capacity: 10)) + let newQueue = RxMutableBox(Queue(capacity: 0)) CurrentThreadScheduler.queue = newQueue action(state) diff --git a/RxTests/RxSwiftTests/Tests/Observable+CreationTest.swift b/RxTests/RxSwiftTests/Tests/Observable+CreationTest.swift index 747756a8f..3fa3468f7 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+CreationTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+CreationTest.swift @@ -99,3 +99,32 @@ extension ObservableCreationTests { XCTAssertEqual(count, 3) } } + +extension ObservableCreationTests { + func testRange_Boundaries() { + let scheduler = TestScheduler(initialClock: 0) + + let res = scheduler.start { + range(Int.max, 1, scheduler) + } + + XCTAssertEqual(res.messages, [ + next(201, Int.max), + completed(202) + ]) + } + + func testRange_Dispose() { + let scheduler = TestScheduler(initialClock: 0) + + let res = scheduler.start(204) { + range(-10, 5, scheduler) + } + + XCTAssertEqual(res.messages, [ + next(201, -10), + next(202, -9), + next(203, -8) + ]) + } +} \ No newline at end of file diff --git a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift index 3cf8d618a..82a1cba70 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1289,4 +1289,14 @@ extension ObservableTimeTest { ]) } + func bufferWithTimeOrCount_Default() { + let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) + + let result = try! range(1, 10, backgroundScheduler) + .buffer(timeSpan: 1000, count: 3, scheduler: backgroundScheduler) + .skip(1) + .first() + + XCTAssertEqual(result!, [4, 5, 6]) + } } \ No newline at end of file