Skip to content

Commit

Permalink
Improve count(*) performance and fix index merge bug
Browse files Browse the repository at this point in the history
Summary:
Today in `SELECT count(*)` MyRocks would still decode every single column due to this check, despite the readset being empty:

```
 // bitmap is cleared on index merge, but it still needs to decode columns
    bool field_requested =
        decode_all_fields || m_verify_row_debug_checksums ||
        bitmap_is_set(field_map, m_table->field[i]->field_index);
```
As a result MyRocks is significantly slower than InnoDB in this particular scenario.

Turns out in index merge, when it tries to reset, it calls ha_index_init with an empty column_bitmap, so our field decoders didn't know it needs to decode anything, so the entire query would return nothing. This is discussed in [this commit](facebook@70f2bcd), and [issue 624](facebook#624) and [PR 626](facebook#626). So the workaround we had at that time is to simply treat empty map as implicitly everything, and the side effect is massively slowed down count(*).

We have a few options to address this:
1. Fix index merge optimizer - looking at the code in QUICK_RANGE_SELECT::init_ror_merged_scan, it actually fixes up the column_bitmap properly, but after init/reset, so the fix would simply be moving the bitmap set code up. For secondary keys, prepare_for_position will automatically call `mark_columns_used_by_index_no_reset(s->primary_key, read_set)` if HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is set (true for both InnoDB and MyRocks), so we would know correctly that we need to unpack PK when walking SK during index merge.
2. Overriding `column_bitmaps_signal` and setup decoders whenever the bitmap changes - however this doesn't work by itself. Because no storage engine today actually use handler::column_bitmaps_signal this path haven't been tested properly in index merge. In this case, QUICK_RANGE_SELECT::init_ror_merged_scan should call set_column_bitmaps_no_signal to avoid resetting the correct read/write set of head since head is used as first handler (reuses_handler=true) and subsequent place holders for read/write set updates (reuse_handler=false).
3. Follow InnoDB's solution - InnoDB delays it actually initialize its template again in index_read for the 2nd time (relying on `prebuilt->sql_stat_start`), and during index_read `QUICK_RANGE_SELECT::column_bitmap` is already fixed up and the table read/write set is switched to it, so the new template would be built correctly.

In order to make it easier to maintain and port, after discussing with Manuel, I'm going with a simplified version of #3 that delays decoder creation until the first read operation (index_*, rnd_*, range_read_*, multi_range_read_*), and setting the delay flag in index_init / rnd_init / multi_range_read_init.

Also, I ran into a bug with truncation_partition where Rdb_converter's tbl_def is stale (we only update ha_rocksdb::m_tbl_def), but it is fine because it is not being used after table open. But my change moves the lookup_bitmap initialization into Rdb_converter which takes a dependency on Rdb_converter::m_tbl_def so now we need to reset it properly.

Reference Patch: facebook@44d6a8d

---------
Porting Note: Due to 8.0's new counting infra (handler::record & handler::record_with_index), this only helps PK counting. Will send out a better fix that works better with 8.0 new counting infra.

Reviewed By: Pushapgl

Differential Revision: D26265470
  • Loading branch information
yizhang82 authored and inikep committed Jul 19, 2024
1 parent 32190a3 commit 4bce350
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 38 deletions.
72 changes: 50 additions & 22 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7324,7 +7324,8 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton,
mrr_enabled_keyread(false),
mrr_used_cpk(false),
m_in_rpl_delete_rows(false),
m_in_rpl_update_rows(false) {}
m_in_rpl_update_rows(false),
m_need_build_decoder(false) {}

