Skip to content

Commit

Permalink
Adds range operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
kzaher committed Sep 13, 2015
1 parent ebe3386 commit 4695ab9
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ All notable changes to this project will be documented in this file.

* Renames `ImmediateScheduler` protocol to `ImmediateSchedulerType`
* Renames `Scheduler` protocol to `SchedulerType`
* Adds `CurrentThreadScheduler`
* Adds `generate` operator
* Cleanup of dead observer code.
* Removes `SpinLock`s in disposables in favor of more performant `OSAtomicCompareAndSwap32`.
* Adds `buffer` operator (version with time and count).
* Adds `range` operator.

## [2.0.0-alpha.2](https://github.com/ReactiveX/RxSwift/releases/tag/2.0.0-alpha.2)

Expand Down
6 changes: 6 additions & 0 deletions Rx.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@
C84B38EA1BA43380001B7D88 /* ScheduledItem.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38E71BA43380001B7D88 /* ScheduledItem.swift */; };
C84B38EE1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; };
C84B38EF1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; };
C86409FC1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; };
C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; };
C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; };
C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; };
C88254181B8A752B00B02D69 /* ItemEvents.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F41B8A752B00B02D69 /* ItemEvents.swift */; };
Expand Down Expand Up @@ -456,6 +458,7 @@
C821DBA11BA4DCAB008F3809 /* Buffer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Buffer.swift; sourceTree = "<group>"; };
C84B38E71BA43380001B7D88 /* ScheduledItem.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ScheduledItem.swift; sourceTree = "<group>"; };
C84B38ED1BA433CD001B7D88 /* Generate.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Generate.swift; sourceTree = "<group>"; };
C86409FB1BA593F500D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = "<group>"; };
C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
C88253F41B8A752B00B02D69 /* ItemEvents.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ItemEvents.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -667,6 +670,7 @@
C8093C801B8A72BE0088E94D /* ObserveOn.swift */,
C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */,
C8093C831B8A72BE0088E94D /* Producer.swift */,
C86409FB1BA593F500D3C4E8 /* Range.swift */,
C8093C841B8A72BE0088E94D /* Reduce.swift */,
C8093C851B8A72BE0088E94D /* RefCount.swift */,
C8093C861B8A72BE0088E94D /* Sample.swift */,
Expand Down Expand Up @@ -1354,6 +1358,7 @@
C8093D421B8A72BE0088E94D /* Switch.swift in Sources */,
C8093DA01B8A72BE0088E94D /* BehaviorSubject.swift in Sources */,
C8093D181B8A72BE0088E94D /* DelaySubscription.swift in Sources */,
C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */,
C8093D221B8A72BE0088E94D /* Map.swift in Sources */,
C8093CD01B8A72BE0088E94D /* InfiniteSequence.swift in Sources */,
C8093D661B8A72BE0088E94D /* ObservableType.swift in Sources */,
Expand Down Expand Up @@ -1464,6 +1469,7 @@
C8093D411B8A72BE0088E94D /* Switch.swift in Sources */,
C8093D9F1B8A72BE0088E94D /* BehaviorSubject.swift in Sources */,
C8093D171B8A72BE0088E94D /* DelaySubscription.swift in Sources */,
C86409FC1BA593F500D3C4E8 /* Range.swift in Sources */,
C8093D211B8A72BE0088E94D /* Map.swift in Sources */,
C8093CCF1B8A72BE0088E94D /* InfiniteSequence.swift in Sources */,
C8093D651B8A72BE0088E94D /* ObservableType.swift in Sources */,
Expand Down
8 changes: 6 additions & 2 deletions RxExample/RxExample.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@
C86409F81BA5909000D3C4E8 /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098D1BA5909000D3C4E8 /* ReplaySubject.swift */; };
C86409F91BA5909000D3C4E8 /* SubjectType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098E1BA5909000D3C4E8 /* SubjectType.swift */; };
C86409FA1BA5909000D3C4E8 /* Variable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098F1BA5909000D3C4E8 /* Variable.swift */; };
C86409FF1BA5A87200D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FE1BA5A87200D3C4E8 /* Range.swift */; };
C86E2F3E1AE5A0CA00C31024 /* SearchResultViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F321AE5A0CA00C31024 /* SearchResultViewModel.swift */; };
C86E2F3F1AE5A0CA00C31024 /* SearchViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F331AE5A0CA00C31024 /* SearchViewModel.swift */; };
C86E2F451AE5A0CA00C31024 /* WikipediaAPI.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F3B1AE5A0CA00C31024 /* WikipediaAPI.swift */; };
Expand Down Expand Up @@ -520,6 +521,7 @@
C864098D1BA5909000D3C4E8 /* ReplaySubject.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplaySubject.swift; sourceTree = "<group>"; };
C864098E1BA5909000D3C4E8 /* SubjectType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubjectType.swift; sourceTree = "<group>"; };
C864098F1BA5909000D3C4E8 /* Variable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Variable.swift; sourceTree = "<group>"; };
C86409FE1BA5A87200D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = "<group>"; };
C86E2F321AE5A0CA00C31024 /* SearchResultViewModel.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = SearchResultViewModel.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C86E2F331AE5A0CA00C31024 /* SearchViewModel.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = SearchViewModel.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C86E2F3B1AE5A0CA00C31024 /* WikipediaAPI.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WikipediaAPI.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -973,10 +975,10 @@
C864093F1BA5909000D3C4E8 /* AsObservable.swift */,
C86409401BA5909000D3C4E8 /* Buffer.swift */,
C86409411BA5909000D3C4E8 /* Catch.swift */,
C86409451BA5909000D3C4E8 /* CombineLatest.swift */,
C86409421BA5909000D3C4E8 /* CombineLatest+arity.swift */,
C86409431BA5909000D3C4E8 /* CombineLatest+arity.tt */,
C86409441BA5909000D3C4E8 /* CombineLatest+CollectionType.swift */,
C86409451BA5909000D3C4E8 /* CombineLatest.swift */,
C86409461BA5909000D3C4E8 /* Concat.swift */,
C86409471BA5909000D3C4E8 /* ConnectableObservable.swift */,
C86409481BA5909000D3C4E8 /* Debug.swift */,
Expand All @@ -997,6 +999,7 @@
C86409571BA5909000D3C4E8 /* ObserveOn.swift */,
C86409581BA5909000D3C4E8 /* ObserveOnSerialDispatchQueue.swift */,
C86409591BA5909000D3C4E8 /* Producer.swift */,
C86409FE1BA5A87200D3C4E8 /* Range.swift */,
C864095A1BA5909000D3C4E8 /* Reduce.swift */,
C864095B1BA5909000D3C4E8 /* RefCount.swift */,
C864095C1BA5909000D3C4E8 /* Sample.swift */,
Expand All @@ -1011,10 +1014,10 @@
C86409651BA5909000D3C4E8 /* TakeWhile.swift */,
C86409661BA5909000D3C4E8 /* Throttle.swift */,
C86409671BA5909000D3C4E8 /* Timer.swift */,
C864096B1BA5909000D3C4E8 /* Zip.swift */,
C86409681BA5909000D3C4E8 /* Zip+arity.swift */,
C86409691BA5909000D3C4E8 /* Zip+arity.tt */,
C864096A1BA5909000D3C4E8 /* Zip+CollectionType.swift */,
C864096B1BA5909000D3C4E8 /* Zip.swift */,
);
path = Implementations;
sourceTree = "<group>";
Expand Down Expand Up @@ -1405,6 +1408,7 @@
C86409B51BA5909000D3C4E8 /* ConnectableObservable.swift in Sources */,
C84B3A3A1BA4345A001B7D88 /* DeinitAction.swift in Sources */,
C84B3A621BA4345A001B7D88 /* UITableView+Rx.swift in Sources */,
C86409FF1BA5A87200D3C4E8 /* Range.swift in Sources */,
C86409E61BA5909000D3C4E8 /* ObserverBase.swift in Sources */,
C84B3A631BA4345A001B7D88 /* UITextField+Rx.swift in Sources */,
C84B3A571BA4345A001B7D88 /* UICollectionView+Rx.swift in Sources */,
Expand Down
3 changes: 1 addition & 2 deletions RxSwift/DataStructures/Queue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ public struct Queue<T>: SequenceType {
public mutating func enqueue(element: T) {
version++

_ = count == storage.count
if count == storage.count {
resizeTo(storage.count * resizeFactor)
resizeTo(max(storage.count, 1) * resizeFactor)
}

storage[pushNextIndex] = element
Expand Down
51 changes: 51 additions & 0 deletions RxSwift/Observables/Implementations/Range.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//
// Range.swift
// Rx
//
// Created by Krunoslav Zaher on 9/13/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//

import Foundation

class RangeProducer<_CompilerWorkaround> : Producer<Int> {
let start: Int
let count: Int
let scheduler: ImmediateSchedulerType

init(start: Int, count: Int, scheduler: ImmediateSchedulerType) {
self.start = start
self.count = count
self.scheduler = scheduler
}

override func run<O : ObserverType where O.E == Int>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
}
}

class RangeSink<_CompilerWorkaround, O: ObserverType where O.E == Int> : Sink<O> {
typealias Parent = RangeProducer<_CompilerWorkaround>

let parent: Parent

init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}

func run() -> Disposable {
return self.parent.scheduler.scheduleRecursive(0) { i, recurse in
if i < self.parent.count {
self.observer?.on(.Next(self.parent.start + i))
recurse(i + 1)
}
else {
self.observer?.on(.Completed)
self.dispose()
}
}
}
}
20 changes: 20 additions & 0 deletions RxSwift/Observables/Observable+Creation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,24 @@ to run the loop send out observer messages.
*/
public func generate<E>(initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}

