Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds as_relation flag to EnumeratorBuilder#active_record_on_batches #86

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@ class BatchesJob < ApplicationJob

def each_iteration(batch_of_comments, product_id)
# batch_of_comments will contain batches of 100 records
Comment.where(id: batch_of_comments.map(&:id)).update_all(deleted: true)
batch_of_comments.each do |comment|
DeleteCommentJob.perform_later(comment)
end
end
end
```

```ruby
class BatchesAsRelationJob < ApplicationJob
include JobIteration::Iteration

def build_enumerator(product_id, cursor:)
enumerator_builder.active_record_on_batches_as_relation(
Product.find(product_id).comments,
cursor: cursor,
batch_size: 100,
)
end

def each_iteration(batch_of_comments, product_id)
# batch_of_comments will be a Comment::ActiveRecord_Relation
batch_of_comments.update_all(deleted: true)
end
end
```
Expand Down
51 changes: 36 additions & 15 deletions lib/job-iteration/active_record_cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,7 @@ def position=(position)
@position = position
end

def update_from_record(record)
self.position = @columns.map do |column|
method = column.to_s.split(".").last
record.send(method.to_sym)
end
end

def next_batch(batch_size)
def next_batch(batch_size, as_relation = false)
return nil if @reached_end

relation = @base_relation.limit(batch_size)
Expand All @@ -64,14 +57,11 @@ def next_batch(batch_size)
relation = relation.where(*conditions)
end

records = relation.uncached do
relation.to_a
if as_relation
prepare_next_batch_as_relation(relation)
else
prepare_next_batch_as_records(batch_size, relation)
end

update_from_record(records.last) unless records.empty?
@reached_end = records.size < batch_size

records.empty? ? nil : records
end

protected
Expand All @@ -93,5 +83,36 @@ def conditions
ret.pop
ret
end

private

def prepare_next_batch_as_records(batch_size, relation)
records = relation.uncached do
relation.to_a
end

update_from_record(records.last) unless records.empty?
@reached_end = records.size < batch_size

records.empty? ? nil : records
end

def prepare_next_batch_as_relation(relation)
last_record = relation.last
if last_record
update_from_record(last_record)
relation
else
@reached_end = true
nil
end
end

def update_from_record(record)
self.position = @columns.map do |column|
method = column.to_s.split(".").last
record.send(method.to_sym)
end
end
end
end
5 changes: 3 additions & 2 deletions lib/job-iteration/active_record_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ module JobIteration
class ActiveRecordEnumerator
SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N"

def initialize(relation, columns: nil, batch_size: 100, cursor: nil)
def initialize(relation, columns: nil, batch_size: 100, as_relation: false, cursor: nil)
@relation = relation
@batch_size = batch_size
@as_relation = as_relation
@columns = Array(columns || "#{relation.table_name}.#{relation.primary_key}")
@cursor = cursor
end
Expand All @@ -26,7 +27,7 @@ def records
def batches
cursor = finder_cursor
Enumerator.new(method(:size)) do |yielder|
while (records = cursor.next_batch(@batch_size))
while (records = cursor.next_batch(@batch_size, @as_relation))
yielder.yield(records, cursor_value(records.last)) if records.any?
end
end
Expand Down
13 changes: 13 additions & 0 deletions lib/job-iteration/enumerator_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ def build_active_record_enumerator_on_batches(scope, cursor:, **args)
wrap(self, enum)
end

# Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations.
# See documentation for #build_active_record_enumerator_on_batches.
def build_active_record_enumerator_on_batches_as_relation(scope, cursor:, **args)
enum = build_active_record_enumerator(
scope,
cursor: cursor,
as_relation: true,
**args
).batches
wrap(self, enum)
end

def build_throttle_enumerator(enum, throttle_on:, backoff:)
JobIteration::ThrottleEnumerator.new(
enum,
Expand All @@ -124,6 +136,7 @@ def build_throttle_enumerator(enum, throttle_on:, backoff:)
alias_method :array, :build_array_enumerator
alias_method :active_record_on_records, :build_active_record_enumerator_on_records
alias_method :active_record_on_batches, :build_active_record_enumerator_on_batches
alias_method :active_record_on_batches_as_relation, :build_active_record_enumerator_on_batches_as_relation
alias_method :throttle, :build_throttle_enumerator

private
Expand Down
25 changes: 25 additions & 0 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def each_iteration(record)
end
end

class BatchActiveRecordRelationIterationJob < SimpleIterationJob
def build_enumerator(cursor:)
enumerator_builder.active_record_on_batches_as_relation(
Product.all,
cursor: cursor,
batch_size: 3,
)
end

def each_iteration(relation)
self.class.records_performed << relation
end
end

class AbortingActiveRecordIterationJob < ActiveRecordIterationJob
def each_iteration(*)
abort_strategy if self.class.records_performed.size == 2
Expand Down Expand Up @@ -412,6 +426,17 @@ def test_activerecord_batches_complete
assert_equal(processed_records, BatchActiveRecordIterationJob.records_performed.flatten.map(&:id))
end

def test_activerecord_batches_as_relation
push(BatchActiveRecordRelationIterationJob)

work_one_job
assert_jobs_in_queue(0)

records_performed = BatchActiveRecordIterationJob.records_performed
assert_equal([3, 3, 3, 1], records_performed.map(&:size))
assert(records_performed.all? { |record| record.is_a?(ActiveRecord::Relation) })
end

def test_activerecord_batches
iterate_exact_times(1.times)

Expand Down
43 changes: 42 additions & 1 deletion test/unit/active_record_enumerator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ class ActiveRecordEnumeratorTest < IterationUnitTest
end
end

test "#batches yielding records performs one query per batch, plus one, when relation_size % batch_size is 0" do
enum = build_enumerator.batches
num_batches = 0
query_count = count_uncached_queries do
enum.each { num_batches += 1 }
end

expected_num_queries = num_batches + 1
assert_equal expected_num_queries, query_count
end

test "#batches yielding records performs one query for each batch when relation_size % batch_size isn't 0" do
enum = build_enumerator(batch_size: 3).batches
num_batches = 0
query_count = count_uncached_queries do
enum.each { num_batches += 1 }
end

assert_equal num_batches, query_count
end

test "#batches yielding relations performs one query per batch, plus one" do
enum = build_enumerator(as_relation: true).batches
num_batches = 0
query_count = count_uncached_queries do
enum.each { num_batches += 1 }
end

expected_num_queries = num_batches + 1
assert_equal expected_num_queries, query_count
end

test "batch size is configurable" do
enum = build_enumerator(batch_size: 4).batches
shops = Product.order(:id).take(4)
Expand Down Expand Up @@ -95,13 +127,22 @@ class ActiveRecordEnumeratorTest < IterationUnitTest

private

def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil)
def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil, as_relation: false)
JobIteration::ActiveRecordEnumerator.new(
relation,
batch_size: batch_size,
columns: columns,
cursor: cursor,
as_relation: as_relation,
)
end

def count_uncached_queries(&block)
count = 0

query_cb = ->(*, payload) { count += 1 unless payload[:cached] }
ActiveSupport::Notifications.subscribed(query_cb, "sql.active_record", &block)
count
end
end
end