-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark-19535][ML] RecommendForAllUsers RecommendForAllItems for ALS on Dataframe #17090
Conversation
* the top `num` K2 items based on the given Ordering. | ||
*/ | ||
|
||
private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may want to put this somewhere more general to be used ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(It'd need its own unit tests, though not sure if we'll get everything in for 2.2)
extends Aggregator[(K1, K2, V), BoundedPriorityQueue[(K2, V)], Array[(K2, V)]] { | ||
|
||
override def zero: BoundedPriorityQueue[(K2, V)] = new BoundedPriorityQueue[(K2, V)](num)(ord) | ||
override def reduce( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to throw some spaces and braces in here to make it a bit more readable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -248,18 +248,18 @@ class ALSModel private[ml] ( | |||
@Since("1.3.0") | |||
def setPredictionCol(value: String): this.type = set(predictionCol, value) | |||
|
|||
private val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) => | |||
if (userFeatures != null && itemFeatures != null) { | |||
blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how the overhead of converting to an array compares with the efficiency of calling sdot -- could be faster to just do the Seqs by hand? is it possible to operate on something besides Seq?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! But since I copy-pasted this block in this PR, maybe it's okay to try it out in another PR? At least with what we have here we know it's not a regression. Want to make sure we get some version of ALS recommendForAll* in 2.2.
Test build #73540 has started for PR 17090 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just small comments.
Could you please make a JIRA for the sdot vs toArray issue which Sean brought up?
@@ -285,6 +285,43 @@ class ALSModel private[ml] ( | |||
|
|||
@Since("1.6.0") | |||
override def write: MLWriter = new ALSModel.ALSModelWriter(this) | |||
|
|||
@Since("2.2.0") | |||
def recommendForAllUsers(num: Int): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to numItems (and numUsers in the other method)
@@ -248,18 +248,18 @@ class ALSModel private[ml] ( | |||
@Since("1.3.0") | |||
def setPredictionCol(value: String): this.type = set(predictionCol, value) | |||
|
|||
private val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could rename userFeatures, itemFeatures to be featuresA, featuresB or something to make it clear that there is no ordering here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's actually clearer to keep the names as it "instantiates" the usage to the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, though the inputs are switched in some uses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are switched in the case of recommendForAllItems
so is that not less clear?
* @param srcOutputColumn name of the column for the source in the output DataFrame | ||
* @param num number of recommendations for each record | ||
* @return a DataFrame of (srcOutputColumn: Int, recommendations), where recommendations are | ||
* stored as an array of (dstId: Int, ratingL: Double) tuples. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ratingL -> rating
predict(srcFactors("features"), dstFactors("features")).as($(predictionCol))) | ||
// We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. | ||
val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) | ||
ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to specify field names for dstId and rating and to document the schema in the recommend methods. That will help users extract recommendations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what a good way to do this is :-/ Ways I can think of but haven't succeeded in:
1/ change the schema of the entire DataFrame
2/ map over the rows in the DataFrame {
map over the items in the array {
convert from tuple (really a Row) to a Row with a different schema
}
}
I tried using RowEncoder in either case, but the types haven't quite worked out. Any ideas?
r.toArray.sorted(ord.reverse) | ||
override def bufferEncoder: Encoder[BoundedPriorityQueue[(K2, V)]] = | ||
Encoders.kryo[BoundedPriorityQueue[(K2, V)]] | ||
override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ style complaint: include "()" at end
|
||
/** | ||
* Returns top `num` items recommended for each user, for all users. | ||
* @param num number of recommendations for each user |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
number -> max number
Test build #73543 has finished for PR 17090 at commit
|
Test build #73553 has finished for PR 17090 at commit
|
cc @MLnick |
the same as #12574 ? |
#12574 is a comprehensive solution that also intends to support cross-validation as well as recommending for a subset (or any arbitrary set) of users/items. So it solves SPARK-10802 and SPARK-10802 at the same time. That PR is in fully working state. I'm a little surprised to see work done on this rather than deciding on the input/output schema stuff... |
By the way I have been doing performance testing in support of #12574 and results are pretty much ready. |
I'd been following the long discussions about a transform-based solution, but those had not seemed to have converged to a clear design. If you feel they have in your PR, then I'll spend some time tomorrow going through your PR to catch up. (Keep in mind that the branch cut is scheduled to happen in a day or two: http://spark.apache.org/versioning-policy.html ) |
For performance tests, I've been using the MovieLens So it's not enormous but "recommend all" does a lot of work - generating Some quick tests for the existing
As part of my performance testing I've tried a few approaches roughly similar to this PR, but using At first I thought this PR was really good:
But then I tried this:
Not sure why |
|
||
private def checkRecommendationOrdering(topK: DataFrame, k: Int): Unit = { | ||
assert(topK.columns.contains("recommendations")) | ||
topK.select("recommendations").collect().foreach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Purely a style suggestion but you can get rid of one set of parens:
...foreach { row =>
...
}
@jkbradley do we propose to add further methods to support recommending for all users (or items) in an input DF? like |
@MLnick Thanks for showing those comparison numbers. If your implementation is faster, then I'm happy going with it. I do wonder if we might hit scalability issues with RDDs which we would not hit with DataFrames, so it'd be worth revisiting a DF-based implementation later on. In terms of the API, my main worry about #12574 is that I haven't seen a full design of how ALS would be plugged into cross validator. I still don't see how CV could handle ALS unless we specialized it for recommendation. It was this uncertainty which made me comment on https://issues.apache.org/jira/browse/SPARK-13857 to recommend we go ahead and merge basic recommendAll methods, while continuing to figure out a good design for tuning. Feel free to push back, but I would really like to see a sketch of how ALS could plug into tuning. I haven't spent the time to do a literature review on how tuning is generally done for recommendation, especially on the best ways to split the data into folds.
I do think this sounds useful, but I'm focused on feature parity w.r.t. the RDD-based API right now. It'd be nice to add later, though that could be via your proposed transform-based API. |
Fitting into the CV / evaluator is actually fairly straightforward. It's just that the semantics of It was discussed on the JIRA here and here. I haven't had a chance to refine it yet but I have a view on the best approach now (basically to fit in with the design of SPARK-14409 and in particular the basic version of #16618). I think that design / schema is more "DataFrame-like". In any event - I'm not against having the convenience methods for recommend-all here. I support it. Ultimately the |
The performance of #12574 is not better than the existing |
I should note that I've found the performance of "recommend all" to be very dependent on number of partitions since it controls the memory consumption per task (which can easily explode in the blocked For example, the default I actually just realised that the defaults for (Side note - it's not ideal actually that the num blocks drives the recommend-all partitions - because the optimal settings for training ALS are unlikely to be optimal for batch recommend-all prediction). Anyway some results of quick tests on my setup: Firstly, to match
So this PR is 30% slower - which is actually pretty decent given it's not using blocked BLAS operators. Note I didn't use netlib native BLAS, which could make a large difference when using level 3 BLAS in the blocked Secondly, to match
In this case, the Finally, middle of the road case:
So a few % slower. Again pretty good actually considering it's not a blocked implementation. Still room to be optimized. After running these I tested against a blocked version using So the performance here is not too bad. The positive is it should avoid completely exploding. As I mentioned above I tried a similar DataFrame-based version using |
Finally, I've done some work related to SPARK-11968 and have a potential solution that seems to be pretty good. In this case it should be more than 3x faster than the best runtime here. |
@@ -248,18 +248,18 @@ class ALSModel private[ml] ( | |||
@Since("1.3.0") | |||
def setPredictionCol(value: String): this.type = set(predictionCol, value) | |||
|
|||
private val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are switched in the case of recommendForAllItems
so is that not less clear?
* Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is the score value. Finds | ||
* the top `num` K2 items based on the given Ordering. | ||
*/ | ||
private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd think we should have at least some basic tests for this - see MLPairRDDFunctionsSuite
for example
val numRecs = 5 | ||
val (training, test) = genExplicitTestData(numUsers, numItems, rank = 2, noiseStd = 0.01) | ||
val topItems = | ||
testALS(training, test, maxIter = 4, rank = 2, regParam = 0.01, targetRMSE = 0.03) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems wasteful to compute and check a model here, and it doesn't really test that the predictions are what we expect them to be.
We can construct an ALSModel
with known factors and check the predictions are as expected (it's just a matrix multiply). See MatrixFactorizationModelSuite
and tests in #12574 (based on those) for example.
@MLnick Thanks a lot for the detailed tests! I really appreciate it. In this case, are you OK with the approach in the current PR (pending reviews)? One thing we should confirm is what you think of the input/output schema for these methods. It'd be nice to have the schema match what you think best for the transform-based recommendAll. Re: design: I saw the discussions in https://issues.apache.org/jira/browse/SPARK-13857 but I didn't see a clear statement of pros & cons leading to a design decision. I'm sorry about not having time to get involved with the discussions, but I'd like to help out as useful in the coming months. |
Test build #73623 has finished for PR 17090 at commit
|
Test build #73624 has finished for PR 17090 at commit
|
Test build #73628 has finished for PR 17090 at commit
|
Isn't deciding on the output schema for these methods essentially the same as deciding on transform semantics in #12574 (apart from the issue of how, or if, to have transform generate the "ground truth" items)? So in a way I'm not certain this "circumvents" the discussion around transform semantics / fitting into cross-validation & pipelines but rather makes an implicit decision on it. If this is set here, it would be rather awkward to have any future My default choice in #12574 was to match the form of the existing It's not necessarily the "best" choice - I go into the other option in detail on #12574 and the related JIRA. Ideally it should match up with the expected input schema for a new My concern here is that we make a quick decision implicitly due to time constraints and are stuck with it down the line as it is exposed in the public API. |
It could also be I'm overthinking things - and we can mould the In which case it doesn't really matter what call we make here, as long as we're consistent with it later. |
It's a good point about making an implicit decision. We could deprecate these methods in favor of transform-based ones in the future---we have done this in the past---but it does push the long-term decision in a clear direction. My hesitation about transform is not just about the schema. It's also because I'm still unclear how we could plug ALS into tuning without having tuning specifically understand ALS (knowing about users, items, etc.). I'll add my thoughts on the JIRA. |
expected: Map[Int, Array[Row]], | ||
dstColName: String): Unit = { | ||
assert(topK.columns.contains("recommendations")) | ||
topK.collect().foreach { row => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little strange to have all the Row
stuff in these tests.
You can do topK.as[(Int, Seq[(Int, Float)])].collect.foreach { case (id, recs) => ...
Then adjust expected
accordingly
|
||
val topKAggregator = new TopByKeyAggregator[Int, Int, Float](k, Ordering.by(_._2)) | ||
Seq( | ||
(0, 3, 54f), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a good idea to have varying # values = like maybe one with only 1
etc.
private def checkTopK( | ||
topK: Dataset[(Int, Array[(Int, Float)])], | ||
expected: Map[Int, Array[(Int, Float)]]): Unit = { | ||
topK.collect().foreach { record => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slightly prefer foreach { case (id, recs) => ...
val recs = row.getAs[WrappedArray[Row]]("recommendations") | ||
assert(recs === expected(id)) | ||
assert(recs(0).fieldIndex(dstColName) == 0) | ||
assert(recs(0).fieldIndex("rating") == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have this requirement in the output (vs. the unnamed tuples)? It does cost more to support named fields (we have to call .rdd on the resulting dataframe in the process of imposing a new schema) and if we decide to remove it later it'll break any usage of the named fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually nevermind. Either way is committing to an incompatible API so the name one seems preferable.
Test build #73787 has finished for PR 17090 at commit
|
Test build #73866 has finished for PR 17090 at commit
|
LGTM |
@jkbradley I've put my updated proposal for ranking evaluation on SPARK-14409 comments. It's different from this schema (and from the I would have preferred a more explicit discussion & decision on the schema before merging this, but now that it's merged we should at least consider how (or if) to deal with the schema difference assuming the above proposal proceeds. |
@MLnick OK I think I misunderstood some of your comments above then. I see the proposal in SPARK-14409 differs from this PR, so I agree it'd be nice to resolve it. We can make changes to this PR's schema as long as it happens soon. Here are the pros of each as I see them:
I'm not sure if there's a performance difference in the formats when stored in Tungsten Rows. I think not, but that'd be good to know. |
I commented further on the JIRA. Sorry if my other comments here and on JIRA were unclear. But the proposed schema for input to Schema 1
You will notice that
Note for reference, the input to the current Schema 2
(So actually neither of the above schemas are easily compatible with the return schema here - but I think it is not really necessary to match the ALS cross-validationMy proposal for fitting ALS into cross-validation is the So what I am saying is that ALS itself (not the evaluator) must know how to return the correct DataFrame output from Concretely: val als = new ALS().setRecommendFor("user").setK(10)
val validator = new TrainValidationSplit()
.setEvaluator(new RankingEvaluator().setK(10))
.setEstimator(als)
.setEstimatorParamMaps(...)
val bestModel = validator.fit(ratings) So while it is complex under the hood - to users it's simply a case of setting 2 params and the rest is as normal. Now, we have the best model selected by cross-validation. We can make recommendations using these convenience methods (I think it will need a cast): val recommendations = bestModel.asInstanceOf[ALSModel].recommendItemsforUsers(10) Alternatively, the val usersDF = ...
+------+
|userId|
+------+
| 1|
| 2|
| 3|
+------+
val recommendations = bestModel.transform(usersDF) So the questions:
The options are: (1) The schema in this PR: (2) The schema below. It is basically an "exploded" version of option (1)
Pros*: matches more closely with the cross-validation / evaluator input format. Perhaps slightly more "dataframe-like". Anyway sorry for hijacking this PR discussion - but as I think you can see, the evaluator / ALS transform interplay is a bit subtle and requires some thought to get the right approach. |
Thanks @MLnick for the explanation. This is what I'd understood from your similar description on the JIRA, but definitely more in-depth. (It might be good to copy to JIRA, or even a design doc at this point.) As I'd said, I haven't done a literature review on this, so it's hard for me to judge what schema evaluators should be given. I see some implicit decisions, such as evaluators using implicit ratings (using rows missing either a label or a prediction) and us not computing predictions for all (user,item) pairs with labels. However, assuming the schema you've selected is best for evaluation, then I think this highlights 2 distinct needs for top K: (a) a user-friendly API (this PR) and (b) an evaluator-friendly API (your design). For (a), many users have requested recommendForAll* methods matching the RDD-based equivalents, and this schema provides top K recommendations in an analogous and friendly schema. If evaluator needs a less user-friendly schema, that's OK, but then I think it should be considered an internal/dev schema which can differ from the user-friendly version. What do you think? |
Good point for copying some detail to JIRA, will do that. Also I responded on JIRA to your comment above (the part about evaluation) - just to avoid confusing this PR with further stuff. |
As for the API - I'm ok with having the "user-facing" version differ from the |
What changes were proposed in this pull request?
This is a simple implementation of RecommendForAllUsers & RecommendForAllItems for the Dataframe version of ALS. It uses Dataframe operations (not a wrapper on the RDD implementation). Haven't benchmarked against a wrapper, but unit test examples do work.
How was this patch tested?
Unit tests