/**
Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to generate and send out observer messages.

- parameter start: The value of the first integer in the sequence.
- parameter count: The number of sequential integers to generate.
- parameter scheduler: Scheduler to run the generator loop on.
- returns: An observable sequence that contains a range of sequential integral numbers.
*/
public func range(start: Int, _ count: Int, _ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Int> {
if count < 0 {
rxFatalError("count can't be negative")
}

if start &+ (count - 1) < start {
rxFatalError("overflow of count")
}

return RangeProducer<Int>(start: start, count: count, scheduler: scheduler)
}
27 changes: 25 additions & 2 deletions RxSwift/Schedulers/CurrentThreadScheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,19 @@ class CurrentThreadSchedulerKey : NSObject, NSCopying {
}
}

/**
Represents an object that schedules units of work on the current thread.

This is the default scheduler for operators that generate elements.

This scheduler is also sometimes called `trampoline scheduler`.
*/
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

/**
The singleton instance of the current thread scheduler.
*/
public static let instance = CurrentThreadScheduler()

static var queue : ScheduleQueue? {
Expand All @@ -40,10 +50,23 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
}
}

static var isScheduleRequired: Bool {
/**
Gets a value that indicates whether the caller must call a `schedule` method.
*/
public static var isScheduleRequired: Bool {
return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] == nil
}

