Skip to content

Commit

Permalink
Fix reverse-conn / tranlog / lock-inversion
Browse files Browse the repository at this point in the history
Compare performance in phys-rep-perf
  • Loading branch information
markhannum committed Dec 4, 2024
1 parent 4b20468 commit 8d8b1ce
Show file tree
Hide file tree
Showing 18 changed files with 1,019 additions and 259 deletions.
13 changes: 9 additions & 4 deletions bdb/phys_rep_lsn.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,13 @@ uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
}

int apply_log(DB_ENV *dbenv, unsigned int file, unsigned int offset, int64_t rectype, void *blob, int blob_len)
int apply_log(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len)
{
return dbenv->apply_log(dbenv, file, offset, rectype, blob, blob_len);
BDB_READLOCK("apply_log");
int rc = bdb_state->dbenv->apply_log(bdb_state->dbenv, file, offset, rectype, blob, blob_len);
BDB_RELLOCK();
return rc;
}

int truncate_log_lock(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, uint32_t flags)
Expand Down Expand Up @@ -506,7 +510,8 @@ LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db, LOG_INFO star
physrep_logmsg(LOGMSG_FATAL, "Require elect-highest-committed-gen on source- exiting\n");
exit(1);
}
physrep_logmsg(LOGMSG_ERROR, "Require elect-highest-committed-gen source\n");
physrep_logmsg(LOGMSG_ERROR, "Require elect-highest-committed-gen source {%d:%d}\n", info.file,
info.offset);
} else {
info.gen = *gen;
}
Expand Down Expand Up @@ -564,7 +569,7 @@ int physrep_bdb_wait_for_seqnum(bdb_state_type *bdb_state, DB_LSN *lsn, void *da
return 0;
}

seqnum_type seqnum;
seqnum_type seqnum = {0};
seqnum.lsn.file = lsn->file;
seqnum.lsn.offset = lsn->offset;
// seqnum.issue_time = ?
Expand Down
4 changes: 2 additions & 2 deletions bdb/phys_rep_lsn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ int physrep_list_ignored_tables(void);
LOG_INFO get_last_lsn(struct bdb_state_tag *);
LOG_INFO get_first_lsn(struct bdb_state_tag *);
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
int apply_log(struct __db_env *, unsigned int file, unsigned int offset,
int64_t rectype, void *blob, int blob_len);
int apply_log(struct bdb_state_tag *, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len);
int truncate_log_lock(struct bdb_state_tag *, unsigned int file,
unsigned int offset, uint32_t flags);
int find_log_timestamp(struct bdb_state_tag *, time_t time, unsigned int *file,
Expand Down
22 changes: 22 additions & 0 deletions berkdb/os/os_rw.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ static long long *__berkdb_num_read_ios = 0;
static long long *__berkdb_num_write_ios = 0;
static void (*read_callback) (int bytes) = 0;
static void (*write_callback) (int bytes) = 0;
static void (*failed_read_callback) (int bytes) = 0;
static void (*failed_write_callback) (int bytes) = 0;

int __slow_read_ns = 0;
int __slow_write_ns = 0;
Expand Down Expand Up @@ -540,6 +542,10 @@ __os_io(dbenv, op, fhp, pgno, pagesize, buf, niop)
if (*niop == (size_t) pagesize)
return (0);
logmsg(LOGMSG_DEBUG, "%s: failed %s io: expected %zd got %zd fd:%d name:%s\n", __func__, op == DB_IO_READ ? "read" : "write", pagesize, *niop, fhp->fd, fhp->name);
if (op == DB_IO_READ && failed_read_callback)
failed_read_callback(pagesize);
if (op == DB_IO_WRITE && failed_write_callback)
failed_write_callback(pagesize);
// try to do a seek + read/write
slow:
#endif
Expand Down Expand Up @@ -1014,6 +1020,10 @@ __os_iov(dbenv, op, fhp, pgno, pagesize, bufs, nobufs, niop)
if (*niop == (size_t)(pagesize * nobufs))
return (0);
logmsg(LOGMSG_DEBUG, "%s: failed %s io: expected %zd got %zd\n", __func__, op == DB_IO_READ ? "read" : "write", pagesize * nobufs, *niop);
if (op == DB_IO_READ && failed_read_callback)
failed_read_callback(pagesize);
if (op == DB_IO_WRITE && failed_write_callback)
failed_write_callback(pagesize);
// iov - we failed to write the pages as a unit, fall through and try them individually
slow:
#endif
Expand Down Expand Up @@ -1297,6 +1307,18 @@ __berkdb_register_read_callback(void (*callback) (int bytes))
read_callback = callback;
}

void
__berkdb_register_failed_read_callback(void (*callback) (int bytes))
{
failed_read_callback = callback;
}

void
__berkdb_register_failed_write_callback(void (*callback) (int bytes))
{
failed_write_callback = callback;
}

void
__berkdb_register_write_callback(void (*callback) (int bytes))
{
Expand Down
6 changes: 3 additions & 3 deletions berkdb/rep/rep_record.c
Original file line number Diff line number Diff line change
Expand Up @@ -8408,9 +8408,8 @@ __rep_verify_match(dbenv, rp, savetime, online)
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}

/* Masters must run full recovery */
int i_am_master = F_ISSET(rep, REP_F_MASTER);
if (gbl_rep_skip_recovery && !i_am_master && log_compare(&dbenv->prev_commit_lsn, &rp->lsn) <= 0) {
if (gbl_rep_skip_recovery && !i_am_master && dbenv->prev_commit_lsn.file > 0 && log_compare(&dbenv->prev_commit_lsn, &rp->lsn) <= 0) {
DB_TXNREGION *region;
region = ((DB_TXNMGR *)dbenv->tx_handle)->reginfo.primary;
dbenv->wrlock_recovery_lock(dbenv, __func__, __LINE__);
Expand Down Expand Up @@ -8467,7 +8466,7 @@ __rep_verify_match(dbenv, rp, savetime, online)

/* Recovery cleanup */
if (dbenv->rep_recovery_cleanup)
dbenv->rep_recovery_cleanup(dbenv, &trunclsn, i_am_master /* 0 */);
dbenv->rep_recovery_cleanup(dbenv, &trunclsn, i_am_master);

dbenv->unlock_recovery_lock(dbenv, __func__, __LINE__);

Expand All @@ -8485,6 +8484,7 @@ __rep_verify_match(dbenv, rp, savetime, online)
logmsg(LOGMSG_WARN, "skip-recovery cannot skip, prev-commit=[%d:%d] trunc-lsn=[%d:%d]\n",
dbenv->prev_commit_lsn.file, dbenv->prev_commit_lsn.offset, rp->lsn.file, rp->lsn.offset);
}
ZERO_LSN(dbenv->prev_commit_lsn);
if ((ret = __rep_dorecovery(dbenv, &rp->lsn, &trunclsn, online,
&undid_schema_change)) != 0) {
Pthread_mutex_unlock(&apply_lk);
Expand Down
Loading

0 comments on commit 8d8b1ce

Please sign in to comment.