diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index fedb4487968c..ed1755ad5021 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -352,11 +352,6 @@ public ArrayList getAncestors(BackupInfo backupInfo) throws IOExcep public ArrayList getAncestors(BackupInfo backupInfo, TableName table) throws IOException { ArrayList ancestors = getAncestors(backupInfo); - return filterAncestorsForTable(ancestors, table); - } - - public static ArrayList filterAncestorsForTable(ArrayList ancestors, - TableName table) { ArrayList tableAncestors = new ArrayList<>(); for (BackupImage image : ancestors) { if (image.hasTable(table)) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index d99aef200176..211e9f96c89c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,13 +18,11 @@ package org.apache.hadoop.hbase.backup.impl; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; -import static org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,26 +40,17 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; - /** * Incremental backup implementation. See the {@link #execute() execute} method. */ @@ -287,48 +276,10 @@ public void execute() throws IOException { // case INCREMENTAL_COPY: try { - // todo: need to add an abstraction to encapsulate and DRY this up - ArrayList ancestors = backupManager.getAncestors(backupInfo); - Map> regionsByTable = new HashMap<>(); - List splits = new ArrayList<>(); - for (TableName table : backupInfo.getTables()) { - ArrayList ancestorsForTable = - BackupManager.filterAncestorsForTable(ancestors, table); - - BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1); - if (backupImage.getType() != BackupType.FULL) { - throw new RuntimeException("No full backup found in ancestors for table " + table); - } - - String lastFullBackupId = backupImage.getBackupId(); - Path backupRootDir = new Path(backupInfo.getBackupRootDir()); - - FileSystem backupFs = backupRootDir.getFileSystem(conf); - Path tableInfoPath = - BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table); - SnapshotProtos.SnapshotDescription snapshotDesc = - SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath); - SnapshotManifest manifest = - SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc); - List regionInfos = new ArrayList<>(manifest.getRegionManifests().size()); - for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest.getRegionManifests()) { - HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo(); - RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo); - // scanning meta doesnt return mob regions, so skip them here too so we keep parity - if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) { - continue; - } - - regionInfos.add(regionInfoObj); - splits.add(new ImmutableBytesWritable(HFileOutputFormat2 - .combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey()))); - } - regionsByTable.put(table, regionInfos); - } // copy out the table and region info files for each table - BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf); + BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(splits); + convertWALsToHFiles(); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); } catch (Exception e) { @@ -408,7 +359,7 @@ protected void deleteBulkLoadDirectory() throws IOException { } } - protected void convertWALsToHFiles(List splits) throws IOException { + protected void convertWALsToHFiles() throws IOException { // get incremental backup file list and prepare parameters for DistCp List incrBackupFileList = backupInfo.getIncrBackupFileList(); // Get list of tables in incremental backup set @@ -424,7 +375,7 @@ protected void convertWALsToHFiles(List splits) throws I LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); } } - walToHFiles(incrBackupFileList, tableList, splits); + walToHFiles(incrBackupFileList, tableList); } @@ -434,9 +385,8 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti } } - protected void walToHFiles(List dirPaths, List tableList, - List splits) throws IOException { - WALPlayer player = new WALPlayer(); + protected void walToHFiles(List dirPaths, List tableList) throws IOException { + Tool player = new WALPlayer(); // Player reads all files in arbitrary directory structure and creates // a Map task for each file. We use ';' as separator @@ -451,7 +401,6 @@ protected void walToHFiles(List dirPaths, List tableList, conf.set(JOB_NAME_CONF_KEY, jobname); String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; - player.setSplits(splits); try { player.setConf(conf); int result = player.run(playerArgs); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index c401a109379e..d4e849f610ae 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.region.MasterRegionFactory; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -122,8 +122,7 @@ private BackupUtils() { * @param conf configuration * @throws IOException exception */ - public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, - Map> lastFullBackupForTable, Configuration conf) + public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) throws IOException { Path rootDir = CommonFSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); @@ -148,56 +147,20 @@ public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, LOG.debug("Attempting to copy table info for:" + table + " target: " + target + " descriptor: " + orig); LOG.debug("Finished copying tableinfo."); - copyTableRegionInfosFromParent(table, targetFs, backupInfo, - lastFullBackupForTable.get(table), conf); + List regions = MetaTableAccessor.getTableRegions(conn, table); + // For each region, write the region info to disk + LOG.debug("Starting to write region info for table " + table); + for (RegionInfo regionInfo : regions) { + Path regionDir = FSUtils + .getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); + regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); + } LOG.debug("Finished writing region info for table " + table); } } } - private static void copyTableRegionInfosFromParent(TableName table, FileSystem targetFs, - BackupInfo backupInfo, List lastFullBackupForTable, Configuration conf) - throws IOException { - for (RegionInfo regionInfo : lastFullBackupForTable) { - Path regionDir = - FSUtils.getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); - regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); - writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); - } - } - - /** - * Returns value represent path for: - * ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/ - * snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and - * 0.98) this path contains .snapshotinfo, .data.manifest (trunk) - * @param tableName table name - * @return path to table info - * @throws IOException exception - */ - public static Path getTableInfoPath(FileSystem fs, Path backupRootPath, String backupId, - TableName tableName) throws IOException { - Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); - Path tableInfoPath = null; - - // can't build the path directly as the timestamp values are different - FileStatus[] snapshots = fs.listStatus(tableSnapShotPath, - new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); - for (FileStatus snapshot : snapshots) { - tableInfoPath = snapshot.getPath(); - // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; - if (tableInfoPath.getName().endsWith("data.manifest")) { - break; - } - } - return tableInfoPath; - } - - static Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backupId) { - return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), - HConstants.SNAPSHOT_DIR_NAME); - } - /** * Write the .regioninfo file on-disk. */ diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index de84f27ecbfb..8ca80d1301f6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -145,13 +145,13 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException { * the future * @param conn HBase connection * @param tableBackupPath backup path - * @param hfileDirs incremental backup folders, which contains hfiles to bulkload - * @param tableNames source tableNames(table names were backuped) - * @param newTableNames target tableNames(table names to be restored to) + * @param logDirs : incremental backup folders, which contains WAL + * @param tableNames : source tableNames(table names were backuped) + * @param newTableNames : target tableNames(table names to be restored to) * @param incrBackupId incremental backup Id * @throws IOException exception */ - public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] hfileDirs, + public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException { try (Admin admin = conn.getAdmin()) { if (tableNames.length != newTableNames.length) { @@ -202,7 +202,7 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[ } RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); - restoreService.run(hfileDirs, tableNames, restoreRootDir, newTableNames, false); + restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, false); } } @@ -225,14 +225,39 @@ Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backu HConstants.SNAPSHOT_DIR_NAME); } + /** + * Returns value represent path for: + * ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/ + * snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and + * 0.98) this path contains .snapshotinfo, .data.manifest (trunk) + * @param tableName table name + * @return path to table info + * @throws IOException exception + */ + Path getTableInfoPath(TableName tableName) throws IOException { + Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + Path tableInfoPath = null; + + // can't build the path directly as the timestamp values are different + FileStatus[] snapshots = fs.listStatus(tableSnapShotPath, + new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); + for (FileStatus snapshot : snapshots) { + tableInfoPath = snapshot.getPath(); + // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; + if (tableInfoPath.getName().endsWith("data.manifest")) { + break; + } + } + return tableInfoPath; + } + /** * Get table descriptor * @param tableName is the table backed up * @return {@link TableDescriptor} saved in backup image of the table */ TableDescriptor getTableDesc(TableName tableName) throws IOException { - Path tableInfoPath = BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName); - ; + Path tableInfoPath = this.getTableInfoPath(tableName); SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath); SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc); TableDescriptor tableDescriptor = manifest.getTableDescriptor(); @@ -282,8 +307,7 @@ private void createAndRestoreTable(Connection conn, TableName tableName, TableNa tableDescriptor = manifest.getTableDescriptor(); } else { tableDescriptor = getTableDesc(tableName); - snapshotMap.put(tableName, - BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName)); + snapshotMap.put(tableName, getTableInfoPath(tableName)); } if (tableDescriptor == null) { LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index d327fed3e4b6..7b5095a897e2 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager; @@ -53,20 +52,14 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner; -import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.SecureTestUtil; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -78,10 +71,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; - /** * This class is only a base for other integration-level backup tests. Do not add tests here. * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other @@ -139,50 +128,10 @@ public void execute() throws IOException { LOG.debug("For incremental backup, current table set is " + backupManager.getIncrementalBackupTableSet()); newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); - - // todo: need to add an abstraction to encapsulate and DRY this up` - ArrayList ancestors = backupManager.getAncestors(backupInfo); - Map> regionsByTable = new HashMap<>(); - List splits = new ArrayList<>(); - for (TableName table : backupInfo.getTables()) { - ArrayList ancestorsForTable = - BackupManager.filterAncestorsForTable(ancestors, table); - - BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1); - if (backupImage.getType() != BackupType.FULL) { - throw new RuntimeException("No full backup found in ancestors for table " + table); - } - - String lastFullBackupId = backupImage.getBackupId(); - Path backupRootDir = new Path(backupInfo.getBackupRootDir()); - - FileSystem backupFs = backupRootDir.getFileSystem(conf); - Path tableInfoPath = - BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table); - SnapshotProtos.SnapshotDescription snapshotDesc = - SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath); - SnapshotManifest manifest = - SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc); - List regionInfos = new ArrayList<>(manifest.getRegionManifests().size()); - for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest - .getRegionManifests()) { - HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo(); - RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo); - // scanning meta doesnt return mob regions, so skip them here too so we keep parity - if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) { - continue; - } - - regionInfos.add(regionInfoObj); - splits.add(new ImmutableBytesWritable(HFileOutputFormat2 - .combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey()))); - } - regionsByTable.put(table, regionInfos); - } // copy out the table and region info files for each table - BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf); + BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(splits); + convertWALsToHFiles(); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index 2cb4ac9756c6..90fbba2bf0ae 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -130,8 +130,6 @@ public void TestIncBackupRestore() throws Exception { byte[] name = regions.get(0).getRegionInfo().getRegionName(); long startSplitTime = EnvironmentEdgeManager.currentTime(); try { - // todo: this fails, and itd be nice if we could really add a split so we can prove - // that our new splits passthrough works (expect split to disappear once we restore) admin.splitRegionAsync(name).get(); } catch (Exception e) { // although split fail, this may not affect following check in current API, diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java deleted file mode 100644 index c784b2561881..000000000000 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.hadoop.hbase.ExtendedCell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary - * so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations - * are not compatible -- data serialized by CellSerialization cannot be deserialized with - * ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the - * serialization is not actually used for the actual written HFiles, just intermediate data (between - * mapper and reducer of a single job). - */ -@InterfaceAudience.Private -public class ExtendedCellSerialization implements Serialization { - @Override - public boolean accept(Class c) { - return ExtendedCell.class.isAssignableFrom(c); - } - - @Override - public ExtendedCellDeserializer getDeserializer(Class t) { - return new ExtendedCellDeserializer(); - } - - @Override - public ExtendedCellSerializer getSerializer(Class c) { - return new ExtendedCellSerializer(); - } - - public static class ExtendedCellDeserializer implements Deserializer { - private DataInputStream dis; - - @Override - public void close() throws IOException { - this.dis.close(); - } - - @Override - public KeyValue deserialize(ExtendedCell ignore) throws IOException { - KeyValue kv = KeyValueUtil.create(this.dis); - PrivateCellUtil.setSequenceId(kv, this.dis.readLong()); - return kv; - } - - @Override - public void open(InputStream is) throws IOException { - this.dis = new DataInputStream(is); - } - } - - public static class ExtendedCellSerializer implements Serializer { - private DataOutputStream dos; - - @Override - public void close() throws IOException { - this.dos.close(); - } - - @Override - public void open(OutputStream os) throws IOException { - this.dos = new DataOutputStream(os); - } - - @Override - public void serialize(ExtendedCell kv) throws IOException { - dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT); - PrivateCellUtil.writeCell(kv, dos, true); - dos.writeLong(kv.getSequenceId()); - } - } -} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 98f86d8e8c30..2bd5330a62f8 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -30,7 +30,6 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -125,7 +124,7 @@ public RegionLocator getRegionLocator() { protected static final byte[] tableSeparator = Bytes.toBytes(";"); - public static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { + protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { return Bytes.add(tableName, tableSeparator, suffix); } @@ -160,15 +159,6 @@ public static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; - /** - * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config - * package-private for internal usage for jobs like WALPlayer which need to use features of - * ExtendedCell. - */ - static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = - "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; - static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; - public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; @@ -629,7 +619,9 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } - mergeSerializations(conf); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + CellSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { LOG.info("bulkload locality sensitive enabled"); @@ -678,33 +670,6 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); } - private static void mergeSerializations(Configuration conf) { - List serializations = new ArrayList<>(); - - // add any existing values that have been set - String[] existing = conf.getStrings("io.serializations"); - if (existing != null) { - Collections.addAll(serializations, existing); - } - - serializations.add(MutationSerialization.class.getName()); - serializations.add(ResultSerialization.class.getName()); - - // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's - // SerializationFactory runs through serializations in the order they are registered. - // We want to register ExtendedCellSerialization before CellSerialization because both - // work for ExtendedCells but only ExtendedCellSerialization handles them properly. - if ( - conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, - EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) - ) { - serializations.add(ExtendedCellSerialization.class.getName()); - } - serializations.add(CellSerialization.class.getName()); - - conf.setStrings("io.serializations", serializations.toArray(new String[0])); - } - public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws IOException { Configuration conf = job.getConfiguration(); @@ -881,16 +846,9 @@ private static Map createFamilyConfValueMap(Configuration conf, * Configure job with a TotalOrderPartitioner, partitioning against * splitPoints. Cleans up the partitions file after job exists. */ - public static void configurePartitioner(Job job, List splitPoints, + static void configurePartitioner(Job job, List splitPoints, boolean writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); - // todo: need to think if there's a better way - if (conf.get(job.getJobName() + ".wrotePartitions") != null) { - LOG.info("Already configured partitions, skipping... {}", splitPoints); - return; - } - LOG.info("Configuring partitions {}", splitPoints); - conf.set(job.getJobName() + ".wrotePartitions", "true"); // create the partitions file FileSystem fs = FileSystem.get(conf); String hbaseTmpFsDir = diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 1e25476a2df5..56c6bebdf261 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -80,7 +79,6 @@ public class WALPlayer extends Configured implements Tool { protected static final String tableSeparator = ";"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - private List splits; public WALPlayer() { } @@ -89,10 +87,6 @@ protected WALPlayer(final Configuration c) { super(c); } - public void setSplits(List splits) { - this.splits = splits; - } - /** * A mapper that just writes out KeyValues. This one can be used together with * {@link CellSortReducer} @@ -111,13 +105,6 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException { if (WALEdit.isMetaEditFamily(cell)) { continue; } - - // Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId - // on WALKey is the same value that was on the cells in the WALEdit. This enables - // CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps. - // See HBASE-27649 - PrivateCellUtil.setSequenceId(cell, key.getSequenceId()); - byte[] outKey = multiTableSupport ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) : CellUtil.cloneRow(cell); @@ -321,15 +308,6 @@ public Job createSubmittableJob(String[] args) throws IOException { if (hfileOutPath != null) { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); - // WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when - // sorting cells in CellSortReducer - job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, - true); - - if (splits != null) { - HFileOutputFormat2.configurePartitioner(job, splits, true); - } - // the bulk HFile case List tableNames = getTableNameList(tables); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index b39d04802c98..9b0d5ec52a34 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,7 +29,6 @@ import java.io.File; import java.io.PrintStream; import java.util.ArrayList; -import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,10 +50,8 @@ import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.hbase.wal.WAL; @@ -138,80 +131,6 @@ public void testPlayingRecoveredEdit() throws Exception { assertTrue(TEST_UTIL.countRows(tn) > 0); } - /** - * Tests that when you write multiple cells with the same timestamp they are properly sorted by - * their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from - * the resulting bulkloaded HFiles. See HBASE-27649 - */ - @Test - public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + "1"); - final byte[] family = Bytes.toBytes("family"); - final byte[] column1 = Bytes.toBytes("c1"); - final byte[] column2 = Bytes.toBytes("c2"); - final byte[] row = Bytes.toBytes("row"); - Table table = TEST_UTIL.createTable(tableName, family); - - long now = EnvironmentEdgeManager.currentTime(); - // put a row into the first table - Put p = new Put(row); - p.addColumn(family, column1, now, column1); - p.addColumn(family, column2, now, column2); - - table.put(p); - - byte[] lastVal = null; - - for (int i = 0; i < 50; i++) { - lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); - p = new Put(row); - p.addColumn(family, column1, now, lastVal); - - table.put(p); - - // wal rolling is necessary to trigger the bug. otherwise no sorting - // needs to occur in the reducer because it's all sorted and coming from a single file. - if (i % 10 == 0) { - WAL log = cluster.getRegionServer(0).getWAL(null); - log.rollWriter(); - } - } - - WAL log = cluster.getRegionServer(0).getWAL(null); - log.rollWriter(); - String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), - HConstants.HREGION_LOGDIR_NAME).toString(); - - Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); - String outPath = "/tmp/" + name.getMethodName(); - configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath); - configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); - - WALPlayer player = new WALPlayer(configuration); - assertEquals(0, ToolRunner.run(configuration, player, - new String[] { walInputDir, tableName.getNameAsString() })); - - Get g = new Get(row); - Result result = table.get(g); - byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); - assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); - - table = TEST_UTIL.truncateTable(tableName); - g = new Get(row); - result = table.get(g); - assertThat(result.listCells(), nullValue()); - - BulkLoadHFiles.create(configuration).bulkLoad(tableName, - new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString())); - - g = new Get(row); - result = table.get(g); - value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); - - assertThat(result.listCells(), notNullValue()); - assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); - } - /** * Simple end-to-end test */