Skip to content

Commit

Permalink
Merge pull request #1284 from Shopify/active-fiber
Browse files Browse the repository at this point in the history
Lock on the current Fiber rather than current Thread
  • Loading branch information
sodabrew authored Dec 17, 2022
2 parents ba4d465 + 213f008 commit 1ac6978
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 26 deletions.
36 changes: 18 additions & 18 deletions ext/mysql2/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static void rb_mysql_client_mark(void * wrapper) {
mysql_client_wrapper * w = wrapper;
if (w) {
rb_gc_mark(w->encoding);
rb_gc_mark(w->active_thread);
rb_gc_mark(w->active_fiber);
}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ static void *nogvl_close(void *ptr) {
mysql_close(wrapper->client);
wrapper->closed = 1;
wrapper->reconnect_enabled = 0;
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
}

return NULL;
Expand Down Expand Up @@ -342,7 +342,7 @@ static VALUE allocate(VALUE klass) {
mysql_client_wrapper * wrapper;
obj = Data_Make_Struct(klass, mysql_client_wrapper, rb_mysql_client_mark, rb_mysql_client_free, wrapper);
wrapper->encoding = Qnil;
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
wrapper->automatic_close = 1;
wrapper->server_version = 0;
wrapper->reconnect_enabled = 0;
Expand Down Expand Up @@ -543,7 +543,7 @@ static VALUE do_send_query(VALUE args) {
mysql_client_wrapper *wrapper = query_args->wrapper;
if ((VALUE)rb_thread_call_without_gvl(nogvl_send_query, query_args, RUBY_UBF_IO, 0) == Qfalse) {
/* an error occurred, we're not active anymore */
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
rb_raise_mysql2_error(wrapper);
}
return Qnil;
Expand Down Expand Up @@ -573,7 +573,7 @@ static void *nogvl_do_result(void *ptr, char use_result) {

/* once our result is stored off, this connection is
ready for another command to be issued */
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;

return result;
}
Expand All @@ -599,13 +599,13 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
GET_CLIENT(self);

/* if we're not waiting on a result, do nothing */
if (NIL_P(wrapper->active_thread))
if (NIL_P(wrapper->active_fiber))
return Qnil;

REQUIRE_CONNECTED(wrapper);
if ((VALUE)rb_thread_call_without_gvl(nogvl_read_query_result, wrapper->client, RUBY_UBF_IO, 0) == Qfalse) {
/* an error occurred, mark this connection inactive */
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
rb_raise_mysql2_error(wrapper);
}

Expand All @@ -618,7 +618,7 @@ static VALUE rb_mysql_client_async_result(VALUE self) {

if (result == NULL) {
if (mysql_errno(wrapper->client) != 0) {
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
rb_raise_mysql2_error(wrapper);
}
/* no data and no error, so query was not a SELECT */
Expand All @@ -645,7 +645,7 @@ struct async_query_args {
static VALUE disconnect_and_raise(VALUE self, VALUE error) {
GET_CLIENT(self);

wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;

/* Invalidate the MySQL socket to prevent further communication.
* The GC will come along later and call mysql_close to free it.
Expand Down Expand Up @@ -710,7 +710,7 @@ static VALUE disconnect_and_mark_inactive(VALUE self) {
GET_CLIENT(self);

/* Check if execution terminated while result was still being read. */
if (!NIL_P(wrapper->active_thread)) {
if (!NIL_P(wrapper->active_fiber)) {
if (CONNECTED(wrapper)) {
/* Invalidate the MySQL socket to prevent further communication. */
#ifndef _WIN32
Expand All @@ -725,24 +725,24 @@ static VALUE disconnect_and_mark_inactive(VALUE self) {
}
/* Skip mysql client check performed before command execution. */
wrapper->client->status = MYSQL_STATUS_READY;
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
}

return Qnil;
}

void rb_mysql_client_set_active_thread(VALUE self) {
VALUE thread_current = rb_thread_current();
static void rb_mysql_client_set_active_fiber(VALUE self) {
VALUE fiber_current = rb_fiber_current();
GET_CLIENT(self);

// see if this connection is still waiting on a result from a previous query
if (NIL_P(wrapper->active_thread)) {
if (NIL_P(wrapper->active_fiber)) {
// mark this connection active
wrapper->active_thread = thread_current;
} else if (wrapper->active_thread == thread_current) {
wrapper->active_fiber = fiber_current;
} else if (wrapper->active_fiber == fiber_current) {
rb_raise(cMysql2Error, "This connection is still waiting for a result, try again once you have the result");
} else {
VALUE inspect = rb_inspect(wrapper->active_thread);
VALUE inspect = rb_inspect(wrapper->active_fiber);
const char *thr = StringValueCStr(inspect);

rb_raise(cMysql2Error, "This connection is in use by: %s", thr);
Expand Down Expand Up @@ -806,7 +806,7 @@ static VALUE rb_mysql_query(VALUE self, VALUE sql, VALUE current) {
args.sql_len = RSTRING_LEN(args.sql);
args.wrapper = wrapper;

rb_mysql_client_set_active_thread(self);
rb_mysql_client_set_active_fiber(self);

#ifndef _WIN32
rb_rescue2(do_send_query, (VALUE)&args, disconnect_and_raise, self, rb_eException, (VALUE)0);
Expand Down
3 changes: 1 addition & 2 deletions ext/mysql2/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

typedef struct {
VALUE encoding;
VALUE active_thread; /* rb_thread_current() or Qnil */
VALUE active_fiber; /* rb_fiber_current() or Qnil */
long server_version;
int reconnect_enabled;
unsigned int connect_timeout;
Expand All @@ -15,7 +15,6 @@ typedef struct {
MYSQL *client;
} mysql_client_wrapper;

void rb_mysql_client_set_active_thread(VALUE self);
void rb_mysql_set_server_query_flags(MYSQL *client, VALUE result);

#define GET_CLIENT(self) \
Expand Down
6 changes: 3 additions & 3 deletions ext/mysql2/statement.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ static VALUE rb_mysql_stmt_execute(int argc, VALUE *argv, VALUE self) {
if (metadata == NULL) {
if (mysql_stmt_errno(stmt) != 0) {
// either CR_OUT_OF_MEMORY or CR_UNKNOWN_ERROR. both fatal.
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
rb_raise_mysql2_stmt_error(stmt_wrapper);
}
// no data and no error, so query was not a SELECT
Expand All @@ -461,7 +461,7 @@ static VALUE rb_mysql_stmt_execute(int argc, VALUE *argv, VALUE self) {
mysql_free_result(metadata);
rb_raise_mysql2_stmt_error(stmt_wrapper);
}
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
}

resultObj = rb_mysql_result_to_obj(stmt_wrapper->client, wrapper->encoding, current, metadata, self);
Expand Down Expand Up @@ -502,7 +502,7 @@ static VALUE rb_mysql_stmt_fields(VALUE self) {
if (metadata == NULL) {
if (mysql_stmt_errno(stmt) != 0) {
// either CR_OUT_OF_MEMORY or CR_UNKNOWN_ERROR. both fatal.
wrapper->active_thread = Qnil;
wrapper->active_fiber = Qnil;
rb_raise_mysql2_stmt_error(stmt_wrapper);
}
// no data and no error, so query was not a SELECT
Expand Down
9 changes: 6 additions & 3 deletions spec/mysql2/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,13 @@ def run_gc
end

it "should describe the thread holding the active query" do
thr = Thread.new { @client.query("SELECT 1", async: true) }
thr = Thread.new do
@client.query("SELECT 1", async: true)
Fiber.current
end

thr.join
expect { @client.query('SELECT 1') }.to raise_error(Mysql2::Error, Regexp.new(Regexp.escape(thr.inspect)))
fiber = thr.value
expect { @client.query('SELECT 1') }.to raise_error(Mysql2::Error, Regexp.new(Regexp.escape(fiber.inspect)))
end

it "should timeout if we wait longer than :read_timeout" do
Expand Down
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
require 'mysql2'
require 'timeout'
require 'yaml'
require 'fiber'

DatabaseCredentials = YAML.load_file('spec/configuration.yml')

if GC.respond_to?(:verify_compaction_references)
Expand Down

0 comments on commit 1ac6978

Please sign in to comment.