diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index eb32998d8fb9..36422b6e9f4a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -20,7 +20,12 @@ import java.io.IOException; import java.net.URI; import java.util.Arrays; +import java.util.List; import java.util.UUID; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -32,7 +37,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -49,6 +53,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; @@ -58,12 +63,12 @@ import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -87,6 +92,11 @@ public class VerifyReplication extends Configured implements Tool { public final static String NAME = "verifyrep"; private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; + private static ThreadPoolExecutor reCompareExecutor = null; + int reCompareTries = 0; + int reCompareBackoffExponent = 0; + int reCompareThreads = 0; + int sleepMsBeforeReCompare = 0; long startTime = 0; long endTime = Long.MAX_VALUE; int batch = -1; @@ -97,7 +107,6 @@ public class VerifyReplication extends Configured implements Tool { String peerId = null; String peerQuorumAddress = null; String rowPrefixes = null; - int sleepMsBeforeReCompare = 0; boolean verbose = false; boolean includeDeletedCells = false; // Source table snapshot name @@ -127,7 +136,12 @@ public enum Counters { BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, - CONTENT_DIFFERENT_ROWS + CONTENT_DIFFERENT_ROWS, + RECOMPARES, + MAIN_THREAD_RECOMPARES, + SOURCE_ROW_CHANGED, + PEER_ROW_CHANGED, + FAILED_RECOMPARE } private Connection sourceConnection; @@ -136,6 +150,9 @@ public enum Counters { private Table replicatedTable; private ResultScanner replicatedScanner; private Result currentCompareRowInPeerTable; + private Scan tableScan; + private int reCompareTries; + private int reCompareBackoffExponent; private int sleepMsBeforeReCompare; private String delimiter = ""; private boolean verbose = false; @@ -153,7 +170,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); + reCompareTries = conf.getInt(NAME + ".recompareTries", 0); + reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1); sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); + if (sleepMsBeforeReCompare > 0) { + reCompareTries = Math.max(reCompareTries, 1); + } delimiter = conf.get(NAME + ".delimiter", ""); verbose = conf.getBoolean(NAME + ".verbose", false); batch = conf.getInt(NAME + ".batch", -1); @@ -182,9 +204,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) if (versions >= 0) { scan.readVersions(versions); } + int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0); + reCompareExecutor = buildReCompareExecutor(reCompareThreads, context); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); sourceConnection = ConnectionFactory.createConnection(conf); sourceTable = sourceConnection.getTable(tableName); + tableScan = scan; final InputSplit tableSplit = context.getInputSplit(); @@ -234,7 +259,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); @@ -248,55 +273,77 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); } } catch (Exception e) { - logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value, + currentCompareRowInPeerTable); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); break; } else { // row only exists in peer table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } } - private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { - if (sleepMsBeforeReCompare > 0) { - Threads.sleep(sleepMsBeforeReCompare); - try { - Result sourceResult = sourceTable.get(new Get(row.getRow())); - Result replicatedResult = replicatedTable.get(new Get(row.getRow())); - Result.compareResults(sourceResult, replicatedResult, false); - if (!sourceResult.isEmpty()) { - context.getCounter(Counters.GOODROWS).increment(1); - if (verbose) { - LOG.info("Good row key (with recompare): " + delimiter - + Bytes.toStringBinary(row.getRow()) + delimiter); - } - } - return; - } catch (Exception e) { - LOG.error("recompare fail after sleep, rowkey=" + delimiter - + Bytes.toStringBinary(row.getRow()) + delimiter); - } + @SuppressWarnings("FutureReturnValueIgnored") + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row, + Result replicatedRow) { + byte[] rowKey = getRow(row, replicatedRow); + if (reCompareTries == 0) { + context.getCounter(counter).increment(1); + context.getCounter(Counters.BADROWS).increment(1); + LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter); + return; + } + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, + reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); + + if (reCompareExecutor == null) { + runnable.run(); + return; } - context.getCounter(counter).increment(1); - context.getCounter(Counters.BADROWS).increment(1); - LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) - + delimiter); + + reCompareExecutor.submit(runnable); } @Override protected void cleanup(Context context) { + if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) { + reCompareExecutor.shutdown(); + try { + boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); + if (!terminated) { + List queue = reCompareExecutor.shutdownNow(); + for (Runnable runnable : queue) { + ((VerifyReplicationRecompareRunnable) runnable).fail(); + } + + terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); + + if (!terminated) { + int activeCount = Math.max(1, reCompareExecutor.getActiveCount()); + LOG.warn("Found {} possible recompares still running in the executable" + + " incrementing BADROWS and FAILED_RECOMPARE", activeCount); + context.getCounter(Counters.BADROWS).increment(activeCount); + context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount); + } + } + } catch (InterruptedException e) { + throw new RuntimeException("Failed to await executor termination in cleanup", e); + } + } if (replicatedScanner != null) { try { while (currentCompareRowInPeerTable != null) { - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } @@ -444,6 +491,10 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); + conf.setInt(NAME + ".recompareTries", reCompareTries); + conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent); + conf.setInt(NAME + ".recompareThreads", reCompareThreads); + // Set Snapshot specific parameters if (peerSnapshotName != null) { conf.set(NAME + ".peerSnapshotName", peerSnapshotName); @@ -519,6 +570,15 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce return job; } + protected static byte[] getRow(Result sourceResult, Result replicatedResult) { + if (sourceResult != null) { + return sourceResult.getRow(); + } else if (replicatedResult != null) { + return replicatedResult.getRow(); + } + throw new RuntimeException("Both sourceResult and replicatedResult are null!"); + } + private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { if (rowPrefixes != null && !rowPrefixes.isEmpty()) { String[] rowPrefixArray = rowPrefixes.split(","); @@ -603,11 +663,20 @@ public boolean doCommandLine(final String[] args) { continue; } - final String sleepToReCompareKey = "--recomparesleep="; + final String deprecatedSleepToReCompareKey = "--recomparesleep="; + final String sleepToReCompareKey = "--recompareSleep="; + if (cmd.startsWith(deprecatedSleepToReCompareKey)) { + LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0." + + " Use --recompareSleep instead."); + sleepMsBeforeReCompare = + Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length())); + continue; + } if (cmd.startsWith(sleepToReCompareKey)) { sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); continue; } + final String verboseKey = "--verbose"; if (cmd.startsWith(verboseKey)) { verbose = true; @@ -656,6 +725,25 @@ public boolean doCommandLine(final String[] args) { continue; } + final String reCompareThreadArgs = "--recompareThreads="; + if (cmd.startsWith(reCompareThreadArgs)) { + reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length())); + continue; + } + + final String reCompareTriesKey = "--recompareTries="; + if (cmd.startsWith(reCompareTriesKey)) { + reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length())); + continue; + } + + final String reCompareBackoffExponentKey = "--recompareBackoffExponent="; + if (cmd.startsWith(reCompareBackoffExponentKey)) { + reCompareBackoffExponent = + Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length())); + continue; + } + if (cmd.startsWith("--")) { printUsage("Invalid argument '" + cmd + "'"); return false; @@ -735,7 +823,8 @@ private static void printUsage(final String errorMsg) { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: verifyrep [--starttime=X]" - + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] " + + "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]" + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] "); @@ -751,8 +840,14 @@ private static void printUsage(final String errorMsg) { System.err.println(" families comma-separated list of families to copy"); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(" delimiter the delimiter used in display around rowkey"); - System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + System.err.println(" recompareSleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); + System.err.println(" recompareThreads number of threads to run recompares in"); + System.err.println(" recompareTries number of recompare attempts before incrementing " + + "the BADROWS counter. Defaults to 1 recompare"); + System.out.println(" recompareBackoffExponent exponential multiplier to increase " + + "recompareSleep after each recompare attempt, " + + "default value is 0 which results in a constant sleep time"); System.err.println(" verbose logs row keys of good rows"); System.err.println(" peerTableName Peer Table Name"); System.err.println(" sourceSnapshotName Source Snapshot Name"); @@ -819,6 +914,27 @@ private static void printUsage(final String errorMsg) { + "2181:/cluster-b \\\n" + " TestTable"); } + private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) { + if (maxThreads == 0) { + return null; + } + + return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), + buildRejectedReComparePolicy(context)); + } + + private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) { + return new CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.debug("Re-comparison execution rejected. Running in main thread."); + context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }; + } + @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java new file mode 100644 index 000000000000..47f5e606b846 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class VerifyReplicationRecompareRunnable implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class); + + private final Mapper.Context context; + private final VerifyReplication.Verifier.Counters originalCounter; + private final String delimiter; + private final byte[] row; + private final Scan tableScan; + private final Table sourceTable; + private final Table replicatedTable; + + private final int reCompareTries; + private final int sleepMsBeforeReCompare; + private final int reCompareBackoffExponent; + private final boolean verbose; + + private Result sourceResult; + private Result replicatedResult; + + public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult, + Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter, + Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries, + int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) { + this.context = context; + this.sourceResult = sourceResult; + this.replicatedResult = replicatedResult; + this.originalCounter = originalCounter; + this.delimiter = delimiter; + this.tableScan = tableScan; + this.sourceTable = sourceTable; + this.replicatedTable = replicatedTable; + this.reCompareTries = reCompareTries; + this.sleepMsBeforeReCompare = sleepMsBeforeReCompare; + this.reCompareBackoffExponent = reCompareBackoffExponent; + this.verbose = verbose; + this.row = VerifyReplication.getRow(sourceResult, replicatedResult); + } + + @Override + public void run() { + Get get = new Get(row); + get.setCacheBlocks(tableScan.getCacheBlocks()); + get.setFilter(tableScan.getFilter()); + + int sleepMs = sleepMsBeforeReCompare; + int tries = 0; + + while (++tries <= reCompareTries) { + context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1); + + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + LOG.warn("Sleeping interrupted, incrementing bad rows and aborting"); + incrementOriginalAndBadCounter(); + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); + Thread.currentThread().interrupt(); + return; + } + + try { + if (fetchLatestRows(get) && matches(sourceResult, replicatedResult, null)) { + if (verbose) { + LOG.info("Good row key (with recompare): {}{}{}", delimiter, Bytes.toStringBinary(row), + delimiter); + } + context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1); + return; + } else { + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); + } + } catch (IOException e) { + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); + if (verbose) { + LOG.info("Got an exception during recompare for rowkey={}", Bytes.toStringBinary(row), e); + } + } + + sleepMs = sleepMs * (2 ^ reCompareBackoffExponent); + } + + LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, Bytes.toStringBinary(row), + delimiter); + incrementOriginalAndBadCounter(); + } + + public void fail() { + if (LOG.isDebugEnabled()) { + LOG.debug("Called fail on row={}", Bytes.toStringBinary(row)); + } + incrementOriginalAndBadCounter(); + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); + } + + private boolean fetchLatestRows(Get get) throws IOException { + Result sourceResult = sourceTable.get(get); + Result replicatedResult = replicatedTable.get(get); + + boolean sourceMatches = matches(sourceResult, this.sourceResult, + VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED); + boolean replicatedMatches = matches(replicatedResult, this.replicatedResult, + VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED); + + this.sourceResult = sourceResult; + this.replicatedResult = replicatedResult; + return sourceMatches && replicatedMatches; + } + + private boolean matches(Result original, Result updated, + VerifyReplication.Verifier.Counters failCounter) { + try { + Result.compareResults(original, updated); + return true; + } catch (Exception e) { + if (failCounter != null) { + context.getCounter(failCounter).increment(1); + if (LOG.isDebugEnabled()) { + LOG.debug("{} for rowkey={}", failCounter, Bytes.toStringBinary(row)); + } + } + return false; + } + } + + private void incrementOriginalAndBadCounter() { + context.getCounter(originalCounter).increment(1); + context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java new file mode 100644 index 000000000000..49c52fbcc3b3 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.counters.GenericCounter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@Category({ ReplicationTests.class, SmallTests.class }) +@RunWith(MockitoJUnitRunner.class) +public class TestVerifyReplicationRecompareRunnable { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class); + + @Mock + private Table sourceTable; + + @Mock + private Table replicatedTable; + + @Mock + private Mapper.Context context; + + static Result genResult(int cols) { + KeyValue[] kvs = new KeyValue[cols]; + + for (int i = 0; i < cols; ++i) { + kvs[i] = + new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes()); + } + + return Result.create(kvs); + } + + static byte[] genBytes() { + return Bytes.toBytes(ThreadLocalRandom.current().nextInt()); + } + + @Before + public void setUp() { + for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters + .values()) { + Counter emptyCounter = new GenericCounter(counter.name(), counter.name()); + when(context.getCounter(counter)).thenReturn(emptyCounter); + } + } + + @Test + public void itRecomparesGoodRow() throws IOException { + Result result = genResult(2); + + when(sourceTable.get(any(Get.class))).thenReturn(result); + when(replicatedTable.get(any(Get.class))).thenReturn(result); + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", + new Scan(), sourceTable, replicatedTable, 3, 1, 0, true); + + runnable.run(); + + assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + assertEquals(0, + context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); + assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); + } + + @Test + public void itRecomparesBadRow() throws IOException { + Result replicatedResult = genResult(1); + when(sourceTable.get(any(Get.class))).thenReturn(genResult(5)); + when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult); + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, + "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); + + runnable.run(); + + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); + assertEquals(0, + context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); + } + + @Test + public void itHandlesExceptionOnRecompare() throws IOException { + when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!")); + when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5)); + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", + new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); + + runnable.run(); + + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java index 26649226d9c8..e263076677a5 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.junit.AfterClass; import org.junit.Before; @@ -99,7 +100,7 @@ public static void setUpBeforeClass() throws Exception { htable3 = connection2.getTable(peerTableName); } - static void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) + static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) throws IOException, InterruptedException, ClassNotFoundException { Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); if (job == null) { @@ -112,6 +113,7 @@ static void runVerifyReplication(String[] args, int expectedGoodRows, int expect job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); assertEquals(expectedBadRows, job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + return job.getCounters(); } /** @@ -438,6 +440,127 @@ public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Excepti checkRestoreTmpDir(CONF2, tmpPath2, 2); } + @Test + public void testVerifyReplicationThreadedRecompares() throws Exception { + // Populate the tables with same data + runBatchCopyTest(); + + // ONLY_IN_PEER_TABLE_ROWS + Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); + put.addColumn(noRepfamName, row, row); + htable3.put(put); + + // CONTENT_DIFFERENT_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); + put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); + htable3.put(put); + + // ONLY_IN_SOURCE_TABLE_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); + put.addColumn(noRepfamName, row, row); + htable1.put(put); + + String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", + "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), + getClusterKey(UTIL2), tableName.getNameAsString() }; + Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); + assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), + 9); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), + 1); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), + 1); + assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) + .getValue(), 1); + } + + @Test + public void testFailsRemainingComparesAfterShutdown() throws Exception { + // Populate the tables with same data + runBatchCopyTest(); + + // ONLY_IN_PEER_TABLE_ROWS + Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); + put.addColumn(noRepfamName, row, row); + htable3.put(put); + + // CONTENT_DIFFERENT_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); + put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); + htable3.put(put); + + // ONLY_IN_SOURCE_TABLE_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); + put.addColumn(noRepfamName, row, row); + htable1.put(put); + + /** + * recompareSleep is set to exceed how long we wait on + * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to + * test the counter-incrementing logic if the executor still hasn't terminated after the call to + * shutdown and awaitTermination + */ + String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", + "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), + getClusterKey(UTIL2), tableName.getNameAsString() }; + + Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 3); + assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), + 3); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), + 1); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), + 1); + assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) + .getValue(), 1); + } + + @Test + public void testVerifyReplicationSynchronousRecompares() throws Exception { + // Populate the tables with same data + runBatchCopyTest(); + + // ONLY_IN_PEER_TABLE_ROWS + Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); + put.addColumn(noRepfamName, row, row); + htable3.put(put); + + // CONTENT_DIFFERENT_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); + put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); + htable3.put(put); + + // ONLY_IN_SOURCE_TABLE_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); + put.addColumn(noRepfamName, row, row); + htable1.put(put); + + String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", + "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2), + tableName.getNameAsString() }; + Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); + assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), + 9); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), + 1); + assertEquals( + counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), + 1); + assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) + .getValue(), 1); + } + @AfterClass public static void tearDownAfterClass() throws Exception { htable3.close();