Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Hale committed Oct 16, 2023
1 parent 846b24a commit a977bfb
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.msd.gin.halyard.common.StatementIndex.Name;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -69,8 +70,8 @@
public final class HalyardTableUtils {

static final byte[] CF_NAME = Bytes.toBytes("e");
static final byte[] CONFIG_ROW_KEY = new byte[] {(byte) 0xff};
static final byte[] CONFIG_COL = Bytes.toBytes("config");
private static final byte[] CONFIG_ROW_KEY = new byte[] {(byte) 0xff};
private static final byte[] CONFIG_COL = Bytes.toBytes("config");

static final int DEFAULT_MAX_VERSIONS = 1;
static final int READ_VERSIONS = 1;
Expand Down Expand Up @@ -147,6 +148,24 @@ static void writeConfig(Table table, HalyardTableConfiguration halyardConfig) th
table.put(configPut);
}

public static Configuration readConfig(KeyspaceConnection conn) throws IOException {
Get getConfig = new Get(CONFIG_ROW_KEY)
.addColumn(CF_NAME, CONFIG_COL);
Result res = conn.get(getConfig);
if (res == null) {
throw new IOException("No config found");
}
Cell[] cells = res.rawCells();
if (cells == null || cells.length == 0) {
throw new IOException("No config found");
}
Cell cell = cells[0];
Configuration halyardConf = new Configuration(false);
ByteArrayInputStream bin = new ByteArrayInputStream(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
halyardConf.addResource(bin, "Table config");
return halyardConf;
}

public static Connection getConnection(Configuration config) throws IOException {
Configuration cfg = HBaseConfiguration.create(config);
cfg.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 3600000l);
Expand Down
19 changes: 1 addition & 18 deletions common/src/main/java/com/msd/gin/halyard/common/RDFFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
Expand All @@ -14,9 +13,6 @@
import javax.annotation.concurrent.ThreadSafe;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
Expand Down Expand Up @@ -79,20 +75,7 @@ public static RDFFactory create(Configuration config) {
}

public static RDFFactory create(KeyspaceConnection conn) throws IOException {
Get getConfig = new Get(HalyardTableUtils.CONFIG_ROW_KEY)
.addColumn(HalyardTableUtils.CF_NAME, HalyardTableUtils.CONFIG_COL);
Result res = conn.get(getConfig);
if (res == null) {
throw new IOException("No config found");
}
Cell[] cells = res.rawCells();
if (cells == null || cells.length == 0) {
throw new IOException("No config found");
}
Cell cell = cells[0];
Configuration halyardConf = new Configuration(false);
ByteArrayInputStream bin = new ByteArrayInputStream(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
halyardConf.addResource(bin);
Configuration halyardConf = HalyardTableUtils.readConfig(conn);

HalyardTableConfiguration config;
// migrate old config
Expand Down
29 changes: 7 additions & 22 deletions tools/src/main/java/com/msd/gin/halyard/tools/HalyardBulkLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -451,27 +451,7 @@ public ParserPump(CombineFileSplit split, TaskAttemptContext context) throws IOE
this.size = split.getLength();
Configuration conf = context.getConfiguration();
this.queue = new LinkedBlockingQueue<>(conf.getInt(PARSER_QUEUE_SIZE_PROPERTY, DEFAULT_PARSER_QUEUE_SIZE));
RDFFactory rdfFactory;
String target = conf.get(TARGET_TABLE_PROPERTY);
if (target != null) {
// load - table exists
Keyspace keyspace = HalyardTableUtils.getKeyspace(conf, target, null);
try {
try (KeyspaceConnection ksConn = keyspace.getConnection()) {
rdfFactory = RDFFactory.create(ksConn);
}
} finally {
try {
keyspace.close();
} finally {
keyspace.destroy();
}
}
} else {
// presplit - table not yet created
rdfFactory = RDFFactory.create(conf);
}
this.idValueFactory = new IdValueFactory(rdfFactory);
this.idValueFactory = new IdValueFactory(RDFFactory.create(conf));
this.valueFactory = new CachingValueFactory(idValueFactory, conf.getInt(VALUE_CACHE_SIZE_PROPERTY, DEFAULT_VALUE_CACHE_SIZE));
this.allowInvalidIris = conf.getBoolean(ALLOW_INVALID_IRIS_PROPERTY, false);
this.skipInvalidLines = conf.getBoolean(SKIP_INVALID_LINES_PROPERTY, false);
Expand Down Expand Up @@ -730,11 +710,16 @@ protected int run(CommandLine cmd) throws Exception {
TableMapReduceUtil.initCredentials(job);
TableDescriptor tableDesc;
try (Connection conn = HalyardTableUtils.getConnection(getConf())) {
try (Table hTable = HalyardTableUtils.getTable(conn, target, true, getConf().getInt(SPLIT_BITS_PROPERTY, DEFAULT_SPLIT_BITS))) {
try (Table hTable = HalyardTableUtils.getTable(conn, target, true, getConf().getInt(SPLIT_BITS_PROPERTY, DEFAULT_SPLIT_BITS))) {
tableDesc = hTable.getDescriptor();
RegionLocator regionLocator = conn.getRegionLocator(tableDesc.getTableName());
HFileOutputFormat2.configureIncrementalLoad(job, tableDesc, regionLocator);
}
try (Keyspace keyspace = HalyardTableUtils.getKeyspace(getConf(), conn, tableDesc.getTableName(), null, null)) {
try (KeyspaceConnection ksConn = keyspace.getConnection()) {
job.getConfiguration().addResource(HalyardTableUtils.readConfig(ksConn));
}
}
}
int rc = run(job, tableDesc);
if (rc == 0) {
Expand Down

0 comments on commit a977bfb

Please sign in to comment.