Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27649 WALPlayer does not properly dedupe overridden cell versions #5047

Merged
merged 8 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOExcep
public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo, TableName table)
throws IOException {
ArrayList<BackupImage> ancestors = getAncestors(backupInfo);
return filterAncestorsForTable(ancestors, table);
}

public static ArrayList<BackupImage> filterAncestorsForTable(ArrayList<BackupImage> ancestors,
TableName table) {
ArrayList<BackupImage> tableAncestors = new ArrayList<>();
for (BackupImage image : ancestors) {
if (image.hasTable(table)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.hadoop.hbase.backup.impl;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
import static org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -40,17 +42,26 @@
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;

/**
* Incremental backup implementation. See the {@link #execute() execute} method.
*/
Expand Down Expand Up @@ -276,10 +287,48 @@ public void execute() throws IOException {

// case INCREMENTAL_COPY:
try {
// todo: need to add an abstraction to encapsulate and DRY this up
ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo);
Map<TableName, List<RegionInfo>> regionsByTable = new HashMap<>();
List<ImmutableBytesWritable> splits = new ArrayList<>();
for (TableName table : backupInfo.getTables()) {
ArrayList<BackupImage> ancestorsForTable =
BackupManager.filterAncestorsForTable(ancestors, table);

BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1);
if (backupImage.getType() != BackupType.FULL) {
throw new RuntimeException("No full backup found in ancestors for table " + table);
}

String lastFullBackupId = backupImage.getBackupId();
Path backupRootDir = new Path(backupInfo.getBackupRootDir());

FileSystem backupFs = backupRootDir.getFileSystem(conf);
Path tableInfoPath =
BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table);
SnapshotProtos.SnapshotDescription snapshotDesc =
SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath);
SnapshotManifest manifest =
SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc);
List<RegionInfo> regionInfos = new ArrayList<>(manifest.getRegionManifests().size());
for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest.getRegionManifests()) {
HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo();
RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo);
// scanning meta doesnt return mob regions, so skip them here too so we keep parity
if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) {
continue;
}

regionInfos.add(regionInfoObj);
splits.add(new ImmutableBytesWritable(HFileOutputFormat2
.combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey())));
}
regionsByTable.put(table, regionInfos);
}
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf);
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles();
convertWALsToHFiles(splits);
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
backupInfo.getBackupRootDir());
} catch (Exception e) {
Expand Down Expand Up @@ -359,7 +408,7 @@ protected void deleteBulkLoadDirectory() throws IOException {
}
}

protected void convertWALsToHFiles() throws IOException {
protected void convertWALsToHFiles(List<ImmutableBytesWritable> splits) throws IOException {
// get incremental backup file list and prepare parameters for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// Get list of tables in incremental backup set
Expand All @@ -375,7 +424,7 @@ protected void convertWALsToHFiles() throws IOException {
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
}
}
walToHFiles(incrBackupFileList, tableList);
walToHFiles(incrBackupFileList, tableList, splits);

}

Expand All @@ -385,8 +434,9 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti
}
}

protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
Tool player = new WALPlayer();
protected void walToHFiles(List<String> dirPaths, List<String> tableList,
List<ImmutableBytesWritable> splits) throws IOException {
WALPlayer player = new WALPlayer();

// Player reads all files in arbitrary directory structure and creates
// a Map task for each file. We use ';' as separator
Expand All @@ -401,6 +451,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
conf.set(JOB_NAME_CONF_KEY, jobname);
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };

player.setSplits(splits);
try {
player.setConf(conf);
int result = player.run(playerArgs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
Expand All @@ -54,6 +53,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -122,7 +122,8 @@ private BackupUtils() {
* @param conf configuration
* @throws IOException exception
*/
public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
Map<TableName, List<RegionInfo>> lastFullBackupForTable, Configuration conf)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
Expand All @@ -147,20 +148,56 @@ public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, C
LOG.debug("Attempting to copy table info for:" + table + " target: " + target
+ " descriptor: " + orig);
LOG.debug("Finished copying tableinfo.");
List<RegionInfo> regions = MetaTableAccessor.getTableRegions(conn, table);
// For each region, write the region info to disk
LOG.debug("Starting to write region info for table " + table);
for (RegionInfo regionInfo : regions) {
Path regionDir = FSUtils
.getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
}
copyTableRegionInfosFromParent(table, targetFs, backupInfo,
lastFullBackupForTable.get(table), conf);
LOG.debug("Finished writing region info for table " + table);
}
}
}

