Skip to content

Commit

Permalink
Adds timeout to PrimitiveSequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
kzaher committed Jul 17, 2017
1 parent c6d9ca0 commit eb5d238
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 0 deletions.
29 changes: 29 additions & 0 deletions RxSwift/Traits/PrimitiveSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,35 @@ extension PrimitiveSequence {
return try primitiveSequenceFactory(resource).asObservable()
}))
}

/**
Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer.

- seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)

- parameter dueTime: Maximum duration between values before a timeout occurs.
- parameter scheduler: Scheduler to run the timeout timer on.
- returns: An observable sequence with a `RxError.timeout` in case of a timeout.
*/
public func timeout(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> PrimitiveSequence<Trait, Element> {
return PrimitiveSequence(raw: source.timeout(dueTime, scheduler: scheduler))
}

/**
Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. If the next element isn't received within the specified timeout duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on.

- seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)

- parameter dueTime: Maximum duration between values before a timeout occurs.
- parameter other: Sequence to return in case of a timeout.
- parameter scheduler: Scheduler to run the timeout timer on.
- returns: The source sequence switching to the other sequence in case of a timeout.
*/
public func timeout(_ dueTime: RxTimeInterval, other: PrimitiveSequence<Trait, Element>, scheduler: SchedulerType)
-> PrimitiveSequence<Trait, Element> {
return PrimitiveSequence(raw: source.timeout(dueTime, other: other.source, scheduler: scheduler))
}
}

extension PrimitiveSequenceType where ElementType: SignedInteger
Expand Down
168 changes: 168 additions & 0 deletions Tests/RxSwiftTests/PrimitiveSequenceTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,174 @@ extension PrimitiveSequenceTest {
300
])
}

func testSingle_timeout() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
next(10, 1),
completed(20)
]).asSingle()

let res = scheduler.start { () -> Observable<Int> in
let singleResult: Single<Int> = xs.timeout(5.0, scheduler: scheduler)

return singleResult.asObservable()
}

XCTAssertEqual(res.events, [
error(205, RxError.timeout)
])
}

func testSingle_timeout_other() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
next(10, 1),
completed(20)
]).asSingle()

let xs2 = scheduler.createColdObservable([
next(20, 1),
completed(20)
]).asSingle()

let res = scheduler.start { () -> Observable<Int> in
let singleResult: Single<Int> = xs.timeout(5.0, other: xs2, scheduler: scheduler)

return singleResult.asObservable()
}

XCTAssertEqual(res.events, [
next(225, 1),
completed(225)
])
}

func testMaybe_timeout() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
next(10, 1),
completed(20)
]).asMaybe()

let res = scheduler.start { () -> Observable<Int> in
let result: Maybe<Int> = xs.timeout(5.0, scheduler: scheduler)

return result.asObservable()
}

XCTAssertEqual(res.events, [
error(205, RxError.timeout)
])
}

func testMaybe_timeout_other() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
next(10, 1),
completed(20)
]).asMaybe()

let xs2 = scheduler.createColdObservable([
next(20, 1),
completed(20)
]).asMaybe()

let res = scheduler.start { () -> Observable<Int> in
let result: Maybe<Int> = xs.timeout(5.0, other: xs2, scheduler: scheduler)

return result.asObservable()
}

XCTAssertEqual(res.events, [
next(225, 1),
completed(225)
])
}

func testCompletable_timeout() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
completed(20, Never.self)
]).asCompletable()

let res = scheduler.start { () -> Observable<Never> in
let result: Completable = xs.timeout(5.0, scheduler: scheduler)

return result.asObservable()
}

XCTAssertEqual(res.events, [
error(205, RxError.timeout)
])
}

func testCompletable_timeout_other() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
completed(20, Never.self)
]).asCompletable()

let xs2 = scheduler.createColdObservable([
completed(20, Never.self)
]).asCompletable()

let res = scheduler.start { () -> Observable<Never> in
let result: Completable = xs.timeout(5.0, other: xs2, scheduler: scheduler)

return result.asObservable()
}

XCTAssertEqual(res.events, [
completed(225)
])
}

func testCompletable_timeout_succeeds() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
completed(2, Never.self)
]).asCompletable()

let res = scheduler.start { () -> Observable<Never> in
let result: Completable = xs.timeout(5.0, scheduler: scheduler)

return result.asObservable()
}

XCTAssertEqual(res.events, [
completed(202)
])
}

func testCompletable_timeout_other_succeeds() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createColdObservable([
completed(2, Never.self)
]).asCompletable()

let xs2 = scheduler.createColdObservable([
completed(20, Never.self)
]).asCompletable()

let res = scheduler.start { () -> Observable<Never> in
let result: Completable = xs.timeout(5.0, other: xs2, scheduler: scheduler)

return result.asObservable()
}

XCTAssertEqual(res.events, [
completed(202)
])
}
}

extension PrimitiveSequenceTest {
Expand Down

0 comments on commit eb5d238

Please sign in to comment.