Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mysql-auto_increment_delay_multiplex_timeout_ms - Closes #3923 #3946

Merged
merged 7 commits into from
Sep 14, 2022
17 changes: 16 additions & 1 deletion include/MySQL_Session.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#ifndef __CLASS_MYSQL_SESSION_H
#define __CLASS_MYSQL_SESSION_H

#include <functional>
#include <vector>

#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Variables.h"
Expand Down Expand Up @@ -155,7 +159,11 @@ class MySQL_Session
void init();
void reset();
void add_ldap_comment_to_pkt(PtrSize_t *);

/**
* @brief Performs the required housekeeping operations over the session and its connections before
* performing any processing on received client packets.
*/
void housekeeping_before_pkts();
int get_pkts_from_client(bool&, PtrSize_t&);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t&);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t&);
Expand Down Expand Up @@ -215,6 +223,12 @@ class MySQL_Session
PtrArray *mybes;
MySQL_Data_Stream *client_myds;
MySQL_Data_Stream *server_myds;
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. These values will be used to release the retained connections in the specific
* hostgroups in housekeeping operations, before client packet processing. Currently 'housekeeping_before_pkts'.
*/
std::vector<int32_t> hgs_expired_conns {};
char * default_schema;
char * user_attributes;

Expand Down Expand Up @@ -319,6 +333,7 @@ class MySQL_Session
void Memory_Stats();
void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds);
bool handle_command_query_kill(PtrSize_t *);
void update_expired_conns(const std::vector<std::function<bool(MySQL_Connection*)>>&);
/**
* @brief Performs the final operations after current query has finished to be executed. It updates the session
* 'transaction_persistent_hostgroup', and updates the 'MySQL_Data_Stream' and 'MySQL_Connection' before
Expand Down
2 changes: 1 addition & 1 deletion include/mysql_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class MySQL_Connection {
bool IsServerOffline();
bool IsAutoCommit();
bool AutocommitFalse_AndSavepoint();
bool MultiplexDisabled();
bool MultiplexDisabled(bool check_delay_token = true);
bool IsKeepMultiplexEnabledVariables(char *query_digest_text);
void ProcessQueryAndSetStatusFlags(char *query_digest_text);
void optimize();
Expand Down
1 change: 1 addition & 0 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2726,6 +2726,7 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo
MySrvC *mysrvc=NULL;
if (_lock)
wrlock();
c->auto_increment_delay_token = 0;
status.myconnpoll_push++;
mysrvc=(MySrvC *)c->parent;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
Expand Down
84 changes: 81 additions & 3 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

#define EXPMARIA

using std::function;
using std::vector;

static inline char is_digit(char c) {
if(c >= '0' && c <= '9')
Expand Down Expand Up @@ -686,6 +688,29 @@ MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) {
return NULL; // NULL = backend not found
};

void MySQL_Session::update_expired_conns(const vector<function<bool(MySQL_Connection*)>>& checks) {
for (uint32_t i = 0; i < mybes->len; i++) {
MySQL_Backend* mybe = static_cast<MySQL_Backend*>(mybes->index(i));
MySQL_Data_Stream* myds = mybe != nullptr ? mybe->server_myds : nullptr;
MySQL_Connection* myconn = myds != nullptr ? myds->myconn : nullptr;

if (myconn != nullptr) {
const bool is_active_transaction = myconn->IsActiveTransaction();
const bool multiplex_disabled = myconn->MultiplexDisabled(false);
const bool is_idle = myconn->async_state_machine == ASYNC_IDLE;

// Make sure the connection is reusable before performing any check
if (myconn->reusable==true && is_active_transaction==false && multiplex_disabled==false && is_idle) {
for (const function<bool(MySQL_Connection*)>& check : checks) {
if (check(myconn)) {
this->hgs_expired_conns.push_back(mybe->hostgroup_id);
break;
}
}
}
}
}
}

MySQL_Backend * MySQL_Session::create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) {
MySQL_Backend *_mybe=new MySQL_Backend();
Expand Down Expand Up @@ -4352,6 +4377,9 @@ int MySQL_Session::RunQuery(MySQL_Data_Stream *myds, MySQL_Connection *myconn) {

// this function was inline
void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
// NOTE: Maintenance of 'multiplex_delayed' has been moved to 'housekeeping_before_pkts'. The previous impl
// is left below as an example of how to perform a more passive maintenance over session connections.
/*
if (mybes) {
MySQL_Backend *_mybe;
unsigned int i;
Expand All @@ -4372,6 +4400,34 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
}
}
}
*/
}

void MySQL_Session::housekeeping_before_pkts() {
if (mysql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
MySQL_Backend* mybe = find_backend(hg_id);

if (mybe != nullptr) {
MySQL_Data_Stream* myds = mybe->server_myds;

if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) {
if (mysql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
} else {
myds->return_MySQL_Connection_To_Pool();
}
}
}
// We are required to perform a cleanup after consuming the elements, thus preventing any subsequent
// 'handler' call to perform recomputing of the already processed elements.
if (hgs_expired_conns.empty() == false) {
hgs_expired_conns.clear();
}
}
}

// this function was inline
Expand Down Expand Up @@ -4426,6 +4482,7 @@ int MySQL_Session::handler() {
}
}

