Skip to content

Commit

Permalink
Pass filesystem instead of conf values
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarryd Lee committed Jan 9, 2023
1 parent 5a29f8a commit f01b36c
Show file tree
Hide file tree
Showing 20 changed files with 74 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ public BackupInfo() {
backupTableInfoMap = new HashMap<>();
}

public BackupInfo(String backupId, BackupType type, TableName[] tables, String targetRootDir) {
public BackupInfo(String backupId, BackupType type, TableName[] tables, String restoreRootDir) {
this();
this.backupId = backupId;
this.type = type;
this.backupRootDir = targetRootDir;
this.backupRootDir = restoreRootDir;
this.addTables(tables);
if (type == BackupType.INCREMENTAL) {
setHLogTargetDir(BackupUtils.getLogBackupDir(targetRootDir, backupId));
setHLogTargetDir(BackupUtils.getLogBackupDir(restoreRootDir, backupId));
}
this.startTs = 0;
this.completeTs = 0;
Expand Down Expand Up @@ -213,8 +213,8 @@ public void setType(BackupType type) {
this.type = type;
}

public void setBackupRootDir(String targetRootDir) {
this.backupRootDir = targetRootDir;
public void setBackupRootDir(String restoreRootDir) {
this.backupRootDir = restoreRootDir;
}

public void setTotalBytesCopied(long totalBytesCopied) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public Builder withTableList(List<TableName> tables) {
return this;
}

public Builder withTargetRootDir(String backupDir) {
request.setTargetRootDir(backupDir);
public Builder withRestoreRootDir(String backupDir) {
request.setRestoreRootDir(backupDir);
return this;
}

Expand Down Expand Up @@ -78,7 +78,7 @@ public BackupRequest build() {

private BackupType type;
private List<TableName> tableList;
private String targetRootDir;
private String restoreRootDir;
private int totalTasks = -1;
private long bandwidth = -1L;
private String backupSetName;
Expand All @@ -105,13 +105,13 @@ public List<TableName> getTableList() {
return this.tableList;
}

private BackupRequest setTargetRootDir(String targetRootDir) {
this.targetRootDir = targetRootDir;
private BackupRequest setRestoreRootDir(String restoreRootDir) {
this.restoreRootDir = restoreRootDir;
return this;
}

public String getTargetRootDir() {
return this.targetRootDir;
public String getRestoreRootDir() {
return this.restoreRootDir;
}

private BackupRequest setTotalTasks(int totalTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class BackupTableInfo {
public BackupTableInfo() {
}

public BackupTableInfo(TableName table, String targetRootDir, String backupId) {
public BackupTableInfo(TableName table, String restoreRootDir, String backupId) {
this.table = table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -34,10 +35,11 @@ public interface RestoreJob extends Configurable {
* Run restore operation
* @param dirPaths path array of WAL log directories
* @param fromTables from tables
* @param restoreFileSystem output file system
* @param toTables to tables
* @param fullBackupRestore full backup restore
* @throws IOException if running the job fails
*/
void run(Path[] dirPaths, TableName[] fromTables, TableName[] toTables, boolean fullBackupRestore)
throws IOException;
void run(Path[] dirPaths, TableName[] fromTables, FileSystem restoreFileSystem,
TableName[] toTables, boolean fullBackupRestore) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public Builder withBackupRootDir(String backupRootDir) {
return this;
}

public Builder withTargetRootDir(String targetRootDir) {
request.setTargetRootDir(targetRootDir);
public Builder withRestoreRootDir(String restoreRootDir) {
request.setRestoreRootDir(restoreRootDir);
return this;
}

Expand Down Expand Up @@ -73,7 +73,7 @@ public RestoreRequest build() {
}

private String backupRootDir;
private String targetRootDir;
private String restoreRootDir;
private String backupId;
private boolean check = false;
private TableName[] fromTables;
Expand All @@ -92,12 +92,12 @@ private RestoreRequest setBackupRootDir(String backupRootDir) {
return this;
}

public String getTargetRootDir() {
return targetRootDir;
public String getRestoreRootDir() {
return restoreRootDir;
}

public RestoreRequest setTargetRootDir(String targetRootDir) {
this.targetRootDir = targetRootDir;
private RestoreRequest setRestoreRootDir(String restoreRootDir) {
this.restoreRootDir = restoreRootDir;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,14 @@ public void restore(RestoreRequest request) throws IOException {
@Override
public String backupTables(BackupRequest request) throws IOException {
BackupType type = request.getBackupType();
String targetRootDir = request.getTargetRootDir();
String restoreRootDir = request.getRestoreRootDir();
List<TableName> tableList = request.getTableList();

String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
Set<TableName> incrTableSet;
try (BackupSystemTable table = new BackupSystemTable(conn)) {
incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
incrTableSet = table.getIncrementalBackupTableSet(restoreRootDir);
}

if (incrTableSet.isEmpty()) {
Expand All @@ -552,7 +552,7 @@ public String backupTables(BackupRequest request) throws IOException {
if (tableList != null && !tableList.isEmpty()) {
for (TableName table : tableList) {
String targetTableBackupDir =
HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table);
HBackupFileSystem.getTableBackupDir(restoreRootDir, backupId, table);
Path targetTableBackupDirPath = new Path(targetTableBackupDir);
FileSystem outputFs =
FileSystem.get(targetTableBackupDirPath.toUri(), conn.getConfiguration());
Expand Down Expand Up @@ -588,7 +588,7 @@ public String backupTables(BackupRequest request) throws IOException {
// update table list
BackupRequest.Builder builder = new BackupRequest.Builder();
request = builder.withBackupType(request.getBackupType()).withTableList(tableList)
.withTargetRootDir(request.getTargetRootDir()).withBackupSetName(request.getBackupSetName())
.withRestoreRootDir(request.getRestoreRootDir()).withBackupSetName(request.getBackupSetName())
.withTotalTasks(request.getTotalTasks()).withBandwidthPerTasks((int) request.getBandwidth())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void execute() throws IOException {
BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
.withTableList(
tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
.withRestoreRootDir(targetBackupDir).withTotalTasks(workers)
.withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build();
String backupId = admin.backupTables(request);
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,17 @@ public void close() {

/**
* Creates a backup info based on input backup request.
* @param backupId backup id
* @param type type
* @param tableList table list
* @param targetRootDir root dir
* @param workers number of parallel workers
* @param bandwidth bandwidth per worker in MB per sec
* @param backupId backup id
* @param type type
* @param tableList table list
* @param restoreRootDir root dir
* @param workers number of parallel workers
* @param bandwidth bandwidth per worker in MB per sec
* @throws BackupException exception
*/
public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
String targetRootDir, int workers, long bandwidth) throws BackupException {
if (targetRootDir == null) {
String restoreRootDir, int workers, long bandwidth) throws BackupException {
if (restoreRootDir == null) {
throw new BackupException("Wrong backup request parameter: target backup root directory");
}

Expand Down Expand Up @@ -230,7 +230,7 @@ public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableN

// there are one or more tables in the table list
backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
targetRootDir);
restoreRootDir);
backupInfo.setBandwidth(bandwidth);
backupInfo.setWorkers(workers);
return backupInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -57,9 +56,10 @@ public class RestoreTablesClient {
private TableName[] sTableArray;
private TableName[] tTableArray;
private String backupRootDir;
private FileSystem restoreFileSystem;
private boolean isOverwrite;

public RestoreTablesClient(Connection conn, RestoreRequest request) {
public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
this.backupRootDir = request.getBackupRootDir();
this.backupId = request.getBackupId();
this.sTableArray = request.getFromTables();
Expand All @@ -70,8 +70,11 @@ public RestoreTablesClient(Connection conn, RestoreRequest request) {
this.isOverwrite = request.isOverwrite();
this.conn = conn;
this.conf = conn.getConfiguration();
if (request.getTargetRootDir() != null) {
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_ROOT_DIR, request.getTargetRootDir());
if (request.getRestoreRootDir() != null) {
Path restoreRootDir = new Path(request.getRestoreRootDir());
restoreFileSystem = restoreRootDir.getFileSystem(conf);
} else {
restoreFileSystem = FileSystem.get(conf);
}
}

Expand Down Expand Up @@ -135,7 +138,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa
String rootDir = image.getRootDir();
String backupId = image.getBackupId();
Path backupRoot = new Path(rootDir);
RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId);
RestoreTool restoreTool = new RestoreTool(conf, backupRoot, restoreFileSystem, backupId);
Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
// We need hFS only for full restore (see the code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void init(final Connection conn, final String backupId, BackupRequest req
this.conf = conn.getConfiguration();
this.fs = CommonFSUtils.getCurrentFileSystem(conf);
backupInfo = backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
request.getRestoreRootDir(), request.getTotalTasks(), request.getBandwidth());
if (tableList == null || tableList.isEmpty()) {
this.tableList = new ArrayList<>(backupInfo.getTables());
}
Expand Down Expand Up @@ -330,7 +330,7 @@ protected String obtainBackupMetaDataStr(BackupInfo backupInfo) {
if (sb.lastIndexOf(";") > 0) {
sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
}
sb.append(",targetRootDir=" + backupInfo.getBackupRootDir());
sb.append(",restoreRootDir=" + backupInfo.getBackupRootDir());

return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void run(String[] backupIds) throws IOException {
Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
String dirs = StringUtils.join(dirPaths, ",");

Path bulkOutputPath = BackupUtils.getBulkOutputDir(
Path bulkOutputPath = BackupUtils.getBulkOutputDir(fs,
BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
// Delete content if exists
if (fs.exists(bulkOutputPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
final static String NAME = "HFileSplitterJob";
public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
public static final String BULK_OUTPUT_ROOT_DIR = "hfile.bulk.output.root.dir";
public final static String TABLES_KEY = "hfile.input.tables";
public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
Expand Down Expand Up @@ -50,8 +51,8 @@ public MapReduceRestoreJob() {
}

@Override
public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
boolean fullBackupRestore) throws IOException {
public void run(Path[] dirPaths, TableName[] tableNames, FileSystem restoreFileSystem,
TableName[] newTableNames, boolean fullBackupRestore) throws IOException {
String bulkOutputConfKey;

player = new MapReduceHFileSplitterJob();
Expand All @@ -71,8 +72,8 @@ public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNam
for (int i = 0; i < tableNames.length; i++) {
LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);

Path bulkOutputPath = BackupUtils
.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
Path bulkOutputPath = BackupUtils.getBulkOutputDir(restoreFileSystem,
BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
Configuration conf = getConf();
conf.set(bulkOutputConfKey, bulkOutputPath.toString());
String[] playerArgs = { dirs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand Down Expand Up @@ -686,16 +685,8 @@ public static boolean validate(HashMap<TableName, BackupManifest> backupManifest
return isValid;
}

public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
throws IOException {
FileSystem fs;
String bulkOutputRootDir = conf.get(MapReduceHFileSplitterJob.BULK_OUTPUT_ROOT_DIR);
if (bulkOutputRootDir != null) {
Path rootDir = new Path(bulkOutputRootDir);
fs = FileSystem.get(rootDir.toUri(), conf);
} else {
fs = FileSystem.get(conf);
}
public static Path getBulkOutputDir(FileSystem fs, String tableName, Configuration conf,
boolean deleteOnExit) throws IOException {
String tmp =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
Path path = new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
Expand All @@ -706,8 +697,9 @@ public static Path getBulkOutputDir(String tableName, Configuration conf, boolea
return path;
}

public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
return getBulkOutputDir(tableName, conf, true);
public static Path getBulkOutputDir(FileSystem restoreFileSystem, String tableName,
Configuration conf) throws IOException {
return getBulkOutputDir(restoreFileSystem, tableName, conf, true);
}

public static String getFileNameCompatibleString(TableName table) {
Expand Down
Loading

0 comments on commit f01b36c

Please sign in to comment.