Skip to content

Commit

Permalink
HBASE-23696 Stop WALProcedureStore after migration finishes
Browse files Browse the repository at this point in the history
  • Loading branch information
saintstack committed Jan 15, 2020
1 parent ae95b1f commit 7b28524
Showing 1 changed file with 55 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,63 +308,70 @@ private void tryMigrate(FileSystem fs) throws IOException {
}
LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
store.start(numThreads);
store.recoverLease();
MutableLong maxProcIdSet = new MutableLong(-1);
MutableLong maxProcIdFromProcs = new MutableLong(-1);
store.load(new ProcedureLoader() {

@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}
try {
store.start(numThreads);
store.recoverLease();
MutableLong maxProcIdSet = new MutableLong(-1);
MutableLong maxProcIdFromProcs = new MutableLong(-1);
store.load(new ProcedureLoader() {

@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}

@Override
public void load(ProcedureIterator procIter) throws IOException {
long procCount = 0;
while (procIter.hasNext()) {
Procedure<?> proc = procIter.next();
update(proc);
procCount++;
if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
maxProcIdFromProcs.setValue(proc.getProcId());
@Override
public void load(ProcedureIterator procIter) throws IOException {
long procCount = 0;
while (procIter.hasNext()) {
Procedure<?> proc = procIter.next();
update(proc);
procCount++;
if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
maxProcIdFromProcs.setValue(proc.getProcId());
}
}
LOG.info("Migrated {} procedures", procCount);
}
LOG.info("Migrated {} procedures", procCount);
}

@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
long corruptedCount = 0;
while (procIter.hasNext()) {
LOG.error("Corrupted procedure {}", procIter.next());
corruptedCount++;
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
long corruptedCount = 0;
while (procIter.hasNext()) {
LOG.error("Corrupted procedure {}", procIter.next());
corruptedCount++;
}
if (corruptedCount > 0) {
throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
" migrating from the old WAL based store to the new region based store, please" +
" fix them before upgrading again.");
}
}
if (corruptedCount > 0) {
throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
" migrating from the old WAL based store to the new region based store, please" +
" fix them before upgrading again.");
});
LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
// anyway, let's do a check here.
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
});
LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
// anyway, let's do a check here.
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
LOG.info("Migration of WALProcedureStore finished");
} catch (IOException ioe) {
store.stop(true);
throw ioe;
} finally {
store.stop(false);
}
LOG.info("Migration of WALProcedureStore finished");
}

@Override
Expand Down

0 comments on commit 7b28524

Please sign in to comment.