housekeeping_before_pkts();
handler_ret = get_pkts_from_client(wrong_pass, pkt);
if (handler_ret != 0) {
return handler_ret;
Expand Down Expand Up @@ -7257,6 +7314,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_
new_myds->myprot.init(&new_myds, new_myds->myconn->userinfo, NULL);
new_sess->status = RESETTING_CONNECTION;
mc->async_state_machine = ASYNC_IDLE; // may not be true, but is used to correctly perform error handling
mc->auto_increment_delay_token = 0;
new_myds->DSS = STATE_MARIADB_QUERY;
thread->register_session_connection_handler(new_sess,true);
if (new_myds->mypolls==NULL) {
Expand Down Expand Up @@ -7377,9 +7435,29 @@ void MySQL_Session::finishQuery(MySQL_Data_Stream *myds, MySQL_Connection *mycon
myds->myconn->set_status(true, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX);
}
}
if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
if (mysql_thread___connection_delay_multiplex_ms && mirror==false) {
myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000;

const bool is_active_transaction = myds->myconn->IsActiveTransaction();
const bool multiplex_disabled_by_status = myds->myconn->MultiplexDisabled(false);

const bool multiplex_delayed = myds->myconn->auto_increment_delay_token > 0;
const bool multiplex_delayed_with_timeout =
!multiplex_disabled_by_status && multiplex_delayed && mysql_thread___auto_increment_delay_multiplex_timeout_ms > 0;

const bool multiplex_disabled = !multiplex_disabled_by_status && (!multiplex_delayed || multiplex_delayed_with_timeout);
const bool conn_is_reusable = myds->myconn->reusable == true && !is_active_transaction && multiplex_disabled;

if (mysql_thread___multiplexing && conn_is_reusable) {
if ((mysql_thread___connection_delay_multiplex_ms || multiplex_delayed_with_timeout) && mirror==false) {
if (multiplex_delayed_with_timeout) {
uint64_t delay_multiplex_us = mysql_thread___connection_delay_multiplex_ms * 1000;
uint64_t auto_increment_delay_us = mysql_thread___auto_increment_delay_multiplex_timeout_ms * 1000;
uint64_t delay_us = delay_multiplex_us > auto_increment_delay_us ? delay_multiplex_us : auto_increment_delay_us;

myds->wait_until=thread->curtime + delay_us;
} else {
myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000;
}

myconn->async_state_machine=ASYNC_IDLE;
myconn->multiplex_delayed=true;
myds->DSS=STATE_MARIADB_GENERIC;
Expand Down
40 changes: 35 additions & 5 deletions lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
//#define __CLASS_STANDARD_MYSQL_THREAD_H

#include <functional>
#include <vector>

#include "MySQL_HostGroups_Manager.h"
#include "prometheus_helpers.h"
#define MYSQL_THREAD_IMPLEMENTATION
Expand All @@ -17,6 +21,9 @@
#include "MySQL_PreparedStatement.h"
#include "MySQL_Logger.hpp"

using std::vector;
using std::function;

#ifdef DEBUG
MySQL_Session *sess_stopat;
#endif
Expand Down Expand Up @@ -1641,6 +1648,14 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi
// integer variable ?
std::unordered_map<std::string, std::tuple<int *, int, int, bool>>::const_iterator it = VariablesPointers_int.find(nameS);
if (it != VariablesPointers_int.end()) {
// Log warnings for variables with possibly wrong values
if (nameS == "auto_increment_delay_multiplex_timeout_ms") {
int intv = atoi(value);
if (intv <= 60) {
proxy_warning("'mysql-auto_increment_delay_multiplex_timeout_ms' is set to a low value: %ums. Remember value is in 'ms'\n", intv);
}
}

bool special_variable = std::get<3>(it->second); // if special_variable is true, min and max values are ignored, and more input validation is needed
if (special_variable == false) {
int intv=atoi(value);
Expand Down Expand Up @@ -3733,12 +3748,27 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig
}
}

if (sess->mybe && sess->mybe->server_myds && sess->mybe->server_myds->myconn) {
MySQL_Connection* myconn = sess->mybe->server_myds->myconn;
// Perform the maintenance for expired connections on the session
if (mysql_thread___multiplexing) {
const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const uint64_t multiplex_timeout_ms = mysql_thread___auto_increment_delay_multiplex_timeout_ms;
const bool multiplex_delayed_enabled = multiplex_timeout_ms != 0 && myconn->auto_increment_delay_token > 0;
const bool timeout_expired = multiplex_delayed_enabled && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime;
return timeout_expired;
};

if (mysql_thread___auto_increment_delay_multiplex_timeout_ms != 0 && (sess_time/1000 > (unsigned long long)mysql_thread___auto_increment_delay_multiplex_timeout_ms)) {
myconn->auto_increment_delay_token = 0;
}
const auto conn_delay_multiplex = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const bool multiplex_delayed = mysql_thread___connection_delay_multiplex_ms != 0 && myconn->multiplex_delayed == true;
const bool timeout_expired = multiplex_delayed && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime;
return timeout_expired;
};

const vector<function<bool(MySQL_Connection*)>> expire_conn_checks {
auto_incr_delay_multiplex_check,
conn_delay_multiplex
};

sess->update_expired_conns(expire_conn_checks);
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/mysql_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2321,14 +2321,14 @@ bool MySQL_Connection::IsAutoCommit() {
return ret;
}

bool MySQL_Connection::MultiplexDisabled() {
bool MySQL_Connection::MultiplexDisabled(bool check_delay_token) {
// status_flags stores information about the status of the connection
// can be used to determine if multiplexing can be enabled or not
bool ret=false;
if (status_flags & (STATUS_MYSQL_CONNECTION_TRANSACTION|STATUS_MYSQL_CONNECTION_USER_VARIABLE|STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT|STATUS_MYSQL_CONNECTION_LOCK_TABLES|STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE|STATUS_MYSQL_CONNECTION_GET_LOCK|STATUS_MYSQL_CONNECTION_NO_MULTIPLEX|STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0|STATUS_MYSQL_CONNECTION_FOUND_ROWS|STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) ) {
ret=true;
}
if (auto_increment_delay_token) return true;
if (check_delay_token && auto_increment_delay_token) return true;
return ret;
}

Expand Down
Loading