diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorInputFormat.java new file mode 100644 index 000000000000..1344703a9117 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorInputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.bulkdatagenerator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +public class BulkDataGeneratorInputFormat extends InputFormat { + + public static final String MAPPER_TASK_COUNT_KEY = + BulkDataGeneratorInputFormat.class.getName() + "mapper.task.count"; + + @Override + public List getSplits(JobContext job) throws IOException { + // Get the number of mapper tasks configured + int mapperCount = job.getConfiguration().getInt(MAPPER_TASK_COUNT_KEY, -1); + Preconditions.checkArgument(mapperCount > 1, MAPPER_TASK_COUNT_KEY + " is not set."); + + // Create a number of input splits equal to the number of mapper tasks + ArrayList splits = new ArrayList(); + for (int i = 0; i < mapperCount; ++i) { + splits.add(new BulkDataGeneratorInputSplit()); + } + return splits; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + BulkDataGeneratorRecordReader bulkDataGeneratorRecordReader = + new BulkDataGeneratorRecordReader(); + bulkDataGeneratorRecordReader.initialize(split, context); + return bulkDataGeneratorRecordReader; + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorInputSplit.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorInputSplit.java new file mode 100644 index 000000000000..7f8c216a92b7 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorInputSplit.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.bulkdatagenerator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Dummy input split to be used by {@link BulkDataGeneratorRecordReader} + */ +public class BulkDataGeneratorInputSplit extends InputSplit implements Writable { + + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorMapper.java new file mode 100644 index 000000000000..35f8b9c471e5 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorMapper.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.bulkdatagenerator; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.commons.math3.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; + +public class BulkDataGeneratorMapper + extends Mapper { + + /** Counter enumeration to count number of rows generated. */ + public static enum Counters { + ROWS_GENERATED + } + + public static final String SPLIT_COUNT_KEY = + BulkDataGeneratorMapper.class.getName() + "split.count"; + + private static final String ORG_ID = "00D000000000062"; + private static final int MAX_EVENT_ID = Integer.MAX_VALUE; + private static final int MAX_VEHICLE_ID = 100; + private static final int MAX_SPEED_KPH = 140; + private static final int NUM_LOCATIONS = 10; + private static int splitCount = 1; + private static final Random random = new Random(System.currentTimeMillis()); + private static final Map> LOCATIONS = + Maps.newHashMapWithExpectedSize(NUM_LOCATIONS); + private static final List LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS); + static { + LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48))); + LOCATIONS.put("Brasília", new Pair<>(BigDecimal.valueOf(-15.78), BigDecimal.valueOf(-47.92))); + LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05))); + LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42))); + LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00))); + LOCATIONS.put("Porto Velho", + new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90))); + LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88))); + LOCATIONS.put("Rio de Janeiro", + new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23))); + LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68))); + LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62))); + LOCATION_KEYS.addAll(LOCATIONS.keySet()); + } + + final static byte[] COLUMN_FAMILY_BYTES = Utility.COLUMN_FAMILY.getBytes(); + + /** {@inheritDoc} */ + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Configuration c = context.getConfiguration(); + splitCount = c.getInt(SPLIT_COUNT_KEY, 1); + } + + /** + * Generates a single record based on value set to the key by + * {@link BulkDataGeneratorRecordReader#getCurrentKey()}. + * {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first + * {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures + * that records are equally distributed across all regions of the table since region boundaries + * are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)} + * method for region split info. + * @param key - The key having index of next record to be generated + * @param value - Value associated with the key (not used) + * @param context - Context of the mapper container + */ + @Override + protected void map(Text key, NullWritable value, Context context) + throws IOException, InterruptedException { + + int recordIndex = Integer.parseInt(key.toString()); + + // <6-characters-region-boundary-prefix>_<15-random-chars>_ + final String toolEventId = + String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_" + + EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_" + + recordIndex; + final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID))); + final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID))); + final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH))); + final String location = LOCATION_KEYS.get(random.nextInt(NUM_LOCATIONS)); + final Pair coordinates = LOCATIONS.get(location); + final BigDecimal latitude = coordinates.getFirst(); + final BigDecimal longitude = coordinates.getSecond(); + + final ImmutableBytesWritable hKey = + new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes()); + addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID); + addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId); + addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId); + addKeyValue(context, hKey, Utility.TableColumnNames.VEHICLE_ID, vechileId); + addKeyValue(context, hKey, Utility.TableColumnNames.SPEED, speed); + addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString()); + addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString()); + addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location); + addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP, + String.valueOf(EnvironmentEdgeManager.currentTime())); + + context.getCounter(Counters.ROWS_GENERATED).increment(1); + } + + private void addKeyValue(final Context context, ImmutableBytesWritable key, + final Utility.TableColumnNames columnName, final String value) + throws IOException, InterruptedException { + KeyValue kv = + new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes()); + context.write(key, kv); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorRecordReader.java new file mode 100644 index 000000000000..f4ecc659e51b --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorRecordReader.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.bulkdatagenerator; + +import java.io.IOException; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +public class BulkDataGeneratorRecordReader extends RecordReader { + + private int numRecordsToCreate = 0; + private int createdRecords = 0; + private Text key = new Text(); + private NullWritable value = NullWritable.get(); + + public static final String RECORDS_PER_MAPPER_TASK_KEY = + BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task"; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + // Get the number of records to create from the configuration + this.numRecordsToCreate = context.getConfiguration().getInt(RECORDS_PER_MAPPER_TASK_KEY, -1); + Preconditions.checkArgument(numRecordsToCreate > 0, + "Number of records to be created by per mapper should be greater than 0."); + } + + @Override + public boolean nextKeyValue() { + createdRecords++; + return createdRecords <= numRecordsToCreate; + } + + @Override + public Text getCurrentKey() { + // Set the index of record to be created + key.set(String.valueOf(createdRecords)); + return key; + } + + @Override + public NullWritable getCurrentValue() { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return (float) createdRecords / (float) numRecordsToCreate; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorTool.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorTool.java new file mode 100644 index 000000000000..befa1486dec4 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/BulkDataGeneratorTool.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.bulkdatagenerator; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; +import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser; + +/** + * A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random + * data, equally distributed among all regions. + */ +public class BulkDataGeneratorTool { + + private static final Logger logger = LoggerFactory.getLogger(BulkDataGeneratorTool.class); + + /** + * Prefix for the generated HFiles directory + */ + private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/"; + + /** + * Number of mapper container to be launched for generating of HFiles + */ + private int mapperCount; + + /** + * Number of rows to be generated by each mapper + */ + private long rowsPerMapper; + + /** + * Table for which random data needs to be generated + */ + private String table; + + /** + * Number of splits for the {@link #table}. Number of regions for the table will be + * ({@link #splitCount} + 1). + */ + private int splitCount; + + /** + * Flag to delete the table (before creating) if it already exists + */ + private boolean deleteTableIfExist; + + /** + * Additional HBase meta-data options to be set for the table + */ + private final Map tableOptions = new HashMap<>(); + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + BulkDataGeneratorTool bulkDataGeneratorTool = new BulkDataGeneratorTool(); + bulkDataGeneratorTool.run(conf, args); + } + + public boolean run(Configuration conf, String[] args) throws IOException { + // Read CLI arguments + CommandLine line = null; + try { + Parser parser = new GnuParser(); + line = parser.parse(getOptions(), args); + readCommandLineParameters(conf, line); + } catch (ParseException | IOException exception) { + logger.error("Error while parsing CLI arguments.", exception); + printUsage(); + return false; + } + + if (line.hasOption("-h")) { + printUsage(); + return true; + } + + Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name must not be empty"); + Preconditions.checkArgument(mapperCount > 0, "Mapper count must be greater than 0"); + Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT), + "Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT); + Preconditions.checkArgument(rowsPerMapper > 0, "Rows per mapper must be greater than 0"); + + Path outputDirectory = generateOutputDirectory(); + logger.info("HFiles will be generated at " + outputDirectory.toString()); + + try (Connection connection = ConnectionFactory.createConnection(conf)) { + final Admin admin = connection.getAdmin(); + final TableName tableName = TableName.valueOf(table); + if (admin.tableExists(tableName)) { + if (deleteTableIfExist) { + logger.info( + "Deleting the table since it already exist and delete-if-exist flag is set to true"); + Utility.deleteTable(admin, table); + } else { + logger.info("Table already exists, cannot generate HFiles for existing table."); + return false; + } + } + + // Creating the pre-split table + Utility.createTable(admin, table, splitCount, tableOptions); + logger.info(table + " created successfully"); + + Job job = createSubmittableJob(conf); + + Table hbaseTable = connection.getTable(tableName); + + // Auto configure partitioner and reducer + HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseTable.getRegionLocator()); + + FileOutputFormat.setOutputPath(job, outputDirectory); + + boolean result = job.waitForCompletion(true); + + if (result) { + logger.info("HFiles generated successfully. Starting bulk load to " + table); + BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf); + Map bulkLoadedHFiles = + bulkLoadHFilesTool.bulkLoad(tableName, outputDirectory); + boolean status = !bulkLoadedHFiles.isEmpty(); + logger.info("BulkLoadHFiles finished successfully with status " + status); + return status; + } else { + logger.info("Failed to generate HFiles."); + return false; + } + } catch (Exception e) { + logger.error("Failed to generate data", e); + return false; + } finally { + FileSystem.get(conf).deleteOnExit(outputDirectory); + } + } + + protected Job createSubmittableJob(Configuration conf) throws IOException { + + conf.setInt(BulkDataGeneratorMapper.SPLIT_COUNT_KEY, splitCount); + conf.setInt(BulkDataGeneratorInputFormat.MAPPER_TASK_COUNT_KEY, mapperCount); + conf.setLong(BulkDataGeneratorRecordReader.RECORDS_PER_MAPPER_TASK_KEY, rowsPerMapper); + + Job job = new Job(conf, BulkDataGeneratorTool.class.getSimpleName() + " - " + table); + + job.setJarByClass(BulkDataGeneratorMapper.class); + job.setInputFormatClass(BulkDataGeneratorInputFormat.class); + + HBaseConfiguration.addHbaseResources(conf); + + job.setMapperClass(BulkDataGeneratorMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + + return job; + } + + /** Returns Random output directory path where HFiles will be generated */ + protected Path generateOutputDirectory() { + final String outputDirectory = + OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis(); + return new Path(outputDirectory); + } + + /** + * This method parses the command line parameters into instance variables + */ + protected void readCommandLineParameters(Configuration conf, CommandLine line) + throws ParseException, IOException { + final List genericParameters = new ArrayList(); + + // Parse the generic options + for (Map.Entry entry : line.getOptionProperties("D").entrySet()) { + genericParameters.add("-D"); + genericParameters.add(entry.getKey() + "=" + entry.getValue()); + } + + logger.info( + "Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0]))); + + new GenericOptionsParser(conf, genericParameters.toArray(new String[0])); + + table = line.getOptionValue("table"); + + if (line.hasOption("mapper-count")) { + mapperCount = Integer.parseInt(line.getOptionValue("mapper-count")); + } + if (line.hasOption("split-count")) { + splitCount = Integer.parseInt(line.getOptionValue("split-count")); + } + if (line.hasOption("rows-per-mapper")) { + rowsPerMapper = Long.parseLong(line.getOptionValue("rows-per-mapper")); + } + + deleteTableIfExist = line.hasOption("delete-if-exist"); + + parseTableOptions(line); + } + + private void parseTableOptions(final CommandLine line) { + final String tableOptionsAsString = line.getOptionValue("table-options"); + if (!StringUtils.isEmpty(tableOptionsAsString)) { + for (String tableOption : tableOptionsAsString.split(",")) { + final String[] keyValueSplit = tableOption.split("="); + final String key = keyValueSplit[0]; + final String value = keyValueSplit[1]; + tableOptions.put(key, value); + } + } + } + + /** Returns the command line option for {@link BulkDataGeneratorTool} */ + protected Options getOptions() { + final Options options = new Options(); + Option option = + new Option("t", "table", true, "The table name for which data need to be generated."); + options.addOption(option); + + option = new Option("d", "delete-if-exist", false, + "If it's set, the table will be deleted if already exist."); + options.addOption(option); + + option = + new Option("mc", "mapper-count", true, "The number of mapper containers to be launched."); + options.addOption(option); + + option = new Option("sc", "split-count", true, + "The number of regions/pre-splits to be created for the table."); + options.addOption(option); + + option = + new Option("r", "rows-per-mapper", true, "The number of rows to be generated PER mapper."); + options.addOption(option); + + option = + new Option("o", "table-options", true, "Table options to be set while creating the table."); + options.addOption(option); + + option = new Option("h", "help", false, "Show help message for the tool"); + options.addOption(option); + + return options; + } + + protected void printUsage() { + final HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(120); + final String helpMessageCommand = "hbase " + BulkDataGeneratorTool.class.getName(); + final String commandSyntax = helpMessageCommand + " [-D]*"; + final String helpMessageSuffix = "Examples:\n" + helpMessageCommand + + " -t TEST_TABLE -mc 10 -r 100 -sc 10\n" + helpMessageCommand + + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"BACKUP=false,NORMALIZATION_ENABLED=false\"\n" + + helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192\n"; + helpFormatter.printHelp(commandSyntax, "", getOptions(), helpMessageSuffix); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/Utility.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/Utility.java new file mode 100644 index 000000000000..3db75239a646 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/bulkdatagenerator/Utility.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.bulkdatagenerator; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +public final class Utility { + + /** + * Schema for HBase table to be generated by generated and populated by + * {@link BulkDataGeneratorTool} + */ + public enum TableColumnNames { + ORG_ID("orgId".getBytes()), + TOOL_EVENT_ID("toolEventId".getBytes()), + EVENT_ID("eventId".getBytes()), + VEHICLE_ID("vehicleId".getBytes()), + SPEED("speed".getBytes()), + LATITUDE("latitude".getBytes()), + LONGITUDE("longitude".getBytes()), + LOCATION("location".getBytes()), + TIMESTAMP("timestamp".getBytes()); + + private final byte[] columnName; + + TableColumnNames(byte[] column) { + this.columnName = column; + } + + public byte[] getColumnName() { + return this.columnName; + } + } + + public static final String COLUMN_FAMILY = "cf"; + + public static final int SPLIT_PREFIX_LENGTH = 6; + + public static final int MAX_SPLIT_COUNT = (int) Math.pow(10, SPLIT_PREFIX_LENGTH); + + /** + * Private Constructor + */ + private Utility() { + + } + + public static void deleteTable(Admin admin, String tableName) throws IOException { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + } + + /** + * Creates a pre-splitted HBase Table having single column family ({@link #COLUMN_FAMILY}) and + * sequential splits with {@link #SPLIT_PREFIX_LENGTH} length character prefix. Example: If a + * table (TEST_TABLE_1) need to be generated with splitCount as 10, table would be created with + * (10+1) regions with boundaries end-keys as (000000-000001, 000001-000002, 000002-000003, ...., + * 0000010-) + * @param admin - Admin object associated with HBase connection + * @param tableName - Name of table to be created + * @param splitCount - Number of splits for the table (Number of regions will be splitCount + 1) + * @param tableOptions - Additional HBase metadata properties to be set for the table + */ + public static void createTable(Admin admin, String tableName, int splitCount, + Map tableOptions) throws IOException { + Preconditions.checkArgument(splitCount > 0, "Split count must be greater than 0"); + TableDescriptorBuilder tableDescriptorBuilder = + TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); + tableOptions.forEach(tableDescriptorBuilder::setValue); + TableDescriptor tableDescriptor = tableDescriptorBuilder + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)).build(); + // Pre-splitting table based on splitCount + byte[][] splitKeys = new byte[splitCount][]; + for (int i = 0; i < splitCount; i++) { + splitKeys[i] = String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", i + 1).getBytes(); + } + admin.createTable(tableDescriptor, splitKeys); + } +}