Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
byroot committed Nov 21, 2022
1 parent ba0b516 commit 13b38da
Showing 1 changed file with 57 additions and 41 deletions.
98 changes: 57 additions & 41 deletions ext/mysql2/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,36 @@ struct nogvl_select_db_args {
char *db;
};

static void rb_mysql_client_set_active_fiber(VALUE self) {
VALUE fiber_current = rb_fiber_current();
GET_CLIENT(self);

// see if this connection is still waiting on a result from a previous query
if (NIL_P(wrapper->active_fiber)) {
// mark this connection active
wrapper->active_fiber = fiber_current;
} else if (wrapper->active_fiber == fiber_current) {
rb_raise(cMysql2Error, "This connection is still waiting for a result, try again once you have the result");
} else {
VALUE inspect = rb_inspect(wrapper->active_fiber);
const char *thr = StringValueCStr(inspect);

rb_raise(cMysql2Error, "This connection is in use by: %s", thr);
}
}

static void rb_mysql_client_release_active_fiber(VALUE self) {
GET_CLIENT(self);

if (NIL_P(wrapper->active_fiber)) {
rb_raise(cMysql2Error, "[BUG] Tried to release a connection that was free");
} else if (wrapper->active_fiber != rb_fiber_current()) {
rb_raise(cMysql2Error, "[BUG] Tried to release a connection owned by another fiber");
} else {
wrapper->active_fiber = Qnil;
}
}

