From f372eeb9fb795b664a711b49b90bcce829a2f4d3 Mon Sep 17 00:00:00 2001 From: Jeremy Evans Date: Mon, 6 May 2024 12:02:34 -0700 Subject: [PATCH] Add temporarily_release_connection Database extension for multithreaded transactional testing This allows one thread to start a transaction, and then release the connection back for usage by the connection pool, so that other threads can operate on the connection object safely inside the transaction. This requires the connection pool be limited to a single connection, to ensure that the released connection can be reacquired. It's not perfect, because if the connection is disconnected and removed from the pool while temporarily released, there is no way to handle that situation correctly. --- CHANGELOG | 4 + .../temporarily_release_connection.rb | 178 ++++++++++++++++++ spec/adapters/sqlite_spec.rb | 26 +++ .../temporarily_release_connection_spec.rb | 110 +++++++++++ www/pages/plugins.html.erb | 4 + 5 files changed, 322 insertions(+) create mode 100644 lib/sequel/extensions/temporarily_release_connection.rb create mode 100644 spec/extensions/temporarily_release_connection_spec.rb diff --git a/CHANGELOG b/CHANGELOG index f41cfbf36..5c31c2b5c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +=== master + +* Add temporarily_release_connection Database extension for multithreaded transactional testing (jeremyevans) + === 5.80.0 (2024-05-01) * Support Dataset#skip_locked on MariaDB 10.6+ (simi) (#2150) diff --git a/lib/sequel/extensions/temporarily_release_connection.rb b/lib/sequel/extensions/temporarily_release_connection.rb new file mode 100644 index 000000000..715de2b62 --- /dev/null +++ b/lib/sequel/extensions/temporarily_release_connection.rb @@ -0,0 +1,178 @@ +# frozen-string-literal: true +# +# The temporarily_release_connection extension adds support for temporarily +# releasing a checked out connection back to the connection pool. It is +# designed for use in multithreaded transactional integration tests, allowing +# a connection to start a transaction in one thread, but be temporarily +# released back to the connection pool, so it can be operated on safely +# by multiple threads inside a block. For example, the main thread could be +# running tests that send web requests, and a separate thread running a web +# server that is responding to those requests, and the same connection and +# transaction would be used for both. +# +# To load the extension into the database: +# +# DB.extension :temporarily_release_connection +# +# After the extension is loaded, call the +temporarily_release_connection+ +# method with the connection object to temporarily release the connection +# back to the pool. Example: +# +# DB.transaction(rollback: :always, auto_savepoint: true) do |conn| +# DB.temporarily_release_connection(conn) do +# # Other threads can operate on connection safely inside the transaction +# yield +# end +# end +# +# For sharded connection pools, the second argument to +temporarily_release_connection+ +# is respected, and specifies the server on which to temporarily release the connection. +# +# The temporarily_release_connection extension is only supported with the +# threaded and timed_queue connection pools that ship with Sequel (and the sharded +# versions of each). To make sure that same connection object can be reacquired, it +# is only supported if the maximum connection pool size is 1, so set the Database +# :max_connections option to 1 if you plan to use this extension. +# +# If the +temporarily_release_connection+ method cannot reacquire the same connection +# it released to the pool, it will raise a Sequel::UnableToReacquireConnectionError +# exception. This should only happen if the connection has been disconnected +# while it was temporarily released. If this error is raised, Database#transaction +# will not rollback the transaction, since the connection object is likely no longer +# valid, and on poorly written database drivers, that could cause the process to crash. +# +# Related modules: Sequel::TemporarilyReleaseConnection, +# Sequel::UnableToReacquireConnectionError + +# +module Sequel + # Error class raised if the connection pool does not provide the same connection + # object when checking a temporarily released connection out. + class UnableToReacquireConnectionError < Error + end + + module TemporarilyReleaseConnection + module DatabaseMethods + # Temporarily release the connection back to the connection pool for the + # duration of the block. + def temporarily_release_connection(conn, server=:default, &block) + pool.temporarily_release_connection(conn, server, &block) + end + + private + + # Do nothing if UnableToReacquireConnectionError is raised, as it is + # likely the connection is not in a usable state. + def rollback_transaction(conn, opts) + return if UnableToReacquireConnectionError === $! + super + end + end + + module PoolMethods + # Temporarily release a currently checked out connection, then yield to the block. Reacquire the same + # connection upon the exit of the block. + def temporarily_release_connection(conn, server) + t = Sequel.current + raise Error, "connection not currently checked out" unless conn.equal?(trc_owned_connection(t, server)) + + begin + trc_release(t, conn, server) + yield + ensure + c = trc_acquire(t, server) + unless conn.equal?(c) + raise UnableToReacquireConnectionError, "reacquired connection not the same as initial connection" + end + end + end + end + + module TimedQueue + private + + def trc_owned_connection(t, server) + owned_connection(t) + end + + def trc_release(t, conn, server) + release(t) + end + + def trc_acquire(t, server) + acquire(t) + end + end + + module ShardedTimedQueue + # Normalize the server name for sharded connection pools + def temporarily_release_connection(conn, server) + server = pick_server(server) + super + end + + private + + def trc_owned_connection(t, server) + owned_connection(t, server) + end + + def trc_release(t, conn, server) + release(t, conn, server) + end + + def trc_acquire(t, server) + acquire(t, server) + end + end + + module ThreadedBase + private + + def trc_release(t, conn, server) + sync{super} + end + end + + module Threaded + include TimedQueue + include ThreadedBase + end + + module ShardedThreaded + include ShardedTimedQueue + include ThreadedBase + end + end + + trc = TemporarilyReleaseConnection + trc_map = { + :threaded => trc::Threaded, + :sharded_threaded => trc::ShardedThreaded, + :timed_queue => trc::TimedQueue, + :sharded_timed_queue => trc::ShardedTimedQueue, + }.freeze + + Database.register_extension(:temporarily_release_connection) do |db| + unless pool_mod = trc_map[db.pool.pool_type] + raise(Error, "temporarily_release_connection extension not supported for connection pool type #{db.pool.pool_type}") + end + + case db.pool.pool_type + when :threaded, :sharded_threaded + if db.opts[:connection_handling] == :disconnect + raise Error, "temporarily_release_connection extension not supported with connection_handling: :disconnect option" + end + end + + unless db.pool.max_size == 1 + raise Error, "temporarily_release_connection extension not supported unless :max_connections option is 1" + end + + db.extend(trc::DatabaseMethods) + db.pool.extend(trc::PoolMethods) + db.pool.extend(pool_mod) + end + + private_constant :TemporarilyReleaseConnection +end diff --git a/spec/adapters/sqlite_spec.rb b/spec/adapters/sqlite_spec.rb index becf70764..808d4e22c 100644 --- a/spec/adapters/sqlite_spec.rb +++ b/spec/adapters/sqlite_spec.rb @@ -1058,3 +1058,29 @@ def setup_db(opts) db[:names].where(name: /^J/).select_order_map(:name).must_equal %w[Jane John] end if RUBY_VERSION >= '3.3' end if DB.adapter_scheme == :sqlite + +# Force a separate Database object for these tests, so temporarily_release_connection +# extension is always tested if testing the sqlite adapter. +describe 'temporarily_release_connection plugin' do + it "should temporarily release a connection" do + db = Sequel.sqlite + db.extension :temporarily_release_connection + + db.create_table(:i){Integer :i} + + db.transaction(:rollback=>:always) do |c| + db.temporarily_release_connection(c) do + 4.times.map do |i| + Thread.new do + db.synchronize do |conn| + _(conn).must_be_same_as c + end + db[:i].insert(i) + end + end.map(&:join) + end + db[:i].count.must_equal 4 + end + db[:i].count.must_equal 0 + end +end if DB.adapter_scheme == :sqlite diff --git a/spec/extensions/temporarily_release_connection_spec.rb b/spec/extensions/temporarily_release_connection_spec.rb new file mode 100644 index 000000000..9e8916325 --- /dev/null +++ b/spec/extensions/temporarily_release_connection_spec.rb @@ -0,0 +1,110 @@ +require_relative "spec_helper" + +pool_types = [ :threaded, :sharded_threaded] +pool_types += [ :timed_queue, :sharded_timed_queue] if RUBY_VERSION >= '3.2' + +pool_types.each do |pool_type| + describe "temporarily_release_connection extension with pool class #{pool_type}" do + before do + opts = {:max_connections=>1, :pool_class=>pool_type} + if pool_type.to_s.start_with?('sharded') + opts[:servers] = {:foo=>{}, :bar=>{}} + end + @db = Sequel.mock(opts).extension(:temporarily_release_connection) + end + + it "should temporarily release connection during block so it can be acquired by other threads" do + conns = [] + @db.transaction(:rollback=>:always) do |c| + @db.temporarily_release_connection(c) do + 4.times.map do |i| + Thread.new do + @db.synchronize do |conn| + conns << conn + end + end + end.map(&:join) + end + end + + c = @db.synchronize{|conn| conn} + conns.size.must_equal 4 + conns.each do |conn| + conn.must_be_same_as c + end + + @db.sqls.must_equal ['BEGIN', 'ROLLBACK'] + end + + it "should temporarily release connection for specific shard during block so it can be acquired by other threads" do + conns = [] + @db.transaction(:rollback=>:always, :server=>:foo) do |c| + @db.temporarily_release_connection(c, :foo) do + @db.transaction(:rollback=>:always, :server=>:bar) do |c2| + @db.temporarily_release_connection(c2, :bar) do + 4.times.map do |i| + Thread.new do + @db.synchronize(:foo) do |conn| + @db.synchronize(:bar) do |conn2| + conns << [conn, conn2] + end + end + end + end.map(&:join) + end + end + end + end + + c = @db.synchronize(:foo){|conn| conn} + c2 = @db.synchronize(:bar){|conn| conn} + conns.size.must_equal 4 + conns.each do |conn, conn2| + conn.must_be_same_as c + conn2.must_be_same_as c2 + end + + @db.sqls.must_equal ["BEGIN -- foo", "BEGIN -- bar", "ROLLBACK -- bar", "ROLLBACK -- foo"] + end if pool_type.to_s.start_with?('sharded') + + it "should raise UnableToReacquireConnectionError if unable to reacquire the same connection it released" do + proc do + @db.transaction(rollback: :always) do |conn| + @db.temporarily_release_connection(conn) do + @db.disconnect + end + end + end.must_raise Sequel::UnableToReacquireConnectionError + @db.sqls.must_equal ['BEGIN'] + end + + it "should raise if provided a connection that is not checked out" do + proc do + @db.temporarily_release_connection(@db.synchronize{|conn| conn}) + end.must_raise Sequel::Error + end + + it "should raise if pool max_size is not 1" do + db = Sequel.mock(:pool_type=>pool_type) + proc do + db.extension(:temporarily_release_connection) + end.must_raise Sequel::Error + end + end +end + +describe "temporarily_release_connection extension" do + it "should raise if pool uses connection_handling: :disconnect option" do + db = Sequel.mock(:connection_handling=>:disconnect) + proc do + db.extension(:temporarily_release_connection) + end.must_raise Sequel::Error + end + + it "should raise if pool uses unsupported pool type" do + db = Sequel.mock(:pool_class=>:single) + proc do + db.extension(:temporarily_release_connection) + end.must_raise Sequel::Error + end +end diff --git a/www/pages/plugins.html.erb b/www/pages/plugins.html.erb index 00238b592..8788507ca 100644 --- a/www/pages/plugins.html.erb +++ b/www/pages/plugins.html.erb @@ -771,6 +771,10 @@ Normalizes SQL before logging, helpful for analytics and sensitive data.
  • +temporarily_release_connection +Allows for multithreaded transactional testing by temporarily releasing checked-out connections back to the pool. +
  • +
  • transaction_connection_validator Handle disconnect failures detected when starting a new transaction using a new connection transparently.