-
Notifications
You must be signed in to change notification settings - Fork 397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid creating SparseVectors for LOCO #377
Changes from all commits
66a5e99
77b97ee
35b8b5c
cb2dc05
a494129
1c383e5
a1d7d81
f95f4bf
a8ff84f
709594f
96b2941
ee76053
2248d49
c3f743d
dc39cb3
25a6ac5
3b7b63a
b271b74
087b1d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,24 +30,22 @@ | |
|
||
package com.salesforce.op.stages.impl.insights | ||
|
||
import com.salesforce.op.{FeatureInsights, UID} | ||
import com.salesforce.op.UID | ||
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.stages.base.unary.UnaryTransformer | ||
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod} | ||
import com.salesforce.op.stages.impl.feature.TimePeriod | ||
import com.salesforce.op.stages.impl.selector.SelectedModel | ||
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel | ||
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ | ||
import com.salesforce.op.utils.spark.RichVector.RichSparseVector | ||
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata} | ||
import enumeratum.{Enum, EnumEntry} | ||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.ml.Model | ||
import org.apache.spark.ml.linalg.Vectors | ||
import org.apache.spark.ml.linalg.SparseVector | ||
import org.apache.spark.ml.param.{IntParam, Param, Params} | ||
|
||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import scala.reflect.runtime.universe._ | ||
|
||
|
||
trait RecordInsightsLOCOParams extends Params { | ||
|
@@ -115,35 +113,29 @@ class RecordInsightsLOCO[T <: Model[T]] | |
private val dateMapTypes = | ||
Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) | ||
|
||
// Indices of features derived from Text(Map)Vectorizer | ||
private lazy val textFeatureIndices = getIndicesOfFeatureType(textTypes ++ textMapTypes) | ||
// Indices of features derived from hashed Text(Map)Vectorizer | ||
private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes, | ||
h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty) | ||
|
||
// Indices of features derived from Date(Map)Vectorizer | ||
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes) | ||
// Indices of features derived from unit Date(Map)Vectorizer | ||
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined) | ||
|
||
/** | ||
* Return the indices of features derived from given types. | ||
* @return Seq[Int] | ||
*/ | ||
private def getIndicesOfFeatureType (types: Set[String]): Seq[Int] = histories | ||
.filter(_.parentFeatureType.exists(types.contains)) | ||
.map(_.index) | ||
.distinct.sorted | ||
private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] = | ||
histories.collect { | ||
case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index | ||
}.distinct.sorted | ||
|
||
private def computeDiffs | ||
private def computeDiff | ||
( | ||
i: Int, | ||
oldInd: Int, | ||
oldVal: Double, | ||
featureArray: Array[(Int, Double)], | ||
featureSize: Int, | ||
featureSparse: SparseVector, | ||
baseScore: Array[Double] | ||
): Array[Double] = { | ||
featureArray.update(i, (oldInd, 0.0)) | ||
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score | ||
val diffs = baseScore.zip(score).map { case (b, s) => b - s } | ||
featureArray.update(i, (oldInd, oldVal)) | ||
diffs | ||
val score = modelApply(labelDummy, featureSparse.toOPVector).score | ||
(baseScore, score).zipped.map { case (b, s) => b - s } | ||
} | ||
|
||
private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = { | ||
|
@@ -158,70 +150,100 @@ class RecordInsightsLOCO[T <: Model[T]] | |
private def convertToTimePeriod(descriptorValue: String): Option[TimePeriod] = | ||
descriptorValue.split("_").lastOption.flatMap(TimePeriod.withNameInsensitiveOption) | ||
|
||
private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = history.grouping match { | ||
case Some(grouping) => history.parentFeatureOrigins.headOption.map(_ + "_" + grouping) | ||
case None => history.parentFeatureOrigins.headOption | ||
private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = { | ||
val groupSuffix = history.grouping.map("_" + _).getOrElse("") | ||
val name = history.parentFeatureOrigins.headOption.map(_ + groupSuffix) | ||
|
||
// If the descriptor value of a derived feature exists, then we check if it is | ||
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is true now - but may not always be true. If you want this to apply only for date unit circles should also check that one of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is not consistent : Unit Circle Transformation in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or I can check the parentType instead There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this change is explicitly to deal with date features that are transformed to unit circle then the check needs to be explicitly for that. Otherwise this is also applied to lat lon values (and anything else that we add later) and if we just check the type of the parent it assumes that we will always have unit circle transformation of dates - which could change at some point... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but as I said above checking the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DateMapVectorizer does days between reference date and the date. The only two that do unit vector are DateToUnitCircleTransformer and DateToUnitCircleVectorizer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then there must be a bug in the shortcut : when
Those features both use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. blarg! you are right there is a bug in the feature history that means we loose info if the same feature undergoes multiple transformations :-( https://github.com/salesforce/TransmogrifAI/blob/master/features/src/main/scala/com/salesforce/op/utils/spark/OpVectorMetadata.scala#L53 Can you put a todo to update once the bug is fixed |
||
// TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the | ||
// feature history after multiple transformations has been fixed | ||
name.map { n => | ||
val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) { | ||
history.descriptorValue | ||
.flatMap(convertToTimePeriod) | ||
.map(p => "_" + p.entryName) | ||
} else None | ||
n + timePeriodName.getOrElse("") | ||
} | ||
} | ||
|
||
private def returnTopPosNeg | ||
( | ||
featureArray: Array[(Int, Double)], | ||
featureSparse: SparseVector, | ||
zeroCountByFeature: Map[String, Int], | ||
featureSize: Int, | ||
baseScore: Array[Double], | ||
k: Int, | ||
indexToExamine: Int | ||
): Seq[LOCOValue] = { | ||
|
||
val minMaxHeap = new MinMaxHeap(k) | ||
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])] | ||
for {i <- featureArray.indices} { | ||
val (oldInd, oldVal) = featureArray(i) | ||
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore) | ||
val history = histories(oldInd) | ||
|
||
agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap, | ||
baseScore) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the sparse features you just put in a value of 0? Cant we just skip adding them to the heap? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had the same idea but in one of the iteration I ran into test failures and deferred it to later. I'll recheck now that I have everything green. @michaelweilsalesforce any thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What kind of failures have you encountered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it may be that we were doing an unnecessary calculation and that just happened to be captured in the test... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @michaelweilsalesforce you can reproduce it by commenting out the line 171-172.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @leahmcguire @gerashegalov The reason for tracking zero values is whenever we want to average LOCOs of a same raw text feature we are also including the zero values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LEt me write a fix that will not go over the zeros |
||
|
||
// Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features | ||
// Adding LOCO results from aggregation map into heaps | ||
for {(name, (indices, ar)) <- aggregationMap} { | ||
// The index here is arbitrary | ||
val (i, n) = (indices.head, indices.length) | ||
val zeroCounts = zeroCountByFeature.get(name).getOrElse(0) | ||
val diffToExamine = ar.map(_ / (n + zeroCounts)) | ||
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait so we are aggregating everything into a map and then putting it into a heap and then just taking it out of the heap? doesn't that defeat the whole purpose of the heap? Shouldn't we be putting each value into the heap as we calculating it rather than aggregating the whole thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are only aggregating TF and Date features There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok - can you add a comment to that effect |
||
} | ||
|
||
minMaxHeap.dequeueAll | ||
} | ||
|
||
private def agggregateDiffs( | ||
featureVec: SparseVector, | ||
indexToExamine: Int, | ||
minMaxHeap: MinMaxHeap, | ||
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])], | ||
baseScore: Array[Double] | ||
): Unit = { | ||
computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) => | ||
val history = histories(oldInd) | ||
history match { | ||
// If indicator value and descriptor value of a derived text feature are empty, then it is likely | ||
// to be a hashing tf output. We aggregate such features for each (rawFeatureName). | ||
case h if h.indicatorValue.isEmpty && h.descriptorValue.isEmpty && textFeatureIndices.contains(oldInd) => | ||
// If indicator value and descriptor value of a derived text feature are empty, then it is | ||
// a hashing tf output. We aggregate such features for each (rawFeatureName). | ||
case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) => | ||
for {name <- getRawFeatureName(h)} { | ||
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double])) | ||
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine))) | ||
} | ||
// If the descriptor value of a derived date feature exists, then it is likely to be | ||
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod). | ||
case h if h.descriptorValue.isDefined && dateFeatureIndices.contains(oldInd) => | ||
for {name <- getRawFeatureName(h)} { | ||
val key = name + h.descriptorValue.flatMap(convertToTimePeriod).map(p => "_" + p.entryName).getOrElse("") | ||
val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double])) | ||
aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine))) | ||
} | ||
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
} | ||
} | ||
} | ||
|
||
// Adding LOCO results from aggregation map into heaps | ||
for {(indices, ar) <- aggregationMap.values} { | ||
// The index here is arbitrary | ||
val (i, n) = (indices.head, indices.length) | ||
val diffToExamine = ar.map(_ / n) | ||
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
private def computeDiffs( | ||
featureVec: SparseVector, | ||
baseScore: Array[Double] | ||
) = { | ||
(0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) => | ||
(i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore)) | ||
} | ||
|
||
minMaxHeap.dequeueAll | ||
} | ||
|
||
override def transformFn: OPVector => TextMap = features => { | ||
val baseResult = modelApply(labelDummy, features) | ||
val baseScore = baseResult.score | ||
val featureSize = features.value.size | ||
|
||
// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros | ||
val featuresSparse = features.value.toSparse | ||
val res = ArrayBuffer.empty[(Int, Double)] | ||
featuresSparse.foreachActive((i, v) => res += i -> v) | ||
val featureIndexSet = featuresSparse.indices.toSet | ||
|
||
// Besides non 0 values, we want to check the text/date features as well | ||
(textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0) | ||
val featureArray = res.toArray | ||
val featureSize = featuresSparse.size | ||
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices) | ||
.filterNot(featureIndexSet.contains) | ||
|
||
// Count zeros by feature name | ||
val zeroCountByFeature = zeroValIndices | ||
.groupBy(i => getRawFeatureName(histories(i)).get) | ||
.mapValues(_.length).view.toMap | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What’s the point of .view here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to force map materialization after toMap in 2.11 |
||
|
||
val k = $(topK) | ||
// Index where to examine the difference in the prediction vector | ||
|
@@ -232,15 +254,16 @@ class RecordInsightsLOCO[T <: Model[T]] | |
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability) | ||
case n if n > 2 => baseResult.prediction.toInt | ||
} | ||
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine) | ||
val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine) | ||
val top = getTopKStrategy match { | ||
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k) | ||
// Take top K positive and top K negative LOCOs, hence 2 * K | ||
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k) | ||
} | ||
|
||
val allIndices = featuresSparse.indices ++ zeroValIndices | ||
top.map { case LOCOValue(i, _, diffs) => | ||
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs) | ||
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs) | ||
}.toMap.toTextMap | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe update comment to indicate only getting hashed text values