From 213f008c7fbbb3e0b0a0637cee354bfc9267775b Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 21 Nov 2022 10:20:06 +0100 Subject: [PATCH] Lock on the current Fiber rather than current Thread Applications using fiber are able to do concurrent queries from the same thread. --- ext/mysql2/client.c | 36 ++++++++++++++++++------------------ ext/mysql2/client.h | 3 +-- ext/mysql2/statement.c | 6 +++--- spec/mysql2/client_spec.rb | 9 ++++++--- spec/spec_helper.rb | 2 ++ 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/ext/mysql2/client.c b/ext/mysql2/client.c index 5b699e924..c200b3649 100644 --- a/ext/mysql2/client.c +++ b/ext/mysql2/client.c @@ -193,7 +193,7 @@ static void rb_mysql_client_mark(void * wrapper) { mysql_client_wrapper * w = wrapper; if (w) { rb_gc_mark(w->encoding); - rb_gc_mark(w->active_thread); + rb_gc_mark(w->active_fiber); } } @@ -297,7 +297,7 @@ static void *nogvl_close(void *ptr) { mysql_close(wrapper->client); wrapper->closed = 1; wrapper->reconnect_enabled = 0; - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; } return NULL; @@ -342,7 +342,7 @@ static VALUE allocate(VALUE klass) { mysql_client_wrapper * wrapper; obj = Data_Make_Struct(klass, mysql_client_wrapper, rb_mysql_client_mark, rb_mysql_client_free, wrapper); wrapper->encoding = Qnil; - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; wrapper->automatic_close = 1; wrapper->server_version = 0; wrapper->reconnect_enabled = 0; @@ -543,7 +543,7 @@ static VALUE do_send_query(VALUE args) { mysql_client_wrapper *wrapper = query_args->wrapper; if ((VALUE)rb_thread_call_without_gvl(nogvl_send_query, query_args, RUBY_UBF_IO, 0) == Qfalse) { /* an error occurred, we're not active anymore */ - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; rb_raise_mysql2_error(wrapper); } return Qnil; @@ -573,7 +573,7 @@ static void *nogvl_do_result(void *ptr, char use_result) { /* once our result is stored off, this connection is ready for another command to be issued */ - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; return result; } @@ -599,13 +599,13 @@ static VALUE rb_mysql_client_async_result(VALUE self) { GET_CLIENT(self); /* if we're not waiting on a result, do nothing */ - if (NIL_P(wrapper->active_thread)) + if (NIL_P(wrapper->active_fiber)) return Qnil; REQUIRE_CONNECTED(wrapper); if ((VALUE)rb_thread_call_without_gvl(nogvl_read_query_result, wrapper->client, RUBY_UBF_IO, 0) == Qfalse) { /* an error occurred, mark this connection inactive */ - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; rb_raise_mysql2_error(wrapper); } @@ -618,7 +618,7 @@ static VALUE rb_mysql_client_async_result(VALUE self) { if (result == NULL) { if (mysql_errno(wrapper->client) != 0) { - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; rb_raise_mysql2_error(wrapper); } /* no data and no error, so query was not a SELECT */ @@ -645,7 +645,7 @@ struct async_query_args { static VALUE disconnect_and_raise(VALUE self, VALUE error) { GET_CLIENT(self); - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; /* Invalidate the MySQL socket to prevent further communication. * The GC will come along later and call mysql_close to free it. @@ -710,7 +710,7 @@ static VALUE disconnect_and_mark_inactive(VALUE self) { GET_CLIENT(self); /* Check if execution terminated while result was still being read. */ - if (!NIL_P(wrapper->active_thread)) { + if (!NIL_P(wrapper->active_fiber)) { if (CONNECTED(wrapper)) { /* Invalidate the MySQL socket to prevent further communication. */ #ifndef _WIN32 @@ -725,24 +725,24 @@ static VALUE disconnect_and_mark_inactive(VALUE self) { } /* Skip mysql client check performed before command execution. */ wrapper->client->status = MYSQL_STATUS_READY; - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; } return Qnil; } -void rb_mysql_client_set_active_thread(VALUE self) { - VALUE thread_current = rb_thread_current(); +static void rb_mysql_client_set_active_fiber(VALUE self) { + VALUE fiber_current = rb_fiber_current(); GET_CLIENT(self); // see if this connection is still waiting on a result from a previous query - if (NIL_P(wrapper->active_thread)) { + if (NIL_P(wrapper->active_fiber)) { // mark this connection active - wrapper->active_thread = thread_current; - } else if (wrapper->active_thread == thread_current) { + wrapper->active_fiber = fiber_current; + } else if (wrapper->active_fiber == fiber_current) { rb_raise(cMysql2Error, "This connection is still waiting for a result, try again once you have the result"); } else { - VALUE inspect = rb_inspect(wrapper->active_thread); + VALUE inspect = rb_inspect(wrapper->active_fiber); const char *thr = StringValueCStr(inspect); rb_raise(cMysql2Error, "This connection is in use by: %s", thr); @@ -806,7 +806,7 @@ static VALUE rb_mysql_query(VALUE self, VALUE sql, VALUE current) { args.sql_len = RSTRING_LEN(args.sql); args.wrapper = wrapper; - rb_mysql_client_set_active_thread(self); + rb_mysql_client_set_active_fiber(self); #ifndef _WIN32 rb_rescue2(do_send_query, (VALUE)&args, disconnect_and_raise, self, rb_eException, (VALUE)0); diff --git a/ext/mysql2/client.h b/ext/mysql2/client.h index 5e0ebe3f0..0a9faaf94 100644 --- a/ext/mysql2/client.h +++ b/ext/mysql2/client.h @@ -3,7 +3,7 @@ typedef struct { VALUE encoding; - VALUE active_thread; /* rb_thread_current() or Qnil */ + VALUE active_fiber; /* rb_fiber_current() or Qnil */ long server_version; int reconnect_enabled; unsigned int connect_timeout; @@ -15,7 +15,6 @@ typedef struct { MYSQL *client; } mysql_client_wrapper; -void rb_mysql_client_set_active_thread(VALUE self); void rb_mysql_set_server_query_flags(MYSQL *client, VALUE result); #define GET_CLIENT(self) \ diff --git a/ext/mysql2/statement.c b/ext/mysql2/statement.c index b31efeb27..fd506de0f 100644 --- a/ext/mysql2/statement.c +++ b/ext/mysql2/statement.c @@ -448,7 +448,7 @@ static VALUE rb_mysql_stmt_execute(int argc, VALUE *argv, VALUE self) { if (metadata == NULL) { if (mysql_stmt_errno(stmt) != 0) { // either CR_OUT_OF_MEMORY or CR_UNKNOWN_ERROR. both fatal. - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; rb_raise_mysql2_stmt_error(stmt_wrapper); } // no data and no error, so query was not a SELECT @@ -461,7 +461,7 @@ static VALUE rb_mysql_stmt_execute(int argc, VALUE *argv, VALUE self) { mysql_free_result(metadata); rb_raise_mysql2_stmt_error(stmt_wrapper); } - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; } resultObj = rb_mysql_result_to_obj(stmt_wrapper->client, wrapper->encoding, current, metadata, self); @@ -502,7 +502,7 @@ static VALUE rb_mysql_stmt_fields(VALUE self) { if (metadata == NULL) { if (mysql_stmt_errno(stmt) != 0) { // either CR_OUT_OF_MEMORY or CR_UNKNOWN_ERROR. both fatal. - wrapper->active_thread = Qnil; + wrapper->active_fiber = Qnil; rb_raise_mysql2_stmt_error(stmt_wrapper); } // no data and no error, so query was not a SELECT diff --git a/spec/mysql2/client_spec.rb b/spec/mysql2/client_spec.rb index 2f757815e..ede316b76 100644 --- a/spec/mysql2/client_spec.rb +++ b/spec/mysql2/client_spec.rb @@ -624,10 +624,13 @@ def run_gc end it "should describe the thread holding the active query" do - thr = Thread.new { @client.query("SELECT 1", async: true) } + thr = Thread.new do + @client.query("SELECT 1", async: true) + Fiber.current + end - thr.join - expect { @client.query('SELECT 1') }.to raise_error(Mysql2::Error, Regexp.new(Regexp.escape(thr.inspect))) + fiber = thr.value + expect { @client.query('SELECT 1') }.to raise_error(Mysql2::Error, Regexp.new(Regexp.escape(fiber.inspect))) end it "should timeout if we wait longer than :read_timeout" do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 594e7d339..edfac4d64 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,6 +2,8 @@ require 'mysql2' require 'timeout' require 'yaml' +require 'fiber' + DatabaseCredentials = YAML.load_file('spec/configuration.yml') if GC.respond_to?(:verify_compaction_references)