From 77afe17e68d1a9ac4c78a2957bbfdcebc43dba2c Mon Sep 17 00:00:00 2001 From: Mark Hale Date: Tue, 17 Oct 2023 14:01:55 +0100 Subject: [PATCH] test --- .../msd/gin/halyard/common/TableKeyspace.java | 1 + .../halyard/tools/HalyardSparkBulkLoad.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/common/src/main/java/com/msd/gin/halyard/common/TableKeyspace.java b/common/src/main/java/com/msd/gin/halyard/common/TableKeyspace.java index 27afc9435..fe3c41fef 100644 --- a/common/src/main/java/com/msd/gin/halyard/common/TableKeyspace.java +++ b/common/src/main/java/com/msd/gin/halyard/common/TableKeyspace.java @@ -75,6 +75,7 @@ public void initMapperJob(List scans, Class> ma @Override public void close() throws IOException { if (isOwner && conn != null && !conn.isClosed()) { + System.out.println("!!!!!!CLOSING!!"); conn.close(); } } diff --git a/tools/src/main/java/com/msd/gin/halyard/tools/HalyardSparkBulkLoad.java b/tools/src/main/java/com/msd/gin/halyard/tools/HalyardSparkBulkLoad.java index 521d0cd2c..2e75ae1e9 100644 --- a/tools/src/main/java/com/msd/gin/halyard/tools/HalyardSparkBulkLoad.java +++ b/tools/src/main/java/com/msd/gin/halyard/tools/HalyardSparkBulkLoad.java @@ -3,6 +3,8 @@ import com.google.common.collect.Iterators; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -29,10 +31,13 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.eclipse.rdf4j.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; public class HalyardSparkBulkLoad extends HalyardBulkLoad { + private static final Logger logger= LoggerFactory.getLogger("HalyardSparkBulkLoad"); private static final String TOOL_NAME = "bulkload"; HalyardSparkBulkLoad() { @@ -53,6 +58,7 @@ public class HalyardSparkBulkLoad extends HalyardBulkLoad { @Override protected int run(Job job, TableDescriptor tableDesc) throws Exception { Configuration conf = job.getConfiguration(); + print("Driver", conf); Path workDir = FileOutputFormat.getOutputPath(job); SparkConf sparkConf = new SparkConf().setAppName(TOOL_NAME + " " + tableDesc.getTableName().getNameAsString()); try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) { @@ -71,11 +77,23 @@ protected int run(Job job, TableDescriptor tableDesc) throws Exception { private static Iterator> toKeyValues(Tuple2,Connection> partitionConn) throws IOException { Iterator partition = partitionConn._1(); Connection conn = partitionConn._2(); + //print("Worker", conn.getConfiguration()); RDFMapper mapper = new RDFMapper(); mapper.init(conn.getConfiguration(), conn); return Iterators.transform(partition, mapper::apply); } + private static void print(String title, Configuration c) throws IOException { + StringWriter s = new StringWriter(); + PrintWriter w = new PrintWriter(s); + w.println(title); + for (Map.Entry e : c.getPropsWithPrefix("hbase").entrySet()) { + String g = e.getValue(); + w.println(e.getKey()+": "+g.substring(0, Math.min(g.length(), 25))); + } + logger.info(s.toString()); + } + private static Pair toRow(KeyValue kv) { ByteArrayWrapper rowKey = new ByteArrayWrapper(Bytes.copy(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); FamiliesQualifiersValues fqv = new FamiliesQualifiersValues();