Skip to content

Commit

Permalink
Pass filesystem instead of conf values
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarryd Lee committed Jan 9, 2023
1 parent 5a29f8a commit cf0984c
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -38,6 +39,6 @@ public interface RestoreJob extends Configurable {
* @param fullBackupRestore full backup restore
* @throws IOException if running the job fails
*/
void run(Path[] dirPaths, TableName[] fromTables, TableName[] toTables, boolean fullBackupRestore)
throws IOException;
void run(Path[] dirPaths, FileSystem targetFileSystem, TableName[] fromTables,
TableName[] toTables, boolean fullBackupRestore) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public String getTargetRootDir() {
return targetRootDir;
}

public RestoreRequest setTargetRootDir(String targetRootDir) {
private RestoreRequest setTargetRootDir(String targetRootDir) {
this.targetRootDir = targetRootDir;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.backup.impl;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -35,7 +34,6 @@
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -57,9 +55,10 @@ public class RestoreTablesClient {
private TableName[] sTableArray;
private TableName[] tTableArray;
private String backupRootDir;
private FileSystem targetFileSystem;
private boolean isOverwrite;

public RestoreTablesClient(Connection conn, RestoreRequest request) {
public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
this.backupRootDir = request.getBackupRootDir();
this.backupId = request.getBackupId();
this.sTableArray = request.getFromTables();
Expand All @@ -71,7 +70,10 @@ public RestoreTablesClient(Connection conn, RestoreRequest request) {
this.conn = conn;
this.conf = conn.getConfiguration();
if (request.getTargetRootDir() != null) {
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_ROOT_DIR, request.getTargetRootDir());
Path targetRootDir = new Path(request.getTargetRootDir());
targetFileSystem = targetRootDir.getFileSystem(conf);
} else {
targetFileSystem = FileSystem.get(conf);
}
}

Expand Down Expand Up @@ -135,7 +137,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa
String rootDir = image.getRootDir();
String backupId = image.getBackupId();
Path backupRoot = new Path(rootDir);
RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId);
RestoreTool restoreTool = new RestoreTool(conf, backupRoot, targetFileSystem, backupId);
Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
// We need hFS only for full restore (see the code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void run(String[] backupIds) throws IOException {
String dirs = StringUtils.join(dirPaths, ",");

Path bulkOutputPath = BackupUtils.getBulkOutputDir(
BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
BackupUtils.getFileNameCompatibleString(tableNames[i]), fs, getConf(), false);
// Delete content if exists
if (fs.exists(bulkOutputPath)) {
if (!fs.delete(bulkOutputPath, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
final static String NAME = "HFileSplitterJob";
public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
public static final String BULK_OUTPUT_ROOT_DIR = "hfile.bulk.output.root.dir";
public final static String TABLES_KEY = "hfile.input.tables";
public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
Expand Down Expand Up @@ -50,8 +51,8 @@ public MapReduceRestoreJob() {
}

@Override
public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
boolean fullBackupRestore) throws IOException {
public void run(Path[] dirPaths, FileSystem targetFileSystem, TableName[] tableNames,
TableName[] newTableNames, boolean fullBackupRestore) throws IOException {
String bulkOutputConfKey;

player = new MapReduceHFileSplitterJob();
Expand All @@ -71,8 +72,8 @@ public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNam
for (int i = 0; i < tableNames.length; i++) {
LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);

Path bulkOutputPath = BackupUtils
.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
Path bulkOutputPath = BackupUtils.getBulkOutputDir(
BackupUtils.getFileNameCompatibleString(newTableNames[i]), targetFileSystem, getConf());
Configuration conf = getConf();
conf.set(bulkOutputConfKey, bulkOutputPath.toString());
String[] playerArgs = { dirs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand Down Expand Up @@ -686,16 +685,8 @@ public static boolean validate(HashMap<TableName, BackupManifest> backupManifest
return isValid;
}

public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
throws IOException {
FileSystem fs;
String bulkOutputRootDir = conf.get(MapReduceHFileSplitterJob.BULK_OUTPUT_ROOT_DIR);
if (bulkOutputRootDir != null) {
Path rootDir = new Path(bulkOutputRootDir);
fs = FileSystem.get(rootDir.toUri(), conf);
} else {
fs = FileSystem.get(conf);
}
public static Path getBulkOutputDir(String tableName, FileSystem fs, Configuration conf,
boolean deleteOnExit) throws IOException {
String tmp =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
Path path = new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
Expand All @@ -706,8 +697,9 @@ public static Path getBulkOutputDir(String tableName, Configuration conf, boolea
return path;
}

public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
return getBulkOutputDir(tableName, conf, true);
public static Path getBulkOutputDir(String tableName, FileSystem targetFileSystem,
Configuration conf) throws IOException {
return getBulkOutputDir(tableName, targetFileSystem, conf, true);
}

public static String getFileNameCompatibleString(TableName table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ public class RestoreTool {
protected Path backupRootPath;
protected String backupId;
protected FileSystem fs;
protected FileSystem targetFs;

// store table name and snapshot dir mapping
private final HashMap<TableName, Path> snapshotMap = new HashMap<>();

public RestoreTool(Configuration conf, final Path backupRootPath, final String backupId)
throws IOException {
public RestoreTool(Configuration conf, final Path backupRootPath,
final FileSystem targetFileSystem, final String backupId) throws IOException {
this.conf = conf;
this.backupRootPath = backupRootPath;
this.backupId = backupId;
this.fs = backupRootPath.getFileSystem(conf);
this.targetFs = targetFileSystem;
}

/**
Expand Down Expand Up @@ -200,7 +202,7 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[
}
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);

restoreService.run(logDirs, tableNames, newTableNames, false);
restoreService.run(logDirs, targetFs, tableNames, newTableNames, false);
}
}

Expand Down Expand Up @@ -350,8 +352,8 @@ private void createAndRestoreTable(Connection conn, TableName tableName, TableNa
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
Path[] paths = new Path[regionPathList.size()];
regionPathList.toArray(paths);
restoreService.run(paths, new TableName[] { tableName }, new TableName[] { newTableName },
true);
restoreService.run(paths, targetFs, new TableName[] { tableName },
new TableName[] { newTableName }, true);

} catch (Exception e) {
LOG.error(e.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Path run() {
@Override
public Path run() {
try {
return BackupUtils.getBulkOutputDir("test", conf, false);
return BackupUtils.getBulkOutputDir("test", FileSystem.get(conf), conf, false);
} catch (IOException ioe) {
LOG.error("Failed to get bulk output dir path", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void run(String[] backupIds) throws IOException {
Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
String dirs = StringUtils.join(dirPaths, ",");
Path bulkOutputPath = BackupUtils.getBulkOutputDir(
BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
BackupUtils.getFileNameCompatibleString(tableNames[i]), fs, getConf(), false);
// Delete content if exists
if (fs.exists(bulkOutputPath)) {
if (!fs.delete(bulkOutputPath, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -65,8 +64,9 @@ public void testFullRestoreRemote() throws Exception {
LOG.info("backup complete");
TableName[] tableset = new TableName[] { table1 };
TableName[] tablemap = new TableName[] { table1_restore };
getBackupAdmin().restore(BackupUtils.createRestoreRequest(BACKUP_REMOTE_ROOT_DIR, backupId,
false, tableset, tablemap, false));
getBackupAdmin().restore(new RestoreRequest.Builder().withBackupRootDir(BACKUP_REMOTE_ROOT_DIR)
.withTargetRootDir(BACKUP_ROOT_DIR).withBackupId(backupId).withCheck(false)
.withFromTables(tableset).withToTables(tablemap).withOvewrite(false).build());
Admin hba = TEST_UTIL.getAdmin();
assertTrue(hba.tableExists(table1_restore));
TEST_UTIL.deleteTable(table1_restore);
Expand Down

0 comments on commit cf0984c

Please sign in to comment.