From b6db90887e349347ead3ff8fc736a65f14ae03b4 Mon Sep 17 00:00:00 2001 From: Bittrance Date: Wed, 18 Jan 2017 23:06:12 +0100 Subject: [PATCH] WIP Implement sample() operator --- examples/sample.rb | 50 ++++++++++++++++++++++++++++++++ lib/rx/linq/observable/sample.rb | 37 +++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 examples/sample.rb create mode 100644 lib/rx/linq/observable/sample.rb diff --git a/examples/sample.rb b/examples/sample.rb new file mode 100644 index 0000000..db3ecbd --- /dev/null +++ b/examples/sample.rb @@ -0,0 +1,50 @@ +require 'rx' + +# With an interval time +source = Rx::Observable.interval(0.05) + .delay(0.01) + .sample(0.15) + .take(2) + +source.subscribe( + lambda { |x| + puts 'Next: ' + x.to_s + }, + lambda { |err| + puts 'Error: ' + err.inspect + }, + lambda { + puts 'Completed' + }) + +# => Next: 1 +# => Next: 4 +# => Completed + +while Thread.list.size > 1 + (Thread.list - [Thread.current]).each(&:join) +end + +# With a sampler +source = Rx::Observable.interval(0.05) + .sample(Rx::Observable.interval(0.15).delay(0.01)) + .take(2) + +source.subscribe( + lambda { |x| + puts 'Next: ' + x.to_s + }, + lambda { |err| + puts 'Error: ' + err.inspect + }, + lambda { + puts 'Completed' + }) + +# => Next: 2 +# => Next: 5 +# => Completed + +while Thread.list.size > 1 + (Thread.list - [Thread.current]).each(&:join) +end diff --git a/lib/rx/linq/observable/sample.rb b/lib/rx/linq/observable/sample.rb new file mode 100644 index 0000000..1f0b93e --- /dev/null +++ b/lib/rx/linq/observable/sample.rb @@ -0,0 +1,37 @@ +module Rx + module Observable + # Return the latest item from this observable when another observable + # emits an item. + def sample(intervalOrSampler, scheduler = DefaultScheduler.instance) + sampler = if intervalOrSampler.is_a? Numeric + Observable.interval(intervalOrSampler, scheduler) + else + intervalOrSampler + end + + AnonymousObservable.new do |observer| + latest = nil + sample_subscription = sampler.subscribe( + lambda { |x| + observer.on_next latest unless latest.nil? + latest = nil + }, + lambda { |err| observer.on_error err }, + lambda { observer.on_completed } + ) + + self_observer = Rx::Observer.configure do |me| + me.on_next do |value| + latest = value + end + me.on_error(&observer.method(:on_error)) + me.on_completed(&observer.method(:on_completed)) + end + + self_subscription = subscribe self_observer + + CompositeSubscription.new [sample_subscription, self_subscription] + end + end + end +end