Skip to content

Commit

Permalink
Added more connected components.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Apr 30, 2014
1 parent cfd209b commit 03f5d76
Showing 1 changed file with 65 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object DataflowPagerank extends Logging {
case "naive" => naiveVersion(sc, fname, partitions, iters)
case "optimized" => optimizedSpark(sc, fname, partitions, iters)
case "cc" => connectedComponents(sc, fname, partitions)
case "ccOpt" => ccSlightlyOpt(sc, fname, partitions)
case _ => throw new UnsupportedOperationException
}

Expand Down Expand Up @@ -147,19 +148,20 @@ object DataflowPagerank extends Logging {


// initialize ccIDs to IDs
var ccs: RDD[(Long, Long)] = edges.map{ case (src: Long, (dst: Long, _: Int)) => (src, src)}
.union(edges.map{ case (src: Long, (dst: Long, _: Int)) => (dst, dst)})
var ccs: RDD[(Long, Long)] = edges.map{ case (src: Long, dst: Long) => (src, src)}
.union(edges.map{ case (src: Long, dst: Long) => (dst, dst)})
.distinct()
var numUpdates = Long.MaxValue

logWarning("Starting CC iterations")
var i = 0
while (numUpdates > 0) {

val newCCs = edges
// get src property
.join(ccs)
// rekey by dst
.map {case (src: Long, (dst: Long, srcCC: Long)) => (dst, (src, srcCC)}
.map {case (src: Long, (dst: Long, srcCC: Long)) => (dst, (src, srcCC))}
// get dst property
.join(ccs)
// emit min ccId to src and adst
Expand All @@ -173,14 +175,72 @@ object DataflowPagerank extends Logging {
// check for convergence
numUpdates = newCCs.join(ccs)
.filter{case (vid, (newCC, oldCC)) => newCC != oldCC }.count()
ccs = newCCs


logWarning(s"CC iter $i with $numUpdates updates")
ccs = newCCs
i += 1

}
val numCCs = ccs.map{ case(_, id) => id}.distinct().count()
logWarning(s"Num connected components: $numCCs")

}

def ccSlightlyOpt(sc: SparkContext, fname: String, partitions: Int) {

val lines = sc.textFile(fname).repartition(partitions)
val edges: RDD[(Long, Long)] = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0).toLong, parts(1).toLong)
}.cache()
logWarning("CC started")


// initialize ccIDs to IDs
var ccs: RDD[(Long, Long)] = edges.map{ case (src: Long, dst: Long) => (src, src)}
.union(edges.map{ case (src: Long, dst: Long) => (dst, dst)})
.distinct()
var numUpdates = Long.MaxValue

logWarning("Starting CC iterations")
var i = 0
while (numUpdates > 0) {

val newCCs = edges
// get src property
.join(ccs)
// rekey by dst
.map {case (src: Long, (dst: Long, srcCC: Long)) => (dst, (src, srcCC))}
// get dst property
.join(ccs)
// emit min ccId to src and adst
.flatMap { case (dst: Long, ((src: Long, srcCC: Long), dstCC)) =>
if (srcCC < dstCC) {
Iterator(dst, srcCC)
} else if (dstCC < srcCC) {
Iterator(src, dstCC)
} else {
Iterator.empty
}
// Iterator((src, min(srcCC, dstCC)), (dst, min(srcCC, dstCC)))
}
.reduceByKey(min(_, _))


ccs = ccs.join(newCCs).map{ case (vid, (oldCC, newCC)) => (vid, min(oldCC, newCC)) }.cache()
// .join(ranks)
// .map { case (id: Long, (incomingRanks: Double, myRank: Double)) => (id, alpha*myRank + (1.0-alpha)*incomingRanks)}

// check for convergence
// numUpdates = newCCs.join(ccs)
// .filter{case (vid, (newCC, oldCC)) => newCC != oldCC }.count()
numUpdates = newCCs.count()

logWarning(s"CC iter $i with $numUpdates updates")
// ccs = newCCs
i += 1
}
val numCCs = ccs.map{ case(_, id) => id}.distinct().count()
logWarning(s"Num connected components: $numCCs")
}
}

0 comments on commit 03f5d76

Please sign in to comment.