Skip to content

Commit

Permalink
WIP Implement sample() operator
Browse files Browse the repository at this point in the history
  • Loading branch information
Bittrance committed Jan 18, 2017
1 parent baf10b6 commit b6db908
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
50 changes: 50 additions & 0 deletions examples/sample.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
require 'rx'

# With an interval time
source = Rx::Observable.interval(0.05)
.delay(0.01)
.sample(0.15)
.take(2)

source.subscribe(
lambda { |x|
puts 'Next: ' + x.to_s
},
lambda { |err|
puts 'Error: ' + err.inspect
},
lambda {
puts 'Completed'
})

# => Next: 1
# => Next: 4
# => Completed

while Thread.list.size > 1
(Thread.list - [Thread.current]).each(&:join)
end

# With a sampler
source = Rx::Observable.interval(0.05)
.sample(Rx::Observable.interval(0.15).delay(0.01))
.take(2)

source.subscribe(
lambda { |x|
puts 'Next: ' + x.to_s
},
lambda { |err|
puts 'Error: ' + err.inspect
},
lambda {
puts 'Completed'
})

# => Next: 2
# => Next: 5
# => Completed

while Thread.list.size > 1
(Thread.list - [Thread.current]).each(&:join)
end
37 changes: 37 additions & 0 deletions lib/rx/linq/observable/sample.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module Rx
module Observable
# Return the latest item from this observable when another observable
# emits an item.
def sample(intervalOrSampler, scheduler = DefaultScheduler.instance)
sampler = if intervalOrSampler.is_a? Numeric
Observable.interval(intervalOrSampler, scheduler)
else
intervalOrSampler
end

AnonymousObservable.new do |observer|
latest = nil
sample_subscription = sampler.subscribe(
lambda { |x|
observer.on_next latest unless latest.nil?
latest = nil
},
lambda { |err| observer.on_error err },
lambda { observer.on_completed }
)

self_observer = Rx::Observer.configure do |me|
me.on_next do |value|
latest = value
end
me.on_error(&observer.method(:on_error))
me.on_completed(&observer.method(:on_completed))
end

self_subscription = subscribe self_observer

CompositeSubscription.new [sample_subscription, self_subscription]
end
end
end
end

0 comments on commit b6db908

Please sign in to comment.