-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Complex Job Workflows with Batches
Whenever you use an asynchronous system, workflow becomes much harder to control. It's easy to create a lot of work in parallel, to fire and forget 10 jobs, but synchronizing based on those parallel jobs in a more complex workflow can quickly become a hard problem to solve yourself. Let's consider an example.
You want to upload 10 product images to S3 and only mark the product as visible to customers once the images are available on S3. We don't want the customer to see broken images, right? Conceptually that's simple: upload them one at a time.
class ProductImageUploader
def upload(product, image_ids)
image_ids.each do |image_id|
s3_upload(data_for_image(image_id))
end
product.mark_visible!
end
end
The problem is that it's slow because we're performing the work sequentially.
Sidekiq allows you to split that work easily. Instead of processing 10 images in a loop, you create a job for each image and have your Sidekiq workers process the jobs in parallel. The resulting upload process will finish much quicker.
class ProductImageUploader
include Sidekiq::Job
def perform(image_id)
s3_upload(data_for_image(image_id))
end
def self.upload(product, image_ids)
image_ids.each do |image_id|
perform_async(image_id)
end
# Can't do this here!
#product.mark_visible!
end
end
That speed comes at a price though: we've lost the ability to mark the product as visible! Because the image upload happens asynchronously, we don't know when all of the images have finished uploading. We'd like to register a callback to be executed when all of the jobs have completed.
That's synchronization in workflow terms: we don't want to change product visibility until the previous parallel jobs have all completed. It's not too hard to implement when all of your work is in one process or on one machine but Sidekiq is designed to run across many machines. Sidekiq Pro allows you to synchronize very easily: it introduces the notion of a Batch of jobs with callbacks that fire when all jobs in that Batch are finished.
class ImageUploader
include Sidekiq::Job
def perform(image_id)
s3_upload(data_for_image(image_id))
end
def on_success(status, options)
product = Product.find(options['pid'])
product.mark_visible!
end
# Kick off this entire workflow by calling something like:
# ImageUploader.upload(product, product.images.map(&:id))
def self.upload(product, image_ids)
batch = Sidekiq::Batch.new
batch.on(:success, self, 'pid' => product.id)
batch.jobs do
image_ids.each do |image_id|
perform_async(image_id)
end
end
end
end
The on_success callback is called once the last image upload job executes successfully. Now we have the best of both worlds: the speed of parallel execution with proper synchronization ensuring no broken images for your users!
Remember: Sidekiq allows you to go from serial to parallel. Sidekiq Pro allows you to go from serial to parallel and back to serial. Easy.