ha_rocksdb::~ha_rocksdb() {
int err MY_ATTRIBUTE((__unused__));
Expand Down Expand Up @@ -8798,8 +8799,7 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
*/
int ha_rocksdb::truncate_table(Rdb_tbl_def *tbl_def_arg,
const std::string &actual_user_table_name,
const TABLE *table_arg,
ulonglong auto_increment_value,
TABLE *table_arg, ulonglong auto_increment_value,
dd::Table *table_def) {
DBUG_ENTER_FUNC();

Expand Down Expand Up @@ -8887,6 +8887,7 @@ int ha_rocksdb::truncate_table(Rdb_tbl_def *tbl_def_arg,

/* Update the local m_tbl_def reference */
m_tbl_def = ddl_manager.find(orig_tablename);
m_converter.reset(new Rdb_converter(ha_thd(), m_tbl_def, table_arg));
DBUG_RETURN(err);
}

Expand Down Expand Up @@ -9182,8 +9183,9 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf,
#endif
DBUG_EXECUTE_IF("dbug.rocksdb.HA_EXTRA_KEYREAD", { m_keyread_only = true; });

bool covered_lookup = (m_keyread_only && kd.can_cover_lookup()) ||
kd.covers_lookup(&value, &m_lookup_bitmap);
bool covered_lookup =
(m_keyread_only && kd.can_cover_lookup()) ||
kd.covers_lookup(&value, m_converter->get_lookup_bitmap());

#ifndef NDEBUG
m_keyread_only = save_keyread_only;
Expand Down Expand Up @@ -9313,7 +9315,8 @@ int ha_rocksdb::secondary_index_read(const int keyno, uchar *const buf) {
rocksdb::Slice value = m_scan_it->value();
bool covered_lookup =
(m_keyread_only && m_key_descr_arr[keyno]->can_cover_lookup()) ||
m_key_descr_arr[keyno]->covers_lookup(&value, &m_lookup_bitmap);
m_key_descr_arr[keyno]->covers_lookup(
&value, m_converter->get_lookup_bitmap());
if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) {
rc = m_key_descr_arr[keyno]->unpack_record(
table, buf, &key, &value,
Expand Down Expand Up @@ -9358,6 +9361,8 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key,
enum ha_rkey_function find_flag) {
DBUG_ENTER_FUNC();

check_build_decoder();

DBUG_EXECUTE_IF(
"myrocks_busy_loop_on_row_read", volatile int debug_i = 0;
while (1) { int temp = debug_i; temp++; debug_i = temp; });
Expand Down Expand Up @@ -10001,6 +10006,8 @@ int ha_rocksdb::records_from_index(ha_rows *num_rows, uint index) {
int ha_rocksdb::index_next(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

bool moves_forward = true;
ha_statistic_increment(&System_status_var::ha_read_next_count);
if (m_key_descr_arr[active_index]->m_is_reverse_cf) {
Expand All @@ -10021,6 +10028,8 @@ int ha_rocksdb::index_next(uchar *const buf) {
int ha_rocksdb::index_prev(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

bool moves_forward = false;
ha_statistic_increment(&System_status_var::ha_read_prev_count);
if (m_key_descr_arr[active_index]->m_is_reverse_cf) {
Expand Down Expand Up @@ -10081,6 +10090,8 @@ int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) {
int ha_rocksdb::index_first(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

m_sk_match_prefix = nullptr;
ha_statistic_increment(&System_status_var::ha_read_first_count);
int rc = index_read_intern(buf, true /* first */);
Expand All @@ -10097,6 +10108,8 @@ int ha_rocksdb::index_first(uchar *const buf) {
int ha_rocksdb::index_last(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

m_sk_match_prefix = nullptr;
ha_statistic_increment(&System_status_var::ha_read_last_count);
int rc = index_read_intern(buf, false /* first */);
Expand Down Expand Up @@ -11467,17 +11480,16 @@ void ha_rocksdb::setup_iterator_for_rnd_scan() {
int ha_rocksdb::rnd_init(bool scan) {
DBUG_ENTER_FUNC();

m_need_build_decoder = true;
active_index = table->s->primary_key;

THD *thd = ha_thd();
if (thd && thd->killed) {
DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
}

Rdb_transaction *const tx = get_or_create_tx(table->in_use);

// when this table is being updated, decode all fields
m_converter->setup_field_decoders(table->read_set,
m_lock_rows == RDB_LOCK_WRITE);

if (scan) {
m_rnd_scan_is_new_snapshot = !tx->has_snapshot();
setup_iterator_for_rnd_scan();
Expand All @@ -11501,6 +11513,8 @@ int ha_rocksdb::rnd_init(bool scan) {
int ha_rocksdb::rnd_next(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

int rc;
ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
for (;;) {
Expand Down Expand Up @@ -11646,11 +11660,26 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) {
int ha_rocksdb::rnd_end() {
DBUG_ENTER_FUNC();

m_need_build_decoder = false;

release_scan_iterator();

DBUG_RETURN(HA_EXIT_SUCCESS);
}

void ha_rocksdb::build_decoder() {
m_converter->setup_field_decoders(table->read_set, active_index,
m_keyread_only,
m_lock_rows == RDB_LOCK_WRITE);
}

void ha_rocksdb::check_build_decoder() {
if (m_need_build_decoder) {
build_decoder();
m_need_build_decoder = false;
}
}

/**
@return
HA_EXIT_SUCCESS OK
Expand All @@ -11659,6 +11688,9 @@ int ha_rocksdb::rnd_end() {
int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) {
DBUG_ENTER_FUNC();

m_need_build_decoder = true;
active_index = idx;

THD *thd = ha_thd();
if (thd && thd->killed) {
DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
Expand All @@ -11667,21 +11699,11 @@ int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) {
Rdb_transaction *const tx = get_or_create_tx(table->in_use);
assert(tx != nullptr);

// when this table is being updated, decode all fields
m_converter->setup_field_decoders(table->read_set,
m_lock_rows == RDB_LOCK_WRITE);

if (!m_keyread_only) {
m_key_descr_arr[idx]->get_lookup_bitmap(table, &m_lookup_bitmap);
}

// If m_lock_rows is not RDB_LOCK_NONE then we will be doing a get_for_update
// when accessing the index, so don't acquire the snapshot right away.
// Otherwise acquire the snapshot immediately.
tx->acquire_snapshot(m_lock_rows == RDB_LOCK_NONE);

active_index = idx;

DBUG_RETURN(HA_EXIT_SUCCESS);
}

Expand All @@ -11692,9 +11714,9 @@ int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) {
int ha_rocksdb::index_end() {
DBUG_ENTER_FUNC();

release_scan_iterator();
m_need_build_decoder = false;

bitmap_free(&m_lookup_bitmap);
release_scan_iterator();

active_index = MAX_KEY;
in_range_check_pushed_down = false;
Expand Down Expand Up @@ -12047,6 +12069,8 @@ void ha_rocksdb::position(const uchar *const record) {
int ha_rocksdb::rnd_pos(uchar *const buf, uchar *const pos) {
DBUG_ENTER_FUNC();

check_build_decoder();

int rc;
size_t len;

Expand Down Expand Up @@ -16860,6 +16884,8 @@ class Mrr_sec_key_rowid_source : public Mrr_rowid_source {
int ha_rocksdb::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
uint n_ranges, uint mode,
HANDLER_BUFFER *buf) {
m_need_build_decoder = true;

int res;

if (!current_thd->optimizer_switch_flag(OPTIMIZER_SWITCH_MRR) ||
Expand Down Expand Up @@ -17106,6 +17132,8 @@ void ha_rocksdb::mrr_free_rows() {
}

int ha_rocksdb::multi_range_read_next(char **range_info) {
check_build_decoder();

if (mrr_uses_default_impl) {
return handler::multi_range_read_next(range_info);
}
Expand Down
15 changes: 7 additions & 8 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,6 @@ class ha_rocksdb : public my_core::handler {

void set_last_rowkey(const uchar *const old_data);

/*
For the active index, indicates which columns must be covered for the
current lookup to be covered. If the bitmap field is null, that means this
index does not cover the current lookup for any record.
*/
MY_BITMAP m_lookup_bitmap;

int alloc_key_buffers(const TABLE *const table_arg,
const Rdb_tbl_def *const tbl_def_arg,
bool alloc_alter_buffers = false)
Expand Down Expand Up @@ -956,7 +949,7 @@ class ha_rocksdb : public my_core::handler {
dd::Table *table_def);
int truncate_table(Rdb_tbl_def *tbl_def,
const std::string &actual_user_table_name,
const TABLE *table_arg, ulonglong auto_increment_value,
TABLE *table_arg, ulonglong auto_increment_value,
dd::Table *table_def);
bool check_if_incompatible_data(HA_CREATE_INFO *const info,
uint table_changes) override
Expand Down Expand Up @@ -1012,6 +1005,9 @@ class ha_rocksdb : public my_core::handler {
void update_row_read(ulonglong count);
static void inc_covered_sk_lookup();

void build_decoder();
void check_build_decoder();

protected:
int records(ha_rows *num_rows) override;
int records_from_index(ha_rows *num_rows, uint index) override;
Expand Down Expand Up @@ -1048,6 +1044,9 @@ class ha_rocksdb : public my_core::handler {
/* Flags tracking if we are inside different replication operation */
bool m_in_rpl_delete_rows;
bool m_in_rpl_update_rows;

/* Need to build decoder on next read operation */
bool m_need_build_decoder;
};

/*
Expand Down
3 changes: 2 additions & 1 deletion storage/rocksdb/nosql_access.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,8 @@ void INLINE_ATTR select_exec::scan_value() {
m_key_def->get_lookup_bitmap(m_table, &m_lookup_bitmap);
}

m_converter->setup_field_decoders(m_table->read_set, m_index);
m_converter->setup_field_decoders(m_table->read_set, m_index,
false /* keyread_only */);
}

bool INLINE_ATTR select_exec::run() {
Expand Down
20 changes: 15 additions & 5 deletions storage/rocksdb/rdb_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ Rdb_converter::~Rdb_converter() {
m_encoder_arr = nullptr;
// These are needed to suppress valgrind errors in rocksdb.partition
m_storage_record.mem_free();
bitmap_free(&m_lookup_bitmap);
}

/*
Expand All @@ -337,31 +338,30 @@ void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
Setup which fields will be unpacked when reading rows
@detail
Three special cases when we still unpack all fields:
Two special cases when we still unpack all fields:
- When client requires decode_all_fields, such as this table is being
updated (m_lock_rows==RDB_LOCK_WRITE).
- When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need
to read all fields to find whether there is a row checksum at the end. We
could skip the fields instead of decoding them, but currently we do
decoding.)
- On index merge as bitmap is cleared during that operation
@seealso
Rdb_converter::setup_field_encoders()
Rdb_converter::convert_record_from_storage_format()
*/
void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
uint active_index, bool keyread_only,
bool decode_all_fields) {
m_key_requested = false;
m_decoders_vect.clear();
bitmap_free(&m_lookup_bitmap);
int last_useful = 0;
int skip_size = 0;

for (uint i = 0; i < m_table->s->fields; i++) {
// bitmap is cleared on index merge, but it still needs to decode columns
bool field_requested =
decode_all_fields || m_verify_row_debug_checksums ||
bitmap_is_clear_all(field_map) ||
bitmap_is_set(field_map, m_table->field[i]->field_index());

// We only need the decoder if the whole record is stored.
Expand Down Expand Up @@ -397,6 +397,11 @@ void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
// skipping. Remove them.
m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
m_decoders_vect.end());

if (!keyread_only && active_index != m_table->s->primary_key) {
m_tbl_def->m_key_descr_arr[active_index]->get_lookup_bitmap(
m_table, &m_lookup_bitmap);
}
}

void Rdb_converter::setup_field_encoders() {
Expand Down Expand Up @@ -584,6 +589,11 @@ int Rdb_converter::convert_record_from_storage_format(
const rocksdb::Slice *const key_slice,
const rocksdb::Slice *const value_slice, uchar *const dst,
bool decode_value = true) {
bool skip_value = !decode_value || get_decode_fields()->size() == 0;
if (!m_key_requested && skip_value) {
return HA_EXIT_SUCCESS;
}

int err = HA_EXIT_SUCCESS;

Rdb_string_reader value_slice_reader(value_slice);
Expand All @@ -605,7 +615,7 @@ int Rdb_converter::convert_record_from_storage_format(
}
}

if (!decode_value || get_decode_fields()->size() == 0) {
if (skip_value) {
// We are done
return HA_EXIT_SUCCESS;
}
Expand Down
12 changes: 10 additions & 2 deletions storage/rocksdb/rdb_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ class Rdb_converter {
Rdb_converter &operator=(const Rdb_converter &decoder) = delete;
~Rdb_converter();

void setup_field_decoders(const MY_BITMAP *field_map,
bool decode_all_fields = false);
void setup_field_decoders(const MY_BITMAP *field_map, uint active_index,
bool keyread_only, bool decode_all_fields = false);

int decode(const std::shared_ptr<Rdb_key_def> &key_def, uchar *dst,
const rocksdb::Slice *key_slice, const rocksdb::Slice *value_slice,
Expand Down Expand Up @@ -172,6 +172,8 @@ class Rdb_converter {
return &m_decoders_vect;
}

const MY_BITMAP *get_lookup_bitmap() { return &m_lookup_bitmap; }

int decode_value_header_for_pk(Rdb_string_reader *reader,
const std::shared_ptr<Rdb_key_def> &pk_def,
rocksdb::Slice *unpack_slice);
Expand Down Expand Up @@ -241,5 +243,11 @@ class Rdb_converter {
my_core::ha_rows m_row_checksums_checked;
// buffer to hold data during encode_value_slice
String m_storage_record;
/*
For the active index, indicates which columns must be covered for the
current lookup to be covered. If the bitmap field is null, that means this
index does not cover the current lookup for any record.
*/
MY_BITMAP m_lookup_bitmap;
};
} // namespace myrocks

0 comments on commit 4bce350

Please sign in to comment.