From 200bef0c18bc1309c83366be94009eadd460befc Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 12 Mar 2014 14:06:05 -0700 Subject: [PATCH] optimize computeYtY and updateBlock --- .../spark/mllib/recommendation/ALS.scala | 80 +++++++++++-------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 8958040e36640..19e8f60c20cf3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -210,21 +210,47 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l */ def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { if (implicitPrefs) { - Option( - factors.flatMapValues { case factorArray => - factorArray.view.map { vector => - val x = new DoubleMatrix(vector) - x.mmul(x.transpose()) - } - }.reduceByKeyLocally((a, b) => a.addi(b)) - .values - .reduce((a, b) => a.addi(b)) - ) + val n = rank * (rank + 1) / 2 + val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { + Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L)) + L + }, combOp = (L1, L2) => { + L1.addi(L2) + }) + val YtY = new DoubleMatrix(rank, rank) + fillFullMatrix(LYtY, YtY) + Option(YtY) } else { None } } + /** + * Adding x * x.t to a matrix, the same as BLAS's DSPR. + * + * @param x a vector of length n + * @param L the lower triangular part of the matrix packed in an array (row major) + */ + private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = { + val n = x.length + var i = 0 + var j = 0 + var idx = 0 + var axi = 0.0 + val xd = x.data + val Ld = L.data + while (i < n) { + axi = alpha * xd(i) + j = 0 + while (j <= i) { + Ld(idx) += axi * xd(j) + j += 1 + idx += 1 + } + i += 1 + } + } + /** * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs */ @@ -376,18 +402,21 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l for (productBlock <- 0 until numBlocks) { for (p <- 0 until blockFactors(productBlock).length) { val x = new DoubleMatrix(blockFactors(productBlock)(p)) - fillXtX(x, tempXtX) + tempXtX.fill(0.0) + dspr(1.0, x, tempXtX) val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) for (i <- 0 until us.length) { implicitPrefs match { case false => userXtX(us(i)).addi(tempXtX) + // dspr(1.0, x, userXtX(us(i))) SimpleBlas.axpy(rs(i), x, userXy(us(i))) case true => // Extension to the original paper to handle rs(i) < 0. confidence is a function // of |rs(i)| instead so that it is never negative: val confidence = 1 + alpha * abs(rs(i)) - userXtX(us(i)).addi(tempXtX.mul(confidence - 1)) + SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) + // dspr(confidence - 1.0, x, userXtX(us(i))) // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) // means we try to reconstruct 0. We add terms only where P = 1, so, term below // is now only added for rs(i) > 0: @@ -400,38 +429,19 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } // Solve the least-squares problem for each user and return the new feature vectors - userXtX.zipWithIndex.map{ case (triangularXtX, index) => + userXtX.zip(userXy).map { case (triangularXtX, rhs) => // Compute the full XtX matrix from the lower-triangular part we got above fillFullMatrix(triangularXtX, fullXtX) // Add regularization (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite implicitPrefs match { - case false => Solve.solvePositive(fullXtX, userXy(index)).data - case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data + case false => Solve.solvePositive(fullXtX, rhs).data + case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), rhs).data } } } - /** - * Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing - * these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values - * at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order. - */ - private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) { - var i = 0 - var pos = 0 - while (i < x.length) { - var j = 0 - while (j <= i) { - xtxDest.data(pos) = x.data(i) * x.data(j) - pos += 1 - j += 1 - } - i += 1 - } - } - /** * Given a triangular matrix in the order of fillXtX above, compute the full symmetric square * matrix that it represents, storing it into destMatrix. @@ -455,7 +465,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l /** - * Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton. + * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization. */ object ALS { /**