From 53bf18b428e34be4e0a692997d7e0f6154634b33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 11:30:03 +0200 Subject: [PATCH 01/13] Changed setting readonly servers 'OFFLINE' due to replication lag behavior in favor of general server 'SHUNNING' --- include/MySQL_HostGroups_Manager.h | 1 + lib/MySQL_HostGroups_Manager.cpp | 97 ++++++++++++++++++++++++++++-- lib/MySQL_Monitor.cpp | 28 ++++++--- 3 files changed, 113 insertions(+), 13 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 6f74e142e8..9b728543a8 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -558,6 +558,7 @@ class MySQL_HostGroups_Manager { void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); void converge_group_replication_config(int _writer_hostgroup); + void group_replication_lag_action(int _hid, char *address, unsigned int port, bool read_only, bool enable); void update_galera_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error, bool soft=false); void update_galera_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 30d44f6cb2..50468e62f4 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -16,6 +16,7 @@ #include #include "prometheus_helpers.h" +#include "proxysql_utils.h" #define char_malloc (char *)malloc #define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); } @@ -3371,6 +3372,93 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u GloAdmin->mysql_servers_wrunlock(); } +void MySQL_HostGroups_Manager::group_replication_lag_action( + int _hid, char *address, unsigned int port, bool read_only, bool enable +) { + GloAdmin->mysql_servers_wrlock(); + wrlock(); + int i,j; + + int reader_hostgroup = 0; + bool writer_is_also_reader = false; + + // Get the reader_hostgroup for the supplied writter hostgroup + std::string t_reader_hostgroup_query { + "SELECT reader_hostgroup,writer_is_also_reader FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" + }; + std::string reader_hostgroup_query {}; + string_format(t_reader_hostgroup_query, reader_hostgroup_query, _hid); + + int cols=0; + char *error=NULL; + int affected_rows=0; + SQLite3_result* rhid_res=NULL; + SQLite3_row* rhid_row=nullptr; + + mydb->execute_statement( + reader_hostgroup_query.c_str(), &error , &cols , &affected_rows , &rhid_res + ); + + // If the is now reader hostgroup configured for 'mysql_group_replication_hostgroups' + // an invalid configuration was somehow inserted. + if (rhid_res->rows.empty() || rhid_res->rows[0]->get_size() == 0) { + goto __exit_replication_lag_action; + } + + rhid_row = rhid_res->rows[0]; + reader_hostgroup = atoi(rhid_row->fields[0]); + writer_is_also_reader = atoi(rhid_row->fields[1]); + + for (i=0; i<(int)MyHostGroups->len; i++) { + MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); + + if (read_only) { + if (_hid >= 0 && reader_hostgroup != (int)myhgc->hid) { + continue; + } + } else { + if (writer_is_also_reader) { + if (_hid >= 0 && _hid != (int)myhgc->hid && reader_hostgroup != (int)myhgc->hid) { + continue; + } + } + } + + int servers_found = 0; + + for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) { + MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); + if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { + // First server found + servers_found += 1; + + if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && enable == false) { + proxy_warning("Shunning server %s:%d from HG %u with replication lag, count number: '%d'\n", address, port, myhgc->hid, mysrvc->cur_replication_lag_count); + mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; + } else { + if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG && enable == true) { + mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; + proxy_warning("Re-enabling server %s:%d from HG %u with replication lag\n", address, port, myhgc->hid); + } + } + + if (!writer_is_also_reader) { + goto __exit_replication_lag_action; + } else { + if (servers_found == 2) { + goto __exit_replication_lag_action; + } + } + } + } + } + +__exit_replication_lag_action: + + wrunlock(); + GloAdmin->mysql_servers_wrunlock(); +} + void MySQL_HostGroups_Manager::drop_all_idle_connections() { // NOTE: the caller should hold wrlock int i, j; @@ -4675,6 +4763,7 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna bool found_reader=false; int read_HG=-1; bool need_converge=false; + int status=0; if (resultset) { // let's get info about this cluster pthread_mutex_lock(&Group_Replication_Info_mutex); @@ -4695,7 +4784,7 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna SQLite3_row *r=*it; int hostgroup=atoi(r->fields[0]); if (hostgroup==_writer_hostgroup) { - int status = atoi(r->fields[1]); + status = atoi(r->fields[1]); if (status == 0) { found_writer=true; } @@ -4738,10 +4827,10 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); sprintf(query,q,_hostname,_port,_writer_hostgroup); mydb->execute(query); - //free(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; + // NOTE: The status should be preserved in case of being SHUNNED + q=(char *)"UPDATE mysql_servers_incoming SET status=%d WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); + sprintf(query,q,status,_hostname,_port,_writer_hostgroup); mydb->execute(query); //free(query); if (writer_is_also_reader && read_HG>=0) { diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index acbfe39e30..00a88c5f64 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -1546,10 +1546,12 @@ void * monitor_group_replication_thread(void *arg) { mmsd->hostname, mmsd->port, num_timeouts, mmsd->max_transactions_behind_count); } } - int lag_counts = 0; - if (read_only) { - lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind); - } + // NOTE: Previously 'lag_counts' was only updated for 'read_only' + // because 'writers' were never selected for being set 'OFFLINE' due to + // replication lag. Since the change of this behavior to 'SHUNNING' + // with replication lag, no matter it's 'read_only' value, 'lag_counts' + // is computed everytime. + int lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind); pthread_mutex_unlock(&GloMyMon->group_replication_mutex); // NOTE: we update MyHGM outside the mutex group_replication_mutex @@ -1573,16 +1575,24 @@ void * monitor_group_replication_thread(void *arg) { MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO"); } else { if (read_only==true) { - if (lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) { - MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging"); - } else { - MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); - } + MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); } else { // the node is a writer // TODO: for now we don't care about the number of writers MyHGM->update_group_replication_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); } + + // NOTE: Replication lag action should takes place **after** the + // servers have been placed in the correct hostgroups, otherwise + // during the reconfiguration of the servers due to 'update_group_replication_set_writer' + // there would be a small window in which the 'SHUNNED' server + // will be treat as 'ONLINE' letting some new connections to + // take places, before it becomes 'SHUNNED' again. + bool enable = true; + if (lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) { + enable = false; + } + MyHGM->group_replication_lag_action(mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, read_only, enable); } } From 33aa80c265d32db6be30ea56370282d27f8d2c72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 11:31:36 +0200 Subject: [PATCH 02/13] Prevent servers that has been placed as 'OFFLINE_SOFT' of becoming writers --- lib/MySQL_HostGroups_Manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 50468e62f4..943f839ba6 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4748,7 +4748,7 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna char *query=NULL; char *q=NULL; char *error=NULL; - q=(char *)"SELECT hostgroup_id, status FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; + q=(char *)"SELECT hostgroup_id, status FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3 AND status <>2"; query=(char *)malloc(strlen(q)+strlen(_hostname)+32); sprintf(query,q,_hostname,_port); mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); From 8a0b8728c08f54a849647dfa1e7850cece3ed14b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 11:33:01 +0200 Subject: [PATCH 03/13] Introduced a simple way of performing manual testing for 'GROUPREP' via 'SQLite3' server --- src/SQLite3_Server.cpp | 121 ++++++++++++++++++++++++++++++++--------- 1 file changed, 95 insertions(+), 26 deletions(-) diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index 7723d2cacc..de5d902c7d 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -233,7 +233,26 @@ class sqlite3server_main_loop_listeners { static sqlite3server_main_loop_listeners S_amll; +/** + * @brief TODO: This function is a duplicate of the one found in 'ProxySQL_RESTAPI_Server.cpp'. + * Function should be extracted from the another file and placed in a common + * resource file for utilities. + */ +std::string replace_str_(const std::string& str, const std::string& match, const std::string& repl) { + if(match.empty()) { + return str; + } + + std::string result = str; + size_t start_pos = 0; + while((start_pos = result.find(match, start_pos)) != std::string::npos) { + result.replace(start_pos, match.length(), repl); + start_pos += repl.length(); + } + + return result; +} void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { @@ -555,8 +574,22 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p #ifdef TEST_GROUPREP if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) { pthread_mutex_lock(&GloSQLite3Server->grouprep_mutex); - GloSQLite3Server->populate_grouprep_table(sess, testLag); - if (testLag > 0) testLag--; + GloSQLite3Server->populate_grouprep_table(sess, 0); + + // Rewrite SELECT queries for matching the table which values + // should be used for the extracted 'server_id'. This allows + // simple manual configuration of the servers for testing + // purposes. + string myip = string(sess->client_myds->proxy_addr.addr); + string server_id = myip.substr(8,1); + string new_query = replace_str_1( + query, "GR_MEMBER_ROUTING_CANDIDATE_STATUS", + "GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + ); + + free(query); + query = static_cast(malloc(new_query.length() + 1)); + strcpy(query, new_query.c_str()); } #endif // TEST_GROUPREP if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) { @@ -595,20 +628,26 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p #ifdef TEST_GROUPREP if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) { pthread_mutex_unlock(&GloSQLite3Server->grouprep_mutex); - if (resultset->rows_count == 0) { - PROXY_TRACE(); - } - if (strncmp("127.2.1.2", sess->client_myds->proxy_addr.addr,9) == 0) { - if (testTimeoutSequence[testIndex--]) - sleep(2); - if (testIndex < 0) - testIndex = 7; - } - else { - if (rand() % 20 == 0) - sleep(2); - } + // NOTE: Old IMPL can be enabled for manual testing + // **************************************************************** + + // if (resultset->rows_count == 0) { + // PROXY_TRACE(); + // } + + // if (strncmp("127.2.1.2", sess->client_myds->proxy_addr.addr,9) == 0) { + // if (testTimeoutSequence[testIndex--]) + // sleep(2); + // if (testIndex < 0) + // testIndex = 7; + // } + // else { + // if (rand() % 20 == 0) + // sleep(2); + // } + + // **************************************************************** } #endif // TEST_GROUPREP if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) { @@ -1085,18 +1124,41 @@ void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) { #endif // TEST_AURORA #ifdef TEST_GROUPREP +/** + * @brief Populates the 'grouprep' table if it's found empty with the default + * values for the three testing servers. + * + * NOTE: This function needs to be called with lock on mutex galera_mutex already acquired + * + * @param sess The current session performing a query. + * @param txs_behind Unused parameter. + */ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind) { - // this function needs to be called with lock on mutex galera_mutex already acquired - // - sessdb->execute("DELETE FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS"); + char *error=NULL; + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + string myip = string(sess->client_myds->proxy_addr.addr); string server_id = myip.substr(8,1); - if (server_id == "1") - sessdb->execute("INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'NO', 0)"); - else { - std::stringstream ss; - ss << "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'YES', " << txs_behind << ")"; - sessdb->execute(ss.str().c_str()); + string query { "SELECT * FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + " LIMIT 1" }; + sessdb->execute_statement(query.c_str(), &error , &cols , &affected_rows , &resultset); + + if (resultset->rows_count==0) { + if (server_id == "1") { + std::string insert_query { + "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + + " (viable_candidate, read_only, transactions_behind) VALUES ('YES', 'NO', 0)" + }; + sessdb->execute(insert_query.c_str()); + } else { + std::string insert_query { + "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + + " (viable_candidate, read_only, transactions_behind) VALUES" + " ('YES', 'YES', 0)" + }; + sessdb->execute(insert_query.c_str()); + } } } #endif // TEST_GALERA @@ -1169,8 +1231,15 @@ bool SQLite3_Server::init() { #ifdef TEST_GROUPREP tables_defs_grouprep = new std::vector; insert_into_tables_defs(tables_defs_grouprep, - (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS", - (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); + (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS1", + (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS1 (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); + insert_into_tables_defs(tables_defs_grouprep, + (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS2", + (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS2 (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); + insert_into_tables_defs(tables_defs_grouprep, + (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS3", + (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS3 (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); + check_and_build_standard_tables(sessdb, tables_defs_grouprep); GloAdmin->enable_grouprep_testing(); #endif // TEST_GALERA From 0850c4d596a7c42f390f3a2259a8961a4eb1cd12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 13:27:08 +0200 Subject: [PATCH 04/13] Improved the documentation for 'group_replication_lag_action' --- include/MySQL_HostGroups_Manager.h | 17 +++++++++++++++++ lib/MySQL_HostGroups_Manager.cpp | 5 +++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 9b728543a8..2cbc9d9635 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -558,6 +558,23 @@ class MySQL_HostGroups_Manager { void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); void converge_group_replication_config(int _writer_hostgroup); + /** + * @brief Set the supplied server as SHUNNED, this function shall be called + * to 'SHUNNED' those servers which replication lag is bigger than: + * - `mysql_thread___monitor_groupreplication_max_transactions_behind_count` + * + * @details The function automatically handles if the supplies server is a + * writer, and the 'writer_is_also_reader' flag is present in that + * hostgroup. In that case, it also sets as 'SHUNNED' the corresponding + * server that is present in the 'reader_hostgroup'. + * + * @param _hid The writer hostgroup. + * @param address The server address. + * @param port The server port. + * @param read_only Boolean specifying the read_only flag value of the server. + * @param enable Boolean specifying if the server needs to be disabled / enabled, + * 'true' for enabling the server if it's 'SHUNNED', 'false' for disabling it. + */ void group_replication_lag_action(int _hid, char *address, unsigned int port, bool read_only, bool enable); void update_galera_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error, bool soft=false); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 943f839ba6..29d16ef80a 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -3399,8 +3399,7 @@ void MySQL_HostGroups_Manager::group_replication_lag_action( reader_hostgroup_query.c_str(), &error , &cols , &affected_rows , &rhid_res ); - // If the is now reader hostgroup configured for 'mysql_group_replication_hostgroups' - // an invalid configuration was somehow inserted. + // If the server isn't present in the supplied hostgroup, there is nothing to do. if (rhid_res->rows.empty() || rhid_res->rows[0]->get_size() == 0) { goto __exit_replication_lag_action; } @@ -3417,6 +3416,8 @@ void MySQL_HostGroups_Manager::group_replication_lag_action( continue; } } else { + // In case of 'writer_is_also_reader' the server can be present + // in both, the 'reader_hostgroup' and the 'writer_hostgroup'. if (writer_is_also_reader) { if (_hid >= 0 && _hid != (int)myhgc->hid && reader_hostgroup != (int)myhgc->hid) { continue; From b929758a87ddf4da027db2ed4cc008c80df0c345 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 18:50:44 +0200 Subject: [PATCH 05/13] Several improvements to 'group_replication_lag_action' 1. Introduced new global variable: 'monitor_groupreplication_max_transaction_behind_for_read_only', that modifies the behavior of 'group_replication_lag'. 2. Improved logic making use of 'MyHGC_find' instead of directly searching 'MyHostGroups' structure. 3. Improved 'group_replication_lag' documentation with new implementation updates. 4. Introduced changes to 'update_group_replication_set_writer' preserving writters placed in 'OFFLINE_SOFT' state. --- include/MySQL_HostGroups_Manager.h | 16 +++-- include/MySQL_Thread.h | 1 + include/proxysql_structs.h | 2 + lib/MySQL_HostGroups_Manager.cpp | 104 ++++++++++++++++------------- lib/MySQL_Monitor.cpp | 4 +- lib/MySQL_Thread.cpp | 4 ++ 6 files changed, 79 insertions(+), 52 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 2cbc9d9635..26da94cbad 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -563,19 +563,25 @@ class MySQL_HostGroups_Manager { * to 'SHUNNED' those servers which replication lag is bigger than: * - `mysql_thread___monitor_groupreplication_max_transactions_behind_count` * - * @details The function automatically handles if the supplies server is a - * writer, and the 'writer_is_also_reader' flag is present in that - * hostgroup. In that case, it also sets as 'SHUNNED' the corresponding - * server that is present in the 'reader_hostgroup'. + * @details The function automatically handles the appropriate operation to + * perform on the supplied server, based on the supplied 'enable' flag and + * in 'monitor_groupreplication_max_transaction_behind_for_read_only' + * variable. In case the value of the variable is: + * + * * '0' or '2': It's required to search the writer hostgroup for + * finding the supplied server. + * * '1' or '2': It's required to search the reader hostgroup for + * finding the supplied server. * * @param _hid The writer hostgroup. * @param address The server address. * @param port The server port. + * @param lag_counts The computed lag for the sever. * @param read_only Boolean specifying the read_only flag value of the server. * @param enable Boolean specifying if the server needs to be disabled / enabled, * 'true' for enabling the server if it's 'SHUNNED', 'false' for disabling it. */ - void group_replication_lag_action(int _hid, char *address, unsigned int port, bool read_only, bool enable); + void group_replication_lag_action(int _hid, char *address, unsigned int port, int lag_counts, bool read_only, bool enable); void update_galera_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error, bool soft=false); void update_galera_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index a3dd6aef71..1a6d25bede 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -410,6 +410,7 @@ class MySQL_Threads_Handler int monitor_groupreplication_healthcheck_timeout; int monitor_groupreplication_healthcheck_max_timeout_count; int monitor_groupreplication_max_transactions_behind_count; + int monitor_groupreplication_max_transactions_behind_for_read_only; int monitor_galera_healthcheck_interval; int monitor_galera_healthcheck_timeout; int monitor_galera_healthcheck_max_timeout_count; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index ca53c1319b..3ca15113f6 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -838,6 +838,7 @@ __thread int mysql_thread___monitor_groupreplication_healthcheck_interval; __thread int mysql_thread___monitor_groupreplication_healthcheck_timeout; __thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count; __thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count; +__thread int mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only; __thread int mysql_thread___monitor_galera_healthcheck_interval; __thread int mysql_thread___monitor_galera_healthcheck_timeout; __thread int mysql_thread___monitor_galera_healthcheck_max_timeout_count; @@ -988,6 +989,7 @@ extern __thread int mysql_thread___monitor_replication_lag_count; extern __thread int mysql_thread___monitor_groupreplication_healthcheck_interval; extern __thread int mysql_thread___monitor_groupreplication_healthcheck_timeout; extern __thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count; +extern __thread int mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only; extern __thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count; extern __thread int mysql_thread___monitor_galera_healthcheck_interval; extern __thread int mysql_thread___monitor_galera_healthcheck_timeout; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 29d16ef80a..5d5720c057 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -3372,12 +3372,46 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u GloAdmin->mysql_servers_wrunlock(); } +/** + * @brief Finds the supplied server in the provided 'MyHGC' and sets the status + * either to 'MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG' if 'enable' is + * 'false' or 'MYSQL_SERVER_STATUS_ONLINE' if 'true'. + * + * @param myhgc The MySQL Hostgroup Container in which to perform the server + * search. + * @param address The server address. + * @param port The server port. + * @param lag_count The lag count, computed by 'get_lag_behind_count'. + * @param enable Boolean specifying if the server should be enabled or not. + */ +void lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) { + for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) { + MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); + if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { + if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && enable == false) { + proxy_warning( + "Shunning server %s:%d from HG %u with replication lag, count number: '%d'\n", + address, port, myhgc->hid, lag_count + ); + mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; + } else { + if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG && enable == true) { + mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; + proxy_warning( + "Re-enabling server %s:%d from HG %u with replication lag\n", + address, port, myhgc->hid, lag_count + ); + } + } + } + } +} + void MySQL_HostGroups_Manager::group_replication_lag_action( - int _hid, char *address, unsigned int port, bool read_only, bool enable + int _hid, char *address, unsigned int port, int lag_counts, bool read_only, bool enable ) { GloAdmin->mysql_servers_wrlock(); wrlock(); - int i,j; int reader_hostgroup = 0; bool writer_is_also_reader = false; @@ -3408,49 +3442,27 @@ void MySQL_HostGroups_Manager::group_replication_lag_action( reader_hostgroup = atoi(rhid_row->fields[0]); writer_is_also_reader = atoi(rhid_row->fields[1]); - for (i=0; i<(int)MyHostGroups->len; i++) { - MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); + { + MyHGC* myhgc = nullptr; - if (read_only) { - if (_hid >= 0 && reader_hostgroup != (int)myhgc->hid) { - continue; - } - } else { - // In case of 'writer_is_also_reader' the server can be present - // in both, the 'reader_hostgroup' and the 'writer_hostgroup'. - if (writer_is_also_reader) { - if (_hid >= 0 && _hid != (int)myhgc->hid && reader_hostgroup != (int)myhgc->hid) { - continue; - } + if ( + mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 0 || + mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 2 || + enable + ) { + if (read_only == false) { + myhgc = MyHGM->MyHGC_find(_hid); + lag_action_set_server_status(myhgc, address, port, lag_counts, enable); } } - int servers_found = 0; - - for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) { - MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); - if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { - // First server found - servers_found += 1; - - if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && enable == false) { - proxy_warning("Shunning server %s:%d from HG %u with replication lag, count number: '%d'\n", address, port, myhgc->hid, mysrvc->cur_replication_lag_count); - mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; - } else { - if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG && enable == true) { - mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; - proxy_warning("Re-enabling server %s:%d from HG %u with replication lag\n", address, port, myhgc->hid); - } - } - - if (!writer_is_also_reader) { - goto __exit_replication_lag_action; - } else { - if (servers_found == 2) { - goto __exit_replication_lag_action; - } - } - } + if ( + mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 1 || + mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 2 || + enable + ) { + myhgc = MyHGM->MyHGC_find(reader_hostgroup); + lag_action_set_server_status(myhgc, address, port, lag_counts, enable); } } @@ -4749,7 +4761,7 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna char *query=NULL; char *q=NULL; char *error=NULL; - q=(char *)"SELECT hostgroup_id, status FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3 AND status <>2"; + q=(char *)"SELECT hostgroup_id, status FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; query=(char *)malloc(strlen(q)+strlen(_hostname)+32); sprintf(query,q,_hostname,_port); mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); @@ -4786,7 +4798,7 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna int hostgroup=atoi(r->fields[0]); if (hostgroup==_writer_hostgroup) { status = atoi(r->fields[1]); - if (status == 0) { + if (status == 0 || status == 2) { found_writer=true; } } @@ -4828,10 +4840,10 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); sprintf(query,q,_hostname,_port,_writer_hostgroup); mydb->execute(query); - // NOTE: The status should be preserved in case of being SHUNNED q=(char *)"UPDATE mysql_servers_incoming SET status=%d WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,status,_hostname,_port,_writer_hostgroup); + // NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise + // we set the server as 'ONLINE'. + sprintf(query, q, (status == 2 ? 2 : 0 ), _hostname, _port, _writer_hostgroup); mydb->execute(query); //free(query); if (writer_is_also_reader && read_HG>=0) { diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 00a88c5f64..0e13c8ce10 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -1592,7 +1592,9 @@ void * monitor_group_replication_thread(void *arg) { if (lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) { enable = false; } - MyHGM->group_replication_lag_action(mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, read_only, enable); + MyHGM->group_replication_lag_action( + mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, lag_counts, read_only, enable + ); } } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 1c29278ba9..7b083ab51c 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -456,6 +456,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_groupreplication_healthcheck_timeout", (char *)"monitor_groupreplication_healthcheck_max_timeout_count", (char *)"monitor_groupreplication_max_transactions_behind_count", + (char *)"monitor_groupreplication_max_transactions_behind_for_read_only", (char *)"monitor_galera_healthcheck_interval", (char *)"monitor_galera_healthcheck_timeout", (char *)"monitor_galera_healthcheck_max_timeout_count", @@ -1042,6 +1043,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_groupreplication_healthcheck_timeout=800; variables.monitor_groupreplication_healthcheck_max_timeout_count=3; variables.monitor_groupreplication_max_transactions_behind_count=3; + variables.monitor_groupreplication_max_transactions_behind_for_read_only=1; variables.monitor_galera_healthcheck_interval=5000; variables.monitor_galera_healthcheck_timeout=800; variables.monitor_galera_healthcheck_max_timeout_count=3; @@ -2030,6 +2032,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_groupreplication_healthcheck_timeout"] = make_tuple(&variables.monitor_groupreplication_healthcheck_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_groupreplication_healthcheck_max_timeout_count"] = make_tuple(&variables.monitor_groupreplication_healthcheck_max_timeout_count, 1, 10, false); VariablesPointers_int["monitor_groupreplication_max_transactions_behind_count"] = make_tuple(&variables.monitor_groupreplication_max_transactions_behind_count, 1, 10, false); + VariablesPointers_int["monitor_groupreplication_max_transactions_behind_for_read_only"] = make_tuple(&variables.monitor_groupreplication_max_transactions_behind_for_read_only, 0, 2, false); VariablesPointers_int["monitor_galera_healthcheck_interval"] = make_tuple(&variables.monitor_galera_healthcheck_interval, 50, 7*24*3600*1000, false); VariablesPointers_int["monitor_galera_healthcheck_timeout"] = make_tuple(&variables.monitor_galera_healthcheck_timeout, 50, 600*1000, false); @@ -3565,6 +3568,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_groupreplication_healthcheck_timeout=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_timeout"); mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_max_timeout_count"); mysql_thread___monitor_groupreplication_max_transactions_behind_count=GloMTH->get_variable_int((char *)"monitor_groupreplication_max_transactions_behind_count"); + mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only=GloMTH->get_variable_int((char *)"monitor_groupreplication_max_transactions_behind_for_read_only"); mysql_thread___monitor_galera_healthcheck_interval=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_interval"); mysql_thread___monitor_galera_healthcheck_timeout=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_timeout"); mysql_thread___monitor_galera_healthcheck_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_max_timeout_count"); From 3873f0b937106c4b39512aa272fac0feb113fe68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 19:19:25 +0200 Subject: [PATCH 06/13] Fixed compilation with invalid call to renamed function --- src/SQLite3_Server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index de5d902c7d..7712a8de9d 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -582,7 +582,7 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p // purposes. string myip = string(sess->client_myds->proxy_addr.addr); string server_id = myip.substr(8,1); - string new_query = replace_str_1( + string new_query = replace_str_( query, "GR_MEMBER_ROUTING_CANDIDATE_STATUS", "GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id ); From edc631ba6c63f21f5423ca7dc1eccea21afb8b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 22 Jul 2021 19:44:04 +0200 Subject: [PATCH 07/13] Added missing parameter 'lag_count' to 'proxy_warning' from 'lag_action_set_server_status' --- lib/MySQL_HostGroups_Manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 5d5720c057..adfeb840a0 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -3398,7 +3398,7 @@ void lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG && enable == true) { mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; proxy_warning( - "Re-enabling server %s:%d from HG %u with replication lag\n", + "Re-enabling server %s:%d from HG %u with replication lag, count number: '%d'\n", address, port, myhgc->hid, lag_count ); } From 459a3f12c272541454cd6abffb13dd87ccc5bc09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Tue, 27 Jul 2021 14:41:25 +0200 Subject: [PATCH 08/13] Added nullity checks for params for 'lag_action_set_server_status' --- lib/MySQL_HostGroups_Manager.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index adfeb840a0..b0ca4dc457 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -3375,7 +3375,9 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u /** * @brief Finds the supplied server in the provided 'MyHGC' and sets the status * either to 'MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG' if 'enable' is - * 'false' or 'MYSQL_SERVER_STATUS_ONLINE' if 'true'. + * 'false' or 'MYSQL_SERVER_STATUS_ONLINE' if 'true'. If either of the + * 'myhgc' or 'address' params are 'NULL' the function performs no action, + * and returns immediately. * * @param myhgc The MySQL Hostgroup Container in which to perform the server * search. @@ -3385,6 +3387,8 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u * @param enable Boolean specifying if the server should be enabled or not. */ void lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) { + if (myhgc == NULL || address == NULL) return; + for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) { MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { From a6c22462763def09a2c97c7182abea97abdd8ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Tue, 27 Jul 2021 17:21:11 +0200 Subject: [PATCH 09/13] Replaced 'TEST_GROUPREP' impl to better match approach followed for 'READONLY' --- include/SQLite3_Server.h | 7 ++ lib/MySQL_Monitor.cpp | 6 +- src/SQLite3_Server.cpp | 259 ++++++++++++++++++++++++++++----------- 3 files changed, 198 insertions(+), 74 deletions(-) diff --git a/include/SQLite3_Server.h b/include/SQLite3_Server.h index 288d011567..1c26d5f337 100644 --- a/include/SQLite3_Server.h +++ b/include/SQLite3_Server.h @@ -13,6 +13,10 @@ class SQLite3_Session { ~SQLite3_Session(); }; +#ifdef TEST_GROUPREP +using group_rep_status = std::tuple; +#endif + class SQLite3_Server { private: volatile int main_shutdown; @@ -43,6 +47,7 @@ class SQLite3_Server { std::vector *tables_defs_galera; #endif // TEST_GALERA #ifdef TEST_GROUPREP + std::unordered_map grouprep_map; std::vector *tables_defs_grouprep; #endif // TEST_GROUPREP #if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) @@ -69,9 +74,11 @@ class SQLite3_Server { void init_galera_ifaces_string(std::string& s); #endif // TEST_GALERA #ifdef TEST_GROUPREP + unsigned int max_num_grouprep_servers; pthread_mutex_t grouprep_mutex; void populate_grouprep_table(MySQL_Session *sess, int txs_behind = 0); void init_grouprep_ifaces_string(std::string& s); + group_rep_status grouprep_test_value(const std::string& srv_addr); #endif // TEST_GROUPREP SQLite3_Server(); ~SQLite3_Server(); diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 0e13c8ce10..a9edb5f81e 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -1405,7 +1405,11 @@ void * monitor_group_replication_thread(void *arg) { //mmsd->async_exit_status=mysql_ping_start(&mmsd->interr,mmsd->mysql); mmsd->interr=0; // reset the value #ifdef TEST_GROUPREP - mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS"); + { + std::string s { "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" }; + s += " " + std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port); + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,s.c_str()); + } #else mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT viable_candidate,read_only,transactions_behind FROM sys.gr_member_routing_candidate_status"); #endif diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index 7712a8de9d..7511901dc7 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -8,6 +8,7 @@ #include "MySQL_Logger.hpp" #include "MySQL_Data_Stream.h" +#include "proxysql_utils.h" #include "query_processor.h" #include "SQLite3_Server.h" @@ -234,24 +235,78 @@ class sqlite3server_main_loop_listeners { static sqlite3server_main_loop_listeners S_amll; /** - * @brief TODO: This function is a duplicate of the one found in 'ProxySQL_RESTAPI_Server.cpp'. - * Function should be extracted from the another file and placed in a common - * resource file for utilities. + * @brief Helper function that checks if the supplied string + * is a number. + * @param s The string to check. + * @return True if the supplied string is just composed of + * digits, false otherwise. */ -std::string replace_str_(const std::string& str, const std::string& match, const std::string& repl) { - if(match.empty()) { - return str; +bool is_number(const std::string& s) { + if (s.empty()) { return false; } + + for (const auto& d : s) { + if (std::isdigit(d) == false) { + return false; + } + } + + return true; +} + +/** + * @brief Checks if the query matches an specified 'monitor_query' of the + * following format: + * + * "$MONITOR_QUERY" + " hostname:port" + * + * If the query matches, 'true' is returned, false otherwise. + * + * @param monitor_query Query that should be matched against the current + * supplied 'query'. + * @param query Current query, to be matched against the supplied + * 'monitor_query'. + * @return 'true' if the query matches, false otherwise. + */ +bool match_monitor_query(const std::string& monitor_query, const std::string& query) { + if (query.rfind(monitor_query, 0) != 0) { + return false; } - std::string result = str; - size_t start_pos = 0; + std::string srv_address { + query.substr(monitor_query.size()) + }; - while((start_pos = result.find(match, start_pos)) != std::string::npos) { - result.replace(start_pos, match.length(), repl); - start_pos += repl.length(); + // Check that what is beyond this point, is just the servers address, + // written as an identifier 'n.n.n.n:n'. + std::size_t cur_mark_pos = 0; + for (int i = 0; i < 3; i++) { + std::size_t next_mark_pos = srv_address.find('.', cur_mark_pos); + if (next_mark_pos == std::string::npos) { + return false; + } else { + std::string number { + srv_address.substr(cur_mark_pos, next_mark_pos - cur_mark_pos) + }; + + if (is_number(number)) { + cur_mark_pos = next_mark_pos + 1; + } else { + return false; + } + } } - return result; + // Check last part is also a valid number + cur_mark_pos = srv_address.find(':', cur_mark_pos); + if (cur_mark_pos == std::string::npos) { + return false; + } else { + std::string number { + srv_address.substr(cur_mark_pos + 1) + }; + + return is_number(number); + } } void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { @@ -575,21 +630,40 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) { pthread_mutex_lock(&GloSQLite3Server->grouprep_mutex); GloSQLite3Server->populate_grouprep_table(sess, 0); - - // Rewrite SELECT queries for matching the table which values - // should be used for the extracted 'server_id'. This allows - // simple manual configuration of the servers for testing - // purposes. - string myip = string(sess->client_myds->proxy_addr.addr); - string server_id = myip.substr(8,1); - string new_query = replace_str_( - query, "GR_MEMBER_ROUTING_CANDIDATE_STATUS", - "GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id - ); - - free(query); - query = static_cast(malloc(new_query.length() + 1)); - strcpy(query, new_query.c_str()); + // NOTE: This query should be in one place that can be reused by + // 'ProxySQL_Monitor' module. + const std::string grouprep_monitor_test_query_start { + "SELECT viable_candidate,read_only,transactions_behind " + "FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS " + }; + + // If the query matches 'grouprep_monitor_test_query_start', it + // means that the query has been issued by `ProxySQL_Monitor` and + // we need to fetch for the proper values and replace the query + // with one holding the values from `grouprep_map`. + if (match_monitor_query(grouprep_monitor_test_query_start, query_no_space)) { + std::string srv_addr { + query_no_space + grouprep_monitor_test_query_start.size() + }; + + const group_rep_status& gr_srv_status = + GloSQLite3Server->grouprep_test_value(srv_addr); + free(query); + + std::string t_select_as_query { + "SELECT '%s' AS viable_candidate, '%s' AS read_only, %d AS transactions_behind" + }; + std::string select_as_query {}; + string_format( + t_select_as_query, select_as_query, + std::get<0>(gr_srv_status) ? "YES" : "NO", + std::get<1>(gr_srv_status) ? "YES" : "NO", + std::get<2>(gr_srv_status) + ); + + query = static_cast(malloc(select_as_query.length() + 1)); + strcpy(query, select_as_query.c_str()); + } } #endif // TEST_GROUPREP if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) { @@ -629,25 +703,11 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) { pthread_mutex_unlock(&GloSQLite3Server->grouprep_mutex); - // NOTE: Old IMPL can be enabled for manual testing - // **************************************************************** - - // if (resultset->rows_count == 0) { - // PROXY_TRACE(); - // } - - // if (strncmp("127.2.1.2", sess->client_myds->proxy_addr.addr,9) == 0) { - // if (testTimeoutSequence[testIndex--]) - // sleep(2); - // if (testIndex < 0) - // testIndex = 7; - // } - // else { - // if (rand() % 20 == 0) - // sleep(2); - // } - - // **************************************************************** + // NOTE: Enable this just in case of manual testing + // if (rand() % 100 == 0) { + // // randomly add some latency on 1% of the traffic + // sleep(2); + // } } #endif // TEST_GROUPREP if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) { @@ -670,6 +730,18 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p l_free(query_length,query); } +#ifdef TEST_GROUPREP +group_rep_status SQLite3_Server::grouprep_test_value(const std::string& srv_addr) { + group_rep_status cur_srv_st { "YES", "YES", 0 }; + + auto it = grouprep_map.find(srv_addr); + if (it != grouprep_map.end()) { + cur_srv_st = it->second; + } + + return cur_srv_st; +} +#endif SQLite3_Session::SQLite3_Session() { sessdb=new SQLite3DB(); @@ -942,7 +1014,16 @@ void SQLite3_Server::init_grouprep_ifaces_string(std::string& s) { pthread_mutex_init(&grouprep_mutex,NULL); if (!s.empty()) s += ";"; - s += "127.2.1.1:3306;127.2.1.2:3306;127.2.1.3:3306"; + + // Maximum number of servers to simulate. + max_num_grouprep_servers = 50; + for (unsigned int i=0; i < max_num_grouprep_servers; i++) { + s += "127.2.1." + std::to_string(i) + ":3306"; + + if (i != max_num_grouprep_servers) { + s += ";"; + } + } } #endif // TEST_GROUPREP @@ -1128,38 +1209,73 @@ void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) { * @brief Populates the 'grouprep' table if it's found empty with the default * values for the three testing servers. * - * NOTE: This function needs to be called with lock on mutex galera_mutex already acquired + * NOTE: This function needs to be called with lock on grouprep_mutex already acquired * * @param sess The current session performing a query. * @param txs_behind Unused parameter. */ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind) { + GloAdmin->mysql_servers_wrlock(); + // We are going to repopulate the map + this->grouprep_map.clear(); + char *error=NULL; int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; - string myip = string(sess->client_myds->proxy_addr.addr); - string server_id = myip.substr(8,1); - string query { "SELECT * FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + " LIMIT 1" }; - sessdb->execute_statement(query.c_str(), &error , &cols , &affected_rows , &resultset); - - if (resultset->rows_count==0) { - if (server_id == "1") { - std::string insert_query { - "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + - " (viable_candidate, read_only, transactions_behind) VALUES ('YES', 'NO', 0)" + string query { "SELECT * FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" }; + sessdb->execute_statement(query.c_str(), &error, &cols, &affected_rows, &resultset); + if (resultset) { + for (const SQLite3_row* r : resultset->rows) { + std::string srv_addr { std::string(r->fields[0]) + ":" + std::string(r->fields[1]) }; + const group_rep_status srv_status { + std::string { r->fields[2] } == "YES" ? true : false, + std::string { r->fields[3] } == "YES" ? true : false, + atoi(r->fields[4]) }; - sessdb->execute(insert_query.c_str()); - } else { - std::string insert_query { - "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" + server_id + - " (viable_candidate, read_only, transactions_behind) VALUES" - " ('YES', 'YES', 0)" + + this->grouprep_map[srv_addr] = srv_status; + } + } + delete resultset; + + // Insert some default servers for manual testing. + // + // NOTE: This logic can be improved in the future, for now it only populates + // the 'monitoring' data for the default severs. If more servers are placed + // as the default ones, more servers will be placed in their appropiated + // hostgroups with the same pattern as first ones. + if (this->grouprep_map.size() == 0) { + GloAdmin->admindb->execute_statement( + (char*)"SELECT DISTINCT hostname, port, hostgroup_id FROM mysql_servers" + " WHERE hostgroup_id BETWEEN 2700 AND 4200", + &error, &cols , &affected_rows , &resultset + ); + + for (const SQLite3_row* r : resultset->rows) { + std::string hostname { r->fields[0] }; + int port = atoi(r->fields[1]); + int hostgroup_id = atoi(r->fields[1]); + const std::string t_insert_query { + "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" + " (hostname, port, viable_candidate, read_only, transactions_behind) VALUES" + " ('%s', %d, '%s', '%s', 0)" }; - sessdb->execute(insert_query.c_str()); + std::string insert_query {}; + + if (hostgroup_id % 4 == 0) { + string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "NO"); + sessdb->execute(insert_query.c_str()); + } else { + string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "YES"); + sessdb->execute(insert_query.c_str()); + } } + delete resultset; } + + GloAdmin->mysql_servers_wrunlock(); } #endif // TEST_GALERA @@ -1231,14 +1347,11 @@ bool SQLite3_Server::init() { #ifdef TEST_GROUPREP tables_defs_grouprep = new std::vector; insert_into_tables_defs(tables_defs_grouprep, - (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS1", - (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS1 (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); - insert_into_tables_defs(tables_defs_grouprep, - (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS2", - (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS2 (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); - insert_into_tables_defs(tables_defs_grouprep, - (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS3", - (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS3 (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)"); + (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS", + (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS (" + "hostname VARCHAR NOT NULL, port INT NOT NULL, viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null, PRIMARY KEY (hostname, port)" + ")" + ); check_and_build_standard_tables(sessdb, tables_defs_grouprep); GloAdmin->enable_grouprep_testing(); From 1fac83d0a4c12474867240b1ea9916cf74100b6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Wed, 28 Jul 2021 17:32:01 +0200 Subject: [PATCH 10/13] Fixed 'hostgroup_id' index selection in 'populate_grouprep_table' --- src/SQLite3_Server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index 7511901dc7..65a1fd0842 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -1256,7 +1256,7 @@ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind for (const SQLite3_row* r : resultset->rows) { std::string hostname { r->fields[0] }; int port = atoi(r->fields[1]); - int hostgroup_id = atoi(r->fields[1]); + int hostgroup_id = atoi(r->fields[2]); const std::string t_insert_query { "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" " (hostname, port, viable_candidate, read_only, transactions_behind) VALUES" From 9f2c8836bc721c5664859b7cbc09a58b4455711b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Fri, 20 Aug 2021 00:07:58 +0200 Subject: [PATCH 11/13] Fixed removal of servers not belonging to cluster hostgroups by 'group_replication' actions 'set_read_only/set_offline/set_writer' --- lib/MySQL_HostGroups_Manager.cpp | 56 ++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index b0ca4dc457..b67409ca92 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4628,14 +4628,22 @@ void MySQL_HostGroups_Manager::update_group_replication_set_offline(char *_hostn GloAdmin->mysql_servers_wrlock(); mydb->execute("DELETE FROM mysql_servers_incoming"); mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + // NOTE: Only updated the servers that have belong to the same cluster. + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" + " SELECT %d UNION ALL" + " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" + " SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" + ")"; query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup); + sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); + // NOTE: Only delete the servers that have belong to the same cluster. + q=(char*)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" + " SELECT %d UNION ALL" + " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" + " SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" + ")"; + sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); mydb->execute(query); //free(query); q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; @@ -4704,14 +4712,22 @@ void MySQL_HostGroups_Manager::update_group_replication_set_read_only(char *_hos GloAdmin->mysql_servers_wrlock(); mydb->execute("DELETE FROM mysql_servers_incoming"); mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + // NOTE: Only updated the servers that have belong to the same cluster. + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" + " SELECT %d UNION ALL" + " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" + " SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" + ")"; query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup); + sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); + // NOTE: Only delete the servers that have belong to the same cluster. + q=(char*)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" + " SELECT %d UNION ALL" + " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" + " SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" + ")"; + sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); mydb->execute(query); //free(query); q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; @@ -4779,6 +4795,8 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna bool found_writer=false; bool found_reader=false; int read_HG=-1; + int offline_HG=-1; + int backup_writer_HG=-1; bool need_converge=false; int status=0; if (resultset) { @@ -4791,6 +4809,8 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna info=it2->second; writer_is_also_reader=info->writer_is_also_reader; read_HG=info->reader_hostgroup; + offline_HG=info->offline_hostgroup; + backup_writer_HG=info->backup_writer_hostgroup; need_converge=info->need_converge; info->need_converge=false; } @@ -4835,14 +4855,14 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna GloAdmin->mysql_servers_wrlock(); mydb->execute("DELETE FROM mysql_servers_incoming"); mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d"; + // NOTE: Only updated the servers that have belong to the same cluster. + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id IN (%d, %d, %d)"; query=(char *)malloc(strlen(q)+strlen(_hostname)+256); - sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup); + sprintf(query,q,_writer_hostgroup,_hostname,_port,backup_writer_HG,read_HG,offline_HG); mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); + // NOTE: Only delete the servers that have belong to the same cluster. + q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (%d, %d, %d)"; + sprintf(query,q,_hostname,_port,backup_writer_HG,read_HG,offline_HG); mydb->execute(query); q=(char *)"UPDATE mysql_servers_incoming SET status=%d WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; // NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise From fce6cfb8a0a6d4206b6309a399079c4d479fd52f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Fri, 20 Aug 2021 15:11:02 +0200 Subject: [PATCH 12/13] Improved preservation of 'OFFLINE_SOFT' server state during 'group_replication' update actions --- lib/MySQL_HostGroups_Manager.cpp | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index b67409ca92..9bd7670866 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4646,9 +4646,13 @@ void MySQL_HostGroups_Manager::update_group_replication_set_offline(char *_hostn sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); mydb->execute(query); //free(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); + // q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + // sprintf(query,q,_hostname,_port,_writer_hostgroup); + q=(char *)"UPDATE mysql_servers_incoming SET status=(CASE " + " (SELECT status FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND" + " hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)) WHEN 2 THEN 2 ELSE 0 END)" + " WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + sprintf(query,q,_hostname,_port,_writer_hostgroup,_hostname,_port,_writer_hostgroup); mydb->execute(query); //free(query); converge_group_replication_config(_writer_hostgroup); @@ -4730,9 +4734,12 @@ void MySQL_HostGroups_Manager::update_group_replication_set_read_only(char *_hos sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); mydb->execute(query); //free(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); + // NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise we set the server as 'ONLINE'. + q=(char *)"UPDATE mysql_servers_incoming SET status=(CASE " + " (SELECT status FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND" + " hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)) WHEN 2 THEN 2 ELSE 0 END)" + " WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + sprintf(query,q,_hostname,_port,_writer_hostgroup,_hostname,_port,_writer_hostgroup); mydb->execute(query); //free(query); converge_group_replication_config(_writer_hostgroup); @@ -4799,6 +4806,8 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna int backup_writer_HG=-1; bool need_converge=false; int status=0; + bool offline_soft_found=false; + if (resultset) { // let's get info about this cluster pthread_mutex_lock(&Group_Replication_Info_mutex); @@ -4820,6 +4829,8 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; int hostgroup=atoi(r->fields[0]); + offline_soft_found = atoi(r->fields[1]) == 2 ? true : false; + if (hostgroup==_writer_hostgroup) { status = atoi(r->fields[1]); if (status == 0 || status == 2) { @@ -4833,6 +4844,13 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna } } } + // NOTE: In case of a writer not being found but a 'OFFLINE_SOFT' status + // is found in a hostgroup, 'OFFLINE_SOFT' status should be preserved. + if (found_writer == false) { + if (offline_soft_found) { + status = 2; + } + } if (need_converge==false) { if (found_writer) { // maybe no-op if ( From dd71fcd81fe20b3549fefdb5db606d8df0da71dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Wed, 25 Aug 2021 23:56:41 +0200 Subject: [PATCH 13/13] Shun "soft" and "hard" in group replication lag When shunning a node due to replication lag in a group replication cluster, we first shun the node as MYSQL_SERVER_STATUS_SHUNNED , then we shun it as MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG . In this way we prevent (for a short time) to kill connections on that backend. This backing off from that server can give the server enough time to sync up. See discussion in comments in https://github.com/sysown/proxysql/pull/3533 --- include/MySQL_HostGroups_Manager.h | 3 ++- lib/MySQL_HostGroups_Manager.cpp | 34 +++++++++++++++++------------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 26da94cbad..cc88dfcc87 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -417,6 +417,8 @@ class MySQL_HostGroups_Manager { void p_update_connection_pool_update_counter(std::string& endpoint_id, std::map labels, std::map& m_map, unsigned long long value, p_hg_dyn_counter::metric idx); void p_update_connection_pool_update_gauge(std::string& endpoint_id, std::map labels, std::map& m_map, unsigned long long value, p_hg_dyn_gauge::metric idx); + void group_replication_lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable); + public: std::mutex galera_set_writer_mutex; pthread_rwlock_t gtid_rwlock; @@ -582,7 +584,6 @@ class MySQL_HostGroups_Manager { * 'true' for enabling the server if it's 'SHUNNED', 'false' for disabling it. */ void group_replication_lag_action(int _hid, char *address, unsigned int port, int lag_counts, bool read_only, bool enable); - void update_galera_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error, bool soft=false); void update_galera_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_galera_set_writer(char *_hostname, int _port, int _writer_hostgroup); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 9bd7670866..28ea741afd 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -3386,25 +3386,29 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u * @param lag_count The lag count, computed by 'get_lag_behind_count'. * @param enable Boolean specifying if the server should be enabled or not. */ -void lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) { +void MySQL_HostGroups_Manager::group_replication_lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) { if (myhgc == NULL || address == NULL) return; for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) { MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { - if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && enable == false) { - proxy_warning( - "Shunning server %s:%d from HG %u with replication lag, count number: '%d'\n", - address, port, myhgc->hid, lag_count - ); - mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; - } else { - if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG && enable == true) { + + if (enable == true) { + if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG || mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) { mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; - proxy_warning( - "Re-enabling server %s:%d from HG %u with replication lag, count number: '%d'\n", - address, port, myhgc->hid, lag_count - ); + proxy_info("Re-enabling server %u:%s:%d from replication lag\n", myhgc->hid, address, port); + } + } else { + if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { + proxy_warning("Shunning 'soft' server %u:%s:%d with replication lag, count number: %d\n", myhgc->hid, address, port, lag_count); + mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED; + } else { + if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) { + if (lag_count >= ( mysql_thread___monitor_groupreplication_max_transactions_behind_count * 2 )) { + proxy_warning("Shunning 'hard' server %u:%s:%d with replication lag, count number: %d\n", myhgc->hid, address, port, lag_count); + mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; + } + } } } } @@ -3456,7 +3460,7 @@ void MySQL_HostGroups_Manager::group_replication_lag_action( ) { if (read_only == false) { myhgc = MyHGM->MyHGC_find(_hid); - lag_action_set_server_status(myhgc, address, port, lag_counts, enable); + group_replication_lag_action_set_server_status(myhgc, address, port, lag_counts, enable); } } @@ -3466,7 +3470,7 @@ void MySQL_HostGroups_Manager::group_replication_lag_action( enable ) { myhgc = MyHGM->MyHGC_find(reader_hostgroup); - lag_action_set_server_status(myhgc, address, port, lag_counts, enable); + group_replication_lag_action_set_server_status(myhgc, address, port, lag_counts, enable); } }