Skip to content

Commit

Permalink
no longer needing to cause serialization costs
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Mar 3, 2017
1 parent b0680db commit 6a7e3d1
Showing 1 changed file with 8 additions and 14 deletions.
22 changes: 8 additions & 14 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,14 @@ class ALSModel private[ml] (
// We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output.
val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2))
val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
.toDF(srcOutputColumn, "recommendations")

// There is some performance hit from converting the (Int, Float) tuples to
// (dstOutputColumn: Int, rating: Float) structs using .rdd. Need SPARK-16483 for a fix.
val schema = new StructType()
.add(srcOutputColumn, IntegerType)
.add("recommendations",
ArrayType(
StructType(
StructField(dstOutputColumn, IntegerType, nullable = false) ::
StructField("rating", FloatType, nullable = false) ::
Nil
)))
recs.sparkSession.createDataFrame(recs.rdd, schema)
.toDF("id", "recommendations")

val arrayType = ArrayType(
new StructType()
.add(dstOutputColumn, IntegerType)
.add("rating", FloatType)
)
recs.select($"id" as srcOutputColumn, $"recommendations" cast arrayType)
}
}

Expand Down

0 comments on commit 6a7e3d1

Please sign in to comment.