Skip to content

Commit

Permalink
Merge pull request #4625 from rmosolgo/dataloader-rewrite
Browse files Browse the repository at this point in the history
Rewrite dataloader to use Fiber#transfer
  • Loading branch information
rmosolgo authored Dec 11, 2023
2 parents 05d120d + 0b30f64 commit fee1832
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 141 deletions.
232 changes: 100 additions & 132 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ def with(source_class, *batch_args, **batch_kwargs)
#
# @return [void]
def yield
Fiber.yield
if use_fiber_resume?
Fiber.yield
else
parent_fiber = Thread.current[:parent_fiber]
parent_fiber.transfer
end
nil
end

Expand Down Expand Up @@ -167,120 +172,121 @@ def run_isolated
end
end

# @api private Move along, move along
def run
if @nonblocking && !Fiber.scheduler
raise "`nonblocking: true` requires `Fiber.scheduler`, assign one with `Fiber.set_scheduler(...)` before executing GraphQL."
end
# At a high level, the algorithm is:
#
# A) Inside Fibers, run jobs from the queue one-by-one
# - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
# - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
# - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded)
# B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources.
# - Similarly, create a Fiber to consume pending sources and tell them to load their data.
# - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources.
# - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one.
# C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data.
# - Those Fibers assume that source caches will have been populated with the data they were waiting for.
# - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list.
# D) Once all pending fibers have been resumed once, return to `A` above.
#
# For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
# on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
#
pending_fibers = []
next_fibers = []
pending_source_fibers = []
job_fibers = []
next_job_fibers = []
source_fibers = []
next_source_fibers = []
first_pass = true

while first_pass || (f = pending_fibers.shift)
if first_pass
manager = spawn_fiber do
while first_pass || job_fibers.any?
first_pass = false
else
# These fibers were previously waiting for sources to load data,
# resume them. (They might wait again, in which case, re-enqueue them.)
resume(f)
if f.alive?
next_fibers << f
end
end

while @pending_jobs.any?
# Create a Fiber to consume jobs until one of the jobs yields
# or jobs run out
f = spawn_fiber {
while (job = @pending_jobs.shift)
job.call
while (f = job_fibers.shift || spawn_job_fiber)
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
}
resume(f)
# In this case, the job yielded. Queue it up to run again after
# we load whatever it's waiting for.
if f.alive?
next_fibers << f
end
end

if pending_fibers.empty?
# Now, run all Sources which have become pending _before_ resuming GraphQL execution.
# Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
#
# This is where an evented approach would be even better -- can we tell which
# fibers are ready to continue, and continue execution there?
#
if (first_source_fiber = create_source_fiber)
pending_source_fibers << first_source_fiber
end

while pending_source_fibers.any?
while (outer_source_fiber = pending_source_fibers.pop)
resume(outer_source_fiber)
if outer_source_fiber.alive?
next_source_fibers << outer_source_fiber
end
if (next_source_fiber = create_source_fiber)
pending_source_fibers << next_source_fiber
if job_fibers.empty?
any_pending_sources = true
while any_pending_sources
while (f = source_fibers.shift || spawn_source_fiber)
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
end
end
join_queues(source_fibers, next_source_fibers)
any_pending_sources = @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
end
join_queues(pending_source_fibers, next_source_fibers)
next_source_fibers.clear
end
# Move newly-enqueued Fibers on to the list to be resumed.
# Clear out the list of next-round Fibers, so that
# any Fibers that pause can be put on it.
join_queues(pending_fibers, next_fibers)
next_fibers.clear
join_queues(job_fibers, next_job_fibers)
end
end

if @pending_jobs.any?
raise "Invariant: #{@pending_jobs.size} pending jobs"
elsif pending_fibers.any?
raise "Invariant: #{pending_fibers.size} pending fibers"
elsif next_fibers.any?
raise "Invariant: #{next_fibers.size} next fibers"
run_fiber(manager)

rescue UncaughtThrowError => e
throw e.tag, e.value
end

def run_fiber(f)
if use_fiber_resume?
f.resume
else
f.transfer
end
nil
end

def join_queues(previous_queue, next_queue)
if @nonblocking
Fiber.scheduler.run
next_queue.select!(&:alive?)
def spawn_fiber
fiber_vars = get_fiber_variables
parent_fiber = use_fiber_resume? ? nil : Fiber.current
Fiber.new(blocking: !@nonblocking) {
set_fiber_variables(fiber_vars)
Thread.current[:parent_fiber] = parent_fiber
yield
# With `.transfer`, you have to explicitly pass back to the parent --
# if the fiber is allowed to terminate normally, control is passed to the main fiber instead.
if parent_fiber
parent_fiber.transfer(true)
else
true
end
}
end


