Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
eclipse-rdf4j/rdf4j#1378 simplified code
Browse files Browse the repository at this point in the history
Signed-off-by: Håvard Ottestad <[email protected]>
  • Loading branch information
hmottestad committed Apr 3, 2019
1 parent 4389140 commit 89243d6
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*******************************************************************************/
package org.eclipse.rdf4j.sail.shacl.planNodes;

import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.impl.ListBindingSet;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.shacl.ShaclSailConnection;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

abstract class AbstractBulkJoinPlanNode implements PlanNode {

static void runQuery(ArrayDeque<Tuple> left, ArrayDeque<Tuple> right, SailConnection connection,
ParsedQuery parsedQuery, boolean skipBasedOnPreviousConnection) {
List<BindingSet> newBindindingset = buildBindingSets(left, connection, skipBasedOnPreviousConnection);

if (!newBindindingset.isEmpty()) {
updateQuery(parsedQuery, newBindindingset);
executeQuery(right, connection, parsedQuery);
}
}

private static void executeQuery(ArrayDeque<Tuple> right, SailConnection connection, ParsedQuery parsedQuery) {

try (Stream<? extends BindingSet> stream = Iterations.stream(
connection.evaluate(parsedQuery.getTupleExpr(), parsedQuery.getDataset(), new MapBindingSet(), true))) {
stream
.map(Tuple::new)
.forEach(right::addFirst);
}

}

private static void updateQuery(ParsedQuery parsedQuery, List<BindingSet> newBindindingset) {
try {
parsedQuery.getTupleExpr()
.visitChildren(new AbstractQueryModelVisitor<Exception>() {
@Override
public void meet(BindingSetAssignment node) {
node.setBindingSets(newBindindingset);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static List<BindingSet> buildBindingSets(ArrayDeque<Tuple> left, SailConnection connection,
boolean skipBasedOnPreviousConnection) {
return left.stream()
.map(tuple -> tuple.line.get(0))
.map(v -> (Resource) v)
.filter(r -> {
if (!skipBasedOnPreviousConnection) {
return true;
}

if (connection instanceof ShaclSailConnection) {
return ((ShaclSailConnection) connection).getPreviousStateConnection()
.hasStatement(r, null, null, true);
}
return true;

})
.map(r -> new ListBindingSet(Collections.singletonList("a"), Collections.singletonList(r)))
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,15 @@

import org.apache.commons.text.StringEscapeUtils;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.impl.ListBindingSet;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.QueryParserFactory;
import org.eclipse.rdf4j.query.parser.QueryParserRegistry;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.memory.MemoryStoreConnection;
import org.eclipse.rdf4j.sail.shacl.ShaclSailConnection;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.ArrayDeque;

/**
* @author Håvard Ottestad
Expand All @@ -41,7 +29,7 @@
* External means that this plan node can join the iterator from a plan node with an external source (Repository
* or SailConnection) based on a query or a predicate.
*/
public class BulkedExternalInnerJoin implements PlanNode {
public class BulkedExternalInnerJoin extends AbstractBulkJoinPlanNode {

private final SailConnection connection;
private final PlanNode leftNode;
Expand All @@ -65,9 +53,9 @@ public BulkedExternalInnerJoin(PlanNode leftNode, SailConnection connection, Str
public CloseableIteration<Tuple, SailException> iterator() {
return new CloseableIteration<Tuple, SailException>() {

LinkedList<Tuple> left = new LinkedList<>();
ArrayDeque<Tuple> left = new ArrayDeque<>();

LinkedList<Tuple> right = new LinkedList<>();
ArrayDeque<Tuple> right = new ArrayDeque<>();

CloseableIteration<Tuple, SailException> leftNodeIterator = leftNode.iterator();

Expand All @@ -85,45 +73,7 @@ private void calculateNext() {
return;
}

List<BindingSet> newBindindingset = left.stream()
.map(tuple -> tuple.line.get(0))
.map(v -> (Resource) v)
.filter(r -> {
if (!skipBasedOnPreviousConnection)
return true;

if (connection instanceof ShaclSailConnection) {
return ((ShaclSailConnection) connection).getPreviousStateConnection()
.hasStatement(r, null, null, true);
}
return true;

})
.map(r -> new ListBindingSet(Collections.singletonList("a"), Collections.singletonList(r)))
.collect(Collectors.toList());

if (!newBindindingset.isEmpty()) {

try {
parsedQuery.getTupleExpr().visitChildren(new AbstractQueryModelVisitor<Exception>() {
@Override
public void meet(BindingSetAssignment node) throws Exception {
node.setBindingSets(newBindindingset);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}

try (CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate = connection
.evaluate(parsedQuery.getTupleExpr(), parsedQuery.getDataset(), new MapBindingSet(),
true)) {
while (evaluate.hasNext()) {
BindingSet next = evaluate.next();
right.addFirst(new Tuple(next));
}
}
}
runQuery(left, right, connection, parsedQuery, skipBasedOnPreviousConnection);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.eclipse.rdf4j.sail.shacl.ShaclSailConnection;

import java.util.Collections;
import java.util.LinkedList;
import java.util.ArrayDeque;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -37,7 +37,7 @@
* External means that this plan node can join the iterator from a plan node with an external source (Repository
* or SailConnection) based on a query or a predicate.
*/
public class BulkedExternalLeftOuterJoin implements PlanNode {
public class BulkedExternalLeftOuterJoin extends AbstractBulkJoinPlanNode {

private final SailConnection connection;
private final PlanNode leftNode;
Expand All @@ -61,9 +61,9 @@ public BulkedExternalLeftOuterJoin(PlanNode leftNode, SailConnection connection,
public CloseableIteration<Tuple, SailException> iterator() {
return new CloseableIteration<Tuple, SailException>() {

LinkedList<Tuple> left = new LinkedList<>();
ArrayDeque<Tuple> left = new ArrayDeque<>();

LinkedList<Tuple> right = new LinkedList<>();
ArrayDeque<Tuple> right = new ArrayDeque<>();

CloseableIteration<Tuple, SailException> leftNodeIterator = leftNode.iterator();

Expand All @@ -73,52 +73,15 @@ private void calculateNext() {
return;
}

while (left.size() < 100 && leftNodeIterator.hasNext()) {
while (left.size() < 200 && leftNodeIterator.hasNext()) {
left.addFirst(leftNodeIterator.next());
}

if (left.isEmpty()) {
return;
}

List<BindingSet> newBindindingset = left.stream()
.map(tuple -> tuple.line.get(0))
.map(v -> (Resource) v)
.filter(r -> {
if (!skipBasedOnPreviousConnection) {
return true;
}

if (connection instanceof ShaclSailConnection) {
return ((ShaclSailConnection) connection).getPreviousStateConnection()
.hasStatement(r, null, null, true);
}
return true;

})
.map(r -> new ListBindingSet(Collections.singletonList("a"), Collections.singletonList(r)))
.collect(Collectors.toList());

if (!newBindindingset.isEmpty()) {
try {
parsedQuery.getTupleExpr().visitChildren(new AbstractQueryModelVisitor<Exception>() {
@Override
public void meet(BindingSetAssignment node) throws Exception {
node.setBindingSets(newBindindingset);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}

try (CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate = connection
.evaluate(parsedQuery.getTupleExpr(), null, new MapBindingSet(), true)) {
while (evaluate.hasNext()) {
BindingSet next = evaluate.next();
right.addFirst(new Tuple(next));
}
}
}
runQuery(left, right, connection, parsedQuery, skipBasedOnPreviousConnection);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ public class BulkedBenchmark {
private final static int SIZE = 10000;
private static final String QUERY = "?a <" + RDFS.LABEL + "> ?c";


private final SailRepository repository = new SailRepository(new MemoryStore());
private final List<Tuple> subjects;


public BulkedBenchmark(){
public BulkedBenchmark() {

repository.init();

Expand All @@ -76,10 +74,10 @@ public BulkedBenchmark(){
try (SailRepositoryConnection connection = repository.getConnection()) {
connection.begin();
ValueFactory vf = connection.getValueFactory();
for(int i = 0; i<SIZE; i++){
for (int i = 0; i < SIZE; i++) {
IRI iri = vf.createIRI("http://example.com/" + i);
connection.add(iri, RDF.TYPE, RDFS.RESOURCE);
connection.add(iri, RDFS.LABEL, vf.createLiteral("label_"+i));
connection.add(iri, RDFS.LABEL, vf.createLiteral("label_" + i));
subjects.add(iri);
}

Expand All @@ -101,18 +99,19 @@ public void setUp() {
}

@Benchmark
public int innerJoin(){
public int innerJoin() {
try (SailConnection connection = repository.getSail().getConnection()) {
PlanNode bulkedExternalInnerJoin = new BulkedExternalInnerJoin(new MockInputPlanNode(subjects), connection, QUERY, false);
PlanNode bulkedExternalInnerJoin = new BulkedExternalInnerJoin(new MockInputPlanNode(subjects), connection,
QUERY, false);
return new MockConsumePlanNode(bulkedExternalInnerJoin).asList().size();
}
}


@Benchmark
public int outerJoin(){
public int outerJoin() {
try (SailConnection connection = repository.getSail().getConnection()) {
PlanNode bulkedExternalInnerJoin = new BulkedExternalLeftOuterJoin(new MockInputPlanNode(subjects), connection, QUERY, false);
PlanNode bulkedExternalInnerJoin = new BulkedExternalLeftOuterJoin(new MockInputPlanNode(subjects),
connection, QUERY, false);
return new MockConsumePlanNode(bulkedExternalInnerJoin).asList().size();
}
}
Expand Down

0 comments on commit 89243d6

Please sign in to comment.