Skip to content

Commit

Permalink
HBASE-27904: A random data generator tool leveraging hbase bulk load (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Himanshu-g81 authored Jul 27, 2023
1 parent cfa3f13 commit f664552
Show file tree
Hide file tree
Showing 6 changed files with 724 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.bulkdatagenerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

public class BulkDataGeneratorInputFormat extends InputFormat<Text, NullWritable> {

public static final String MAPPER_TASK_COUNT_KEY =
BulkDataGeneratorInputFormat.class.getName() + "mapper.task.count";

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
// Get the number of mapper tasks configured
int mapperCount = job.getConfiguration().getInt(MAPPER_TASK_COUNT_KEY, -1);
Preconditions.checkArgument(mapperCount > 1, MAPPER_TASK_COUNT_KEY + " is not set.");

// Create a number of input splits equal to the number of mapper tasks
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < mapperCount; ++i) {
splits.add(new BulkDataGeneratorInputSplit());
}
return splits;
}

@Override
public RecordReader<Text, NullWritable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
BulkDataGeneratorRecordReader bulkDataGeneratorRecordReader =
new BulkDataGeneratorRecordReader();
bulkDataGeneratorRecordReader.initialize(split, context);
return bulkDataGeneratorRecordReader;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.bulkdatagenerator;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;

/**
* Dummy input split to be used by {@link BulkDataGeneratorRecordReader}
*/
public class BulkDataGeneratorInputSplit extends InputSplit implements Writable {

@Override
public void readFields(DataInput arg0) throws IOException {
}

@Override
public void write(DataOutput arg0) throws IOException {
}

@Override
public long getLength() throws IOException, InterruptedException {
return 0;
}

@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.bulkdatagenerator;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;

public class BulkDataGeneratorMapper
extends Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> {

/** Counter enumeration to count number of rows generated. */
public static enum Counters {
ROWS_GENERATED
}

public static final String SPLIT_COUNT_KEY =
BulkDataGeneratorMapper.class.getName() + "split.count";

private static final String ORG_ID = "00D000000000062";
private static final int MAX_EVENT_ID = Integer.MAX_VALUE;
private static final int MAX_VEHICLE_ID = 100;
private static final int MAX_SPEED_KPH = 140;
private static final int NUM_LOCATIONS = 10;
private static int splitCount = 1;
private static final Random random = new Random(System.currentTimeMillis());
private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS =
Maps.newHashMapWithExpectedSize(NUM_LOCATIONS);
private static final List<String> LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS);
static {
LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48)));
LOCATIONS.put("Brasília", new Pair<>(BigDecimal.valueOf(-15.78), BigDecimal.valueOf(-47.92)));
LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05)));
LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42)));
LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00)));
LOCATIONS.put("Porto Velho",
new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90)));
LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88)));
LOCATIONS.put("Rio de Janeiro",
new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23)));
LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68)));
LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62)));
LOCATION_KEYS.addAll(LOCATIONS.keySet());
}

final static byte[] COLUMN_FAMILY_BYTES = Utility.COLUMN_FAMILY.getBytes();

/** {@inheritDoc} */
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration c = context.getConfiguration();
splitCount = c.getInt(SPLIT_COUNT_KEY, 1);
}

/**
* Generates a single record based on value set to the key by
* {@link BulkDataGeneratorRecordReader#getCurrentKey()}.
* {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first
* {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures
* that records are equally distributed across all regions of the table since region boundaries
* are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)}
* method for region split info.
* @param key - The key having index of next record to be generated
* @param value - Value associated with the key (not used)
* @param context - Context of the mapper container
*/
@Override
protected void map(Text key, NullWritable value, Context context)
throws IOException, InterruptedException {

int recordIndex = Integer.parseInt(key.toString());

// <6-characters-region-boundary-prefix>_<15-random-chars>_<record-index-for-this-mapper-task>
final String toolEventId =
String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_"
+ EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_"
+ recordIndex;
final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID)));
final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID)));
final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH)));
final String location = LOCATION_KEYS.get(random.nextInt(NUM_LOCATIONS));
final Pair<BigDecimal, BigDecimal> coordinates = LOCATIONS.get(location);
final BigDecimal latitude = coordinates.getFirst();
final BigDecimal longitude = coordinates.getSecond();

final ImmutableBytesWritable hKey =
new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes());
addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID);
addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId);
addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId);
addKeyValue(context, hKey, Utility.TableColumnNames.VEHICLE_ID, vechileId);
addKeyValue(context, hKey, Utility.TableColumnNames.SPEED, speed);
addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString());
addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString());
addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location);
addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP,
String.valueOf(EnvironmentEdgeManager.currentTime()));

context.getCounter(Counters.ROWS_GENERATED).increment(1);
}

private void addKeyValue(final Context context, ImmutableBytesWritable key,
final Utility.TableColumnNames columnName, final String value)
throws IOException, InterruptedException {
KeyValue kv =
new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes());
context.write(key, kv);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.bulkdatagenerator;

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

public class BulkDataGeneratorRecordReader extends RecordReader<Text, NullWritable> {

private int numRecordsToCreate = 0;
private int createdRecords = 0;
private Text key = new Text();
private NullWritable value = NullWritable.get();

public static final String RECORDS_PER_MAPPER_TASK_KEY =
BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task";

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// Get the number of records to create from the configuration
this.numRecordsToCreate = context.getConfiguration().getInt(RECORDS_PER_MAPPER_TASK_KEY, -1);
Preconditions.checkArgument(numRecordsToCreate > 0,
"Number of records to be created by per mapper should be greater than 0.");
}

@Override
public boolean nextKeyValue() {
createdRecords++;
return createdRecords <= numRecordsToCreate;
}

@Override
public Text getCurrentKey() {
// Set the index of record to be created
key.set(String.valueOf(createdRecords));
return key;
}

@Override
public NullWritable getCurrentValue() {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return (float) createdRecords / (float) numRecordsToCreate;
}

@Override
public void close() throws IOException {

}
}
Loading

0 comments on commit f664552

Please sign in to comment.