Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support batch record collections #402

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,41 @@ end
title,content
My Title,Hello World!
```
### Processing Batch Collections

The Maintenance Tasks gem supports processing Active Records in batches. This
can reduce the number of calls your Task makes to the database. Use `in_batches`
to specify that your Task should process records in batches. The default batch
size is 100, but a custom size can be specified.

```ruby
# app/tasks/maintenance/update_posts_in_batches_task.rb
module Maintenance
class UpdatePostsInBatchesTask < MaintenanceTasks::Task
in_batches 50

def collection
Post.all
end

def count
collection.count
end

def process(batch_of_posts)
Post.where(id: batch_of_posts.map(&:id)).update_all(
content: "New content added on #{Time.now.utc}"
)
end
end
end
```

Ensure that you've implemented the following methods:
* `collection`: return an Active Record Relation to be iterated over.
* `process`: do the work of your Task on a _batch_ of records.
* `count`: return the number of rows that will be iterated over (this should
still be the number of records to be processed, not the number of batches).

### Considerations when writing Tasks

Expand Down
16 changes: 14 additions & 2 deletions app/jobs/maintenance_tasks/task_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,18 @@ def build_enumerator(_run, cursor:)

case collection
when ActiveRecord::Relation
enumerator_builder.active_record_on_records(collection, cursor: cursor)
if @task.in_batches?
enumerator_builder.active_record_on_batches(
collection,
cursor: cursor,
batch_size: @task.batch_size
)
else
enumerator_builder.active_record_on_records(
collection,
cursor: cursor
)
end
when Array
enumerator_builder.build_array_enumerator(collection, cursor: cursor)
when CSV
Expand All @@ -50,7 +61,8 @@ def build_enumerator(_run, cursor:)
def each_iteration(input, _run)
throw(:abort, :skip_complete_callbacks) if @run.stopping?
task_iteration(input)
@ticker.tick
batch_size = @task.in_batches? ? input.size : 1
@ticker.tick(batch_size)
@run.reload_status
end

Expand Down
10 changes: 5 additions & 5 deletions app/models/maintenance_tasks/ticker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ def initialize(throttle_duration, &persist)
@ticks_recorded = 0
end

# Increments the tick count by one, and may persist the new value if the
# threshold duration has passed since initialization or the tick count was
# last persisted.
def tick
@ticks_recorded += 1
# Increments the tick count, which defaults to one. It may persist the new
# value if the threshold duration has passed since initialization or the
# tick count was last persisted.
def tick(increment = 1)
@ticks_recorded += increment
persist if persist?
end

Expand Down
25 changes: 25 additions & 0 deletions app/tasks/maintenance_tasks/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class Task
class NotFoundError < NameError; end

class << self
attr_reader :batch_size

# Finds a Task with the given name.
#
# @param name [String] the name of the Task to be found.
Expand Down Expand Up @@ -72,6 +74,14 @@ def count
new.count
end

# Make this a Task that processes batches.
#
# @param batch_size [Integer] optionally, a custom batch size can be
# specified. Defaults to 100 if one is not provided.
def in_batches(batch_size = 100)
@batch_size = batch_size
end

private

def load_constants
Expand Down Expand Up @@ -110,5 +120,20 @@ def process(_item)
# @return [Integer, nil]
def count
end

# The batch size for this Task instance, if any.
#
# @return [Integer, nil] the batch size for this Task instance, or nil if
# the Task does not process batches.
def batch_size
self.class.batch_size
end

# Whether this Task instance processes batches.
#
# @return [Boolean] whether this Task instance processes batches.
def in_batches?
batch_size.present?
end
end
end
20 changes: 20 additions & 0 deletions test/dummy/app/tasks/maintenance/update_posts_in_batches_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true
module Maintenance
class UpdatePostsInBatchesTask < MaintenanceTasks::Task
in_batches 5

def collection
Post.all
end

def count
collection.count
end

def process(batch_of_posts)
Post.where(id: batch_of_posts.map(&:id)).update_all(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get the batches, we shouldn't need the call to where, shouldn't we be able to call update_all directly on the batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@adrianna-chang-shopify adrianna-chang-shopify May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there's no reason we couldn't turn this back into a Relation / Batch Enumerator again on our end, so it probably makes sense for us to do that (despite being annoying to be moving this back and forth between a BatchEnumerator / Array / Relation 😛 )

content: "New content added on #{Time.now.utc}"
)
end
end
end
13 changes: 13 additions & 0 deletions test/jobs/maintenance_tasks/task_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,18 @@ class << self
ensure
MaintenanceTasks.error_handler = error_handler_before
end

test ".perform_now handles batched tasks" do
5.times do |i|
Post.create!(title: "Another Post ##{i}", content: "Content ##{i}")
end
# We expect 2 batches (7 posts => 5 + 2)
Maintenance::UpdatePostsInBatchesTask.any_instance.expects(:process).twice

run = Run.create!(task_name: "Maintenance::UpdatePostsInBatchesTask")
TaskJob.perform_now(run)

assert_equal Post.count, run.reload.tick_count
end
end
end
1 change: 1 addition & 0 deletions test/models/maintenance_tasks/task_data_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TaskDataTest < ActiveSupport::TestCase
"Maintenance::ErrorTask",
"Maintenance::ImportPostsTask",
"Maintenance::TestTask",
"Maintenance::UpdatePostsInBatchesTask",
"Maintenance::UpdatePostsTask",
]
assert_equal expected, TaskData.available_tasks.map(&:name)
Expand Down
1 change: 1 addition & 0 deletions test/system/maintenance_tasks/tasks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TasksTest < ApplicationSystemTestCase
"Maintenance::ErrorTask\nNew",
"Maintenance::ImportPostsTask\nNew",
"Maintenance::TestTask\nNew",
"Maintenance::UpdatePostsInBatchesTask\nNew",
"Completed Tasks",
"Maintenance::UpdatePostsTask\nSucceeded",
]
Expand Down
1 change: 1 addition & 0 deletions test/tasks/maintenance_tasks/task_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class TaskTest < ActiveSupport::TestCase
"Maintenance::ErrorTask",
"Maintenance::ImportPostsTask",
"Maintenance::TestTask",
"Maintenance::UpdatePostsInBatchesTask",
"Maintenance::UpdatePostsTask",
]
assert_equal expected,
Expand Down