diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java index 52d856663b59..353a6b2476ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java @@ -308,63 +308,70 @@ private void tryMigrate(FileSystem fs) throws IOException { } LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir); WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery); - store.start(numThreads); - store.recoverLease(); - MutableLong maxProcIdSet = new MutableLong(-1); - MutableLong maxProcIdFromProcs = new MutableLong(-1); - store.load(new ProcedureLoader() { - - @Override - public void setMaxProcId(long maxProcId) { - maxProcIdSet.setValue(maxProcId); - } + try { + store.start(numThreads); + store.recoverLease(); + MutableLong maxProcIdSet = new MutableLong(-1); + MutableLong maxProcIdFromProcs = new MutableLong(-1); + store.load(new ProcedureLoader() { + + @Override + public void setMaxProcId(long maxProcId) { + maxProcIdSet.setValue(maxProcId); + } - @Override - public void load(ProcedureIterator procIter) throws IOException { - long procCount = 0; - while (procIter.hasNext()) { - Procedure proc = procIter.next(); - update(proc); - procCount++; - if (proc.getProcId() > maxProcIdFromProcs.longValue()) { - maxProcIdFromProcs.setValue(proc.getProcId()); + @Override + public void load(ProcedureIterator procIter) throws IOException { + long procCount = 0; + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + update(proc); + procCount++; + if (proc.getProcId() > maxProcIdFromProcs.longValue()) { + maxProcIdFromProcs.setValue(proc.getProcId()); + } } + LOG.info("Migrated {} procedures", procCount); } - LOG.info("Migrated {} procedures", procCount); - } - @Override - public void handleCorrupted(ProcedureIterator procIter) throws IOException { - long corruptedCount = 0; - while (procIter.hasNext()) { - LOG.error("Corrupted procedure {}", procIter.next()); - corruptedCount++; + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + long corruptedCount = 0; + while (procIter.hasNext()) { + LOG.error("Corrupted procedure {}", procIter.next()); + corruptedCount++; + } + if (corruptedCount > 0) { + throw new IOException("There are " + corruptedCount + " corrupted procedures when" + + " migrating from the old WAL based store to the new region based store, please" + + " fix them before upgrading again."); + } } - if (corruptedCount > 0) { - throw new IOException("There are " + corruptedCount + " corrupted procedures when" + - " migrating from the old WAL based store to the new region based store, please" + - " fix them before upgrading again."); + }); + LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}", + maxProcIdSet.longValue(), maxProcIdFromProcs.longValue()); + // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but + // anyway, let's do a check here. + if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) { + if (maxProcIdSet.longValue() > 0) { + // let's add a fake row to retain the max proc id + region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY, + PROC_QUALIFIER, EMPTY_BYTE_ARRAY)); } + } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) { + LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures"); } - }); - LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}", - maxProcIdSet.longValue(), maxProcIdFromProcs.longValue()); - // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but - // anyway, let's do a check here. - if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) { - if (maxProcIdSet.longValue() > 0) { - // let's add a fake row to retain the max proc id - region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY, - PROC_QUALIFIER, EMPTY_BYTE_ARRAY)); + if (!fs.delete(procWALDir, true)) { + throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " + + procWALDir); } - } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) { - LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures"); - } - if (!fs.delete(procWALDir, true)) { - throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " + - procWALDir); + LOG.info("Migration of WALProcedureStore finished"); + } catch (IOException ioe) { + store.stop(true); + throw ioe; + } finally { + store.stop(false); } - LOG.info("Migration of WALProcedureStore finished"); } @Override