Skip to content

Commit

Permalink
more enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
rda3mon committed Aug 27, 2022
1 parent 286c8d0 commit a2f11de
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -343,6 +344,11 @@ protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
Path dest =
new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));

// Delete all *data* files in dest
if (!deleteData(fs, dest)) {
throw new IOException("Could not delete " + dest);
}

FileStatus[] fsts = fs.listStatus(bulkOutputPath);
for (FileStatus fst : fsts) {
if (fst.isDirectory()) {
Expand All @@ -361,6 +367,41 @@ protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
}
}

/**
* Deletes only data files and keeps all META
* @param fs file system instance
* @param dest destination location
* @return true, if success, false - otherwise
* @throws FileNotFoundException exception
* @throws IOException exception
*/
private boolean deleteData(FileSystem fs, Path dest) throws FileNotFoundException, IOException {
RemoteIterator<LocatedFileStatus> it = fs.listFiles(dest, true);
List<Path> toDelete = new ArrayList<Path>();
while (it.hasNext()) {
Path p = it.next().getPath();
if (fs.isDirectory(p)) {
continue;
}
// Keep meta
String fileName = p.toString();
if (
fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
|| fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0
) {
continue;
}
toDelete.add(p);
}
for (Path p : toDelete) {
boolean result = fs.delete(p, false);
if (!result) {
return false;
}
}
return true;
}

protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
Set<TableName> allSet = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
} else {
tableNameBytes = Bytes.toBytes(writeTableNames);
}
String tableName = Bytes.toString(tableNameBytes);
Path tableRelPath = getTableRelativePath(tableNameBytes);
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);

Expand Down Expand Up @@ -294,7 +295,6 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;

String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) {
try (
Connection connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,22 @@ public void setup(Context context) throws IOException {
* A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer}
*/
static class WALCellMapper extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
private byte[] table;
private Set<String> tableSet = new HashSet<>();
private boolean multiTableSupport = false;

@Override
public void map(WALKey key, WALEdit value, Context context) throws IOException {
try {
// skip all other tables
if (Bytes.equals(table, key.getTableName().getName())) {
TableName table = key.getTableName();
if (tableSet.contains(table.getNameAsString())) {
for (Cell cell : value.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)),
new MapReduceExtendedCell(cell));
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
}
}
} catch (InterruptedException e) {
Expand All @@ -165,13 +168,12 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {

@Override
public void setup(Context context) throws IOException {
// only a single table is supported when HFiles are generated with HFileOutputFormat
String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
if (tables == null || tables.length != 1) {
// this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("Exactly one table must be specified for bulk HFile case.");
Configuration conf = context.getConfiguration();
String[] tables = conf.getStrings(TABLES_KEY);
this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false);
for (String table : tables) {
tableSet.add(table);
}
table = Bytes.toBytes(tables[0]);
}
}

Expand Down Expand Up @@ -363,7 +365,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(KeyValue.class);
try (Connection conn = ConnectionFactory.createConnection(conf);) {
List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
List<TableInfo> tableInfoList = new ArrayList<>();
for (TableName tableName : tableNames) {
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName);
Expand Down

0 comments on commit a2f11de

Please sign in to comment.