Skip to content

Commit

Permalink
Fix reverse-conn / tranlog / lock-inversion
Browse files Browse the repository at this point in the history
Compare performance in phys-rep-perf

Signed-off-by: Mark Hannum <[email protected]>
  • Loading branch information
markhannum committed Dec 2, 2024
1 parent 7be2427 commit f859d56
Show file tree
Hide file tree
Showing 16 changed files with 883 additions and 252 deletions.
13 changes: 9 additions & 4 deletions bdb/phys_rep_lsn.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,13 @@ uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
}

int apply_log(DB_ENV *dbenv, unsigned int file, unsigned int offset, int64_t rectype, void *blob, int blob_len)
int apply_log(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len)
{
return dbenv->apply_log(dbenv, file, offset, rectype, blob, blob_len);
BDB_READLOCK("apply_log");
int rc = bdb_state->dbenv->apply_log(bdb_state->dbenv, file, offset, rectype, blob, blob_len);
BDB_RELLOCK();
return rc;
}

int truncate_log_lock(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, uint32_t flags)
Expand Down Expand Up @@ -506,7 +510,8 @@ LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db, LOG_INFO star
physrep_logmsg(LOGMSG_FATAL, "Require elect-highest-committed-gen on source- exiting\n");
exit(1);
}
physrep_logmsg(LOGMSG_ERROR, "Require elect-highest-committed-gen source\n");
physrep_logmsg(LOGMSG_ERROR, "Require elect-highest-committed-gen source {%d:%d}\n", info.file,
info.offset);
} else {
info.gen = *gen;
}
Expand Down Expand Up @@ -564,7 +569,7 @@ int physrep_bdb_wait_for_seqnum(bdb_state_type *bdb_state, DB_LSN *lsn, void *da
return 0;
}

seqnum_type seqnum;
seqnum_type seqnum = {0};
seqnum.lsn.file = lsn->file;
seqnum.lsn.offset = lsn->offset;
// seqnum.issue_time = ?
Expand Down
4 changes: 2 additions & 2 deletions bdb/phys_rep_lsn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ int physrep_list_ignored_tables(void);
LOG_INFO get_last_lsn(struct bdb_state_tag *);
LOG_INFO get_first_lsn(struct bdb_state_tag *);
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
int apply_log(struct __db_env *, unsigned int file, unsigned int offset,
int64_t rectype, void *blob, int blob_len);
int apply_log(struct bdb_state_tag *, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len);
int truncate_log_lock(struct bdb_state_tag *, unsigned int file,
unsigned int offset, uint32_t flags);
int find_log_timestamp(struct bdb_state_tag *, time_t time, unsigned int *file,
Expand Down
6 changes: 3 additions & 3 deletions berkdb/rep/rep_record.c
Original file line number Diff line number Diff line change
Expand Up @@ -8408,9 +8408,8 @@ __rep_verify_match(dbenv, rp, savetime, online)
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}

/* Masters must run full recovery */
int i_am_master = F_ISSET(rep, REP_F_MASTER);
if (gbl_rep_skip_recovery && !i_am_master && log_compare(&dbenv->prev_commit_lsn, &rp->lsn) <= 0) {
if (gbl_rep_skip_recovery && !i_am_master && dbenv->prev_commit_lsn.file > 0 && log_compare(&dbenv->prev_commit_lsn, &rp->lsn) <= 0) {
DB_TXNREGION *region;
region = ((DB_TXNMGR *)dbenv->tx_handle)->reginfo.primary;
dbenv->wrlock_recovery_lock(dbenv, __func__, __LINE__);
Expand Down Expand Up @@ -8467,7 +8466,7 @@ __rep_verify_match(dbenv, rp, savetime, online)

/* Recovery cleanup */
if (dbenv->rep_recovery_cleanup)
dbenv->rep_recovery_cleanup(dbenv, &trunclsn, i_am_master /* 0 */);
dbenv->rep_recovery_cleanup(dbenv, &trunclsn, i_am_master);

dbenv->unlock_recovery_lock(dbenv, __func__, __LINE__);

Expand All @@ -8485,6 +8484,7 @@ __rep_verify_match(dbenv, rp, savetime, online)
logmsg(LOGMSG_WARN, "skip-recovery cannot skip, prev-commit=[%d:%d] trunc-lsn=[%d:%d]\n",
dbenv->prev_commit_lsn.file, dbenv->prev_commit_lsn.offset, rp->lsn.file, rp->lsn.offset);
}
ZERO_LSN(dbenv->prev_commit_lsn);
if ((ret = __rep_dorecovery(dbenv, &rp->lsn, &trunclsn, online,
&undid_schema_change)) != 0) {
Pthread_mutex_unlock(&apply_lk);
Expand Down
414 changes: 191 additions & 223 deletions db/db_metrics.c

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ extern int gbl_physrep_keepalive_freq_sec;
extern int gbl_physrep_max_candidates;
extern int gbl_physrep_max_pending_replicants;
extern int gbl_physrep_reconnect_penalty;
extern int gbl_physrep_register_interval;
extern int gbl_physrep_reconnect_interval;
extern int gbl_physrep_shuffle_host_list;
extern int gbl_physrep_ignore_queues;
extern int gbl_physrep_max_rollback;
Expand Down
10 changes: 4 additions & 6 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1871,10 +1871,8 @@ REGISTER_TUNABLE("match_on_ckp",
NULL, NULL, NULL, NULL);

/* physical replication */
REGISTER_TUNABLE("blocking_physrep",
"Physical replicant blocks on select. (Default: false)",
TUNABLE_BOOLEAN, &gbl_blocking_physrep, 0, NULL, NULL, NULL,
NULL);
REGISTER_TUNABLE("blocking_physrep", "Physical replicant blocks on select. (Default: off)", TUNABLE_BOOLEAN,
&gbl_blocking_physrep, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("tranlog_default_timeout", "Default timeout for tranlog queries. (Default: 30)", TUNABLE_INTEGER,
&gbl_tranlog_default_timeout, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("tranlog_incoherent_timeout", "Timeout in seconds for incoherent tranlog. (Default: 10)",
Expand Down Expand Up @@ -1931,8 +1929,8 @@ REGISTER_TUNABLE("physrep_reconnect_penalty",
"Physrep wait seconds before retry to the same node. (Default: 5)",
TUNABLE_INTEGER, &gbl_physrep_reconnect_penalty, 0, NULL, NULL,
NULL, NULL);
REGISTER_TUNABLE("physrep_register_interval", "Interval for physical replicant re-registration. (Default: 600)",
TUNABLE_INTEGER, &gbl_physrep_register_interval, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_reconnect_interval", "Reconnect interval for physical replicants (Default: 600)",
TUNABLE_INTEGER, &gbl_physrep_reconnect_interval, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_shuffle_host_list",
"Shuffle the host list returned by register_replicant() "
"before connecting to the hosts. (Default: OFF)",
Expand Down
22 changes: 13 additions & 9 deletions db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ typedef struct DB_Connection {
} while (0)

int gbl_physrep_debug = 0;
int gbl_physrep_register_interval = 600; // force re-registration every 10 mins
int gbl_physrep_reconnect_interval = 600; // force re-registration every 10 mins
int gbl_physrep_reconnect_penalty = 0;
int gbl_blocking_physrep = 0;
int gbl_physrep_fanout = 8;
Expand Down Expand Up @@ -102,6 +102,7 @@ static size_t repl_dbs_sz;
reverse_conn_handle_tp *rev_conn_hndl = NULL;

static int last_register;
static int repl_db_connect_time;

static int add_replicant_host(char *hostname, char *dbname);
static void dump_replicant_hosts(void);
Expand Down Expand Up @@ -191,6 +192,8 @@ static void set_repl_db_connected(char *dbname, char *host)
gbl_physrep_repl_name = dbname;
gbl_physrep_repl_host = host;
repl_db_connected = 1;
repl_db_connect_time = comdb2_time_epoch();
physrep_logmsg(LOGMSG_USER, "Physical replicant is now replicating from %s@%s\n", dbname, host);
}

static void set_repl_db_disconnected()
Expand Down Expand Up @@ -635,8 +638,7 @@ static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
if (stop_physrep_worker == 0) {
/* check if we need to call new file flag */
if (prev_info.file < file) {
rc = apply_log(thedb->bdb_env->dbenv, prev_info.file,
get_next_offset(thedb->bdb_env->dbenv, prev_info),
rc = apply_log(thedb->bdb_env, prev_info.file, get_next_offset(thedb->bdb_env->dbenv, prev_info),
REP_NEWFILE, NULL, 0);
if (rc != 0) {
physrep_logmsg(LOGMSG_FATAL, "%s:%d: Something went wrong with applying the logs (rc: %d)\n",
Expand All @@ -645,8 +647,7 @@ static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
}
}

rc = apply_log(thedb->bdb_env->dbenv, file, offset, REP_LOG, blob,
blob_len);
rc = apply_log(thedb->bdb_env, file, offset, REP_LOG, blob, blob_len);

if (is_commit((u_int32_t)*rectype)) {
if (gbl_physrep_debug) {
Expand Down Expand Up @@ -1271,7 +1272,7 @@ static void *physrep_worker(void *args)
}

if (repl_db_connected &&
(force_registration() || (((now = time(NULL)) - last_register) > gbl_physrep_register_interval))) {
(force_registration() || (((now = time(NULL)) - repl_db_connect_time) > gbl_physrep_reconnect_interval))) {
close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__);
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d: Forcing re-registration\n", __func__, __LINE__);
Expand Down Expand Up @@ -1394,9 +1395,6 @@ static void *physrep_worker(void *args)
cdb2_close(repl_metadb);
}

physrep_logmsg(LOGMSG_USER, "Physical replicant is now replicating from %s@%s\n",
repl_db_cnct->dbname, repl_db_cnct->hostname);

if (do_truncate && repl_db) {
info = get_last_lsn(thedb->bdb_env);
prev_info = handle_truncation(repl_db, info);
Expand Down Expand Up @@ -1481,6 +1479,12 @@ static void *physrep_worker(void *args)
goto repl_loop;
}

/* Check reconnect */
if (force_registration() || ((now = time(NULL)) - repl_db_connect_time) > gbl_physrep_reconnect_interval) {
close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__);
goto repl_loop;
}

int update_regck = gbl_physrep_update_registry_interval;
if (repl_db_connected && update_regck > 0 && (comdb2_time_epoch() - last_update_registry) > update_regck) {
update_registry_periodic(repl_db_cnct->dbname, repl_db_cnct->hostname);
Expand Down
23 changes: 23 additions & 0 deletions db/request_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string.h>
#include <strings.h>
#include <stddef.h>
#include <comdb2_atomic.h>

#include "request_stats.h"

Expand All @@ -34,6 +35,8 @@ extern void __berkdb_register_fsync_callback(void (*callback)(int fd));

static void user_request_done(void *st) { free(st); }

static struct global_stats global = {0};

void user_request_begin(enum request_type type, int flags)
{
struct per_request_stats *st;
Expand Down Expand Up @@ -67,6 +70,8 @@ void user_request_fsync_callback(int fd)
if (st) {
st->nfsyncs++;
}

ATOMIC_ADD64(global.fsyncs, 1);
}

void user_request_read_callback(int bytes)
Expand All @@ -81,6 +86,9 @@ void user_request_read_callback(int bytes)
st->nreads++;
st->readbytes += bytes;
}

ATOMIC_ADD64(global.page_reads, 1);
ATOMIC_ADD64(global.page_bytes_read, bytes);
}

void user_request_write_callback(int bytes)
Expand All @@ -95,6 +103,9 @@ void user_request_write_callback(int bytes)
st->nwrites++;
st->writebytes += bytes;
}

ATOMIC_ADD64(global.page_writes, 1);
ATOMIC_ADD64(global.page_bytes_written, bytes);
}

void user_request_memp_callback(void)
Expand All @@ -107,6 +118,18 @@ void user_request_memp_callback(void)
st = pthread_getspecific(key);
if (st)
st->mempgets++;

ATOMIC_ADD64(global.mempgets, 1);
}

void global_request_stats(struct global_stats *stats)
{
stats->page_reads = ATOMIC_LOAD64(global.page_reads);
stats->page_writes = ATOMIC_LOAD64(global.page_writes);
stats->fsyncs = ATOMIC_LOAD64(global.fsyncs);
stats->mempgets = ATOMIC_LOAD64(global.mempgets);
stats->page_bytes_read = ATOMIC_LOAD64(global.page_bytes_read);
stats->page_bytes_written = ATOMIC_LOAD64(global.page_bytes_written);
}

void user_request_init(void)
Expand Down
10 changes: 10 additions & 0 deletions db/request_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,23 @@ struct per_request_stats {
long long writebytes;
};

struct global_stats {
int64_t page_reads;
int64_t page_writes;
int64_t fsyncs;
int64_t mempgets;
int64_t page_bytes_read;
int64_t page_bytes_written;
};

void user_request_begin(enum request_type type, int flags);
struct per_request_stats *user_request_get_stats(void);
void user_request_end(void);
void user_request_read_callback(int);
void user_request_write_callback(int);
void user_request_memp_callback(void);
void user_request_init(void);
void global_request_stats(struct global_stats *stats);

void user_request_on(void);
void user_request_off(void);
Expand Down
2 changes: 2 additions & 0 deletions sqlite/ext/comdb2/tranlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ static int tranlogColumn(
if (pCur->data.data)
LOGCOPY_32(&rectype, pCur->data.data);

normalize_rectype(&rectype);

if (rectype == DB___txn_regop_gen){
generation = get_generation_from_regop_gen_record(pCur->data.data);
}
Expand Down
10 changes: 10 additions & 0 deletions tests/phys_rep_perf.test/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ifeq ($(TESTSROOTDIR),)
include ../testcase.mk
else
include $(TESTSROOTDIR)/testcase.mk
endif
export CHECK_DB_AT_FINISH=0
ifeq ($(TEST_TIMEOUT),)
export TEST_TIMEOUT=15m
endif

1 change: 1 addition & 0 deletions tests/phys_rep_perf.test/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This test compares the i/o and memory utilization (and in the future, possibly other things) against a normal database against a physrep-database, and a reverse-connection physrep-database.
Loading

0 comments on commit f859d56

Please sign in to comment.