/**
Schedules an action to be executed as soon as possible on current thread.

If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.

- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
let queue = CurrentThreadScheduler.queue

Expand All @@ -53,7 +76,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
return scheduledItem
}

let newQueue = RxMutableBox(Queue<ScheduledItemType>(capacity: 10))
let newQueue = RxMutableBox(Queue<ScheduledItemType>(capacity: 0))
CurrentThreadScheduler.queue = newQueue

action(state)
Expand Down
29 changes: 29 additions & 0 deletions RxTests/RxSwiftTests/Tests/Observable+CreationTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,32 @@ extension ObservableCreationTests {
XCTAssertEqual(count, 3)
}
}

extension ObservableCreationTests {
func testRange_Boundaries() {
let scheduler = TestScheduler(initialClock: 0)

let res = scheduler.start {
range(Int.max, 1, scheduler)
}

XCTAssertEqual(res.messages, [
next(201, Int.max),
completed(202)
])
}

func testRange_Dispose() {
let scheduler = TestScheduler(initialClock: 0)

let res = scheduler.start(204) {
range(-10, 5, scheduler)
}

XCTAssertEqual(res.messages, [
next(201, -10),
next(202, -9),
next(203, -8)
])
}
}
10 changes: 10 additions & 0 deletions RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1289,4 +1289,14 @@ extension ObservableTimeTest {
])
}

func bufferWithTimeOrCount_Default() {
let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)

let result = try! range(1, 10, backgroundScheduler)
.buffer(timeSpan: 1000, count: 3, scheduler: backgroundScheduler)
.skip(1)
.first()

XCTAssertEqual(result!, [4, 5, 6])
}
}

0 comments on commit 4695ab9

Please sign in to comment.