diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestFileBasedSFTBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestFileBasedSFTBulkLoad.java new file mode 100644 index 000000000000..93e51fff8aab --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestFileBasedSFTBulkLoad.java @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Bulk Load and MR on a distributed cluster. + * With FileBased StorefileTracker enabled. + * It starts an MR job that creates linked chains + * + * The format of rows is like this: + * Row Key -> Long + * + * L:<< Chain Id >> -> Row Key of the next link in the chain + * S:<< Chain Id >> -> The step in the chain that his link is. + * D:<< Chain Id >> -> Random Data. + * + * All chains start on row 0. + * All rk's are > 0. + * + * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job. + * + * There are a few options exposed: + * + * hbase.IntegrationTestBulkLoad.chainLength + * The number of rows that will be part of each and every chain. + * + * hbase.IntegrationTestBulkLoad.numMaps + * The number of mappers that will be run. Each mapper creates on linked list chain. + * + * hbase.IntegrationTestBulkLoad.numImportRounds + * How many jobs will be run to create linked lists. + * + * hbase.IntegrationTestBulkLoad.tableName + * The name of the table. + * + * hbase.IntegrationTestBulkLoad.replicaCount + * How many region replicas to configure for the table under test. + */ +@Category(IntegrationTests.class) +public class IntegrationTestFileBasedSFTBulkLoad extends IntegrationTestBulkLoad { + + private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestFileBasedSFTBulkLoad.class); + + private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; + private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; + private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; + private static int NUM_REPLICA_COUNT_DEFAULT = 1; + + @Test + public void testFileBasedSFTBulkLoad() throws Exception { + super.testBulkLoad(); + } + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(getConf()); + util.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + util.initializeCluster(1); + int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { + LOG.debug("Region Replicas enabled: " + replicaCount); + } + + // Scale this up on a real cluster + if (util.isDistributedCluster()) { + util.getConfiguration().setIfUnset(NUM_MAPS_KEY, + Integer.toString(util.getAdmin().getRegionServers().size() * 10)); + util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); + } else { + util.startMiniMapReduceCluster(); + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int status = ToolRunner.run(conf, new IntegrationTestFileBasedSFTBulkLoad(), args); + System.exit(status); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6745e5a7ad85..a17fbc84be60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6847,7 +6847,7 @@ public interface BulkLoadListener { * @return final path to be used for actual loading * @throws IOException */ - String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile) + String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String customStaging) throws IOException; /** @@ -6969,12 +6969,21 @@ public Map> bulkLoadHFiles(Collection> f familyWithFinalPath.put(familyName, new ArrayList<>()); } List> lst = familyWithFinalPath.get(familyName); + String finalPath = path; try { - String finalPath = path; + boolean reqTmp = store.storeEngine.requireWritingToTmpDirFirst(); if (bulkLoadListener != null) { - finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile); + finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile, + reqTmp ? null : regionDir.toString()); + } + Pair pair = null; + if (reqTmp) { + pair = store.preBulkLoadHFile(finalPath, seqId); + } + else { + Path livePath = new Path(finalPath); + pair = new Pair<>(livePath, livePath); } - Pair pair = store.preBulkLoadHFile(finalPath, seqId); lst.add(pair); } catch (IOException ioe) { // A failure here can cause an atomicity violation that we currently @@ -6984,7 +6993,7 @@ public Map> bulkLoadHFiles(Collection> f " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe); if (bulkLoadListener != null) { try { - bulkLoadListener.failedBulkLoad(familyName, path); + bulkLoadListener.failedBulkLoad(familyName, finalPath); } catch (Exception ex) { LOG.error("Error while calling failedBulkLoad for family " + Bytes.toString(familyName) + " with path " + path, ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 8920471a86ee..3ed7b60b82b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -508,6 +508,10 @@ private Path preCommitStoreFile(final String familyName, final Path buildPath, * @throws IOException */ Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException { + // rename is not necessary in case of direct-insert stores + if(buildPath.equals(dstPath)){ + return dstPath; + } // buildPath exists, therefore not doing an exists() check. if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index f74e2f89be3e..dbc5f72e37b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -342,7 +343,8 @@ private User getActiveUser() throws IOException { return user; } - private static class SecureBulkLoadListener implements BulkLoadListener { + //package-private for test purpose only + static class SecureBulkLoadListener implements BulkLoadListener { // Target filesystem private final FileSystem fs; private final String stagingDir; @@ -350,19 +352,28 @@ private static class SecureBulkLoadListener implements BulkLoadListener { // Source filesystem private FileSystem srcFs = null; private Map origPermissions = null; + private Map origSources = null; public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) { this.fs = fs; this.stagingDir = stagingDir; this.conf = conf; this.origPermissions = new HashMap<>(); + this.origSources = new HashMap<>(); } @Override - public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile) - throws IOException { + public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile, + String customStaging ) throws IOException { Path p = new Path(srcPath); - Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + + //store customStaging for failedBulkLoad + String currentStaging = stagingDir; + if(StringUtils.isNotEmpty(customStaging)){ + currentStaging = customStaging; + } + + Path stageP = new Path(currentStaging, new Path(Bytes.toString(family), p.getName())); // In case of Replication for bulk load files, hfiles are already copied in staging directory if (p.equals(stageP)) { @@ -391,11 +402,16 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean LOG.debug("Moving " + p + " to " + stageP); FileStatus origFileStatus = fs.getFileStatus(p); origPermissions.put(srcPath, origFileStatus.getPermission()); + origSources.put(stageP.toString(), srcPath); if(!fs.rename(p, stageP)) { throw new IOException("Failed to move HFile: " + p + " to " + stageP); } } - fs.setPermission(stageP, PERM_ALL_ACCESS); + + if(StringUtils.isNotEmpty(customStaging)) { + fs.setPermission(stageP, PERM_ALL_ACCESS); + } + return stageP.toString(); } @@ -413,35 +429,37 @@ private void closeSrcFs() throws IOException { } @Override - public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { + public void failedBulkLoad(final byte[] family, final String stagedPath) throws IOException { try { - Path p = new Path(srcPath); - if (srcFs == null) { - srcFs = FileSystem.newInstance(p.toUri(), conf); - } - if (!FSUtils.isSameHdfs(conf, srcFs, fs)) { - // files are copied so no need to move them back + String src = origSources.get(stagedPath); + if(StringUtils.isEmpty(src)){ + LOG.debug(stagedPath + " was not moved to staging. No need to move back"); return; } - Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); - // In case of Replication for bulk load files, hfiles are not renamed by end point during - // prepare stage, so no need of rename here again - if (p.equals(stageP)) { - LOG.debug(p.getName() + " is already available in source directory. Skipping rename."); + Path stageP = new Path(stagedPath); + if (!fs.exists(stageP)) { + throw new IOException( + "Missing HFile: " + stageP + ", can't be moved back to it's original place"); + } + + //we should not move back files if the original exists + Path srcPath = new Path(src); + if(srcFs.exists(srcPath)) { + LOG.debug(src + " is already at it's original place. No need to move."); return; } - LOG.debug("Moving " + stageP + " back to " + p); - if (!fs.rename(stageP, p)) { - throw new IOException("Failed to move HFile: " + stageP + " to " + p); + LOG.debug("Moving " + stageP + " back to " + srcPath); + if (!fs.rename(stageP, srcPath)) { + throw new IOException("Failed to move HFile: " + stageP + " to " + srcPath); } // restore original permission - if (origPermissions.containsKey(srcPath)) { - fs.setPermission(p, origPermissions.get(srcPath)); + if (origPermissions.containsKey(stagedPath)) { + fs.setPermission(srcPath, origPermissions.get(src)); } else { - LOG.warn("Can't find previous permission for path=" + srcPath); + LOG.warn("Can't find previous permission for path=" + stagedPath); } } finally { closeSrcFs(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 3a934b749358..e7f168f1e02e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -54,6 +54,10 @@ public class TestBulkLoad extends TestBulkloadBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBulkLoad.class); + public TestBulkLoad(boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + @Test public void verifyBulkLoadEvent() throws IOException { TableName tableName = TableName.valueOf("test", "test"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java index 0db15e750d61..86e41ede8294 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java @@ -27,7 +27,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; @@ -56,10 +59,12 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class TestBulkloadBase { @ClassRule public static TemporaryFolder testFolder = new TemporaryFolder(); @@ -71,12 +76,31 @@ public class TestBulkloadBase { protected final byte[] family2 = Bytes.toBytes("family2"); protected final byte[] family3 = Bytes.toBytes("family3"); + protected Boolean useFileBasedSFT; + @Rule public TestName name = new TestName(); + public TestBulkloadBase(boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = {false, true}; + return Arrays.asList(data); + } + @Before public void before() throws IOException { Bytes.random(randomBytes); + if(useFileBasedSFT) { + conf.set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } + else { + conf.unset(StoreFileTrackerFactory.TRACKER_IMPL); + } } protected Pair withMissingHFileForFamily(byte[] family) { @@ -111,7 +135,7 @@ protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableNam } protected HRegion testRegionWithFamilies(byte[]... families) throws IOException { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = TableName.valueOf(name.getMethodName().substring(0, name.getMethodName().indexOf("["))); return testRegionWithFamiliesAndSpecifiedTableName(tableName, families); } @@ -130,7 +154,7 @@ protected List> withFamilyPathsFor(byte[]... families) thro private String createHFileForFamilies(byte[] family) throws IOException { HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); // TODO We need a way to do this without creating files - File hFileLocation = testFolder.newFile(); + File hFileLocation = testFolder.newFile(generateUniqueName(null)); FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); try { hFileFactory.withOutputStream(out); @@ -149,6 +173,12 @@ private String createHFileForFamilies(byte[] family) throws IOException { return hFileLocation.getAbsoluteFile().getAbsolutePath(); } + private static String generateUniqueName(final String suffix) { + String name = UUID.randomUUID().toString().replaceAll("-", ""); + if (suffix != null) name += suffix; + return name; + } + protected static Matcher bulkLogWalEditType(byte[] typeBytes) { return new WalMatcher(typeBytes); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java index b17995a591e4..70b81a3bc7df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java @@ -61,6 +61,10 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { private final RegionServerServices regionServerServices = mock(RegionServerServices.class); public static AtomicInteger called = new AtomicInteger(0); + public TestCompactionAfterBulkLoad(boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + @Override protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, byte[]... families) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java index b8ca951c355f..5ba95c3fb3d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java @@ -19,12 +19,13 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; import java.util.Deque; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -46,23 +47,26 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; - +@RunWith(Parameterized.class) @Category({RegionServerTests.class, MediumTests.class}) public class TestSecureBulkLoadManager { @@ -87,15 +91,34 @@ public class TestSecureBulkLoadManager { private Thread laterBulkload; protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility(); + protected Boolean useFileBasedSFT; + private static Configuration conf = testUtil.getConfiguration(); - @BeforeClass - public static void setUp() throws Exception { + public TestSecureBulkLoadManager(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = {false, true}; + return Arrays.asList(data); + } + + @Before + public void setUp() throws Exception { + if (useFileBasedSFT) { + conf.set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } + else{ + conf.unset(StoreFileTrackerFactory.TRACKER_IMPL); + } testUtil.startMiniCluster(); } - @AfterClass - public static void tearDown() throws Exception { + @After + public void tearDown() throws Exception { testUtil.shutdownMiniCluster(); testUtil.cleanupTestDir(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkloadListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkloadListener.java new file mode 100644 index 000000000000..bf3732a821da --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkloadListener.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +/** + * Tests for failedBulkLoad logic to make sure staged files are returned to their original location + * if the bulkload have failed. + */ +@Category({MiscTests.class, LargeTests.class}) +public class TestSecureBulkloadListener { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSecureBulkloadListener.class); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + private Configuration conf; + private MiniDFSCluster cluster; + private HBaseTestingUtility htu; + private DistributedFileSystem dfs; + private final Random random = new Random(); + private final byte[] randomBytes = new byte[100]; + private static final String host1 = "host1"; + private static final String host2 = "host2"; + private static final String host3 = "host3"; + private static byte[] FAMILY = Bytes.toBytes("family"); + private static final String STAGING_DIR = "staging"; + private static final String CUSTOM_STAGING_DIR = "customStaging"; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + random.nextBytes(randomBytes); + htu = new HBaseTestingUtility(); + htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks + htu.getConfiguration().setInt("dfs.replication", 3); + htu.startMiniDFSCluster(3, + new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3}); + + conf = htu.getConfiguration(); + cluster = htu.getDFSCluster(); + dfs = (DistributedFileSystem) FileSystem.get(conf); + } + + @After + public void tearDownAfterClass() throws Exception { + htu.shutdownMiniCluster(); + } + + @Test + public void testMovingStagedFile() throws Exception { + Path stagingDirPath = + new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); + if (!dfs.exists(stagingDirPath)) { + dfs.mkdirs(stagingDirPath); + } + SecureBulkLoadManager.SecureBulkLoadListener listener = + new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); + + //creating file to load + String srcFile = createHFileForFamilies(FAMILY); + Path srcPath = new Path(srcFile); + Assert.assertTrue(dfs.exists(srcPath)); + + Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); + if (!dfs.exists(stagedFamily)) { + dfs.mkdirs(stagedFamily); + } + + //moving file to staging + String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null); + Path stagedPath = new Path(stagedFile); + Assert.assertTrue(dfs.exists(stagedPath)); + Assert.assertFalse(dfs.exists(srcPath)); + + //moving files back to original location after a failed bulkload + listener.failedBulkLoad(FAMILY, stagedFile); + Assert.assertFalse(dfs.exists(stagedPath)); + Assert.assertTrue(dfs.exists(srcPath)); + } + + @Test + public void testMovingStagedFileWithCustomStageDir() throws Exception { + Path stagingDirPath = + new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); + if (!dfs.exists(stagingDirPath)) { + dfs.mkdirs(stagingDirPath); + } + SecureBulkLoadManager.SecureBulkLoadListener listener = + new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); + + //creating file to load + String srcFile = createHFileForFamilies(FAMILY); + Path srcPath = new Path(srcFile); + Assert.assertTrue(dfs.exists(srcPath)); + + Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); + if (!dfs.exists(stagedFamily)) { + dfs.mkdirs(stagedFamily); + } + + Path customStagingDirPath = + new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), CUSTOM_STAGING_DIR)); + Path customStagedFamily = new Path(customStagingDirPath, new Path(Bytes.toString(FAMILY))); + if (!dfs.exists(customStagedFamily)) { + dfs.mkdirs(customStagedFamily); + } + + //moving file to staging using a custom staging dir + String stagedFile = + listener.prepareBulkLoad(FAMILY, srcFile, false, customStagingDirPath.toString()); + Path stagedPath = new Path(stagedFile); + Assert.assertTrue(dfs.exists(stagedPath)); + Assert.assertFalse(dfs.exists(srcPath)); + + //moving files back to original location after a failed bulkload + listener.failedBulkLoad(FAMILY, stagedFile); + Assert.assertFalse(dfs.exists(stagedPath)); + Assert.assertTrue(dfs.exists(srcPath)); + } + + @Test + public void testCopiedStagedFile() throws Exception { + Path stagingDirPath = + new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); + if (!dfs.exists(stagingDirPath)) { + dfs.mkdirs(stagingDirPath); + } + SecureBulkLoadManager.SecureBulkLoadListener listener = + new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); + + //creating file to load + String srcFile = createHFileForFamilies(FAMILY); + Path srcPath = new Path(srcFile); + Assert.assertTrue(dfs.exists(srcPath)); + + Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); + if (!dfs.exists(stagedFamily)) { + dfs.mkdirs(stagedFamily); + } + + //copying file to staging + String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, true, null); + Path stagedPath = new Path(stagedFile); + Assert.assertTrue(dfs.exists(stagedPath)); + Assert.assertTrue(dfs.exists(srcPath)); + + //should do nothing because the original file was copied to staging + listener.failedBulkLoad(FAMILY, stagedFile); + Assert.assertTrue(dfs.exists(stagedPath)); + Assert.assertTrue(dfs.exists(srcPath)); + } + + @Test(expected = IOException.class) + public void testDeletedStagedFile() throws Exception { + Path stagingDirPath = + new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); + if (!dfs.exists(stagingDirPath)) { + dfs.mkdirs(stagingDirPath); + } + SecureBulkLoadManager.SecureBulkLoadListener listener = + new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); + + //creating file to load + String srcFile = createHFileForFamilies(FAMILY); + Path srcPath = new Path(srcFile); + Assert.assertTrue(dfs.exists(srcPath)); + + Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); + if (!dfs.exists(stagedFamily)) { + dfs.mkdirs(stagedFamily); + } + + //moving file to staging + String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null); + Path stagedPath = new Path(stagedFile); + Assert.assertTrue(dfs.exists(stagedPath)); + Assert.assertFalse(dfs.exists(srcPath)); + + dfs.delete(stagedPath, false); + + //moving files back to original location after a failed bulkload + listener.failedBulkLoad(FAMILY, stagedFile); + } + + private String createHFileForFamilies(byte[] family) throws IOException { + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); + Path testDir = new Path(dfs.getWorkingDirectory() , new Path(name.getMethodName(), Bytes.toString(family))); + if(!dfs.exists(testDir)){ + dfs.mkdirs(testDir); + } + Path hfilePath = new Path(testDir, generateUniqueName(null)); + FSDataOutputStream out = dfs.createFile(hfilePath).build(); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContextBuilder().build()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L) + .setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hfilePath.toString(); + } + + private static String generateUniqueName(final String suffix) { + String name = UUID.randomUUID().toString().replaceAll("-", ""); + if (suffix != null) name += suffix; + return name; + } + +}