Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reverse-conn / tranlog #4858

Open
wants to merge 1 commit into
base: 8.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions bdb/phys_rep_lsn.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,13 @@ uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
}

int apply_log(DB_ENV *dbenv, unsigned int file, unsigned int offset, int64_t rectype, void *blob, int blob_len)
int apply_log(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len)
{
return dbenv->apply_log(dbenv, file, offset, rectype, blob, blob_len);
BDB_READLOCK("apply_log");
int rc = bdb_state->dbenv->apply_log(bdb_state->dbenv, file, offset, rectype, blob, blob_len);
BDB_RELLOCK();
return rc;
}

int truncate_log_lock(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, uint32_t flags)
Expand Down Expand Up @@ -506,7 +510,8 @@ LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db, LOG_INFO star
physrep_logmsg(LOGMSG_FATAL, "Require elect-highest-committed-gen on source- exiting\n");
exit(1);
}
physrep_logmsg(LOGMSG_ERROR, "Require elect-highest-committed-gen source\n");
physrep_logmsg(LOGMSG_ERROR, "Require elect-highest-committed-gen source {%d:%d}\n", info.file,
info.offset);
} else {
info.gen = *gen;
}
Expand Down Expand Up @@ -564,7 +569,7 @@ int physrep_bdb_wait_for_seqnum(bdb_state_type *bdb_state, DB_LSN *lsn, void *da
return 0;
}

seqnum_type seqnum;
seqnum_type seqnum = {0};
seqnum.lsn.file = lsn->file;
seqnum.lsn.offset = lsn->offset;
// seqnum.issue_time = ?
Expand Down
4 changes: 2 additions & 2 deletions bdb/phys_rep_lsn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ int physrep_list_ignored_tables(void);
LOG_INFO get_last_lsn(struct bdb_state_tag *);
LOG_INFO get_first_lsn(struct bdb_state_tag *);
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
int apply_log(struct __db_env *, unsigned int file, unsigned int offset,
int64_t rectype, void *blob, int blob_len);
int apply_log(struct bdb_state_tag *, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len);
int truncate_log_lock(struct bdb_state_tag *, unsigned int file,
unsigned int offset, uint32_t flags);
int find_log_timestamp(struct bdb_state_tag *, time_t time, unsigned int *file,
Expand Down
22 changes: 22 additions & 0 deletions berkdb/os/os_rw.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ static long long *__berkdb_num_read_ios = 0;
static long long *__berkdb_num_write_ios = 0;
static void (*read_callback) (int bytes) = 0;
static void (*write_callback) (int bytes) = 0;
static void (*failed_read_callback) (int bytes) = 0;
static void (*failed_write_callback) (int bytes) = 0;

int __slow_read_ns = 0;
int __slow_write_ns = 0;
Expand Down Expand Up @@ -540,6 +542,10 @@ __os_io(dbenv, op, fhp, pgno, pagesize, buf, niop)
if (*niop == (size_t) pagesize)
return (0);
logmsg(LOGMSG_DEBUG, "%s: failed %s io: expected %zd got %zd fd:%d name:%s\n", __func__, op == DB_IO_READ ? "read" : "write", pagesize, *niop, fhp->fd, fhp->name);
if (op == DB_IO_READ && failed_read_callback)
failed_read_callback(pagesize);
if (op == DB_IO_WRITE && failed_write_callback)
failed_write_callback(pagesize);
// try to do a seek + read/write
slow:
#endif
Expand Down Expand Up @@ -1014,6 +1020,10 @@ __os_iov(dbenv, op, fhp, pgno, pagesize, bufs, nobufs, niop)
if (*niop == (size_t)(pagesize * nobufs))
return (0);
logmsg(LOGMSG_DEBUG, "%s: failed %s io: expected %zd got %zd\n", __func__, op == DB_IO_READ ? "read" : "write", pagesize * nobufs, *niop);
if (op == DB_IO_READ && failed_read_callback)
failed_read_callback(pagesize);
if (op == DB_IO_WRITE && failed_write_callback)
failed_write_callback(pagesize);
// iov - we failed to write the pages as a unit, fall through and try them individually
slow:
#endif
Expand Down Expand Up @@ -1297,6 +1307,18 @@ __berkdb_register_read_callback(void (*callback) (int bytes))
read_callback = callback;
}

void
__berkdb_register_failed_read_callback(void (*callback) (int bytes))
{
failed_read_callback = callback;
}

void
__berkdb_register_failed_write_callback(void (*callback) (int bytes))
{
failed_write_callback = callback;
}

void
__berkdb_register_write_callback(void (*callback) (int bytes))
{
Expand Down
6 changes: 3 additions & 3 deletions berkdb/rep/rep_record.c
Original file line number Diff line number Diff line change
Expand Up @@ -8408,9 +8408,8 @@ __rep_verify_match(dbenv, rp, savetime, online)
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}

/* Masters must run full recovery */
int i_am_master = F_ISSET(rep, REP_F_MASTER);
if (gbl_rep_skip_recovery && !i_am_master && log_compare(&dbenv->prev_commit_lsn, &rp->lsn) <= 0) {
if (gbl_rep_skip_recovery && !i_am_master && dbenv->prev_commit_lsn.file > 0 && log_compare(&dbenv->prev_commit_lsn, &rp->lsn) <= 0) {
DB_TXNREGION *region;
region = ((DB_TXNMGR *)dbenv->tx_handle)->reginfo.primary;
dbenv->wrlock_recovery_lock(dbenv, __func__, __LINE__);
Expand Down Expand Up @@ -8467,7 +8466,7 @@ __rep_verify_match(dbenv, rp, savetime, online)

/* Recovery cleanup */
if (dbenv->rep_recovery_cleanup)
dbenv->rep_recovery_cleanup(dbenv, &trunclsn, i_am_master /* 0 */);
dbenv->rep_recovery_cleanup(dbenv, &trunclsn, i_am_master);

dbenv->unlock_recovery_lock(dbenv, __func__, __LINE__);

Expand All @@ -8485,6 +8484,7 @@ __rep_verify_match(dbenv, rp, savetime, online)
logmsg(LOGMSG_WARN, "skip-recovery cannot skip, prev-commit=[%d:%d] trunc-lsn=[%d:%d]\n",
dbenv->prev_commit_lsn.file, dbenv->prev_commit_lsn.offset, rp->lsn.file, rp->lsn.offset);
}
ZERO_LSN(dbenv->prev_commit_lsn);
if ((ret = __rep_dorecovery(dbenv, &rp->lsn, &trunclsn, online,
&undid_schema_change)) != 0) {
Pthread_mutex_unlock(&apply_lk);
Expand Down
Loading