Skip to content

Commit

Permalink
more enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
rda3mon committed Aug 27, 2022
1 parent 286c8d0 commit d7a1930
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
} else {
tableNameBytes = Bytes.toBytes(writeTableNames);
}
String tableName = Bytes.toString(tableNameBytes);
Path tableRelPath = getTableRelativePath(tableNameBytes);
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);

Expand Down Expand Up @@ -294,7 +295,6 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;

String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) {
try (
Connection connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,22 @@ public void setup(Context context) throws IOException {
* A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer}
*/
static class WALCellMapper extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
private byte[] table;
private Set<String> tableSet = new HashSet<>();
private boolean multiTableSupport = false;

@Override
public void map(WALKey key, WALEdit value, Context context) throws IOException {
try {
// skip all other tables
if (Bytes.equals(table, key.getTableName().getName())) {
TableName table = key.getTableName();
if (tableSet.contains(table.getNameAsString())) {
for (Cell cell : value.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)),
new MapReduceExtendedCell(cell));
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
}
}
} catch (InterruptedException e) {
Expand All @@ -165,13 +168,12 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {

@Override
public void setup(Context context) throws IOException {
// only a single table is supported when HFiles are generated with HFileOutputFormat
String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
if (tables == null || tables.length != 1) {
// this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("Exactly one table must be specified for bulk HFile case.");
Configuration conf = context.getConfiguration();
String[] tables = conf.getStrings(TABLES_KEY);
this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false);
for (String table : tables) {
tableSet.add(table);
}
table = Bytes.toBytes(tables[0]);
}
}

Expand Down Expand Up @@ -363,7 +365,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(KeyValue.class);
try (Connection conn = ConnectionFactory.createConnection(conf);) {
List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
List<TableInfo> tableInfoList = new ArrayList<>();
for (TableName tableName : tableNames) {
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName);
Expand Down

0 comments on commit d7a1930

Please sign in to comment.