Skip to content

Commit

Permalink
Merge branch 'master' into achit/LOCO-Test-Bug-Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tovbinm authored Nov 14, 2019
2 parents 3a9369f + ccc1501 commit 92c1e43
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 243 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
name: CI

on: [push, pull_request]
on:
push:
branches:
- master
pull_request:
branches:
- master

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
Expand All @@ -16,4 +20,4 @@ jobs:
- name: Build
run: ./gradlew scalaStyle reportScoverage
- name: Build Helloworld
run: cd helloworld && ./gradlew scalaStyle test
run: cd helloworld && ./gradlew scalaStyle test
2 changes: 1 addition & 1 deletion ROADMAP.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# The TransmogrifAI Roadmap
# TransmogrifAI Roadmap

## Short Term

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
val (fittedStages, newResultFeatures) =
if (stages.exists(_.isInstanceOf[Estimator[_]])) {
val rawData = generateRawData()

// Update features with fitted stages
val fittedStgs = fitStages(data = rawData, stagesToFit = stages, persistEveryKStages)
val newResultFtrs = resultFeatures.map(_.copyWithNewStages(fittedStgs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,21 @@ trait RecordInsightsLOCOParams extends Params {
def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName)
def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy))

final val vectorAggregationStrategy = new Param[String](parent = this, name = "vectorAggregationStrategy",
doc = "Aggregate text/date vector by " +
"1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or " +
"2. Avg strategy - calculate the loco for each column of the vector and then average all the locos."
)
def setVectorAggregationStrategy(strategy: VectorAggregationStrategy): this.type =
set(vectorAggregationStrategy, strategy.entryName)
def getVectorAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName(
$(vectorAggregationStrategy))


setDefault(
topK -> 20,
topKStrategy -> TopKStrategy.Abs.entryName
topKStrategy -> TopKStrategy.Abs.entryName,
vectorAggregationStrategy -> VectorAggregationStrategy.Avg.entryName
)
}

Expand Down Expand Up @@ -104,30 +116,33 @@ class RecordInsightsLOCO[T <: Model[T]]
/**
* These are the name of the types we want to perform an aggregation of the LOCO results over derived features
*/
private val textTypes =
Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], FeatureType.typeName[TextList])
private val textMapTypes =
Set(FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap])
private val dateTypes =
Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime])
private val dateMapTypes =
Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Indices of features derived from hashed Text(Map)Vectorizer
private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes,
h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty)

// Indices of features derived from unit Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined)
private val textTypes = Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea],
FeatureType.typeName[TextList], FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap])
private val dateTypes = Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime],
FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Map of RawFeatureName to the size of its derived features that needs to be aggregated
// for the above textTypes and dateTypes.
private lazy val aggFeaturesSize: Map[String, Int] = histories
.filter(h => isTextFeature(h) || isDateFeature(h))
.groupBy { h => getRawFeatureName(h).get }
.mapValues(_.length)

/**
* Return whether this feature derived from hashed Text(Map)Vectorizer
* @return Boolean
*/
private def isTextFeature(h: OpVectorColumnHistory): Boolean = {
h.parentFeatureType.exists(textTypes.contains) && h.indicatorValue.isEmpty && h.descriptorValue.isEmpty
}

/**
* Return the indices of features derived from given types.
* @return Seq[Int]
* Return whether this feature derived from unit circle Date(Map)Vectorizer
* @return Boolean
*/
private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] =
histories.collect {
case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index
}.distinct.sorted
private def isDateFeature(h: OpVectorColumnHistory): Boolean = {
h.parentFeatureType.exists(dateTypes.contains) && h.descriptorValue.isDefined
}

