diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java index 22e05132bf91..a13b75b0dcc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -39,7 +40,9 @@ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, Sto @Override public List load() throws IOException { - return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString()); + List files = + ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString()); + return files != null ? files : Collections.emptyList(); } @Override @@ -57,4 +60,9 @@ protected void doAddCompactionResults(Collection compactedFiles, Collection newFiles) throws IOException { // NOOP } + + @Override + void set(List files) { + // NOOP + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java index de28b0eb9996..c370b87c1154 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java @@ -48,7 +48,7 @@ * storages. */ @InterfaceAudience.Private -public class FileBasedStoreFileTracker extends StoreFileTrackerBase { +class FileBasedStoreFileTracker extends StoreFileTrackerBase { private final StoreFileListFile backedFile; @@ -139,4 +139,17 @@ protected void doAddCompactionResults(Collection compactedFiles, } } } + + @Override + void set(List files) throws IOException { + synchronized (storefiles) { + storefiles.clear(); + StoreFileList.Builder builder = StoreFileList.newBuilder(); + for (StoreFileInfo info : files) { + storefiles.put(info.getPath().getName(), info); + builder.addStoreFile(toStoreFileEntry(info)); + } + backedFile.update(builder); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java new file mode 100644 index 000000000000..e486e6d563af --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * A store file tracker used for migrating between store file tracker implementations. + */ +@InterfaceAudience.Private +class MigrationStoreFileTracker extends StoreFileTrackerBase { + + public static final String SRC_IMPL = "hbase.store.file-tracker.migration.src.impl"; + + public static final String DST_IMPL = "hbase.store.file-tracker.migration.dst.impl"; + + private final StoreFileTrackerBase src; + + private final StoreFileTrackerBase dst; + + public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { + super(conf, isPrimaryReplica, ctx); + this.src = StoreFileTrackerFactory.create(conf, SRC_IMPL, isPrimaryReplica, ctx); + this.dst = StoreFileTrackerFactory.create(conf, DST_IMPL, isPrimaryReplica, ctx); + Preconditions.checkArgument(!src.getClass().equals(dst.getClass()), + "src and dst is the same: %s", src.getClass()); + } + + @Override + public List load() throws IOException { + List files = src.load(); + dst.set(files); + return files; + } + + @Override + protected boolean requireWritingToTmpDirFirst() { + // Returns true if either of the two StoreFileTracker returns true. + // For example, if we want to migrate from a tracker implementation which can ignore the broken + // files under data directory to a tracker implementation which can not, if we still allow + // writing in tmp directory directly, we may have some broken files under the data directory and + // then after we finally change the implementation which can not ignore the broken files, we + // will be in trouble. + return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst(); + } + + @Override + protected void doAddNewStoreFiles(Collection newFiles) throws IOException { + src.doAddNewStoreFiles(newFiles); + dst.doAddNewStoreFiles(newFiles); + } + + @Override + protected void doAddCompactionResults(Collection compactedFiles, + Collection newFiles) throws IOException { + src.doAddCompactionResults(compactedFiles, newFiles); + dst.doAddCompactionResults(compactedFiles, newFiles); + } + + @Override + void set(List files) { + throw new UnsupportedOperationException( + "Should not call this method on " + getClass().getSimpleName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java index c778bfc51deb..ffb3647e6259 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; @@ -121,7 +120,10 @@ StoreFileList load() throws IOException { * We will set the timestamp in this method so just pass the builder in */ void update(StoreFileList.Builder builder) throws IOException { - Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update"); + if (nextTrackFile < 0) { + // we need to call load first to load the prevTimestamp and also the next file + load(); + } FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index 92c699278c2e..d860f8e3812d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -95,8 +96,7 @@ private HFileContext createFileContext(Compression.Algorithm compression, } @Override - public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) - throws IOException { + public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { if (!isPrimaryReplica) { throw new IllegalStateException("Should not call create writer on secondary replicas"); } @@ -170,4 +170,12 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) protected abstract void doAddCompactionResults(Collection compactedFiles, Collection newFiles) throws IOException; + + /** + * used to mirror the store file list after loading when migration. + *

+ * Do not add this method to the {@link StoreFileTracker} interface since we do not need this + * method in upper layer. + */ + abstract void set(List files) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java index c446d5ae9a31..2c2b71db6f8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + /** * Factory method for creating store file tracker. */ @@ -39,7 +41,7 @@ public final class StoreFileTrackerFactory { private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class); public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, - StoreContext ctx) { + StoreContext ctx) { Class tracker = conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class); LOG.info("instantiating StoreFileTracker impl {}", tracker.getName()); @@ -47,22 +49,31 @@ public static StoreFileTracker create(Configuration conf, boolean isPrimaryRepli } public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family, - HRegionFileSystem regionFs) { + HRegionFileSystem regionFs) { ColumnFamilyDescriptorBuilder fDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)); - StoreContext ctx = StoreContext.getBuilder(). - withColumnFamilyDescriptor(fDescBuilder.build()). - withRegionFileSystem(regionFs). - build(); - return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx); + StoreContext ctx = StoreContext.getBuilder().withColumnFamilyDescriptor(fDescBuilder.build()) + .withRegionFileSystem(regionFs).build(); + return StoreFileTrackerFactory.create(conf, TRACK_IMPL, isPrimaryReplica, ctx); + } + + public static Configuration mergeConfigurations(Configuration global, TableDescriptor table, + ColumnFamilyDescriptor family) { + return new CompoundConfiguration().add(global).addBytesMap(table.getValues()) + .addStringMap(family.getConfiguration()).addBytesMap(family.getValues()); } - public static Configuration mergeConfigurations(Configuration global, - TableDescriptor table, ColumnFamilyDescriptor family) { - return new CompoundConfiguration() - .add(global) - .addBytesMap(table.getValues()) - .addStringMap(family.getConfiguration()) - .addBytesMap(family.getValues()); + static StoreFileTrackerBase create(Configuration conf, String configName, + boolean isPrimaryReplica, StoreContext ctx) { + String className = + Preconditions.checkNotNull(conf.get(configName), "config %s is not set", configName); + Class tracker; + try { + tracker = Class.forName(className).asSubclass(StoreFileTrackerBase.class); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + LOG.info("instantiating StoreFileTracker impl {} as {}", tracker.getName(), configName); + return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestMigrationStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestMigrationStoreFileTracker.java new file mode 100644 index 000000000000..e9ebb4c792e4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestMigrationStoreFileTracker.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestMigrationStoreFileTracker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrationStoreFileTracker.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final TableDescriptor TD = + TableDescriptorBuilder.newBuilder(TableName.valueOf("file_based_tracker")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(); + + private static final RegionInfo RI = RegionInfoBuilder.newBuilder(TD.getTableName()).build(); + + @Rule + public TestName name = new TestName(); + + @Parameter(0) + public Class srcImplClass; + + @Parameter(1) + public Class dstImplClass; + + private HRegion region; + + private Path rootDir; + + private WAL wal; + + @Parameters(name = "{index}: src={0}, dst={1}") + public static List params() { + List> impls = + Arrays.asList(DefaultStoreFileTracker.class, FileBasedStoreFileTracker.class); + List params = new ArrayList<>(); + for (Class src : impls) { + for (Class dst : impls) { + if (src.equals(dst)) { + continue; + } + params.add(new Object[] { src, dst }); + } + } + return params; + } + + @BeforeClass + public static void setUpBeforeClass() { + ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, + MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); + } + + @Before + public void setUp() throws IOException { + Configuration conf = UTIL.getConfiguration(); + conf.setClass(MigrationStoreFileTracker.SRC_IMPL, srcImplClass, StoreFileTrackerBase.class); + conf.setClass(MigrationStoreFileTracker.DST_IMPL, dstImplClass, StoreFileTrackerBase.class); + rootDir = UTIL.getDataTestDir(name.getMethodName().replaceAll("[=:\\[ ]", "_")); + wal = HBaseTestingUtil.createWal(conf, rootDir, RI); + } + + @After + public void tearDown() throws IOException { + if (region != null) { + region.close(); + } + Closeables.close(wal, true); + UTIL.cleanupTestDir(); + } + + private List getStoreFiles() { + return Iterables.getOnlyElement(region.getStores()).getStorefiles().stream() + .map(s -> s.getFileInfo().getPath().getName()).collect(Collectors.toList()); + } + + private HRegion createRegion(Class trackerImplClass) + throws IOException { + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setClass(StoreFileTrackerFactory.TRACK_IMPL, trackerImplClass, StoreFileTracker.class); + return HRegion.createHRegion(RI, rootDir, conf, TD, wal, true); + } + + private void reopenRegion(Class trackerImplClass) + throws IOException { + region.flush(true); + List before = getStoreFiles(); + region.close(); + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setClass(StoreFileTrackerFactory.TRACK_IMPL, trackerImplClass, StoreFileTracker.class); + region = HRegion.openHRegion(rootDir, RI, TD, wal, conf); + List after = getStoreFiles(); + assertEquals(before.size(), after.size()); + assertThat(after, hasItems(before.toArray(new String[0]))); + } + + private void putData(int start, int end) throws IOException { + for (int i = start; i < end; i++) { + region.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + if (i % 30 == 0) { + region.flush(true); + } + } + } + + private void verifyData(int start, int end) throws IOException { + for (int i = start; i < end; i++) { + Result result = region.get(new Get(Bytes.toBytes(i))); + assertEquals(i, Bytes.toInt(result.getValue(CF, CQ))); + } + } + + @Test + public void testMigration() throws IOException { + region = createRegion(srcImplClass); + putData(0, 100); + verifyData(0, 100); + reopenRegion(MigrationStoreFileTracker.class); + verifyData(0, 100); + region.compact(true); + putData(100, 200); + reopenRegion(dstImplClass); + verifyData(0, 200); + } +}