From f8324ebea3a4c4a24a36ed8ff74e1b66557a43fd Mon Sep 17 00:00:00 2001 From: Adrianna Chang Date: Wed, 12 May 2021 10:02:28 -0400 Subject: [PATCH 1/3] Allow ActiveRecordCursor#next_batch to return either records or relations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Étienne Barrié --- lib/job-iteration/active_record_cursor.rb | 51 ++++++++++++++++------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/lib/job-iteration/active_record_cursor.rb b/lib/job-iteration/active_record_cursor.rb index 2fc3b7f3..0fa649fc 100644 --- a/lib/job-iteration/active_record_cursor.rb +++ b/lib/job-iteration/active_record_cursor.rb @@ -48,14 +48,7 @@ def position=(position) @position = position end - def update_from_record(record) - self.position = @columns.map do |column| - method = column.to_s.split(".").last - record.send(method.to_sym) - end - end - - def next_batch(batch_size) + def next_batch(batch_size, as_relation = false) return nil if @reached_end relation = @base_relation.limit(batch_size) @@ -64,14 +57,11 @@ def next_batch(batch_size) relation = relation.where(*conditions) end - records = relation.uncached do - relation.to_a + if as_relation + prepare_next_batch_as_relation(relation) + else + prepare_next_batch_as_records(batch_size, relation) end - - update_from_record(records.last) unless records.empty? - @reached_end = records.size < batch_size - - records.empty? ? nil : records end protected @@ -93,5 +83,36 @@ def conditions ret.pop ret end + + private + + def prepare_next_batch_as_records(batch_size, relation) + records = relation.uncached do + relation.to_a + end + + update_from_record(records.last) unless records.empty? + @reached_end = records.size < batch_size + + records.empty? ? nil : records + end + + def prepare_next_batch_as_relation(relation) + last_record = relation.last + if last_record + update_from_record(last_record) + relation + else + @reached_end = true + nil + end + end + + def update_from_record(record) + self.position = @columns.map do |column| + method = column.to_s.split(".").last + record.send(method.to_sym) + end + end end end From 1d0c5708dee9d299a352772e751fe86a034dd874 Mon Sep 17 00:00:00 2001 From: Adrianna Chang Date: Wed, 12 May 2021 10:04:24 -0400 Subject: [PATCH 2/3] ActiveRecordEnumerator takes +as_relation+ options and #batches can yield relations --- lib/job-iteration/active_record_enumerator.rb | 5 ++- test/unit/active_record_enumerator_test.rb | 43 ++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job-iteration/active_record_enumerator.rb index c9feb5f3..eb91fb11 100644 --- a/lib/job-iteration/active_record_enumerator.rb +++ b/lib/job-iteration/active_record_enumerator.rb @@ -6,9 +6,10 @@ module JobIteration class ActiveRecordEnumerator SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N" - def initialize(relation, columns: nil, batch_size: 100, cursor: nil) + def initialize(relation, columns: nil, batch_size: 100, as_relation: false, cursor: nil) @relation = relation @batch_size = batch_size + @as_relation = as_relation @columns = Array(columns || "#{relation.table_name}.#{relation.primary_key}") @cursor = cursor end @@ -26,7 +27,7 @@ def records def batches cursor = finder_cursor Enumerator.new(method(:size)) do |yielder| - while (records = cursor.next_batch(@batch_size)) + while (records = cursor.next_batch(@batch_size, @as_relation)) yielder.yield(records, cursor_value(records.last)) if records.any? end end diff --git a/test/unit/active_record_enumerator_test.rb b/test/unit/active_record_enumerator_test.rb index dfedc6fb..f5a17eed 100644 --- a/test/unit/active_record_enumerator_test.rb +++ b/test/unit/active_record_enumerator_test.rb @@ -50,6 +50,38 @@ class ActiveRecordEnumeratorTest < IterationUnitTest end end + test "#batches yielding records performs one query per batch, plus one, when relation_size % batch_size is 0" do + enum = build_enumerator.batches + num_batches = 0 + query_count = count_uncached_queries do + enum.each { num_batches += 1 } + end + + expected_num_queries = num_batches + 1 + assert_equal expected_num_queries, query_count + end + + test "#batches yielding records performs one query for each batch when relation_size % batch_size isn't 0" do + enum = build_enumerator(batch_size: 3).batches + num_batches = 0 + query_count = count_uncached_queries do + enum.each { num_batches += 1 } + end + + assert_equal num_batches, query_count + end + + test "#batches yielding relations performs one query per batch, plus one" do + enum = build_enumerator(as_relation: true).batches + num_batches = 0 + query_count = count_uncached_queries do + enum.each { num_batches += 1 } + end + + expected_num_queries = num_batches + 1 + assert_equal expected_num_queries, query_count + end + test "batch size is configurable" do enum = build_enumerator(batch_size: 4).batches shops = Product.order(:id).take(4) @@ -95,13 +127,22 @@ class ActiveRecordEnumeratorTest < IterationUnitTest private - def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil) + def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil, as_relation: false) JobIteration::ActiveRecordEnumerator.new( relation, batch_size: batch_size, columns: columns, cursor: cursor, + as_relation: as_relation, ) end + + def count_uncached_queries(&block) + count = 0 + + query_cb = ->(*, payload) { count += 1 unless payload[:cached] } + ActiveSupport::Notifications.subscribed(query_cb, "sql.active_record", &block) + count + end end end From bae9b71987bba9e4dc03e3e60c3e1ab28013f0ff Mon Sep 17 00:00:00 2001 From: Adrianna Chang Date: Wed, 12 May 2021 10:04:51 -0400 Subject: [PATCH 3/3] Create new API EnumeratorBuilder#active_record_on_batches_as_relation --- README.md | 23 ++++++++++++++++++++++- lib/job-iteration/enumerator_builder.rb | 13 +++++++++++++ test/unit/active_job_iteration_test.rb | 25 +++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ea00cfcf..818e5c8e 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,28 @@ class BatchesJob < ApplicationJob def each_iteration(batch_of_comments, product_id) # batch_of_comments will contain batches of 100 records - Comment.where(id: batch_of_comments.map(&:id)).update_all(deleted: true) + batch_of_comments.each do |comment| + DeleteCommentJob.perform_later(comment) + end + end +end +``` + +```ruby +class BatchesAsRelationJob < ApplicationJob + include JobIteration::Iteration + + def build_enumerator(product_id, cursor:) + enumerator_builder.active_record_on_batches_as_relation( + Product.find(product_id).comments, + cursor: cursor, + batch_size: 100, + ) + end + + def each_iteration(batch_of_comments, product_id) + # batch_of_comments will be a Comment::ActiveRecord_Relation + batch_of_comments.update_all(deleted: true) end end ``` diff --git a/lib/job-iteration/enumerator_builder.rb b/lib/job-iteration/enumerator_builder.rb index f803c9bc..fb90b6ad 100644 --- a/lib/job-iteration/enumerator_builder.rb +++ b/lib/job-iteration/enumerator_builder.rb @@ -110,6 +110,18 @@ def build_active_record_enumerator_on_batches(scope, cursor:, **args) wrap(self, enum) end + # Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations. + # See documentation for #build_active_record_enumerator_on_batches. + def build_active_record_enumerator_on_batches_as_relation(scope, cursor:, **args) + enum = build_active_record_enumerator( + scope, + cursor: cursor, + as_relation: true, + **args + ).batches + wrap(self, enum) + end + def build_throttle_enumerator(enum, throttle_on:, backoff:) JobIteration::ThrottleEnumerator.new( enum, @@ -124,6 +136,7 @@ def build_throttle_enumerator(enum, throttle_on:, backoff:) alias_method :array, :build_array_enumerator alias_method :active_record_on_records, :build_active_record_enumerator_on_records alias_method :active_record_on_batches, :build_active_record_enumerator_on_batches + alias_method :active_record_on_batches_as_relation, :build_active_record_enumerator_on_batches_as_relation alias_method :throttle, :build_throttle_enumerator private diff --git a/test/unit/active_job_iteration_test.rb b/test/unit/active_job_iteration_test.rb index 78d5c6ba..534afef8 100644 --- a/test/unit/active_job_iteration_test.rb +++ b/test/unit/active_job_iteration_test.rb @@ -75,6 +75,20 @@ def each_iteration(record) end end + class BatchActiveRecordRelationIterationJob < SimpleIterationJob + def build_enumerator(cursor:) + enumerator_builder.active_record_on_batches_as_relation( + Product.all, + cursor: cursor, + batch_size: 3, + ) + end + + def each_iteration(relation) + self.class.records_performed << relation + end + end + class AbortingActiveRecordIterationJob < ActiveRecordIterationJob def each_iteration(*) abort_strategy if self.class.records_performed.size == 2 @@ -412,6 +426,17 @@ def test_activerecord_batches_complete assert_equal(processed_records, BatchActiveRecordIterationJob.records_performed.flatten.map(&:id)) end + def test_activerecord_batches_as_relation + push(BatchActiveRecordRelationIterationJob) + + work_one_job + assert_jobs_in_queue(0) + + records_performed = BatchActiveRecordIterationJob.records_performed + assert_equal([3, 3, 3, 1], records_performed.map(&:size)) + assert(records_performed.all? { |record| record.is_a?(ActiveRecord::Relation) }) + end + def test_activerecord_batches iterate_exact_times(1.times)