Skip to content

Commit

Permalink
[dbs-leipzig#1570] add degree range operator
Browse files Browse the repository at this point in the history
  • Loading branch information
alwba committed Aug 9, 2022
1 parent 5f1b5d5 commit 9691f8f
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.*;
import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce;

import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeMap;

/**
* Operator that calculates the degree range evolution of a temporal graph for the
* whole lifetime of the graph.
*/
public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> absoluteDegreeTrees = graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());

DataSet<Tuple1<Long>> timePoints = absoluteDegreeTrees
// 5) extract all timestamps where degree of any vertex changes
.reduceGroup(new ExtractAllTimePointsReduce())
.distinct();

return absoluteDegreeTrees
// join with interval degree mappings
// 6) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE, timePoints));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.gradoop.temporal.model.impl.operators.metric.functions;

/**
* Enum for defining an aggregate type.
*/
public enum AggregateType {
/**
* Minimum aggregation.
*/
MIN,
/**
* Maximum aggregation.
*/
MAX,
/**
* Average aggregation.
*/
AVG,
/**
* Degree Range aggregation.
*/
RANGE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.*;
import java.util.stream.Stream;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToAggregateDegrees
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Integer>> {

/**
* The aggregate type to use (min,max,avg).
*/
private final AggregateType aggregateType;
/**
* The timestamps where at least one vertex degree changes.
*/
private final SortedSet<Long> timePoints;

/**
* Creates an instance of this group reduce function.
*
* @param aggregateType the aggregate type to use (min,max,avg).
*/
public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType, DataSet<Tuple1<Long>> timePoints) {
this.aggregateType = aggregateType;

List<Tuple1<Long>> tuples;
try {
tuples = timePoints.collect();
this.timePoints = new TreeSet<>();

for (int i = 0; i < timePoints.count(); i = i + 1) {
this.timePoints.add(tuples.get(i).getField(0));
}
} catch (Exception e) {
throw new RuntimeException(e);
}

}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Integer>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
}

int numberOfVertices = degreeTrees.size();

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
Optional<Integer> opt;
Optional<Integer> opt2;
switch (aggregateType) {
case MIN:
opt = vertexDegrees.values().stream().reduce(Math::min);
opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer)));
break;
case MAX:
opt = vertexDegrees.values().stream().reduce(Math::max);
opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer)));
break;
case AVG:
opt = vertexDegrees.values().stream().reduce(Math::addExact);
opt.ifPresent(integer -> collector.collect(
new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices))));
break;
case RANGE:
opt = vertexDegrees.values().stream().reduce(Math::max);
opt2 = vertexDegrees.values().stream().reduce(Math::min);
opt.flatMap(max -> opt2.map(min -> max - min));
opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer)));
break;
default:
throw new IllegalArgumentException("Aggregate type not specified.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.util.TemporalGradoopTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;

import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase {
/**
* The expected in-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_IN_DEGREES = new ArrayList<>();
/**
* The expected out-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_OUT_DEGREES = new ArrayList<>();
/**
* The expected degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_BOTH_DEGREES = new ArrayList<>();

static {
// IN DEGREES
EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2));
EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1));

// OUT DEGREES
EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1));

// DEGREES
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); //4,3
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0)); //5,1
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1));
}

/**
* The degree type to test.
*/
@Parameterized.Parameter(0)
public VertexDegree degreeType;

/**
* The expected degree range evolution for the given type.
*/
@Parameterized.Parameter(1)
public List<Tuple2<Long, Integer>> expectedDegrees;

/**
* The temporal graph to test the operator.
*/
TemporalGraph testGraph;

/**
* The parameters to test the operator.
*
* @return three different vertex degree types with its corresponding expected degree evolution.
*/
@Parameterized.Parameters(name = "Test degree type {0}.")
public static Iterable<Object[]> parameters() {
return Arrays.asList(
new Object[] {VertexDegree.IN, EXPECTED_IN_DEGREES},
new Object[] {VertexDegree.OUT, EXPECTED_OUT_DEGREES},
new Object[] {VertexDegree.BOTH, EXPECTED_BOTH_DEGREES});
}

/**
* Set up the test graph and create the id-label mapping.
*
* @throws Exception in case of an error
*/
@Before
public void setUp() throws Exception {
testGraph = getTestGraphWithValues();
Collection<Tuple2<GradoopId, String>> idLabelCollection = new HashSet<>();
testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel()))
.returns(new TypeHint<Tuple2<GradoopId, String>>() {
}).output(new LocalCollectionOutputFormat<>(idLabelCollection));
getExecutionEnvironment().execute();
}

/**
* Test the degree range evolution operator.
*
* @throws Exception in case of an error.
*/
@Test
public void testDegreeVariance() throws Exception {
Collection<Tuple2<Long, Integer>> resultCollection = new ArrayList<>();

final DataSet<Tuple2<Long, Integer>> resultDataSet = testGraph
.callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME));

resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));
getExecutionEnvironment().execute();

System.out.println(resultCollection);

assertTrue(resultCollection.containsAll(expectedDegrees));
assertTrue(expectedDegrees.containsAll(resultCollection));
}
}

0 comments on commit 9691f8f

Please sign in to comment.