Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to calculate LOCO for dates/texts by Leaving Out Entire Vector. #418

Merged
merged 21 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,21 @@ trait RecordInsightsLOCOParams extends Params {
def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName)
def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy))

final val vectorAggregationStrategy = new Param[String](parent = this, name = "vectorAggregationStrategy",
doc = "Aggregate text/date vector by " +
"1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or " +
"2. Avg strategy - calculate the loco for each column of the vector and then average all the locos."
)
def setVectorAggregationStrategy(strategy: VectorAggregationStrategy): this.type =
set(vectorAggregationStrategy, strategy.entryName)
def getVectorAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName(
$(vectorAggregationStrategy))


setDefault(
topK -> 20,
topKStrategy -> TopKStrategy.Abs.entryName
topKStrategy -> TopKStrategy.Abs.entryName,
vectorAggregationStrategy -> VectorAggregationStrategy.Avg.entryName
)
}

Expand Down Expand Up @@ -104,30 +116,33 @@ class RecordInsightsLOCO[T <: Model[T]]
/**
* These are the name of the types we want to perform an aggregation of the LOCO results over derived features
*/
private val textTypes =
Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], FeatureType.typeName[TextList])
private val textMapTypes =
Set(FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap])
private val dateTypes =
Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime])
private val dateMapTypes =
Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Indices of features derived from hashed Text(Map)Vectorizer
private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes,
h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty)

// Indices of features derived from unit Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined)
private val textTypes = Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea],
FeatureType.typeName[TextList], FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap])
private val dateTypes = Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime],
FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Map of RawFeatureName to the size of its derived features that needs to be aggregated
// for the above textTypes and dateTypes.
private lazy val aggFeaturesSize: Map[String, Int] = histories
.filter(h => isTextFeature(h) || isDateFeature(h))
.groupBy { h => getRawFeatureName(h).get }
.mapValues(_.length)

/**
* Return whether this feature derived from hashed Text(Map)Vectorizer
* @return Boolean
*/
private def isTextFeature(h: OpVectorColumnHistory): Boolean = {
h.parentFeatureType.exists(textTypes.contains) && h.indicatorValue.isEmpty && h.descriptorValue.isEmpty
}

/**
* Return the indices of features derived from given types.
* @return Seq[Int]
* Return whether this feature derived from unit circle Date(Map)Vectorizer
* @return Boolean
*/
private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] =
histories.collect {
case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index
}.distinct.sorted
private def isDateFeature(h: OpVectorColumnHistory): Boolean = {
h.parentFeatureType.exists(dateTypes.contains) && h.descriptorValue.isDefined
}

private def computeDiff
(
Expand Down Expand Up @@ -159,7 +174,7 @@ class RecordInsightsLOCO[T <: Model[T]]
// TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the
// feature history after multiple transformations has been fixed
name.map { n =>
val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) {
val timePeriodName = if (dateTypes.exists(history.parentFeatureType.contains)) {
history.descriptorValue
.flatMap(convertToTimePeriod)
.map(p => "_" + p.entryName)
Expand All @@ -168,82 +183,72 @@ class RecordInsightsLOCO[T <: Model[T]]
}
}

private def aggregateDiffs(
featureSparse: SparseVector,
aggIndices: Array[(Int, Int)],
strategy: VectorAggregationStrategy,
baseScore: Array[Double],
featureSize: Int
): Array[Double] = {
Copy link
Contributor Author

@sanmitra sanmitra Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of computeDiff is O(n).
When VectorAggregationStrategy=Avg note that, we do computeDiff m times and
When VectorAggregationStrategy=LeaveOutVector, we do computeDiff only 1 time.
So the former computation is O(n*m) and the latter is O(n)

where n is the size of entire feature vector (i.e containing all the features) and m is the size of individual text/date feature vector which is to be aggregated.

@tovbinm ^^

strategy match {
case VectorAggregationStrategy.Avg =>
aggIndices
.map { case (i, oldInd) => computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) }
.foldLeft(Array.empty[Double])(sumArrays)
.map( _ / featureSize)

case VectorAggregationStrategy.LeaveOutVector =>
val copyFeatureSparse = featureSparse.copy
aggIndices.foreach {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)}
tovbinm marked this conversation as resolved.
Show resolved Hide resolved
computeDiff(copyFeatureSparse, baseScore)
}
}

private def returnTopPosNeg
(
featureSparse: SparseVector,
zeroCountByFeature: Map[String, Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {
val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]

agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap,
baseScore)

// Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features
// Adding LOCO results from aggregation map into heaps
for {(name, (indices, ar)) <- aggregationMap} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val zeroCounts = zeroCountByFeature.get(name).getOrElse(0)
val diffToExamine = ar.map(_ / (n + zeroCounts))
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}
// Map[FeatureName, (Array[SparseVectorIndices], Array[ActualIndices])
val aggActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]]

private def agggregateDiffs(
featureVec: SparseVector,
indexToExamine: Int,
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) =>
(0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is
// a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) =>
case h if isTextFeature(h) || isDateFeature(h) => {
for {name <- getRawFeatureName(h)} {
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine)))
val indices = aggActiveIndices.getOrElse(name, (Array.empty[(Int, Int)]))
aggActiveIndices.update(name, indices :+ (i, oldInd))
}
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
case _ => {
val diffToExamine = computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}
}
}

private def computeDiffs(
featureVec: SparseVector,
baseScore: Array[Double]
) = {
(0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) =>
(i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore))
// Aggregate active indices of each text feature and date feature based on vector aggregate strategy.
aggActiveIndices.foreach {
case (name, aggIndices) =>
val diffToExamine = aggregateDiffs(featureSparse, aggIndices,
getVectorAggregationStrategy, baseScore, aggFeaturesSize.get(name).get)
minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you manage to remove the zero val indices logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From featureSparse vector we know the active indices count, but to calculate the average LOCO for each date/text field, we needed the zero val indices logic. We used to calculate the total count of indices for a date/text feature in each transformation of a individual record/row in transformFn function. There is no need to do this, we can just calculate the total count of indices per date/text feature only once using OpVectorColumnHistory at the global level (i.e outside transformFn function ), this is what I did in line 139

private lazy val textFeaturesCount: Map[String, Int] = getFeatureCount(isTextIndex)
private lazy val dateFeaturesCount: Map[String, Int] = getFeatureCount(isDateIndex) 

.filterNot(featureIndexSet.contains)

// Count zeros by feature name
val zeroCountByFeature = zeroValIndices
.groupBy(i => getRawFeatureName(histories(i)).get)
.mapValues(_.length).view.toMap

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -254,14 +259,14 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
val allIndices = featuresSparse.indices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
Expand Down Expand Up @@ -329,3 +334,14 @@ object TopKStrategy extends Enum[TopKStrategy] {
case object Abs extends TopKStrategy("abs")
case object PositiveNegative extends TopKStrategy("positive and negative")
}


sealed abstract class VectorAggregationStrategy(val name: String) extends EnumEntry with Serializable

object VectorAggregationStrategy extends Enum[VectorAggregationStrategy] {
val values = findValues
case object LeaveOutVector extends
VectorAggregationStrategy("calculate the loco by leaving out the entire vector")
case object Avg extends
VectorAggregationStrategy("calculate the loco for each column of the vector and then average all the locos")
}
Loading