diff --git a/sql/binlog.cc b/sql/binlog.cc index b44a120e0268..fd4e326d5f08 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -11887,6 +11887,20 @@ bool set_read_only(THD* thd, ulonglong readonly) DBUG_RETURN(result); } +int trim_logged_gtid(const std::vector& trimmed_gtids) +{ + if (trimmed_gtids.empty()) + return 0; + + global_sid_lock->rdlock(); + + int error = gtid_state->remove_logged_gtid_on_trim(trimmed_gtids); + + global_sid_lock->unlock(); + + return error; +} + #endif /* !defined(MYSQL_CLIENT) */ struct st_mysql_storage_engine binlog_storage_engine= diff --git a/sql/binlog.h b/sql/binlog.h index f98127e4785a..3fd43aedd041 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -1242,6 +1242,7 @@ void reset_semi_sync_last_acked(); bool set_read_only(THD *thd, ulonglong read_only); bool before_set_read_only(THD *thd, ulonglong read_only); void print_read_only_change(THD *thd); +int trim_logged_gtid(const std::vector& trimmed_gtids); #endif #endif /* BINLOG_H_INCLUDED */ diff --git a/sql/replication.h b/sql/replication.h index 862ad455793b..775ffdb9ac39 100644 --- a/sql/replication.h +++ b/sql/replication.h @@ -685,22 +685,24 @@ int get_user_var_str(const char *name, char *value, unsigned long len, unsigned int precision, int *null_value); -/* Type of callbakc that raft pluginh wants to invoke in the server */ +/* Type of callback that raft plugin wants to invoke in the server */ enum class RaftListenerCallbackType { SET_READ_ONLY= 1, UNSET_READ_ONLY= 2, RAFT_LISTENER_THREADS_EXIT= 3, + TRIM_LOGGED_GTIDS= 4, }; -/* Callbak argument, each type needs specialization to pass arguments. - * Setting read only does not need any arguments, so we donot have any - * specialization (yet) */ -class ICallbackArg +/* Callback argument, each type would just populate the fields needed for its + * callback */ +class RaftListenerCallbackArg { public: - explicit ICallbackArg() {} - virtual ~ICallbackArg() {} + explicit RaftListenerCallbackArg() {} + ~RaftListenerCallbackArg() {} + + std::vector trim_gtids= {}; }; class RaftListenerQueue @@ -726,7 +728,7 @@ class RaftListenerQueue struct QueueElement { RaftListenerCallbackType type; - ICallbackArg arg; + RaftListenerCallbackArg arg; }; /* Add an element to the queue. This will signal any listening threads diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index b94de79f3abb..f549f2488940 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -2245,7 +2245,16 @@ class Gtid_state @param thd Thread for which owned groups are updated. */ void update_on_rollback(THD *thd); + #endif // ifndef MYSQL_CLIENT + + /* Remove gtid from logged_gtid when binlog gets trimmed. + + @param trimmed_gtids The gtids to remove from logged_gtids + */ + enum_return_status remove_logged_gtid_on_trim( + const std::vector& trimmed_gtids); + /** Allocates a GNO for an automatically numbered group. diff --git a/sql/rpl_gtid_state.cc b/sql/rpl_gtid_state.cc index 0b8c843b3ca7..5ab5f27aa603 100644 --- a/sql/rpl_gtid_state.cc +++ b/sql/rpl_gtid_state.cc @@ -439,3 +439,46 @@ int Gtid_state::init() DBUG_RETURN(0); } + +enum_return_status Gtid_state::remove_logged_gtid_on_trim( + const std::vector& trimmed_gtids) +{ + DBUG_ENTER("Gtid_state::remove_logged_gtid_on_trim"); + global_sid_lock->assert_some_lock(); + + + if (trimmed_gtids.empty()) + RETURN_OK; + + const auto& first_gtid= trimmed_gtids.front(); + + Gtid gtid; + gtid.parse(global_sid_map, first_gtid.c_str()); + rpl_sidno first_sidno= gtid.sidno; + sid_locks.lock(first_sidno); + + for (const auto& trimmed_gtid : trimmed_gtids) + { + gtid.parse(global_sid_map, trimmed_gtid.c_str()); + DBUG_ASSERT(first_sidno == gtid.sidno); + if (gtid.sidno > 0) + { + /* Remove Gtid from logged_gtid set. */ + DBUG_PRINT("info", ("Removing gtid(sidno:%d, gno:%lld) from logged gtids", + gtid.sidno, gtid.gno)); + if (logged_gtids._remove_gtid(gtid) != RETURN_STATUS_OK) + { + // NO_LINT_DEBUG + sql_print_error("Failed to remove gtid(sidno:%d, gno: %lld) from " + "logged gtids. ", gtid.sidno, gtid.gno); + sid_locks.unlock(first_sidno); + RETURN_REPORTED_ERROR; + + } + } + } + + sid_locks.unlock(first_sidno); + RETURN_OK; +} + diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index 861fbe6894d1..330bf8141318 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -35,7 +35,7 @@ Raft_replication_delegate *raft_replication_delegate; RaftListenerQueue raft_listener_queue; extern bool set_read_only(THD *thd, ulonglong read_only); - +extern int trim_logged_gtid(const std::vector& trimmed_gtids); /* structure to save transaction log filename and position */ @@ -719,6 +719,9 @@ pthread_handler_t process_raft_queue(void *arg) raft_listener_queue.deinit(); exit= true; break; + case RaftListenerCallbackType::TRIM_LOGGED_GTIDS: + trim_logged_gtid(element.arg.trim_gtids); + break; default: break; }