Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Enumerator Yielding Relations #91

Merged
merged 4 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@ class BatchesJob < ApplicationJob

def each_iteration(batch_of_comments, product_id)
# batch_of_comments will contain batches of 100 records
Comment.where(id: batch_of_comments.map(&:id)).update_all(deleted: true)
batch_of_comments.each do |comment|
DeleteCommentJob.perform_later(comment)
end
end
end
```

```ruby
class BatchesAsRelationJob < ApplicationJob
include JobIteration::Iteration

def build_enumerator(product_id, cursor:)
enumerator_builder.active_record_on_batch_relations(
Product.find(product_id).comments,
cursor: cursor,
batch_size: 100,
)
end

def each_iteration(batch_of_comments, product_id)
# batch_of_comments will be a Comment::ActiveRecord_Relation
batch_of_comments.update_all(deleted: true)
end
end
```
Expand Down
116 changes: 116 additions & 0 deletions lib/job-iteration/active_record_batch_enumerator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# frozen_string_literal: true

module JobIteration
# Builds Batch Enumerator based on ActiveRecord Relation.
# @see EnumeratorBuilder
class ActiveRecordBatchEnumerator
include Enumerable

SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N"

def initialize(relation, columns: nil, batch_size: 100, cursor: nil)
@batch_size = batch_size
@primary_key = "#{relation.table_name}.#{relation.primary_key}"
@columns = Array(columns&.map(&:to_s) || @primary_key)
@primary_key_index = @columns.index(@primary_key) || @columns.index(relation.primary_key)
@pluck_columns = if @primary_key_index
@columns
else
@columns.dup << @primary_key
end
@cursor = Array.wrap(cursor)
raise ArgumentError, "Must specify at least one column" if @columns.empty?
if relation.joins_values.present? && [email protected]? { |column| column.to_s.include?(".") }
raise ArgumentError, "You need to specify fully-qualified columns if you join a table"
end

if relation.arel.orders.present? || relation.arel.taken.present?
raise ConditionNotSupportedError
end

@base_relation = relation.reorder(@columns.join(","))
end

def each
adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
if block_given?
while (relation = next_batch)
break if @cursor.empty?
yield relation, cursor_value
end
else
to_enum(:each)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot the size here 😄

end
end

def size
@base_relation.count
end

private

def next_batch
relation = @base_relation.limit(@batch_size)
if conditions.any?
relation = relation.where(*conditions)
end

cursor_values, ids = relation.uncached do
pluck_columns(relation)
end

cursor = cursor_values.last
# The primary key was plucked, but original cursor did not include it
cursor.pop unless @primary_key_index || cursor.nil?
adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
@cursor = Array.wrap(cursor)

# Yields relations by selecting the primary keys of records in the batch.
# Post.where(published: nil) results in an enumerator of relations like: Post.where(ids: batch_of_ids)
@base_relation.where(@primary_key => ids)
end

def pluck_columns(relation)
if @pluck_columns.size == 1 # only the primary key
column_values = relation.pluck(*@pluck_columns)
return [column_values, column_values]
end

column_values = relation.pluck(*@pluck_columns)
primary_key_index = @primary_key_index || -1
primary_key_values = column_values.map { |values| values[primary_key_index] }

serialize_column_values!(column_values)
[column_values, primary_key_values]
end

def cursor_value
return @cursor.first if @cursor.size == 1
@cursor
end

def conditions
column_index = @cursor.size - 1
column = @columns[column_index]
where_clause = if @columns.size == @cursor.size
"#{column} > ?"
else
"#{column} >= ?"
end
while column_index > 0
column_index -= 1
column = @columns[column_index]
where_clause = "#{column} > ? OR (#{column} = ? AND (#{where_clause}))"
end
ret = @cursor.reduce([where_clause]) { |params, value| params << value << value }
ret.pop
ret
end

def serialize_column_values!(column_values)
column_values.map! { |values| values.map! { |value| column_value(value) } }
end

def column_value(value)
value.is_a?(Time) ? value.strftime(SQL_DATETIME_WITH_NSEC) : value
end
end
end
14 changes: 13 additions & 1 deletion lib/job-iteration/enumerator_builder.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# frozen_string_literal: true
require_relative "./active_record_batch_enumerator"
require_relative "./active_record_enumerator"
require_relative "./csv_enumerator"
require_relative "./throttle_enumerator"
Expand Down Expand Up @@ -100,7 +101,7 @@ def build_active_record_enumerator_on_records(scope, cursor:, **args)
wrap(self, enum)
end

# Builds Enumerator from Active Record Relation and enumerates on batches.
# Builds Enumerator from Active Record Relation and enumerates on batches of records.
# Each Enumerator tick moves the cursor +batch_size+ rows forward.
#
# +batch_size:+ sets how many records will be fetched in one batch. Defaults to 100.
Expand All @@ -115,6 +116,16 @@ def build_active_record_enumerator_on_batches(scope, cursor:, **args)
wrap(self, enum)
end

# Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations.
# See documentation for #build_active_record_enumerator_on_batches.
def build_active_record_enumerator_on_batch_relations(scope, cursor:, **args)
JobIteration::ActiveRecordBatchEnumerator.new(
scope,
cursor: cursor,
**args
).each
end

def build_throttle_enumerator(enum, throttle_on:, backoff:)
JobIteration::ThrottleEnumerator.new(
enum,
Expand All @@ -129,6 +140,7 @@ def build_throttle_enumerator(enum, throttle_on:, backoff:)
alias_method :array, :build_array_enumerator
alias_method :active_record_on_records, :build_active_record_enumerator_on_records
alias_method :active_record_on_batches, :build_active_record_enumerator_on_batches
alias_method :active_record_on_batch_relations, :build_active_record_enumerator_on_batch_relations
alias_method :throttle, :build_throttle_enumerator

private
Expand Down
26 changes: 26 additions & 0 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def each_iteration(record)
end
end

class BatchActiveRecordRelationIterationJob < SimpleIterationJob
def build_enumerator(cursor:)
enumerator_builder.active_record_on_batch_relations(
Product.all,
cursor: cursor,
batch_size: 3,
)
end

def each_iteration(relation)
self.class.records_performed << relation
end
end

class AbortingActiveRecordIterationJob < ActiveRecordIterationJob
def each_iteration(*)
abort_strategy if self.class.records_performed.size == 2
Expand Down Expand Up @@ -448,6 +462,18 @@ def test_activerecord_batches
assert_equal(1, BatchActiveRecordIterationJob.on_complete_called)
end

def test_activerecord_batch_relations
push(BatchActiveRecordRelationIterationJob)

work_one_job
assert_jobs_in_queue(0)

records_performed = BatchActiveRecordIterationJob.records_performed
assert_equal([3, 3, 3, 1], records_performed.map(&:size))
adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
assert(records_performed.all? { |relation| relation.is_a?(ActiveRecord::Relation) })
assert(records_performed.none?(&:loaded?))
end

def test_multiple_columns
iterate_exact_times(3.times)

Expand Down
108 changes: 108 additions & 0 deletions test/unit/active_record_batch_enumerator_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# frozen_string_literal: true

require "test_helper"

module JobIteration
class ActiveRecordBatchEnumeratorTest < IterationUnitTest
SQL_TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%N"
test "#each yields batches as relation with the last record's cursor position" do
enum = build_enumerator
product_batches = Product.order(:id).take(4).in_groups_of(2).map { |product| [product, product.last.id] }

enum.first(2).each_with_index do |(batch, cursor), index|
assert batch.is_a?(ActiveRecord::Relation)
assert_equal product_batches[index].first, batch
assert_equal product_batches[index].last, cursor
end
end

test "#each yields unloaded relations" do
enum = build_enumerator
relation, _ = enum.first

refute_predicate relation, :loaded?
end

test "#each yields enumerator when called without a block" do
enum = build_enumerator
assert enum.each.is_a?(Enumerator)
end

test "#each doesn't yield anything if the relation is empty" do
enum = build_enumerator(relation: Product.none)

assert_equal([], enum.to_a)
end

test "batch size is configurable" do
enum = build_enumerator(batch_size: 4)
products = Product.order(:id).take(4)

assert_equal([products, products.last.id], enum.first)
end

test "columns are configurable" do
enum = build_enumerator(columns: [:updated_at])
products = Product.order(:updated_at).take(2)

expected_product_cursor = products.last.updated_at.strftime(SQL_TIME_FORMAT)
assert_equal([products, expected_product_cursor], enum.first)
end

test "columns can be an array" do
enum = build_enumerator(columns: [:updated_at, :id])
products = Product.order(:updated_at, :id).take(2)

expected_product_cursor = [products.last.updated_at.strftime(SQL_TIME_FORMAT), products.last.id]
assert_equal([products, expected_product_cursor], enum.first)
end

test "columns configured with primary key only queries primary key column once" do
queries = track_uncached_queries do
enum = build_enumerator(columns: [:updated_at, :id])
enum.first
end
assert_equal 1, queries.first.scan(/`products`.`id`/).count
adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
end

test "cursor can be used to resume" do
products = Product.order(:id).take(3)

enum = build_enumerator(cursor: products.shift.id)

assert_equal([products, products.last.id], enum.first)
end

test "cursor can be used to resume on multiple columns" do
enum = build_enumerator(columns: [:created_at, :id])
products = Product.order(:created_at, :id).take(2)

cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id]
assert_equal([products, cursor], enum.first)

enum = build_enumerator(columns: [:created_at, :id], cursor: cursor)
products = Product.order(:created_at, :id).offset(2).take(2)

cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id]
assert_equal([products, cursor], enum.first)
end

adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
private

def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil)
JobIteration::ActiveRecordBatchEnumerator.new(
relation,
batch_size: batch_size,
columns: columns,
cursor: cursor,
)
end

def track_uncached_queries(&block)
adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
queries = []
query_cb = ->(*, payload) { queries << payload[:sql] unless payload[:cached] }
adrianna-chang-shopify marked this conversation as resolved.
Show resolved Hide resolved
ActiveSupport::Notifications.subscribed(query_cb, "sql.active_record", &block)
queries
end
end
end