static VALUE rb_set_ssl_mode_option(VALUE self, VALUE setting) {
unsigned long version = mysql_get_client_version();

Expand Down Expand Up @@ -297,7 +327,6 @@ static void *nogvl_close(void *ptr) {
mysql_close(wrapper->client);
wrapper->closed = 1;
wrapper->reconnect_enabled = 0;
wrapper->active_fiber = Qnil;
}

return NULL;
Expand Down Expand Up @@ -507,9 +536,11 @@ static VALUE rb_mysql_connect(VALUE self, VALUE user, VALUE pass, VALUE host, VA
static VALUE rb_mysql_client_close(VALUE self) {
GET_CLIENT(self);

rb_mysql_client_set_active_fiber(self);
if (wrapper->client) {
rb_thread_call_without_gvl(nogvl_close, wrapper, RUBY_UBF_IO, 0);
}
rb_mysql_client_release_active_fiber(self);

return Qnil;
}
Expand All @@ -531,22 +562,14 @@ static VALUE rb_mysql_client_closed(VALUE self) {
*/
static void *nogvl_send_query(void *ptr) {
struct nogvl_send_query_args *args = ptr;
int rv;

rv = mysql_send_query(args->mysql, args->sql_ptr, args->sql_len);

return (void*)(rv == 0 ? Qtrue : Qfalse);
return (void *)(mysql_send_query(args->mysql, args->sql_ptr, args->sql_len) ? Qtrue : Qfalse);
}

static VALUE do_send_query(VALUE args) {
static int do_send_query(VALUE args) {
struct nogvl_send_query_args *query_args = (void *)args;
mysql_client_wrapper *wrapper = query_args->wrapper;
if ((VALUE)rb_thread_call_without_gvl(nogvl_send_query, query_args, RUBY_UBF_IO, 0) == Qfalse) {
/* an error occurred, we're not active anymore */
wrapper->active_fiber = Qnil;
rb_raise_mysql2_error(wrapper);
}
return Qnil;
return RTEST((VALUE)rb_thread_call_without_gvl(nogvl_send_query, query_args, RUBY_UBF_IO, 0));
}

/*
Expand All @@ -571,10 +594,6 @@ static void *nogvl_do_result(void *ptr, char use_result) {
result = mysql_store_result(wrapper->client);
}

/* once our result is stored off, this connection is
ready for another command to be issued */
wrapper->active_fiber = Qnil;

return result;
}

Expand Down Expand Up @@ -605,7 +624,7 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
REQUIRE_CONNECTED(wrapper);
if ((VALUE)rb_thread_call_without_gvl(nogvl_read_query_result, wrapper->client, RUBY_UBF_IO, 0) == Qfalse) {
/* an error occurred, mark this connection inactive */
wrapper->active_fiber = Qnil;
rb_mysql_client_release_active_fiber(self);
rb_raise_mysql2_error(wrapper);
}

Expand All @@ -616,9 +635,12 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
result = (MYSQL_RES *)rb_thread_call_without_gvl(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
}

/* once our result is stored off, this connection is
ready for another command to be issued */
rb_mysql_client_release_active_fiber(self);

if (result == NULL) {
if (mysql_errno(wrapper->client) != 0) {
wrapper->active_fiber = Qnil;
rb_raise_mysql2_error(wrapper);
}
/* no data and no error, so query was not a SELECT */
Expand All @@ -645,7 +667,7 @@ struct async_query_args {
static VALUE disconnect_and_raise(VALUE self, VALUE error) {
GET_CLIENT(self);

wrapper->active_fiber = Qnil;
rb_mysql_client_release_active_fiber(self);

/* Invalidate the MySQL socket to prevent further communication.
* The GC will come along later and call mysql_close to free it.
Expand Down Expand Up @@ -725,30 +747,12 @@ static VALUE disconnect_and_mark_inactive(VALUE self) {
}
/* Skip mysql client check performed before command execution. */
wrapper->client->status = MYSQL_STATUS_READY;
wrapper->active_fiber = Qnil;
rb_mysql_client_release_active_fiber(self);
}

return Qnil;
}

static void rb_mysql_client_set_active_fiber(VALUE self) {
VALUE fiber_current = rb_fiber_current();
GET_CLIENT(self);

// see if this connection is still waiting on a result from a previous query
if (NIL_P(wrapper->active_fiber)) {
// mark this connection active
wrapper->active_fiber = fiber_current;
} else if (wrapper->active_fiber == fiber_current) {
rb_raise(cMysql2Error, "This connection is still waiting for a result, try again once you have the result");
} else {
VALUE inspect = rb_inspect(wrapper->active_fiber);
const char *thr = StringValueCStr(inspect);

rb_raise(cMysql2Error, "This connection is in use by: %s", thr);
}
}

/* call-seq:
* client.abandon_results!
*
Expand Down Expand Up @@ -797,6 +801,8 @@ static VALUE rb_mysql_query(VALUE self, VALUE sql, VALUE current) {

(void)RB_GC_GUARD(current);
Check_Type(current, T_HASH);

rb_mysql_client_set_active_fiber(self);
rb_ivar_set(self, intern_current_query_options, current);

Check_Type(sql, T_STRING);
Expand All @@ -806,13 +812,18 @@ static VALUE rb_mysql_query(VALUE self, VALUE sql, VALUE current) {
args.sql_len = RSTRING_LEN(args.sql);
args.wrapper = wrapper;

rb_mysql_client_set_active_fiber(self);

#ifndef _WIN32
rb_rescue2(do_send_query, (VALUE)&args, disconnect_and_raise, self, rb_eException, (VALUE)0);
if (do_send_query((VALUE)&args)) {
/* an error occurred, we're not active anymore */
rb_mysql_client_release_active_fiber(self);
rb_raise_mysql2_error(wrapper);
}

(void)RB_GC_GUARD(sql);

if (rb_hash_aref(current, sym_async) == Qtrue) {
rb_mysql_client_release_active_fiber(self);
return Qnil;
} else {
async_args.fd = wrapper->client->net.fd;
Expand All @@ -823,7 +834,12 @@ static VALUE rb_mysql_query(VALUE self, VALUE sql, VALUE current) {
return rb_ensure(rb_mysql_client_async_result, self, disconnect_and_mark_inactive, self);
}
#else
do_send_query((VALUE)&args);
if (do_send_query((VALUE)&args)) {
/* an error occurred, we're not active anymore */
rb_mysql_client_release_active_fiber(self);
rb_raise_mysql2_error(wrapper);
}

(void)RB_GC_GUARD(sql);

/* this will just block until the result is ready */
Expand Down

0 comments on commit 13b38da

Please sign in to comment.