private static void copyTableRegionInfosFromParent(TableName table, FileSystem targetFs,
BackupInfo backupInfo, List<RegionInfo> lastFullBackupForTable, Configuration conf)
throws IOException {
for (RegionInfo regionInfo : lastFullBackupForTable) {
Path regionDir =
FSUtils.getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
}
}

/**
* Returns value represent path for:
* ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/
* snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and
* 0.98) this path contains .snapshotinfo, .data.manifest (trunk)
* @param tableName table name
* @return path to table info
* @throws IOException exception
*/
public static Path getTableInfoPath(FileSystem fs, Path backupRootPath, String backupId,
TableName tableName) throws IOException {
Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
Path tableInfoPath = null;

// can't build the path directly as the timestamp values are different
FileStatus[] snapshots = fs.listStatus(tableSnapShotPath,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
for (FileStatus snapshot : snapshots) {
tableInfoPath = snapshot.getPath();
// SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
if (tableInfoPath.getName().endsWith("data.manifest")) {
break;
}
}
return tableInfoPath;
}

static Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backupId) {
return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId),
HConstants.SNAPSHOT_DIR_NAME);
}

/**
* Write the .regioninfo file on-disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException {
* the future
* @param conn HBase connection
* @param tableBackupPath backup path
* @param logDirs : incremental backup folders, which contains WAL
* @param tableNames : source tableNames(table names were backuped)
* @param newTableNames : target tableNames(table names to be restored to)
* @param hfileDirs incremental backup folders, which contains hfiles to bulkload
* @param tableNames source tableNames(table names were backuped)
* @param newTableNames target tableNames(table names to be restored to)
* @param incrBackupId incremental backup Id
* @throws IOException exception
*/
public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs,
public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] hfileDirs,
TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException {
try (Admin admin = conn.getAdmin()) {
if (tableNames.length != newTableNames.length) {
Expand Down Expand Up @@ -202,7 +202,7 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[
}
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);

restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, false);
restoreService.run(hfileDirs, tableNames, restoreRootDir, newTableNames, false);
}
}

Expand All @@ -225,39 +225,14 @@ Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backu
HConstants.SNAPSHOT_DIR_NAME);
}

/**
* Returns value represent path for:
* ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/
* snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and
* 0.98) this path contains .snapshotinfo, .data.manifest (trunk)
* @param tableName table name
* @return path to table info
* @throws IOException exception
*/
Path getTableInfoPath(TableName tableName) throws IOException {
Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
Path tableInfoPath = null;

// can't build the path directly as the timestamp values are different
FileStatus[] snapshots = fs.listStatus(tableSnapShotPath,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
for (FileStatus snapshot : snapshots) {
tableInfoPath = snapshot.getPath();
// SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
if (tableInfoPath.getName().endsWith("data.manifest")) {
break;
}
}
return tableInfoPath;
}

/**
* Get table descriptor
* @param tableName is the table backed up
* @return {@link TableDescriptor} saved in backup image of the table
*/
TableDescriptor getTableDesc(TableName tableName) throws IOException {
Path tableInfoPath = this.getTableInfoPath(tableName);
Path tableInfoPath = BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName);
;
SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
TableDescriptor tableDescriptor = manifest.getTableDescriptor();
Expand Down Expand Up @@ -307,7 +282,8 @@ private void createAndRestoreTable(Connection conn, TableName tableName, TableNa
tableDescriptor = manifest.getTableDescriptor();
} else {
tableDescriptor = getTableDesc(tableName);
snapshotMap.put(tableName, getTableInfoPath(tableName));
snapshotMap.put(tableName,
BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName));
}
if (tableDescriptor == null) {
LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
Expand Down
Loading