Skip to content

Commit

Permalink
Fixed kcore. now works.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Apr 29, 2014
1 parent 1e924df commit 76a6a54
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx.lib

import org.apache.spark._
import scala.math._
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.SparkContext._
Expand Down Expand Up @@ -163,8 +164,10 @@ object Analytics extends Logging {
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

logWarning("Starting kcore")
val result = KCore.run(graph, kmax, kmin)
println("Size of cores: " + result.vertices.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))

logWarning("Size of cores: " + result.vertices.map { case (vid,data) => (min(data, kmax), 1) }.reduceByKey((_+_)).collect().mkString(", "))
sc.stop()

case "triangles" =>
Expand Down
10 changes: 7 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ object KCore extends Logging {

// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache
var degrees = graph.degrees
println("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
val degrees = graph.degrees
val numVertices = degrees.count
// logWarning(s"Numvertices: $numVertices")
// logWarning(s"degree sample: ${degrees.take(10).mkString(", ")}")
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).take(10).mkString(", "))
var curK = kmin
while (curK <= kmax) {
g = computeCurrentKCore(g, curK).cache
Expand All @@ -50,6 +54,7 @@ object KCore extends Logging {
}

def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = {
logWarning(s"Computing kcore for k=$k")
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = {
if (!et.srcAttr._2 || !et.dstAttr._2) {
// if either vertex has already been turned off we do nothing
Expand Down Expand Up @@ -86,7 +91,6 @@ object KCore extends Logging {
}

// Note that initial message should have no effect
logWarning("kcore starting pregel")
Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg)
}
}

0 comments on commit 76a6a54

Please sign in to comment.