private def computeDiff
(
Expand Down Expand Up @@ -159,7 +174,7 @@ class RecordInsightsLOCO[T <: Model[T]]
// TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the
// feature history after multiple transformations has been fixed
name.map { n =>
val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) {
val timePeriodName = if (dateTypes.exists(history.parentFeatureType.contains)) {
history.descriptorValue
.flatMap(convertToTimePeriod)
.map(p => "_" + p.entryName)
Expand All @@ -168,82 +183,72 @@ class RecordInsightsLOCO[T <: Model[T]]
}
}

private def aggregateDiffs(
featureSparse: SparseVector,
aggIndices: Array[(Int, Int)],
strategy: VectorAggregationStrategy,
baseScore: Array[Double],
featureSize: Int
): Array[Double] = {
strategy match {
case VectorAggregationStrategy.Avg =>
aggIndices
.map { case (i, oldInd) => computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) }
.foldLeft(Array.empty[Double])(sumArrays)
.map( _ / featureSize)

case VectorAggregationStrategy.LeaveOutVector =>
val copyFeatureSparse = featureSparse.copy
aggIndices.foreach {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)}
computeDiff(copyFeatureSparse, baseScore)
}
}

private def returnTopPosNeg
(
featureSparse: SparseVector,
zeroCountByFeature: Map[String, Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {
val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]

agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap,
baseScore)

// Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features
// Adding LOCO results from aggregation map into heaps
for {(name, (indices, ar)) <- aggregationMap} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val zeroCounts = zeroCountByFeature.get(name).getOrElse(0)
val diffToExamine = ar.map(_ / (n + zeroCounts))
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}
// Map[FeatureName, (Array[SparseVectorIndices], Array[ActualIndices])
val aggActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]]

private def agggregateDiffs(
featureVec: SparseVector,
indexToExamine: Int,
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) =>
(0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is
// a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) =>
case h if isTextFeature(h) || isDateFeature(h) => {
for {name <- getRawFeatureName(h)} {
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine)))
val indices = aggActiveIndices.getOrElse(name, (Array.empty[(Int, Int)]))
aggActiveIndices.update(name, indices :+ (i, oldInd))
}
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
case _ => {
val diffToExamine = computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}
}
}

private def computeDiffs(
featureVec: SparseVector,
baseScore: Array[Double]
) = {
(0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) =>
(i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore))
// Aggregate active indices of each text feature and date feature based on vector aggregate strategy.
aggActiveIndices.foreach {
case (name, aggIndices) =>
val diffToExamine = aggregateDiffs(featureSparse, aggIndices,
getVectorAggregationStrategy, baseScore, aggFeaturesSize.get(name).get)
minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
.filterNot(featureIndexSet.contains)

// Count zeros by feature name
val zeroCountByFeature = zeroValIndices
.groupBy(i => getRawFeatureName(histories(i)).get)
.mapValues(_.length).view.toMap

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -254,14 +259,14 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
val allIndices = featuresSparse.indices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
Expand Down Expand Up @@ -329,3 +334,14 @@ object TopKStrategy extends Enum[TopKStrategy] {
case object Abs extends TopKStrategy("abs")
case object PositiveNegative extends TopKStrategy("positive and negative")
}


sealed abstract class VectorAggregationStrategy(val name: String) extends EnumEntry with Serializable

object VectorAggregationStrategy extends Enum[VectorAggregationStrategy] {
val values = findValues
case object LeaveOutVector extends
VectorAggregationStrategy("calculate the loco by leaving out the entire vector")
case object Avg extends
VectorAggregationStrategy("calculate the loco for each column of the vector and then average all the locos")
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,22 +353,6 @@ trait DataBalancerParams extends Params {
private[op] def getUpSampleFraction: Double = $(upSampleFraction)


/**
* Fraction to sample majority data
* Value should be in ]0.0, 1.0]
*
* @group param
*/
private[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction",
"fraction to sample majority data", ParamValidators.inRange(
lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true
)
)

private[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value)

private[op] def getDownSampleFraction: Double = $(downSampleFraction)

/**
* Whether or not positive data is in minority
* Value should be in true or false
Expand Down
Loading

0 comments on commit 92c1e43

Please sign in to comment.