diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java index 9a65ed929d7f..e582bdedd296 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -343,6 +344,11 @@ protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, Path dest = new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName)); + // Delete all *data* files in dest + if (!deleteData(fs, dest)) { + throw new IOException("Could not delete " + dest); + } + FileStatus[] fsts = fs.listStatus(bulkOutputPath); for (FileStatus fst : fsts) { if (fst.isDirectory()) { @@ -361,6 +367,41 @@ protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, } } + /** + * Deletes only data files and keeps all META + * @param fs file system instance + * @param dest destination location + * @return true, if success, false - otherwise + * @throws FileNotFoundException exception + * @throws IOException exception + */ + private boolean deleteData(FileSystem fs, Path dest) throws FileNotFoundException, IOException { + RemoteIterator it = fs.listFiles(dest, true); + List toDelete = new ArrayList(); + while (it.hasNext()) { + Path p = it.next().getPath(); + if (fs.isDirectory(p)) { + continue; + } + // Keep meta + String fileName = p.toString(); + if ( + fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 + || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0 + ) { + continue; + } + toDelete.add(p); + } + for (Path p : toDelete) { + boolean result = fs.delete(p, false); + if (!result) { + return false; + } + } + return true; + } + protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { Set allSet = new HashSet<>(); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 13c7cbe2a764..600aed1fb63b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -264,6 +264,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { } else { tableNameBytes = Bytes.toBytes(writeTableNames); } + String tableName = Bytes.toString(tableNameBytes); Path tableRelPath = getTableRelativePath(tableNameBytes); byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); @@ -294,7 +295,6 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; - String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { try ( Connection connection = diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 687e80568b2f..65b77d17a903 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -143,19 +143,22 @@ public void setup(Context context) throws IOException { * A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer} */ static class WALCellMapper extends Mapper { - private byte[] table; + private Set tableSet = new HashSet<>(); + private boolean multiTableSupport = false; @Override public void map(WALKey key, WALEdit value, Context context) throws IOException { try { - // skip all other tables - if (Bytes.equals(table, key.getTableName().getName())) { + TableName table = key.getTableName(); + if (tableSet.contains(table.getNameAsString())) { for (Cell cell : value.getCells()) { if (WALEdit.isMetaEditFamily(cell)) { continue; } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), - new MapReduceExtendedCell(cell)); + byte[] outKey = multiTableSupport + ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) + : CellUtil.cloneRow(cell); + context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell)); } } } catch (InterruptedException e) { @@ -165,13 +168,12 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException { @Override public void setup(Context context) throws IOException { - // only a single table is supported when HFiles are generated with HFileOutputFormat - String[] tables = context.getConfiguration().getStrings(TABLES_KEY); - if (tables == null || tables.length != 1) { - // this can only happen when WALMapper is used directly by a class other than WALPlayer - throw new IOException("Exactly one table must be specified for bulk HFile case."); + Configuration conf = context.getConfiguration(); + String[] tables = conf.getStrings(TABLES_KEY); + this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false); + for (String table : tables) { + tableSet.add(table); } - table = Bytes.toBytes(tables[0]); } } @@ -363,7 +365,7 @@ public Job createSubmittableJob(String[] args) throws IOException { FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); try (Connection conn = ConnectionFactory.createConnection(conf);) { - List tableInfoList = new ArrayList(); + List tableInfoList = new ArrayList<>(); for (TableName tableName : tableNames) { Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName);