Skip to content

Commit

Permalink
Clean up and create additional tests for #126
Browse files Browse the repository at this point in the history
  • Loading branch information
tachyus-ryan committed Feb 3, 2020
1 parent 0ac220e commit d74410b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
13 changes: 4 additions & 9 deletions src/FSharp.Control.Reactive/Observable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ module Builders =
/// The Reactive module provides operators for working with IObservable<_> in F#.
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Observable =
open System.Reactive.Concurrency
open System.Reactive.Linq

type Observer with
/// Creates an observer from the specified onNext function.
Expand All @@ -128,15 +126,12 @@ module Observable =
type Observable with
/// Creates an observable sequence from the specified Subscribe method implementation.
static member Create (subscribe: IObserver<'T> -> unit -> unit) =
Observable.Create(Func<_,_>(fun o -> Action(subscribe o)))
let subscribe o = Action(subscribe o)
Observable.Create(subscribe)

/// Creates an observable sequence from the specified Subscribe method implementation.
static member Create subscribe =
Observable.Create(Func<_,IDisposable> subscribe)

/// Creates an observable sequence from the specified asynchronous Subscribe method implementation.
static member CreateAsync subscribe =
Observable.Create(Func<IObserver<'T>, System.Threading.Tasks.Task<IDisposable>> (subscribe >> Async.StartAsTask))
static member CreateAsync (subscribe: IObserver<'T> -> Async<IDisposable>) =
Observable.Create(subscribe >> Async.StartAsTask)

type IObservable<'T> with
/// Subscribes to the Observable with just a next-function.
Expand Down
37 changes: 30 additions & 7 deletions tests/ObservableSpecs.fs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
module FSharp.Reactive.Tests.ObservableSpecs

open System
open System.Reactive.Concurrency
open System.Reactive.Disposables
open System.Reactive.Linq
open FSharp.Control.Reactive
open Builders
open Microsoft.Reactive.Testing
open NUnit.Framework
open FsCheck
open Microsoft.Reactive.Testing
open System.Reactive.Subjects
open System.Reactive.Concurrency
open FSharp.Control.Reactive
open FSharp.Control.Reactive.Builders
open FSharp.Control.Reactive.Observable
open FSharp.Control.Reactive.Testing
open TestNotification
open FSharp.Control.Reactive.Testing.TestNotification
open System.Threading
open System.Reactive.Subjects


let ``should be`` expectedNext expectedError expectedCompleted (observable:'a IObservable) =
Expand Down Expand Up @@ -513,6 +512,18 @@ let ``timestampOn uses timestamps from the supplied scheduler``() =

[<Test>]
let ``Observable.Create should support a simple observable returning fun () -> ()``() =
let obs =
fun (o:IObserver<_>) ->
o.OnNext("xxx")
o.OnCompleted()
fun () -> ()
|> Observable.Create

use x = obs.Subscribe(fun result -> Assert.That(result, Is.EqualTo "xxx"))
()

[<Test>]
let ``Observable.Create should support a simple observable returning ignore``() =
let obs =
Observable.Create(fun (o : IObserver<_>) ->
o.OnNext("xxx")
Expand All @@ -522,6 +533,18 @@ let ``Observable.Create should support a simple observable returning fun () -> (
use x = obs.Subscribe(fun result -> Assert.That(result, Is.EqualTo "xxx"))
()

[<Test>]
let ``Observable.Create should support a simple observable returning Disposable.Empty``() =
let obs =
fun (o:IObserver<_>) ->
o.OnNext("xxx")
o.OnCompleted()
Disposable.Empty
|> Observable.Create

use x = obs.Subscribe(fun result -> Assert.That(result, Is.EqualTo "xxx"))
()

[<Test>]
let ``Observable.subscribeOn should run subscription on another thread`` () =
let expected = "Hello World"
Expand Down

0 comments on commit d74410b

Please sign in to comment.