From d7a1930ad327acea8cf8024ca4da54491d7d41cc Mon Sep 17 00:00:00 2001 From: Mallikarjun Date: Sat, 27 Aug 2022 15:28:52 +0530 Subject: [PATCH] more enhancements --- .../hbase/mapreduce/HFileOutputFormat2.java | 2 +- .../hadoop/hbase/mapreduce/WALPlayer.java | 26 ++++++++++--------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 13c7cbe2a764..600aed1fb63b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -264,6 +264,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { } else { tableNameBytes = Bytes.toBytes(writeTableNames); } + String tableName = Bytes.toString(tableNameBytes); Path tableRelPath = getTableRelativePath(tableNameBytes); byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); @@ -294,7 +295,6 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; - String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { try ( Connection connection = diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 687e80568b2f..65b77d17a903 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -143,19 +143,22 @@ public void setup(Context context) throws IOException { * A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer} */ static class WALCellMapper extends Mapper { - private byte[] table; + private Set tableSet = new HashSet<>(); + private boolean multiTableSupport = false; @Override public void map(WALKey key, WALEdit value, Context context) throws IOException { try { - // skip all other tables - if (Bytes.equals(table, key.getTableName().getName())) { + TableName table = key.getTableName(); + if (tableSet.contains(table.getNameAsString())) { for (Cell cell : value.getCells()) { if (WALEdit.isMetaEditFamily(cell)) { continue; } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), - new MapReduceExtendedCell(cell)); + byte[] outKey = multiTableSupport + ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) + : CellUtil.cloneRow(cell); + context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell)); } } } catch (InterruptedException e) { @@ -165,13 +168,12 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException { @Override public void setup(Context context) throws IOException { - // only a single table is supported when HFiles are generated with HFileOutputFormat - String[] tables = context.getConfiguration().getStrings(TABLES_KEY); - if (tables == null || tables.length != 1) { - // this can only happen when WALMapper is used directly by a class other than WALPlayer - throw new IOException("Exactly one table must be specified for bulk HFile case."); + Configuration conf = context.getConfiguration(); + String[] tables = conf.getStrings(TABLES_KEY); + this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false); + for (String table : tables) { + tableSet.add(table); } - table = Bytes.toBytes(tables[0]); } } @@ -363,7 +365,7 @@ public Job createSubmittableJob(String[] args) throws IOException { FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); try (Connection conn = ConnectionFactory.createConnection(conf);) { - List tableInfoList = new ArrayList(); + List tableInfoList = new ArrayList<>(); for (TableName tableName : tableNames) { Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName);