From 4cdaedd90d7ab9d5dd06c80bd21a94ca002f4597 Mon Sep 17 00:00:00 2001 From: Alexey Maykov Date: Fri, 19 Dec 2014 23:45:32 -0800 Subject: [PATCH] Implemented the background thread for periodic syncing Summary: Starting a new thread which is used to periodically sync writes. Changed sysvars to directly mimic WriteOptions. Introduced a separate global sysvar to control the background sync thread. This diff is for this issue: https://github.com/MariaDB/webscalesql-5.6/issues/15 Test Plan: ran the test. A number of syncs doesn't go up in the background mode. It goes up after a second sleep Checked in gdb that correct write_options are set for truncate table and drop table Reviewers: jonahcohen, sergey.petrunya.9, MarkCallaghan Reviewed By: MarkCallaghan Differential Revision: https://reviews.facebook.net/D31221 --- storage/rocksdb/ha_rocksdb.cc | 195 ++++++++++++++++++++++------------ 1 file changed, 130 insertions(+), 65 deletions(-) diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 863e90375a57..a33cb24f54b6 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -49,6 +49,7 @@ static handler *rocksdb_create_handler(handlerton *hton, MEM_ROOT *mem_root); void key_copy(uchar *to_key, uchar *from_record, KEY *key_info, uint key_length); +void* background_thread(void*); /////////////////////////////////////////////////////////// // Parameters and settings @@ -83,6 +84,10 @@ static HASH rocksdb_open_tables; /* The mutex used to init the hash; variable for example share methods */ mysql_mutex_t rocksdb_mutex; +mysql_mutex_t background_mutex; +mysql_mutex_t stop_cond_mutex; +mysql_cond_t stop_cond; +bool stop_background_thread; ////////////////////////////////////////////////////////////////////////////// @@ -118,8 +123,7 @@ static long long rocksdb_block_cache_size; static uint64_t rocksdb_info_log_level; static char * rocksdb_wal_dir; static uint64_t rocksdb_index_type; -static uint64_t rocksdb_write_sync; -static rocksdb::WriteOptions rocksdb_write_options; +static char rocksdb_background_sync; static rocksdb::DBOptions init_db_options() { rocksdb::DBOptions o; @@ -159,27 +163,6 @@ static TYPELIB index_type_typelib = { nullptr }; -enum write_sync_options { - WRITE_SYNC_OFF, - WRITE_SYNC_ON_COMMIT, - WRITE_SYNC_BACKGROUND -}; - -static const char* write_sync_names[] = { - "off", - "on_commit", - "background", - NullS -}; - -static TYPELIB write_sync_typelib = { - array_elements(write_sync_names) - 1, - "write_sync_typelib", - write_sync_names, - nullptr -}; - - //TODO: 0 means don't wait at all, and we don't support it yet? static MYSQL_THDVAR_ULONG(lock_wait_timeout, PLUGIN_VAR_RQCMDARG, "Number of seconds to wait for lock", @@ -478,31 +461,32 @@ static MYSQL_SYSVAR_STR(cf_options_file, rocksdb_cf_options_file, rocksdb_cf_options_file_validate, rocksdb_cf_options_file_update, ""); -static MYSQL_SYSVAR_ENUM(write_sync, - rocksdb_write_sync, +static MYSQL_SYSVAR_BOOL(background_sync, + rocksdb_background_sync, PLUGIN_VAR_RQCMDARG, - "WriteOptions::write_sync for RocksDB", - NULL, NULL, WRITE_SYNC_OFF, &write_sync_typelib); + "turns on background syncs for RocksDB", + NULL, NULL, FALSE); -static MYSQL_SYSVAR_BOOL(write_disable_wal, - *reinterpret_cast(&rocksdb_write_options.disableWAL), +static MYSQL_THDVAR_BOOL(write_sync, + PLUGIN_VAR_RQCMDARG, + "WriteOptions::sync for RocksDB", + NULL, NULL, rocksdb::WriteOptions().sync); + +static MYSQL_THDVAR_BOOL(write_disable_wal, PLUGIN_VAR_RQCMDARG, "WriteOptions::disableWAL for RocksDB", - NULL, NULL, rocksdb_write_options.disableWAL); + NULL, NULL, rocksdb::WriteOptions().disableWAL); -static MYSQL_SYSVAR_ULONG(write_timeout_hint_us, - rocksdb_write_options.timeout_hint_us, +static MYSQL_THDVAR_ULONG(write_timeout_hint_us, PLUGIN_VAR_RQCMDARG, "WriteOptions::timeout_hint_us for RocksDB", - NULL, NULL, rocksdb_write_options.timeout_hint_us, + NULL, NULL, rocksdb::WriteOptions().timeout_hint_us, /* min */ 0L, /* max */ LONG_MAX, 0); -static MYSQL_SYSVAR_BOOL(write_ignore_missing_column_families, - *reinterpret_cast( - &rocksdb_write_options.ignore_missing_column_families), +static MYSQL_THDVAR_BOOL(write_ignore_missing_column_families, PLUGIN_VAR_RQCMDARG, "WriteOptions::ignore_missing_column_families for RocksDB", - NULL, NULL, rocksdb_write_options.ignore_missing_column_families); + NULL, NULL, rocksdb::WriteOptions().ignore_missing_column_families); const longlong ROCKSDB_WRITE_BUFFER_SIZE_DEFAULT=4194304; @@ -559,6 +543,8 @@ static struct st_mysql_sys_var* rocksdb_system_variables[]= { MYSQL_SYSVAR(default_cf_options), MYSQL_SYSVAR(cf_options_file), + MYSQL_SYSVAR(background_sync), + MYSQL_SYSVAR(write_sync), MYSQL_SYSVAR(write_disable_wal), MYSQL_SYSVAR(write_timeout_hint_us), @@ -567,20 +553,15 @@ static struct st_mysql_sys_var* rocksdb_system_variables[]= { NULL }; -static rocksdb::WriteOptions get_write_options() { - rocksdb::WriteOptions opt(rocksdb_write_options); - switch (rocksdb_write_sync) { - case WRITE_SYNC_OFF: - opt.sync = false; - break; - case WRITE_SYNC_ON_COMMIT: - opt.sync = true; - break; - case WRITE_SYNC_BACKGROUND: - // this option is not implemented yet - DBUG_ASSERT(0); - break; - } +rocksdb::WriteOptions get_write_options(THD* thd) +{ + rocksdb::WriteOptions opt; + + opt.sync = THDVAR(thd, write_sync); + opt.disableWAL = THDVAR(thd, write_disable_wal); + opt.timeout_hint_us = THDVAR(thd, write_timeout_hint_us); + opt.ignore_missing_column_families = + THDVAR(thd, write_ignore_missing_column_families); return opt; } @@ -605,18 +586,35 @@ static uchar* rocksdb_get_key(ROCKSDB_SHARE *share, size_t *length, PSI_stage_info stage_waiting_on_row_lock= { 0, "Waiting for row lock", 0}; #ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_background; + static PSI_stage_info *all_rocksdb_stages[]= { & stage_waiting_on_row_lock }; -static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex; +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex, + key_mutex_background, key_mutex_stop_background; static PSI_mutex_info all_rocksdb_mutexes[]= { { &ex_key_mutex_example, "rocksdb", PSI_FLAG_GLOBAL}, - { &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0} + { &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0}, + { &key_mutex_background, "background", PSI_FLAG_GLOBAL}, + { &key_mutex_stop_background, "stop background", PSI_FLAG_GLOBAL}, +}; + +PSI_cond_key key_cond_stop; + +static PSI_cond_info all_rocksdb_conds[]= +{ + { &key_cond_stop, "cond_stop", PSI_FLAG_GLOBAL} +}; + +static PSI_thread_info all_rocksdb_threads[]= +{ + { &key_thread_background, "background", PSI_FLAG_GLOBAL}, }; static void init_rocksdb_psi_keys() @@ -630,12 +628,17 @@ static void init_rocksdb_psi_keys() count= array_elements(all_rocksdb_mutexes); PSI_server->register_mutex(category, all_rocksdb_mutexes, count); + count= array_elements(all_rocksdb_conds); + PSI_server->register_cond(category, all_rocksdb_conds, count); + count= array_elements(all_rocksdb_stages); mysql_stage_register(category, all_rocksdb_stages, count); + + count= array_elements(all_server_threads); + mysql_thread_register(category, all_server_threads, count); } #endif - static int bytewise_compare(const rocksdb::Slice& a, const rocksdb::Slice& b) { size_t a_size= a.size(); @@ -785,10 +788,10 @@ class Rdb_transaction trx_locks.clear(); } - bool commit() + bool commit(THD* thd) { bool res= false; - flush_batch(); + flush_batch(thd); /* rollback() will delete snapshot, batch and locks */ rollback(); return res; @@ -796,7 +799,7 @@ class Rdb_transaction private: - int flush_batch_intern() + int flush_batch_intern(THD* thd) { bool res= false; rocksdb::WriteBatch batch; @@ -820,15 +823,15 @@ class Rdb_transaction } } } - rocksdb::Status s= rdb->Write(get_write_options(), &batch); + rocksdb::Status s= rdb->Write(get_write_options(thd), &batch); res= !s.ok(); // we return true when something failed return res; } public: - int flush_batch() + int flush_batch(THD* thd) { - bool bres= flush_batch_intern(); + bool bres= flush_batch_intern(thd); changes.reinit(); n_writes= 0; return bres; @@ -923,7 +926,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_trx, bool) { Rdb_transaction*& trx= get_trx_from_thd(thd); if (trx) - trx->commit(); // todo: return error code. + trx->commit(thd); // todo: return error code. } else { @@ -1020,6 +1023,10 @@ static int rocksdb_init_func(void *p) rocksdb_hton= (handlerton *)p; mysql_mutex_init(ex_key_mutex_example, &rocksdb_mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_mutex_background, &background_mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_mutex_stop_background, &stop_cond_mutex, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_cond_stop, &stop_cond, NULL); (void) my_hash_init(&rocksdb_open_tables,system_charset_info,32,0,0, (my_hash_get_key) rocksdb_get_key,0,0); @@ -1143,6 +1150,19 @@ static int rocksdb_init_func(void *p) if (ddl_manager.init(rdb)) DBUG_RETURN(1); + stop_background_thread = false; + pthread_t thread_handle; + auto err = mysql_thread_create( + key_thread_background, &thread_handle, + nullptr, + background_thread, NULL + ); + if (err != 0) { + sql_print_error("RocksDB: Couldn't start the background thread: (errno=%d)", + err); + DBUG_RETURN(1); + } + sql_print_information("RocksDB instance opened"); DBUG_RETURN(0); } @@ -1151,6 +1171,17 @@ static int rocksdb_done_func(void *p) { int error= 0; DBUG_ENTER("rocksdb_done_func"); + + // signal the background thread to stop + mysql_mutex_lock(&stop_cond_mutex); + stop_background_thread = true; + mysql_cond_signal(&stop_cond); + mysql_mutex_unlock(&stop_cond_mutex); + + // wait for the background thread to finish + mysql_mutex_lock(&background_mutex); + mysql_mutex_unlock(&background_mutex); + if (rocksdb_open_tables.records) error= 1; my_hash_free(&rocksdb_open_tables); @@ -1299,7 +1330,9 @@ static handler* rocksdb_create_handler(handlerton *hton, ha_rocksdb::ha_rocksdb(handlerton *hton, TABLE_SHARE *table_arg) - : handler(hton, table_arg), scan_it(NULL), pk_descr(NULL), + : handler(hton, table_arg), scan_it(NULL), + tbl_def(NULL), + pk_descr(NULL), key_descr(NULL), pk_can_be_decoded(false), pk_tuple(NULL), pk_packed_tuple(NULL), @@ -2861,7 +2894,7 @@ int ha_rocksdb::write_row(uchar *buf) if (!do_locking && trx->n_writes > THDVAR(table->in_use, bulk_load_size)) { /* Bulk insert mode: flush the write batch if it became too big */ - if (trx->flush_batch()) + if (trx->flush_batch(table->in_use)) DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } @@ -3334,7 +3367,7 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type) will be no-ops. */ Rdb_transaction* trx= get_or_create_trx(thd); - res= trx->commit(); + res= trx->commit(thd); delete scan_it; scan_it= NULL; } @@ -3422,6 +3455,8 @@ int ha_rocksdb::delete_table(const char *tablename) void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl) { + rocksdb::WriteOptions wo = get_write_options(ha_thd()); + rocksdb::ReadOptions opts; opts.total_order_seek= true; rocksdb::Iterator* it= rdb->NewIterator(opts); @@ -3444,7 +3479,7 @@ void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl) rocksdb::Slice key= it->key(); if (!tbl->key_descr[i]->covers_key(key.data(), key.size())) break; - rdb->Delete(get_write_options(), key); + rdb->Delete(wo, key); it->Next(); } } @@ -3797,3 +3832,33 @@ ulong Primary_key_comparator::get_hashnr(const char *key, size_t key_len) &nr, &nr2); return((ulong) nr); } + +void* background_thread(void*) +{ + mysql_mutex_lock(&background_mutex); + mysql_mutex_lock(&stop_cond_mutex); + + rocksdb::WriteBatch wb; + for (;;) { + timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec++; + // wait for 1 second + auto ret = mysql_cond_timedwait(&stop_cond, &stop_cond_mutex, &ts); + if (ret != ETIMEDOUT && stop_background_thread) { + assert(ret == 0); + break; + } + + if (rdb && rocksdb_background_sync) { + auto wo = rocksdb::WriteOptions(); + wo.sync = true; + rdb->Write(wo, &wb); + } + } + + mysql_mutex_unlock(&stop_cond_mutex); + mysql_mutex_unlock(&background_mutex); + + return nullptr; +}