Skip to content

Commit

Permalink
Fixes #584 - Allow asterisk as node label filter (#585)
Browse files Browse the repository at this point in the history
* also introduces key:null properties for virtual nodes if the property is not present at the group members
  • Loading branch information
s1ck authored and jexp committed Sep 11, 2017
1 parent 37b1ae1 commit 091e8c0
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 93 deletions.
209 changes: 130 additions & 79 deletions src/main/java/apoc/nodes/Grouping.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,51 +30,28 @@
public class Grouping {

private static final int BATCHSIZE = 10000;

private static final String ASTERISK = "*";

@Context
public GraphDatabaseAPI db;
@Context
public Log log;

static class Key {
private final int hash;
private final String label;
private final Map<String,Object> values;

public Key(String label, Map<String, Object> values) {
this.label = label;
this.values = values;
hash = 31 * label.hashCode() + values.hashCode();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Key key = (Key) o;
return label.equals(key.label) && values.equals(key.values);
}

@Override
public int hashCode() {
return hash;
}
}

@Procedure
@Description("Group all nodes and their relationships by given keys, create virtual nodes and relationships for the summary information, you can provide an aggregations map [{kids:'sum',age:['min','max','avg'],gender:'collect'},{`*`,'count'}]")
public Stream<GraphResult> group(@Name("labels") List<String> labels, @Name("groupByProperties") List<String> groupByProperties,
@Name(value = "aggregations",defaultValue = "[{\"*\":\"count\"},{\"*\":\"count\"}]") List<Map<String,Object>> aggregations) {
@Name(value = "aggregations", defaultValue = "[{\"*\":\"count\"},{\"*\":\"count\"}]") List<Map<String, Object>> aggregations) {
String[] keys = groupByProperties.toArray(new String[groupByProperties.size()]);
Map<String,List<String>> nodeAggNames = (aggregations.size()>0) ? toStringListMap(aggregations.get(0)) : emptyMap();
String[] nodeAggKeys = keyArray(nodeAggNames, "*");
Map<String, List<String>> nodeAggNames = (aggregations.size() > 0) ? toStringListMap(aggregations.get(0)) : emptyMap();
String[] nodeAggKeys = keyArray(nodeAggNames, ASTERISK);

Map<String,List<String>> relAggNames = (aggregations.size()>1) ? toStringListMap(aggregations.get(1)) : emptyMap();
String[] relAggKeys = keyArray(relAggNames, "*");;
Map<String, List<String>> relAggNames = (aggregations.size() > 1) ? toStringListMap(aggregations.get(1)) : emptyMap();
String[] relAggKeys = keyArray(relAggNames, ASTERISK);

Map<Key,Set<Node>> grouped = new ConcurrentHashMap<>();
Map<Key,Node> virtual = new ConcurrentHashMap<>();
Map<RelKey,Relationship> virtualRels = new ConcurrentHashMap<>();
Map<NodeKey, Set<Node>> grouped = new ConcurrentHashMap<>();
Map<NodeKey, Node> virtualNodes = new ConcurrentHashMap<>();
Map<RelKey, Relationship> virtualRels = new ConcurrentHashMap<>();

List<Future> futures = new ArrayList<>(1000);

Expand All @@ -83,27 +60,32 @@ public Stream<GraphResult> group(@Name("labels") List<String> labels, @Name("gro
Label label = Label.label(labelName);
Label[] singleLabel = {label};

try (ResourceIterator<Node> nodes = db.findNodes(label)) {
try (ResourceIterator<Node> nodes = (labelName.equals(ASTERISK)) ? db.getAllNodes().iterator() : db.findNodes(label)) {
while (nodes.hasNext()) {
List<Node> batch = Util.take(nodes, BATCHSIZE);
futures.add(Util.inTxFuture(pool, db, () -> {
try {
for (Node node : batch) {
Key key = keyFor(node, labelName, keys);
grouped.compute(key, (k, v) -> {if (v == null) v = new HashSet<>(); v.add(node); return v;});
virtual.compute(key, (k, v) -> {
if (v == null) {
v = new VirtualNode(singleLabel,node.getProperties(keys),db);
}
Node vn = v;
if (!nodeAggNames.isEmpty()) {
aggregate(vn, nodeAggNames, nodeAggKeys.length > 0 ? node.getProperties(nodeAggKeys) : Collections.emptyMap());
}
return vn;}
NodeKey key = keyFor(node, labelName, keys);
grouped.compute(key, (k, v) -> {
if (v == null) v = new HashSet<>();
v.add(node);
return v;
});
virtualNodes.compute(key, (k, v) -> {
if (v == null) {
v = new VirtualNode(singleLabel, propertiesFor(node, keys), db);
}
Node vn = v;
if (!nodeAggNames.isEmpty()) {
aggregate(vn, nodeAggNames, nodeAggKeys.length > 0 ? node.getProperties(nodeAggKeys) : Collections.emptyMap());
}
return vn;
}
);
}
} catch(Exception e) {
log.debug("Error grouping nodes",e);
} catch (Exception e) {
log.debug("Error grouping nodes", e);
}
return null;
}));
Expand All @@ -113,27 +95,27 @@ public Stream<GraphResult> group(@Name("labels") List<String> labels, @Name("gro
}
Util.waitForFutures(futures);
futures.clear();
Iterator<Map.Entry<Key, Set<Node>>> entries = grouped.entrySet().iterator();
Iterator<Map.Entry<NodeKey, Set<Node>>> entries = grouped.entrySet().iterator();
int size = 0;
List<Map.Entry<Key, Set<Node>>> batch = new ArrayList<>();
List<Map.Entry<NodeKey, Set<Node>>> batch = new ArrayList<>();
while (entries.hasNext()) {
Map.Entry<Key, Set<Node>> outerEntry = entries.next();
Map.Entry<NodeKey, Set<Node>> outerEntry = entries.next();
batch.add(outerEntry);
size += outerEntry.getValue().size();
if (size > BATCHSIZE || !entries.hasNext()) {
ArrayList<Map.Entry<Key, Set<Node>>> submitted = new ArrayList<>(batch);
ArrayList<Map.Entry<NodeKey, Set<Node>>> submitted = new ArrayList<>(batch);
batch.clear();
size = 0;
futures.add(Util.inTxFuture(pool, db, () -> {
try {
for (Map.Entry<Key, Set<Node>> entry : submitted) {
for (Map.Entry<NodeKey, Set<Node>> entry : submitted) {
for (Node node : entry.getValue()) {
Key startKey = entry.getKey();
Node v1 = virtual.get(startKey);
NodeKey startKey = entry.getKey();
Node v1 = virtualNodes.get(startKey);
for (Relationship rel : node.getRelationships(Direction.OUTGOING)) {
Node endNode = rel.getEndNode();
for (Key endKey : keysFor(endNode, labels, keys)) {
Node v2 = virtual.get(endKey);
for (NodeKey endKey : keysFor(endNode, labels, keys)) {
Node v2 = virtualNodes.get(endKey);
if (v2 == null) continue;
virtualRels.compute(new RelKey(startKey, endKey, rel), (rk, vRel) -> {
if (vRel == null) vRel = v1.createRelationshipTo(v2, rel.getType());
Expand All @@ -146,31 +128,31 @@ public Stream<GraphResult> group(@Name("labels") List<String> labels, @Name("gro
}
}
}
} catch(Exception e) {
log.debug("Error grouping relationships",e);
} catch (Exception e) {
log.debug("Error grouping relationships", e);
}
return null;
}));
Util.removeFinished(futures);
}
}
Util.waitForFutures(futures);
return fixAggregates(virtual.values()).stream().map( n -> new GraphResult(singletonList(n), fixAggregates(Iterables.asList(n.getRelationships()))));
return fixAggregates(virtualNodes.values()).stream().map(n -> new GraphResult(singletonList(n), fixAggregates(Iterables.asList(n.getRelationships()))));
}

public Map<String, List<String>> toStringListMap(Map<String, Object> input) {
private Map<String, List<String>> toStringListMap(Map<String, Object> input) {
Map<String, List<String>> nodeAggNames = new LinkedHashMap<>(input.size());
input.forEach((k, v) -> nodeAggNames.put(k, v instanceof List ? ((List<Object>) v).stream().map(Object::toString).collect(Collectors.toList()) : singletonList(v.toString())));
return nodeAggNames;
}

public String[] keyArray(Map<String, ?> map, String... removeKeys) {
private String[] keyArray(Map<String, ?> map, String... removeKeys) {
List<String> keys = new ArrayList<>(map.keySet());
for (String key : removeKeys) keys.remove(key);
return keys.toArray(new String[keys.size()]);
}

private <C extends Collection<T>,T extends PropertyContainer> C fixAggregates(C pcs) {
private <C extends Collection<T>, T extends PropertyContainer> C fixAggregates(C pcs) {
for (PropertyContainer pc : pcs) {
pc.getAllProperties().entrySet().forEach((entry) -> {
Object v = entry.getValue();
Expand All @@ -181,18 +163,18 @@ private <C extends Collection<T>,T extends PropertyContainer> C fixAggregates(C
}
if (k.matches("^avg_.+") && v instanceof double[]) {
double[] values = (double[]) v;
entry.setValue(values[1] == 0 ? 0 : values[0]/ values[1]);
entry.setValue(values[1] == 0 ? 0 : values[0] / values[1]);
}
if (k.matches("^collect_.+") && v instanceof Collection) {
entry.setValue(((Collection)v).toArray());
entry.setValue(((Collection) v).toArray());
}
});
}
return pcs;
}

public void aggregate(PropertyContainer pc, Map<String, List<String>> aggregations, Map<String, Object> properties) {
aggregations.forEach((k2,aggNames) -> {
private void aggregate(PropertyContainer pc, Map<String, List<String>> aggregations, Map<String, Object> properties) {
aggregations.forEach((k2, aggNames) -> {
for (String aggName : aggNames) {
String key = aggName + "_" + k2;
if ("count_*".equals(key)) {
Expand Down Expand Up @@ -232,29 +214,98 @@ public void aggregate(PropertyContainer pc, Map<String, List<String>> aggregatio
});
}

public Key keyFor(Node node, String label, String[] keys) {
Map<String, Object> props = node.getProperties(keys);
return new Key(label, props);
/**
* Returns the properties for the given node according to the specified keys. If a node does not have a property
* assigned to given key, the value is set to {@code null}.
*
* @param node node
* @param keys property keys
* @return node properties for keys
*/
private Map<String, Object> propertiesFor(Node node, String[] keys) {
Map<String, Object> props = new HashMap<>(keys.length);

for (String key : keys) {
props.put(key, node.getProperty(key, null));
}

return props;
}

public Collection<Key> keysFor(Node node, List<String> labels, String[] keys) {
Map<String, Object> props = node.getProperties(keys);
List<Key> result=new ArrayList<>(labels.size());
for (Label label : node.getLabels()) {
if (labels.contains(label.name())) {
result.add(new Key(label.name(), props));
/**
* Creates a grouping key for the given node using its label and grouping properties.
*
* @param node node
* @param label node label
* @param keys property keys
* @return grouping key
*/
private NodeKey keyFor(Node node, String label, String[] keys) {
return new NodeKey(label, propertiesFor(node, keys));
}

/**
* Creates a grouping key for each specified label.
*
* @param node node
* @param labels node labels
* @param keys property keys
* @return grouping keys
*/
private Collection<NodeKey> keysFor(Node node, List<String> labels, String[] keys) {
Map<String, Object> props = propertiesFor(node, keys);
List<NodeKey> result = new ArrayList<>(labels.size());
if (labels.contains(ASTERISK)) {
result.add(new NodeKey(ASTERISK, props));
} else {
for (Label label : node.getLabels()) {
if (labels.contains(label.name())) {
result.add(new NodeKey(label.name(), props));
}
}
}
return result;
}

/**
* Represents a grouping key for nodes.
*/
static class NodeKey {
private final int hash;
private final String label;
private final Map<String, Object> values;

NodeKey(String label, Map<String, Object> values) {
this.label = label;
this.values = values;
hash = 31 * label.hashCode() + values.hashCode();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

NodeKey key = (NodeKey) o;
return label.equals(key.label) && values.equals(key.values);
}

@Override
public int hashCode() {
return hash;
}
}

/**
* Represents a grouping key for relationships.
*/
private static class RelKey {
private final int hash;
private final Key startKey;
private final Key endKey;
private final NodeKey startKey;
private final NodeKey endKey;
private final String type;

public RelKey(Key startKey, Key endKey, Relationship rel) {
RelKey(NodeKey startKey, NodeKey endKey, Relationship rel) {
this.startKey = startKey;
this.endKey = endKey;
this.type = rel.getType().name();
Expand Down
Loading

0 comments on commit 091e8c0

Please sign in to comment.