def get_fiber_state
fiber_locals = {}

Thread.current.keys.each do |fiber_var_key|
# This variable should be fresh in each new fiber
if fiber_var_key != :__graphql_runtime_info
fiber_locals[fiber_var_key] = Thread.current[fiber_var_key]
end
end
previous_queue.concat(next_queue)

fiber_locals
end

def set_fiber_state(state)
state.each { |k, v| Thread.current[k] = v }
end

private

# If there are pending sources, return a fiber for running them.
# Otherwise, return `nil`.
#
# @return [Fiber, nil]
def create_source_fiber
def join_queues(prev_queue, new_queue)
@nonblocking && Fiber.scheduler.run
prev_queue.concat(new_queue)
new_queue.clear
end

def use_fiber_resume?
Fiber.respond_to?(:scheduler) &&
(
(defined?(::DummyScheduler) && Fiber.scheduler.is_a?(::DummyScheduler)) ||
(defined?(::Evt) && ::Evt::Scheduler.singleton_class::BACKENDS.any? { |be| Fiber.scheduler.is_a?(be) }) ||
(defined?(::Libev) && Fiber.scheduler.is_a?(::Libev::Scheduler))
)
end

def spawn_job_fiber
if @pending_jobs.any?
spawn_fiber do
while job = @pending_jobs.shift
job.call
end
end
end
end

def spawn_source_fiber
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
source_by_batch_params.each_value do |source|
Expand All @@ -292,48 +298,10 @@ def create_source_fiber
end

if pending_sources
# By passing the whole array into this Fiber, it's possible that we set ourselves up for a bunch of no-ops.
# For example, if you have sources `[a, b, c]`, and `a` is loaded, then `b` yields to wait for `d`, then
# the next fiber would be dispatched with `[c, d]`. It would fulfill `c`, then `d`, then eventually
# the previous fiber would start up again. `c` would no longer be pending, but it would still receive `.run_pending_keys`.
# That method is short-circuited since it isn't pending any more, but it's still a waste.
#
# This design could probably be improved by maintaining a `@pending_sources` queue which is shared by the fibers,
# similar to `@pending_jobs`. That way, when a fiber is resumed, it would never pick up work that was finished by a different fiber.
source_fiber = spawn_fiber do
spawn_fiber do
pending_sources.each(&:run_pending_keys)
end
end

source_fiber
end

def resume(fiber)
fiber.resume
rescue UncaughtThrowError => e
throw e.tag, e.value
end

# Copies the thread local vars into the fiber thread local vars. Many
# gems (such as RequestStore, MiniRacer, etc.) rely on thread local vars
# to keep track of execution context, and without this they do not
# behave as expected.
#
# @see https://github.com/rmosolgo/graphql-ruby/issues/3449
def spawn_fiber
fiber_vars = get_fiber_variables

if @nonblocking
Fiber.new(blocking: false) do
set_fiber_variables(fiber_vars)
yield
end
else
Fiber.new do
set_fiber_variables(fiber_vars)
yield
end
end
end
end
end
17 changes: 8 additions & 9 deletions spec/graphql/dataloader/async_dataloader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ def wait_for(tag:, wait:)
end

def with_scheduler
prev_scheduler = Fiber.scheduler
Fiber.set_scheduler(scheduler_class.new)
yield
ensure
Fiber.set_scheduler(prev_scheduler)
Fiber.set_scheduler(nil)
end

module AsyncDataloaderAssertions
Expand Down Expand Up @@ -240,13 +239,13 @@ def self.included(child_class)
include AsyncDataloaderAssertions
end

# if RUBY_ENGINE == "ruby" && !ENV["GITHUB_ACTIONS"]
# describe "With libev_scheduler" do
# require "libev_scheduler"
# let(:scheduler_class) { Libev::Scheduler }
# include AsyncDataloaderAssertions
# end
# end
if RUBY_ENGINE == "ruby" && !ENV["GITHUB_ACTIONS"]
describe "With libev_scheduler" do
require "libev_scheduler"
let(:scheduler_class) { Libev::Scheduler }
include AsyncDataloaderAssertions
end
end

describe "with evt" do
require "evt"
Expand Down

0 comments on commit fee1832

Please sign in to comment.