Skip to content

Commit

Permalink
fixes #600 - add a configuration option to mergeNodes that allows to …
Browse files Browse the repository at this point in the history
…combine relationships of the same type and direction
  • Loading branch information
AngeloBusato committed Jul 20, 2018
1 parent f0d772b commit 6a0bcd9
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 29 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 8 additions & 1 deletion docs/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,18 @@ CALL apoc.periodic.rock_n_roll('match (p:Person) return id(p) as id_p', 'MATCH (
| call apoc.refactor.categorize(node, propertyKey, type, outgoing, label) | turn each unique propertyKey into a category node and connect to it
|===

On mergeRelationship with config properties you can choose from 3 different management:
On mergeRelationship and mergeNodes with config properties you can choose from 3 different management:
* "overwrite" : if there is the same property in more relationship, in the new one will have the last relationship's property value
* "discard" : if there is the same property in more relationship, the new one will have the first relationship's property value
* "combine" : if there is the same property in more relationship, the new one a value's array with all relationships' values

On mergeNodes procedure there is also a config parameter to merge relationships of the merged nodes:
* "mergeRels: true/false" : give the possibility to merge relationships with same type and direction.

If relationships have same start and end nodes will be merged into one, and properties managed by the properties config.
If relationships have different start/end nodes (related to direction), relationships will be maintained and properties will be combine in all relationship.


TODO:

* merge nodes by label + property
Expand Down
88 changes: 88 additions & 0 deletions docs/refactor.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,94 @@ This config option also works for `apoc.refactor.mergeRelationships([rels],{conf
| combine | if there is only one property in list, it will be set / kept as single property otherwise create an array, tries to coerce values
|===

For mergeNodes you can Merge relationships with same type and direction, you can spacify this with property mergeRels.
Relationships properties are combined.

.example1 - Relationships with same start and end nodes

First of all we have to create nodes and relationships

---
Create (n1:Person {name:'Tom'}),
(n2:Person {name:'John'}),
(n3:Company {name:'Company1'}),
(n5:Car {brand:'Ferrari'}),
(n6:Animal:Cat {name:'Derby'}),
(n7:City {name:'London'}),
(n1)-[:WORKS_FOR{since:2015}]->(n3),
(n2)-[:WORKS_FOR{since:2018}]->(n3),
(n3)-[:HAS_HQ{since:2004}]->(n7),
(n1)-[:DRIVE{since:2017}]->(n5),
(n2)-[:HAS{since:2013}]->(n6)
return *;
---

image::{img}/apoc.refactor.mergeNodes.createDataSetFirstExample.png[]

Next step is calling the apoc to merge nodes :Person

---
MATCH (a1:Person{name:'John'}), (a2:Person {name:'Tom'})
WITH head(collect([a1,a2])) as nodes
CALL apoc.refactor.mergeNodes(nodes,{properties:"combine", mergeRels:true}) yield node
MATCH (n)-[r:WORKS_FOR]->(c) return *
---

and the result is:

image::{img}/apoc.refactor.mergeNodes.resultFirstExample.png[]

In this case we have relationships with same start and end nodes so relationships are merged into one and properties are combined.


.example2 - Relationships with different start or end nodes

---
Create (n1:Person {name:'Tom'}),
(n2:Person {name:'John'}),
(n3:Company {name:'Company1'}),
(n4:Company {name:'Company2'}),
(n5:Car {brand:'Ferrari'}),
(n6:Animal:Cat {name:'Derby'}),
(n7:City {name:'London'}),
(n8:City {name:'Liverpool'}),
(n1)-[:WORKS_FOR{since:2015}]->(n3),
(n2)-[:WORKS_FOR{since:2018}]->(n4),
(n3)-[:HAS_HQ{since:2004}]->(n7),
(n4)-[:HAS_HQ{since:2007}]->(n8),
(n1)-[:DRIVE{since:2017}]->(n5),
(n2)-[:HAS{since:2013}]->(n6)
return *;
---

image::{img}/apoc.refactor.mergeNodes.createDataSetSecondExample.png[]

Next step is calling the apoc to merge nodes :Person

---
MATCH (a1:Person{name:'John'}), (a2:Person {name:'Tom'})
WITH head(collect([a1,a2])) as nodes
CALL apoc.refactor.mergeNodes(nodes,{properties:"combine", mergeRels:true}) yield node
MATCH (n)-[r:WORKS_FOR]->(c) return n.name,r.since,c.name;
---

and the result is:

image::{img}/apoc.refactor.mergeNodes.resultSecondExample.png[]

image::{img}/apoc.refactor.mergeNodes.resultSecondExampleData.png[]

In this case we have relationships with different end nodes so all relationships are maintained and properties are combined into all relationships.


---
MATCH (p:Person)
WITH p ORDER BY p.created DESC // newest one first
WITH p.email, collect(p) as nodes
CALL apoc.refactor.mergeNodes(nodes, {properties:'combine', mergeRels: true}) YIELD node
RETURN node
---


== Normalize boolean properties

Expand Down
86 changes: 58 additions & 28 deletions src/main/java/apoc/refactor/GraphRefactoring.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ private Stream<NodeRefactorResult> doCloneNodes(@Name("nodes") List<Node> nodes,
Node newNode = copyLabels(node, db.createNode());

Map<String, Object> properties = node.getAllProperties();
if (skipProperties!=null && !skipProperties.isEmpty())
if (skipProperties != null && !skipProperties.isEmpty())
for (String skip : skipProperties) properties.remove(skip);

Node copy = copyProperties(properties, newNode);
if (withRelationships) {
copyRelationships(node, copy,false);
copyRelationships(node, copy, false);
}
return result.withOther(copy);
} catch (Exception e) {
Expand All @@ -52,8 +52,8 @@ public Stream<NodeRefactorResult> extractNode(@Name("relationships") Object rels
NodeRefactorResult result = new NodeRefactorResult(rel.getId());
try {
Node copy = copyProperties(rel, db.createNode(Util.labels(labels)));
copy.createRelationshipTo(rel.getEndNode(),RelationshipType.withName(outType));
rel.getStartNode().createRelationshipTo(copy,RelationshipType.withName(inType));
copy.createRelationshipTo(rel.getEndNode(), RelationshipType.withName(outType));
rel.getStartNode().createRelationshipTo(copy, RelationshipType.withName(inType));
rel.delete();
return result.withOther(copy);
} catch (Exception e) {
Expand All @@ -70,7 +70,7 @@ public Stream<RelationshipRefactorResult> collapseNode(@Name("nodes") Object nod
try {
Iterable<Relationship> outRels = node.getRelationships(Direction.OUTGOING);
Iterable<Relationship> inRels = node.getRelationships(Direction.INCOMING);
if (node.getDegree(Direction.OUTGOING) == 1 && node.getDegree(Direction.INCOMING) == 1) {
if (node.getDegree(Direction.OUTGOING) == 1 && node.getDegree(Direction.INCOMING) == 1) {
Relationship outRel = outRels.iterator().next();
Relationship inRel = inRels.iterator().next();
Relationship newRel = inRel.getStartNode().createRelationshipTo(outRel.getEndNode(), RelationshipType.withName(type));
Expand All @@ -82,7 +82,7 @@ public Stream<RelationshipRefactorResult> collapseNode(@Name("nodes") Object nod

return result.withOther(newRel);
} else {
return result.withError(String.format("Node %d has more that 1 outgoing %d or incoming %d relationships",node.getId(),node.getDegree(Direction.OUTGOING),node.getDegree(Direction.INCOMING)));
return result.withError(String.format("Node %d has more that 1 outgoing %d or incoming %d relationships", node.getId(), node.getDegree(Direction.OUTGOING), node.getDegree(Direction.INCOMING)));
}
} catch (Exception e) {
return result.withError(e);
Expand All @@ -96,9 +96,9 @@ public Stream<RelationshipRefactorResult> collapseNode(@Name("nodes") Object nod
@Procedure(mode = Mode.WRITE)
@Description("apoc.refactor.cloneNodes([node1,node2,...]) clone nodes with their labels and properties")
public Stream<NodeRefactorResult> cloneNodes(@Name("nodes") List<Node> nodes,
@Name(value = "withRelationships",defaultValue = "false") boolean withRelationships,
@Name(value = "skipProperties",defaultValue = "[]") List<String> skipProperties) {
return doCloneNodes(nodes,withRelationships,skipProperties);
@Name(value = "withRelationships", defaultValue = "false") boolean withRelationships,
@Name(value = "skipProperties", defaultValue = "[]") List<String> skipProperties) {
return doCloneNodes(nodes, withRelationships, skipProperties);
}

/**
Expand All @@ -108,7 +108,7 @@ public Stream<NodeRefactorResult> cloneNodes(@Name("nodes") List<Node> nodes,
@Deprecated
@Description("apoc.refactor.cloneNodesWithRelationships([node1,node2,...]) clone nodes with their labels, properties and relationships")
public Stream<NodeRefactorResult> cloneNodesWithRelationships(@Name("nodes") List<Node> nodes) {
return doCloneNodes(nodes,true, Collections.emptyList());
return doCloneNodes(nodes, true, Collections.emptyList());
}

/**
Expand All @@ -117,15 +117,17 @@ public Stream<NodeRefactorResult> cloneNodesWithRelationships(@Name("nodes") Lis
*/
@Procedure(mode = Mode.WRITE)
@Description("apoc.refactor.mergeNodes([node1,node2],[{properties:'override' or 'discard' or 'combine'}]) merge nodes onto first in list")
public Stream<NodeResult> mergeNodes(@Name("nodes") List<Node> nodes, @Name(value= "config", defaultValue = "") Map<String, Object> config) {
public Stream<NodeResult> mergeNodes(@Name("nodes") List<Node> nodes, @Name(value = "config", defaultValue = "") Map<String, Object> config) {
if (nodes == null || nodes.isEmpty()) return Stream.empty();
RefactorConfig conf = new RefactorConfig(config);
// grab write locks upfront consistently ordered
try (Transaction tx=db.beginTx()) {
nodes.stream().distinct().sorted(Comparator.comparingLong(Node::getId)).forEach( tx::acquireWriteLock );
try (Transaction tx = db.beginTx()) {
nodes.stream().distinct().sorted(Comparator.comparingLong(Node::getId)).forEach(tx::acquireWriteLock);
tx.success();
}
RefactorConfig conf = new RefactorConfig(config);

final Node first = nodes.get(0);

nodes.stream().skip(1).distinct().forEach(node -> mergeNodes(node, first, true, conf));
return Stream.of(new NodeResult(first));
}
Expand All @@ -136,14 +138,14 @@ public Stream<NodeResult> mergeNodes(@Name("nodes") List<Node> nodes, @Name(valu
*/
@Procedure(mode = Mode.WRITE)
@Description("apoc.refactor.mergeRelationships([rel1,rel2]) merge relationships onto first in list")
public Stream<RelationshipResult> mergeRelationships(@Name("rels") List<Relationship> relationships, @Name(value= "config", defaultValue = "") Map<String, Object> config) {
public Stream<RelationshipResult> mergeRelationships(@Name("rels") List<Relationship> relationships, @Name(value = "config", defaultValue = "") Map<String, Object> config) {
if (relationships == null || relationships.isEmpty()) return Stream.empty();
RefactorConfig conf = new RefactorConfig(config);
Iterator<Relationship> it = relationships.iterator();
Relationship first = it.next();
while (it.hasNext()) {
Relationship other = it.next();
if(first.getStartNode().equals(other.getStartNode()) && first.getEndNode().equals(other.getEndNode()))
if (first.getStartNode().equals(other.getStartNode()) && first.getEndNode().equals(other.getEndNode()))
mergeRels(other, first, true, conf);
else
throw new RuntimeException("All Relationships must have the same start and end nodes.");
Expand Down Expand Up @@ -180,7 +182,7 @@ public Stream<RelationshipRefactorResult> to(@Name("relationship") Relationship
RelationshipRefactorResult result = new RelationshipRefactorResult(rel.getId());
try {
Relationship newRel = rel.getStartNode().createRelationshipTo(newNode, rel.getType());
copyProperties(rel,newRel);
copyProperties(rel, newRel);
rel.delete();
return Stream.of(result.withOther(newRel));
} catch (Exception e) {
Expand All @@ -195,7 +197,7 @@ public Stream<RelationshipRefactorResult> invert(@Name("relationship") Relations
RelationshipRefactorResult result = new RelationshipRefactorResult(rel.getId());
try {
Relationship newRel = rel.getEndNode().createRelationshipTo(rel.getStartNode(), rel.getType());
copyProperties(rel,newRel);
copyProperties(rel, newRel);
rel.delete();
return Stream.of(result.withOther(newRel));
} catch (Exception e) {
Expand All @@ -213,7 +215,7 @@ public Stream<RelationshipRefactorResult> from(@Name("relationship") Relationshi
RelationshipRefactorResult result = new RelationshipRefactorResult(rel.getId());
try {
Relationship newRel = newNode.createRelationshipTo(rel.getEndNode(), rel.getType());
copyProperties(rel,newRel);
copyProperties(rel, newRel);
rel.delete();
return Stream.of(result.withOther(newRel));
} catch (Exception e) {
Expand All @@ -235,13 +237,13 @@ public void normalizeAsBoolean(
PropertyContainer pc = (PropertyContainer) entity;
Object value = pc.getProperty(propertyKey, null);
if (value != null) {
boolean isTrue = trueValues.contains(value);
boolean isTrue = trueValues.contains(value);
boolean isFalse = falseValues.contains(value);
if (isTrue && !isFalse) {
pc.setProperty(propertyKey, true );
pc.setProperty(propertyKey, true);
}
if (!isTrue && isFalse) {
pc.setProperty(propertyKey, false );
pc.setProperty(propertyKey, false);
}
if (!isTrue && !isFalse) {
pc.removeProperty(propertyKey);
Expand Down Expand Up @@ -276,19 +278,19 @@ public void categorize(
// Create batches of nodes
List<Node> batch = null;
List<Future<Void>> futures = new ArrayList<>();
try(Transaction tx = db.beginTx()) {
try (Transaction tx = db.beginTx()) {
for (Node node : db.getAllNodes()) {
if (batch == null) {
batch = new ArrayList<>((int) batchSize);
}
batch.add(node);
if (batch.size() == batchSize) {
futures.add( categorizeNodes(batch, sourceKey, relationshipType, outgoing, label, targetKey, copiedKeys) );
futures.add(categorizeNodes(batch, sourceKey, relationshipType, outgoing, label, targetKey, copiedKeys));
batch = null;
}
}
if (batch != null) {
futures.add( categorizeNodes(batch, sourceKey, relationshipType, outgoing, label, targetKey, copiedKeys) );
futures.add(categorizeNodes(batch, sourceKey, relationshipType, outgoing, label, targetKey, copiedKeys));
}

// Await processing of node batches
Expand Down Expand Up @@ -341,6 +343,12 @@ private Node mergeNodes(Node source, Node target, boolean delete, RefactorConfig
copyRelationships(source, copyLabels(source, target), delete);
if (delete) source.delete();
PropertiesManager.mergeProperties(properties, target, conf);

if (conf.getMergeRelsAllowed()) {
Map<String, Object> map = Collections.singletonMap("properties", "combine");
mergeRelsWithSameTypeAndDirectionInMergeNodes(target, new RefactorConfig(map), Direction.OUTGOING);
mergeRelsWithSameTypeAndDirectionInMergeNodes(target, new RefactorConfig(map), Direction.INCOMING);
}
} catch (NotFoundException e) {
log.warn("skipping a node for merging: " + e.getCause().getMessage());
}
Expand All @@ -365,17 +373,17 @@ private Node copyRelationships(Node source, Node target, boolean delete) {
private Node copyLabels(Node source, Node target) {
for (Label label : source.getLabels()) {
if (!target.hasLabel(label)) {
target.addLabel(label);
target.addLabel(label);
}
}
return target;
}

private <T extends PropertyContainer> T copyProperties(PropertyContainer source, T target) {
return copyProperties(source.getAllProperties(),target);
return copyProperties(source.getAllProperties(), target);
}

private <T extends PropertyContainer> T copyProperties(Map<String,Object> source, T target) {
private <T extends PropertyContainer> T copyProperties(Map<String, Object> source, T target) {
for (Map.Entry<String, Object> prop : source.entrySet())
target.setProperty(prop.getKey(), prop.getValue());
return target;
Expand All @@ -396,4 +404,26 @@ private Relationship copyRelationship(Relationship rel, Node source, Node target
Relationship newrel = startNode.createRelationshipTo(endNode, rel.getType());
return copyProperties(rel, newrel);
}


public void mergeRelsWithSameTypeAndDirectionInMergeNodes(Node node, RefactorConfig config, Direction dir) {

int i = 1;
for (Iterator<Relationship> it = node.getRelationships(dir).iterator(); it.hasNext(); ) {
Relationship relSource = it.next();
StreamSupport.stream(node.getRelationships(dir, relSource.getType()).spliterator(), false)
.skip(i)
.distinct()
.forEach(relTarget -> {
if (relSource.getStartNode().equals(relTarget.getStartNode()) && relSource.getEndNode().equals(relTarget.getEndNode()))
mergeRels(relSource, relTarget, true, config);
else {
mergeRels(relSource, relTarget, false, config);
mergeRels(relTarget, relSource, false, config);
}
});
i++;
}

}
}
8 changes: 8 additions & 0 deletions src/main/java/apoc/refactor/util/RefactorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ public class RefactorConfig {

private Map<String,String> propertiesManagement = Collections.singletonMap(MATCH_ALL, OVERWRITE);

private Object mergeRelsAllowed;

public RefactorConfig(Map<String,Object> config) {
Object value = config.get("properties");
if (value instanceof String) {
this.propertiesManagement = Collections.singletonMap(MATCH_ALL, value.toString());
} else if (value instanceof Map) {
this.propertiesManagement = (Map<String,String>)value;
}

this.mergeRelsAllowed = config.get("mergeRels");
}

public String getMergeMode(String name){
Expand All @@ -37,4 +41,8 @@ public String getMergeMode(String name){

}

public boolean getMergeRelsAllowed(){
return mergeRelsAllowed == null ? false : (boolean) mergeRelsAllowed;
}

}
Loading

0 comments on commit 6a0bcd9

Please sign in to comment.