Skip to content

Commit

Permalink
RDFSplitter improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Hale committed Sep 13, 2024
1 parent 1bb367f commit 132244b
Showing 1 changed file with 76 additions and 38 deletions.
114 changes: 76 additions & 38 deletions tools/src/main/java/com/msd/gin/halyard/tools/RDFSplitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import com.google.common.io.CountingInputStream;

import java.io.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -16,7 +20,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.*;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
Expand All @@ -25,7 +31,12 @@
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.SimpleNamespace;
import org.eclipse.rdf4j.rio.*;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParser;
import org.eclipse.rdf4j.rio.RDFWriter;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.rio.helpers.BasicParserSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,9 +123,10 @@ private static RDFFormat getWriterFormatForName(String fileName) {
return Rio.getWriterFormatForFileName(fileName).orElseThrow(Rio.unsupportedFormat(fileName));
}

private static final int BUFFER_SIZE = 128*1024;
private static final int QUEUE_CAPACITY = 1;
private static final int MAX_BATCH_SIZE = 10000;
private static final int BUFFER_SIZE = Integer.getInteger("rdfSplitter.bufferSize", 128*1024);
private static final int QUEUE_CAPACITY = Integer.getInteger("rdfSplitter.queueCapacity", 1);
private static final int MIN_BATCH_SIZE = Integer.getInteger("rdfSplitter.minBatchSize", 1000);
private static final int MAX_BATCH_SIZE = Integer.getInteger("rdfSplitter.maxBatchSize", 10000);
private static final Object START = new Object();
private static final Object END = new Object();

Expand All @@ -128,6 +140,7 @@ private static RDFFormat getWriterFormatForName(String fileName) {
private OutputTask.StatementBatcher[] batchers;
private OutputTask.StatementBatcher bnodeBatcher;
private final OutputTask[] tasks;
private Consumer<Statement> stmtConsumer;
private long totalStmtReadCount;
private long inputByteSize;
private CountingInputStream inCounter;
Expand Down Expand Up @@ -168,6 +181,44 @@ public Long call() throws Exception {
return totalStmts;
}

private Consumer<Statement> createStatementConsumer(OutputTask.StatementBatcher[] batchers, OutputTask.StatementBatcher bnodeBatcher) {
if (tasks.length == 1) {
if (batchers.length > 1) {
// if there is only one output task then split the input file into sequential parts
// NB: not bnode safe
return st -> {
long currentBytesRead = inCounter.getCount() - 1;
int idx = (int) (currentBytesRead*batchers.length/inputByteSize);
batchers[idx].handleStatement(st);
};
} else {
return batchers[0]::handleStatement;
}
} else {
if (format.supportsContexts()) {
return st -> {
if (st.getSubject().isBNode() || st.getObject().isBNode() || (st.getContext() != null && st.getContext().isBNode())) {
bnodeBatcher.handleStatement(st);
} else {
// group by subject for more efficient turtle encoding
int idx = Math.floorMod(st.getSubject().hashCode(), batchers.length);
batchers[idx].handleStatement(st);
}
};
} else {
return st -> {
if (st.getSubject().isBNode() || st.getObject().isBNode()) {
bnodeBatcher.handleStatement(st);
} else {
// group by subject for more efficient turtle encoding
int idx = Math.floorMod(st.getSubject().hashCode(), batchers.length);
batchers[idx].handleStatement(st);
}
};
}
}
}

long split(RDFParser parser, InputStream in, RDFHandler[] handlers, RDFHandler bnodeHandler) throws Exception {
batchers = new OutputTask.StatementBatcher[handlers.length];
for (int i=0; i<handlers.length; i++) {
Expand All @@ -177,6 +228,8 @@ long split(RDFParser parser, InputStream in, RDFHandler[] handlers, RDFHandler b
bnodeBatcher = tasks[tasks.length-1].createBatcher(bnodeHandler);
}

stmtConsumer = createStatementConsumer(batchers, bnodeBatcher);

for (OutputTask task : tasks) {
completionService.submit(task);
}
Expand Down Expand Up @@ -253,25 +306,7 @@ public void handleNamespace(String prefix, String uri) throws RDFHandlerExceptio
public void handleStatement(Statement st) throws RDFHandlerException {
logStatus();
totalStmtReadCount++;
if (tasks.length == 1) {
int idx;
if (batchers.length > 1) {
// if there is only one output task then split the input file into sequential parts
long currentBytesRead = inCounter.getCount() - 1;
idx = (int) (currentBytesRead*batchers.length/inputByteSize);
} else {
idx = 0;
}
batchers[idx].handleStatement(st);
} else {
if (st.getSubject().isBNode() || st.getObject().isBNode() || (st.getContext() != null && st.getContext().isBNode())) {
bnodeBatcher.handleStatement(st);
} else {
// group by subject for more efficient turtle encoding
int idx = Math.floorMod(st.getSubject().hashCode(), batchers.length);
batchers[idx].handleStatement(st);
}
}
stmtConsumer.accept(st);
}

@Override
Expand Down Expand Up @@ -317,18 +352,19 @@ public StatementBatcher createBatcher(RDFHandler handler) {
}

private boolean addToQueue(RDFHandler handler, List<Object> batch, boolean flush) throws RDFHandlerException {
boolean enqueued;
boolean enqueued = false;
final int batchSize = batch.size();
if (batchSize >= MAX_BATCH_SIZE || flush) {
enqueued = false;
try {
queue.put(new StatementBatch(handler, batch));
enqueued = true;
} catch (InterruptedException e) {
throw new RDFHandlerException(e);
if (batchSize >= MIN_BATCH_SIZE || flush) {
if (batchSize >= MAX_BATCH_SIZE || flush) {
try {
queue.put(new StatementBatch(handler, batch));
enqueued = true;
} catch (InterruptedException e) {
throw new RDFHandlerException(e);
}
} else {
enqueued = queue.offer(new StatementBatch(handler, batch));
}
} else {
enqueued = queue.offer(new StatementBatch(handler, batch));
}
return enqueued;
}
Expand All @@ -354,7 +390,8 @@ public Long call() throws IOException, InterruptedException {

final class StatementBatcher implements RDFHandler {
private final RDFHandler handler;
private List<Object> batch = new ArrayList<>();
private List<Object> batch = new ArrayList<>(MIN_BATCH_SIZE);
private int batchCount;
private long stmtCount;

StatementBatcher(RDFHandler handler) {
Expand All @@ -363,8 +400,9 @@ final class StatementBatcher implements RDFHandler {

public void add(Object o, boolean flush) throws RDFHandlerException {
batch.add(o);
final int batchSize = batch.size();
if (addToQueue(handler, batch, flush)) {
batchCount++;
int batchSize = batch.size();
batch = new ArrayList<>(batchSize);
}
}
Expand All @@ -381,7 +419,7 @@ public void startRDF() throws RDFHandlerException {
@Override
public void endRDF() throws RDFHandlerException {
add(END, true);
LOGGER.info("Batcher for {} consumed {} statements", handler.toString(), stmtCount);
LOGGER.info("Batcher for {} consumed {} statements over {} batches", handler.toString(), stmtCount, batchCount);
}

@Override
Expand Down

0 comments on commit 132244b

Please sign in to comment.