From 62de0be44853d60060a8de2c5fd2d88740f480ef Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Fri, 6 Sep 2019 10:28:16 -0700 Subject: [PATCH 01/13] Add the ability to calculate the loco for text and date features by leaving out their entire vector. --- .../impl/insights/RecordInsightsLOCO.scala | 172 +++++++++++------- 1 file changed, 106 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 7bcf02f969..feddb351b0 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -64,9 +64,32 @@ trait RecordInsightsLOCOParams extends Params { def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy)) + final val dateAggregationStrategy = new Param[String](parent = this, name = "dateAggregationStrategy", + doc = "Whether vector for each time period - HourOfDay, DayOfWeek, etc is aggregated by " + + "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" + + "2. Avg strategy - calculate the loco for each column of the vector and then average all the locos." + ) + def setDateAggregationStrategy(strategy: VectorAggregationStrategy): this.type = + set(dateAggregationStrategy, strategy.entryName) + def getDateAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName( + $(dateAggregationStrategy)) + + final val textAggregationStrategy = new Param[String](parent = this, name = "dateAggregationStrategy", + doc = "Whether text vector is aggregated by " + + "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" + + "2. Avg strategy - calculate the loco for each column of the vector and then average all the locos." + ) + def setTextAggregationStrategy(strategy: VectorAggregationStrategy): this.type = + set(textAggregationStrategy, strategy.entryName) + def getTextAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName( + $(textAggregationStrategy)) + + setDefault( topK -> 20, - topKStrategy -> TopKStrategy.Abs.entryName + topKStrategy -> TopKStrategy.Abs.entryName, + dateAggregationStrategy -> VectorAggregationStrategy.Avg.entryName, + textAggregationStrategy -> VectorAggregationStrategy.Avg.entryName ) } @@ -113,21 +136,24 @@ class RecordInsightsLOCO[T <: Model[T]] private val dateMapTypes = Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) - // Indices of features derived from hashed Text(Map)Vectorizer - private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes, - h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty) + private lazy val textFeaturesCount: Map[String, Int] = histories + .filter(isTextIndex) + .groupBy { h => getRawFeatureName(h).get } + .mapValues(_.length).view.toMap - // Indices of features derived from unit Date(Map)Vectorizer - private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined) + private lazy val dateFeaturesCount: Map[String, Int] = histories + .filter(isDateIndex) + .groupBy { h => getRawFeatureName(h).get } + .mapValues(_.length).view.toMap - /** - * Return the indices of features derived from given types. - * @return Seq[Int] - */ - private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] = - histories.collect { - case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index - }.distinct.sorted + private def isTextIndex(h: OpVectorColumnHistory): Boolean = { + h.parentFeatureType.exists((textTypes ++ textMapTypes).contains) && + h.indicatorValue.isEmpty && h.descriptorValue.isEmpty + } + + private def isDateIndex(h: OpVectorColumnHistory): Boolean = { + h.parentFeatureType.exists((dateTypes ++ dateMapTypes).contains) && h.descriptorValue.isDefined + } private def computeDiff ( @@ -168,82 +194,85 @@ class RecordInsightsLOCO[T <: Model[T]] } } + private def aggregateDiffs( + featureSparse: SparseVector, + aggIndices: Array[(Int, Int)], + strategy: VectorAggregationStrategy, + baseScore: Array[Double], + featureCount: Int + ): Array[Double] = { + strategy match { + case VectorAggregationStrategy.Avg => + aggIndices + .map { case (i, oldInd) => computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) } + .foldLeft(Array.empty[Double])(sumArrays) + .map( _ / featureCount) + + case VectorAggregationStrategy.LeaveOutVector => + val copyFeatureSparse = featureSparse.copy + aggIndices.map {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)} + computeDiff(copyFeatureSparse, baseScore) + } + } + private def returnTopPosNeg ( featureSparse: SparseVector, - zeroCountByFeature: Map[String, Int], - featureSize: Int, baseScore: Array[Double], k: Int, indexToExamine: Int ): Seq[LOCOValue] = { val minMaxHeap = new MinMaxHeap(k) - val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])] - - agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap, - baseScore) - - // Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features - // Adding LOCO results from aggregation map into heaps - for {(name, (indices, ar)) <- aggregationMap} { - // The index here is arbitrary - val (i, n) = (indices.head, indices.length) - val zeroCounts = zeroCountByFeature.get(name).getOrElse(0) - val diffToExamine = ar.map(_ / (n + zeroCounts)) - minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) - } - minMaxHeap.dequeueAll - } + // Map[FeatureName, (Array[SparseVectorIndices], Array[ActualIndices]) + val textActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] + val dateActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] - private def agggregateDiffs( - featureVec: SparseVector, - indexToExamine: Int, - minMaxHeap: MinMaxHeap, - aggregationMap: mutable.Map[String, (Array[Int], Array[Double])], - baseScore: Array[Double] - ): Unit = { - computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) => + (0 until featureSparse.size, featureSparse.indices).zipped.map { case (i: Int, oldInd: Int) => val history = histories(oldInd) history match { - // If indicator value and descriptor value of a derived text feature are empty, then it is - // a hashing tf output. We aggregate such features for each (rawFeatureName). - case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) => + case h if isTextIndex(h) => { + for {name <- getRawFeatureName(h)} { + val indices = textActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) + textActiveIndices.update(name, indices :+ (i, oldInd)) + } + } + case h if isDateIndex(h) => { for {name <- getRawFeatureName(h)} { - val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double])) - aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine))) + val indices = dateActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) + dateActiveIndices.update(name, indices :+ (i, oldInd)) } - case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) + } + case _ => { + val diffToExamine = computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) + minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) + } } } - } - private def computeDiffs( - featureVec: SparseVector, - baseScore: Array[Double] - ) = { - (0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) => - (i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore)) + // Aggregate active indices of each text feature and date feature based on their respective strategy. + textActiveIndices.map { + case (name, aggIndices) => + val diffToExamine = aggregateDiffs(featureSparse, aggIndices, + getTextAggregationStrategy, baseScore, textFeaturesCount.get(name).get) + minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) } + dateActiveIndices.map { + case (name, aggIndices) => + val diffToExamine = aggregateDiffs(featureSparse, aggIndices, + getDateAggregationStrategy, baseScore, dateFeaturesCount.get(name).get) + minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) + } + + minMaxHeap.dequeueAll } override def transformFn: OPVector => TextMap = features => { val baseResult = modelApply(labelDummy, features) val baseScore = baseResult.score - val featureSize = features.value.size // TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse - val featureIndexSet = featuresSparse.indices.toSet - - // Besides non 0 values, we want to check the text/date features as well - val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices) - .filterNot(featureIndexSet.contains) - - // Count zeros by feature name - val zeroCountByFeature = zeroValIndices - .groupBy(i => getRawFeatureName(histories(i)).get) - .mapValues(_.length).view.toMap val k = $(topK) // Index where to examine the difference in the prediction vector @@ -254,14 +283,14 @@ class RecordInsightsLOCO[T <: Model[T]] // For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability) case n if n > 2 => baseResult.prediction.toInt } - val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine) + val topPosNeg = returnTopPosNeg(featuresSparse, baseScore, k, indexToExamine) val top = getTopKStrategy match { case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k) // Take top K positive and top K negative LOCOs, hence 2 * K case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k) } - val allIndices = featuresSparse.indices ++ zeroValIndices + val allIndices = featuresSparse.indices top.map { case LOCOValue(i, _, diffs) => RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs) }.toMap.toTextMap @@ -329,3 +358,14 @@ object TopKStrategy extends Enum[TopKStrategy] { case object Abs extends TopKStrategy("abs") case object PositiveNegative extends TopKStrategy("positive and negative") } + + +sealed abstract class VectorAggregationStrategy(val name: String) extends EnumEntry with Serializable + +object VectorAggregationStrategy extends Enum[VectorAggregationStrategy] { + val values = findValues + case object LeaveOutVector extends + VectorAggregationStrategy("calculate the loco by leaving out the entire vector") + case object Avg extends + VectorAggregationStrategy("calculate the loco for each column of the vector and then average all the locos") +} From 93afc120adbd24210e78c37bd74a5e073aafa02c Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Fri, 6 Sep 2019 10:42:37 -0700 Subject: [PATCH 02/13] Refactoring --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index feddb351b0..20af78938b 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -136,13 +136,11 @@ class RecordInsightsLOCO[T <: Model[T]] private val dateMapTypes = Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) - private lazy val textFeaturesCount: Map[String, Int] = histories - .filter(isTextIndex) - .groupBy { h => getRawFeatureName(h).get } - .mapValues(_.length).view.toMap + private lazy val textFeaturesCount: Map[String, Int] = getFeatureCount(isTextIndex) + private lazy val dateFeaturesCount: Map[String, Int] = getFeatureCount(isDateIndex) - private lazy val dateFeaturesCount: Map[String, Int] = histories - .filter(isDateIndex) + private def getFeatureCount(predicate: OpVectorColumnHistory => Boolean): Map[String, Int] = histories + .filter(predicate) .groupBy { h => getRawFeatureName(h).get } .mapValues(_.length).view.toMap From b117b9aabd27325332480c3564633332f5f06fde Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Sun, 8 Sep 2019 02:12:50 -0700 Subject: [PATCH 03/13] Refactoring --- .../insights/RecordInsightsLOCOTest.scala | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 0e7f8d4fea..15ecb3616e 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -44,7 +44,7 @@ import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomT import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata} import com.salesforce.op.{FeatureHistory, OpWorkflow, _} -import org.apache.spark.ml.PredictionModel +import org.apache.spark.ml.Model import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.linalg._ import org.apache.spark.ml.regression.LinearRegressionModel @@ -300,18 +300,20 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI it should "aggregate values for text and textMap derived features" in { val testData = generateTestTextData + val model = generateLRModel(testData) + val actualRecordInsights = generateRecordInsights(model, testData) withClue("TextArea can have two null indicator values") { - testData.actualRecordInsights.map(p => assert(p.size == 7 || p.size == 8)) + actualRecordInsights.map(p => assert(p.size == 7 || p.size == 8)) } withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + "aggregation required for LOCO on this field.") { - testData.actualRecordInsights.foreach { p => + actualRecordInsights.foreach { p => assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)) } } - assertLOCOSum(testData.actualRecordInsights) + assertLOCOSum(actualRecordInsights) assertAggregatedText(textFeatureName) assertAggregatedTextMap(textMapFeatureName, "k0") assertAggregatedTextMap(textMapFeatureName, "k1") @@ -329,7 +331,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, testData) + assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) } } @@ -342,7 +344,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI withClue(s"Aggregate all the derived hashing tf of rawMapFeature - $textMapFeatureName for key - $keyName") { val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textMapFeatureName) && history.grouping == Option(keyName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, testData) + assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) } } } @@ -350,8 +352,10 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI it should "aggregate values for date, datetime, dateMap and dateTimeMap derived features" in { val testData = generateTestDateData + val model = generateLRModel(testData) + val actualRecordInsights = generateRecordInsights(model, testData) - assertLOCOSum(testData.actualRecordInsights) + assertLOCOSum(actualRecordInsights) assertAggregatedDate(dateFeatureName) assertAggregatedDate(dateTimeFeatureName) assertAggregatedDateMap(dateMapFeatureName, "k0") @@ -370,7 +374,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateFeatureName) && history.descriptorValue.isDefined && history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, testData) + assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) } } } @@ -387,7 +391,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateMapFeatureName) && history.grouping == Option(keyName) && history.descriptorValue.isDefined && history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, testData) + assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) } } } @@ -406,7 +410,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI */ private def assertAggregatedWithPredicate( predicate: OpVectorColumnHistory => Boolean, - testData: RecordInsightsTestData[LogisticRegressionModel] + testData: RecordInsightsTestData, + model: OpPredictorWrapperModel[_], + actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]] ): Unit = { implicit val enc: Encoder[(Array[Double], Long)] = ExpressionEncoder() implicit val enc2: Encoder[Seq[Double]] = ExpressionEncoder() @@ -422,9 +428,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val featureArray = v.toArray val locos = indices.map { i => val oldVal = v(i) - val baseScore = testData.sparkModel.transformFn(l.toRealNN, v.toOPVector).score.toSeq + val baseScore = model.transformFn(l.toRealNN, v.toOPVector).score.toSeq featureArray.update(i, 0.0) - val newScore = testData.sparkModel.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq + val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq featureArray.update(i, oldVal) baseScore.zip(newScore).map { case (b, n) => b - n } } @@ -433,7 +439,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } val expected = expectedLocos.collect().toSeq.filter(_.head != 0.0) - val actual = testData.actualRecordInsights + val actual = actualRecordInsights .flatMap(_.find { case (history, _) => predicate(history) }) .map(_._2.map(_._2)).toSeq val zip = actual.zip(expected) @@ -465,7 +471,7 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val textAreaFeatureName = "textArea" val textAreaMapFeatureName = "textAreaMap" - def generateTestDateData: RecordInsightsTestData[LogisticRegressionModel] = { + def generateTestDateData: RecordInsightsTestData = { val refDate = TransmogrifierDefaults.ReferenceDate.minusMillis(1) val minStep = 1000000 @@ -518,10 +524,10 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val parsedInsights = locoInsights.collect(locoTransformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) - RecordInsightsTestData(rawData, featureTransformedDF, featureVector, label, sparkModel, parsedInsights) + RecordInsightsTestData(rawData, featureTransformedDF, featureVector, label) } - def generateTestTextData: RecordInsightsTestData[LogisticRegressionModel] = { + def generateTestTextData: RecordInsightsTestData = { // Random Text Data val textData: Seq[Text] = RandomText.strings(5, 10).withProbabilityOfEmpty(0.3).take(numRows).toList @@ -603,25 +609,26 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val checkedFeatureVector = checker.getOutput() - // RecordInsightsLOCO - val sparkModel = new OpLogisticRegression().setInput(label, checkedFeatureVector).fit(checked) - - val transformer = new RecordInsightsLOCO(sparkModel).setInput(checkedFeatureVector) - - val insights = transformer.transform(checked) + RecordInsightsTestData(testData, checked, checkedFeatureVector, label) + } - val parsed = insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) + def generateLRModel(data: RecordInsightsTestData): OpPredictorWrapperModel[LogisticRegressionModel] = { + new OpLogisticRegression().setInput(data.label, data.featureVector).fit(data.featureTransformedDF) + } - RecordInsightsTestData(testData, checked, checkedFeatureVector, label, sparkModel, parsed) + def generateRecordInsights[T <: Model[T]](model: T, + data: RecordInsightsTestData): Array[Map[OpVectorColumnHistory, Insights]] = { + val transformer = new RecordInsightsLOCO(model).setInput(data.featureVector).setTopK( + data.featureTransformedDF.columns.length) + val insights = transformer.transform(data.featureTransformedDF) + insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) } } -case class RecordInsightsTestData[M <: PredictionModel[Vector, M]] +case class RecordInsightsTestData ( rawDF: DataFrame, featureTransformedDF: DataFrame, featureVector: FeatureLike[OPVector], - label: FeatureLike[RealNN], - sparkModel: OpPredictorWrapperModel[M], - actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]] + label: FeatureLike[RealNN] ) From b7bd84fc35f68ff5b64b19670bc4703d9e712c98 Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Wed, 11 Sep 2019 16:43:47 -0700 Subject: [PATCH 04/13] Refactoring --- .../insights/RecordInsightsLOCOTest.scala | 78 ++++++++----------- 1 file changed, 31 insertions(+), 47 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 15ecb3616e..049e73d9bc 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -299,9 +299,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } it should "aggregate values for text and textMap derived features" in { - val testData = generateTestTextData - val model = generateLRModel(testData) - val actualRecordInsights = generateRecordInsights(model, testData) + val (df, featureVector, label) = generateTestTextData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualRecordInsights = generateRecordInsights(model, df, featureVector) withClue("TextArea can have two null indicator values") { actualRecordInsights.map(p => assert(p.size == 7 || p.size == 8)) @@ -331,7 +331,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) + assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) } } @@ -344,16 +344,16 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI withClue(s"Aggregate all the derived hashing tf of rawMapFeature - $textMapFeatureName for key - $keyName") { val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textMapFeatureName) && history.grouping == Option(keyName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) + assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) } } } it should "aggregate values for date, datetime, dateMap and dateTimeMap derived features" in { - val testData = generateTestDateData - val model = generateLRModel(testData) - val actualRecordInsights = generateRecordInsights(model, testData) + val (df, featureVector, label) = generateTestDateData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualRecordInsights = generateRecordInsights(model, df, featureVector, topK = 40) assertLOCOSum(actualRecordInsights) assertAggregatedDate(dateFeatureName) @@ -374,7 +374,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateFeatureName) && history.descriptorValue.isDefined && history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) + assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) } } } @@ -391,7 +391,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateMapFeatureName) && history.grouping == Option(keyName) && history.descriptorValue.isDefined && history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, testData, model, actualRecordInsights) + assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) } } } @@ -410,20 +410,22 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI */ private def assertAggregatedWithPredicate( predicate: OpVectorColumnHistory => Boolean, - testData: RecordInsightsTestData, model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]] ): Unit = { implicit val enc: Encoder[(Array[Double], Long)] = ExpressionEncoder() implicit val enc2: Encoder[Seq[Double]] = ExpressionEncoder() - val meta = OpVectorMetadata.apply(testData.featureTransformedDF.schema(testData.featureVector.name)) + val meta = OpVectorMetadata.apply(df.schema(featureVector.name)) val indices = meta.getColumnHistory() .filter(predicate) .map(_.index) - val expectedLocos = testData.featureTransformedDF.select(testData.label, testData.featureVector).map { + val expectedLocos = df.select(label, featureVector).map { case Row(l: Double, v: Vector) => val featureArray = v.toArray val locos = indices.map { i => @@ -449,6 +451,17 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } } } + + private def generateRecordInsights[T <: Model[T]]( + model: T, + df: DataFrame, + featureVector: FeatureLike[OPVector], + topK: Int = 20 + ): Array[Map[OpVectorColumnHistory, Insights]] = { + val transformer = new RecordInsightsLOCO(model).setInput(featureVector).setTopK(topK) + val insights = transformer.transform(df) + insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) + } } trait RecordInsightsTestDataGenerator extends TestSparkContext { @@ -471,7 +484,7 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val textAreaFeatureName = "textArea" val textAreaMapFeatureName = "textAreaMap" - def generateTestDateData: RecordInsightsTestData = { + def generateTestDateData: (DataFrame, FeatureLike[OPVector], FeatureLike[RealNN]) = { val refDate = TransmogrifierDefaults.ReferenceDate.minusMillis(1) val minStep = 1000000 @@ -515,19 +528,10 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val featureVector = Seq(dateVector, datetimeVector, dateMapVector, datetimeMapVector).combine() val featureTransformedDF = new OpWorkflow().setResultFeatures(featureVector, label).transform(rawData) - // Train a model - val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(featureTransformedDF) - - // RecordInsightsLOCO - val locoTransformer = new RecordInsightsLOCO(sparkModel).setInput(featureVector).setTopK(40) - val locoInsights = locoTransformer.transform(featureTransformedDF) - val parsedInsights = locoInsights.collect(locoTransformer.getOutput()).map(i => - RecordInsightsParser.parseInsights(i)) - - RecordInsightsTestData(rawData, featureTransformedDF, featureVector, label) + (featureTransformedDF, featureVector, label) } - def generateTestTextData: RecordInsightsTestData = { + def generateTestTextData: (DataFrame, FeatureLike[OPVector], FeatureLike[RealNN]) = { // Random Text Data val textData: Seq[Text] = RandomText.strings(5, 10).withProbabilityOfEmpty(0.3).take(numRows).toList @@ -605,30 +609,10 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { // Sanity Checker val checker = new SanityChecker().setInput(label, featureVector) - val checked = checker.fit(vectorized).transform(vectorized) + val checkedDf = checker.fit(vectorized).transform(vectorized) val checkedFeatureVector = checker.getOutput() - RecordInsightsTestData(testData, checked, checkedFeatureVector, label) - } - - def generateLRModel(data: RecordInsightsTestData): OpPredictorWrapperModel[LogisticRegressionModel] = { - new OpLogisticRegression().setInput(data.label, data.featureVector).fit(data.featureTransformedDF) - } - - def generateRecordInsights[T <: Model[T]](model: T, - data: RecordInsightsTestData): Array[Map[OpVectorColumnHistory, Insights]] = { - val transformer = new RecordInsightsLOCO(model).setInput(data.featureVector).setTopK( - data.featureTransformedDF.columns.length) - val insights = transformer.transform(data.featureTransformedDF) - insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) + (checkedDf, checkedFeatureVector, label) } } - -case class RecordInsightsTestData -( - rawDF: DataFrame, - featureTransformedDF: DataFrame, - featureVector: FeatureLike[OPVector], - label: FeatureLike[RealNN] -) From 47f2ae1c80f0e941816f42efa48297488a6962b2 Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Fri, 13 Sep 2019 17:05:14 -0700 Subject: [PATCH 05/13] Refactoring --- .../insights/RecordInsightsLOCOTest.scala | 246 +++++++++++------- 1 file changed, 149 insertions(+), 97 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 049e73d9bc..c12c3c3b60 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -45,7 +45,6 @@ import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata} import com.salesforce.op.{FeatureHistory, OpWorkflow, _} import org.apache.spark.ml.Model -import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.linalg._ import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -298,108 +297,150 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI "feature importances from Spark's RandomForest") } - it should "aggregate values for text and textMap derived features" in { - val (df, featureVector, label) = generateTestTextData - val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) - val actualRecordInsights = generateRecordInsights(model, df, featureVector) + for (strategy <- VectorAggregationStrategy.values) { + it should s"aggregate values for text and textMap derived features when " + + s"strategy=$strategy" in { + val (df, featureVector, label) = generateTestTextData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualInsights = generateRecordInsights(model, df, featureVector, strategy) - withClue("TextArea can have two null indicator values") { - actualRecordInsights.map(p => assert(p.size == 7 || p.size == 8)) - } - withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + - "aggregation required for LOCO on this field.") { - actualRecordInsights.foreach { p => - assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)) + withClue("TextArea can have two null indicator values") { + actualInsights.map(p => assert(p.size == 7 || p.size == 8)) + } + withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + + "aggregation required for LOCO on this field.") { + actualInsights.foreach { p => + assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)) + } } + + assertLOCOSum(actualInsights) + assertAggregatedText(textFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedText(textAreaFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedTextMap(textMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textAreaMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textAreaMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) } + } - assertLOCOSum(actualRecordInsights) - assertAggregatedText(textFeatureName) - assertAggregatedTextMap(textMapFeatureName, "k0") - assertAggregatedTextMap(textMapFeatureName, "k1") - assertAggregatedText(textAreaFeatureName) - assertAggregatedTextMap(textAreaMapFeatureName, "k0") - assertAggregatedTextMap(textAreaMapFeatureName, "k1") - - - /** - * Compare the aggregation made by RecordInsightsLOCO on a text field to one made manually - * - * @param textFeatureName Text Field Name - */ - def assertAggregatedText(textFeatureName: String): Unit = { - withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && - history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) - } + + for (strategy <- VectorAggregationStrategy.values) { + it should s"aggregate values for date, datetime, dateMap and dateTimeMap derived features when " + + s"strategy=$strategy" in { + val (df, featureVector, label) = generateTestDateData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualInsights = generateRecordInsights(model, df, featureVector, strategy, topK = 40) + + assertLOCOSum(actualInsights) + assertAggregatedDate(dateFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedDate(dateTimeFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedDateMap(dateMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateTimeMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateTimeMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) } + } - /** - * Compare the aggregation made by RecordInsightsLOCO to one made manually - * - * @param textMapFeatureName Text Map Field Name - */ - def assertAggregatedTextMap(textMapFeatureName: String, keyName: String): Unit = { - withClue(s"Aggregate all the derived hashing tf of rawMapFeature - $textMapFeatureName for key - $keyName") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textMapFeatureName) && - history.grouping == Option(keyName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) - } + + private def assertLOCOSum(actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]]): Unit = { + withClue("LOCOs sum to 0") { + actualRecordInsights.foreach(_.values.foreach(a => assert(math.abs(a.map(_._2).sum) < 1e-10))) } } + /** + * Compare the aggregation made by RecordInsightsLOCO on a text field to one made manually + * + * @param textFeatureName Text Field Name + */ + def assertAggregatedText(textFeatureName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && + history.indicatorValue.isEmpty && history.descriptorValue.isEmpty + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) + } + } - it should "aggregate values for date, datetime, dateMap and dateTimeMap derived features" in { - val (df, featureVector, label) = generateTestDateData - val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) - val actualRecordInsights = generateRecordInsights(model, df, featureVector, topK = 40) - - assertLOCOSum(actualRecordInsights) - assertAggregatedDate(dateFeatureName) - assertAggregatedDate(dateTimeFeatureName) - assertAggregatedDateMap(dateMapFeatureName, "k0") - assertAggregatedDateMap(dateMapFeatureName, "k1") - assertAggregatedDateMap(dateTimeMapFeatureName, "k0") - assertAggregatedDateMap(dateTimeMapFeatureName, "k1") - - /** - * Compare the aggregation made by RecordInsightsLOCO on a Date/DateTime field to one made manually - * - * @param dateFeatureName Date/DateTime Field - */ - def assertAggregatedDate(dateFeatureName: String): Unit = { - for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { - withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawFeature - $dateFeatureName.") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateFeatureName) && - history.descriptorValue.isDefined && - history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) - } - } + /** + * Compare the aggregation made by RecordInsightsLOCO to one made manually + * + * @param textMapFeatureName Text Map Field Name + */ + def assertAggregatedTextMap(textMapFeatureName: String, keyName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + withClue(s"Aggregate all the derived hashing tf of rawMapFeature - $textMapFeatureName for key - $keyName") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textMapFeatureName) && + history.grouping == Option(keyName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) } + } - /** - * Compare the aggregation made by RecordInsightsLOCO on a DateMap/DateTimeMap field to one made manually - * - * @param dateMapFeatureName DateMap/DateTimeMap Field - */ - def assertAggregatedDateMap(dateMapFeatureName: String, keyName: String): Unit = { - for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { - withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawMapFeature - $dateMapFeatureName " + - s"with key as $keyName.") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateMapFeatureName) && - history.grouping == Option(keyName) && history.descriptorValue.isDefined && - history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, model, df, featureVector, label, actualRecordInsights) - } + /** + * Compare the aggregation made by RecordInsightsLOCO on a Date/DateTime field to one made manually + * + * @param dateFeatureName Date/DateTime Field + */ + def assertAggregatedDate(dateFeatureName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { + withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawFeature - $dateFeatureName.") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateFeatureName) && + history.descriptorValue.isDefined && + history.descriptorValue.get.split("_").last == timePeriod.entryName + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) } } } - private def assertLOCOSum(actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]]): Unit = { - withClue("LOCOs sum to 0") { - actualRecordInsights.foreach(_.values.foreach(a => assert(math.abs(a.map(_._2).sum) < 1e-10))) + /** + * Compare the aggregation made by RecordInsightsLOCO on a DateMap/DateTimeMap field to one made manually + * + * @param dateMapFeatureName DateMap/DateTimeMap Field + */ + def assertAggregatedDateMap(dateMapFeatureName: String, keyName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { + withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawMapFeature - $dateMapFeatureName " + + s"with key as $keyName.") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateMapFeatureName) && + history.grouping == Option(keyName) && history.descriptorValue.isDefined && + history.descriptorValue.get.split("_").last == timePeriod.entryName + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) + } } } @@ -410,6 +451,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI */ private def assertAggregatedWithPredicate( predicate: OpVectorColumnHistory => Boolean, + strategy: VectorAggregationStrategy, model: OpPredictorWrapperModel[_], df: DataFrame, featureVector: FeatureLike[OPVector], @@ -427,17 +469,24 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val expectedLocos = df.select(label, featureVector).map { case Row(l: Double, v: Vector) => - val featureArray = v.toArray - val locos = indices.map { i => - val oldVal = v(i) - val baseScore = model.transformFn(l.toRealNN, v.toOPVector).score.toSeq - featureArray.update(i, 0.0) - val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq - featureArray.update(i, oldVal) - baseScore.zip(newScore).map { case (b, n) => b - n } + val featureArray = v.copy.toArray + val baseScore = model.transformFn(l.toRealNN, v.toOPVector).score.toSeq + strategy match { + case VectorAggregationStrategy.Avg => + val locos = indices.map { i => + val oldVal = v(i) + featureArray.update(i, 0.0) + val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq + featureArray.update(i, oldVal) + baseScore.zip(newScore).map { case (b, n) => b - n } + } + val sumLOCOs = locos.reduce((a1, a2) => a1.zip(a2).map { case (l, r) => l + r }) + sumLOCOs.map(_ / indices.length) + case VectorAggregationStrategy.LeaveOutVector => + indices.map { i => featureArray.update(i, 0.0) } + val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq + baseScore.zip(newScore).map { case (b, n) => b - n } } - val sumLOCOs = locos.reduce((a1, a2) => a1.zip(a2).map { case (l, r) => l + r }) - sumLOCOs.map(_ / indices.length) } val expected = expectedLocos.collect().toSeq.filter(_.head != 0.0) @@ -456,9 +505,12 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI model: T, df: DataFrame, featureVector: FeatureLike[OPVector], + strategy: VectorAggregationStrategy, topK: Int = 20 ): Array[Map[OpVectorColumnHistory, Insights]] = { val transformer = new RecordInsightsLOCO(model).setInput(featureVector).setTopK(topK) + .setDateAggregationStrategy(strategy) + .setTextAggregationStrategy(strategy) val insights = transformer.transform(df) insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) } From 97483a62b297a31ac77e6ae557f0d37dd3d581bc Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Sun, 15 Sep 2019 19:08:56 -0700 Subject: [PATCH 06/13] Fixing scala style errors --- .../op/stages/impl/insights/RecordInsightsLOCOTest.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index c12c3c3b60..351d64751e 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -297,7 +297,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI "feature importances from Spark's RandomForest") } - for (strategy <- VectorAggregationStrategy.values) { + for {strategy <- VectorAggregationStrategy.values} { it should s"aggregate values for text and textMap derived features when " + s"strategy=$strategy" in { val (df, featureVector, label) = generateTestTextData @@ -328,8 +328,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } } - - for (strategy <- VectorAggregationStrategy.values) { + for {strategy <- VectorAggregationStrategy.values} { it should s"aggregate values for date, datetime, dateMap and dateTimeMap derived features when " + s"strategy=$strategy" in { val (df, featureVector, label) = generateTestDateData @@ -369,7 +368,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI featureVector: FeatureLike[OPVector], label: FeatureLike[RealNN], actualInsights: Array[Map[OpVectorColumnHistory, Insights]] - ): Unit = { + ): Unit = { withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty From 356e79ff27f3d27d5c4cc4936254e4b9ad4ba900 Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 14 Oct 2019 14:12:04 -0700 Subject: [PATCH 07/13] Refractoring --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 8 ++++---- .../op/stages/impl/insights/RecordInsightsLOCOTest.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 20af78938b..59477c8e16 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -208,7 +208,7 @@ class RecordInsightsLOCO[T <: Model[T]] case VectorAggregationStrategy.LeaveOutVector => val copyFeatureSparse = featureSparse.copy - aggIndices.map {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)} + aggIndices.foreach {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)} computeDiff(copyFeatureSparse, baseScore) } } @@ -226,7 +226,7 @@ class RecordInsightsLOCO[T <: Model[T]] val textActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] val dateActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] - (0 until featureSparse.size, featureSparse.indices).zipped.map { case (i: Int, oldInd: Int) => + (0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) => val history = histories(oldInd) history match { case h if isTextIndex(h) => { @@ -249,13 +249,13 @@ class RecordInsightsLOCO[T <: Model[T]] } // Aggregate active indices of each text feature and date feature based on their respective strategy. - textActiveIndices.map { + textActiveIndices.foreach { case (name, aggIndices) => val diffToExamine = aggregateDiffs(featureSparse, aggIndices, getTextAggregationStrategy, baseScore, textFeaturesCount.get(name).get) minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) } - dateActiveIndices.map { + dateActiveIndices.foreach { case (name, aggIndices) => val diffToExamine = aggregateDiffs(featureSparse, aggIndices, getDateAggregationStrategy, baseScore, dateFeaturesCount.get(name).get) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 351d64751e..ec83422a2e 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -482,7 +482,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI val sumLOCOs = locos.reduce((a1, a2) => a1.zip(a2).map { case (l, r) => l + r }) sumLOCOs.map(_ / indices.length) case VectorAggregationStrategy.LeaveOutVector => - indices.map { i => featureArray.update(i, 0.0) } + indices.foreach { i => featureArray.update(i, 0.0) } val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq baseScore.zip(newScore).map { case (b, n) => b - n } } From 9346c68efd4654a1e4151827408852abf741c10d Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 21 Oct 2019 11:40:34 -0700 Subject: [PATCH 08/13] Adding only one paramter to control aggregation of all vector features --- .../impl/insights/RecordInsightsLOCO.scala | 29 ++++++------------- .../insights/RecordInsightsLOCOTest.scala | 3 +- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 59477c8e16..0ff9ce4319 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -64,32 +64,21 @@ trait RecordInsightsLOCOParams extends Params { def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy)) - final val dateAggregationStrategy = new Param[String](parent = this, name = "dateAggregationStrategy", - doc = "Whether vector for each time period - HourOfDay, DayOfWeek, etc is aggregated by " + + final val vectorAggregationStrategy = new Param[String](parent = this, name = "vectorAggregationStrategy", + doc = "Aggregate text/date vector by " + "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" + "2. Avg strategy - calculate the loco for each column of the vector and then average all the locos." ) - def setDateAggregationStrategy(strategy: VectorAggregationStrategy): this.type = - set(dateAggregationStrategy, strategy.entryName) - def getDateAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName( - $(dateAggregationStrategy)) - - final val textAggregationStrategy = new Param[String](parent = this, name = "dateAggregationStrategy", - doc = "Whether text vector is aggregated by " + - "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" + - "2. Avg strategy - calculate the loco for each column of the vector and then average all the locos." - ) - def setTextAggregationStrategy(strategy: VectorAggregationStrategy): this.type = - set(textAggregationStrategy, strategy.entryName) - def getTextAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName( - $(textAggregationStrategy)) + def setVectorAggregationStrategy(strategy: VectorAggregationStrategy): this.type = + set(vectorAggregationStrategy, strategy.entryName) + def getVectorAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName( + $(vectorAggregationStrategy)) setDefault( topK -> 20, topKStrategy -> TopKStrategy.Abs.entryName, - dateAggregationStrategy -> VectorAggregationStrategy.Avg.entryName, - textAggregationStrategy -> VectorAggregationStrategy.Avg.entryName + vectorAggregationStrategy -> VectorAggregationStrategy.Avg.entryName ) } @@ -252,13 +241,13 @@ class RecordInsightsLOCO[T <: Model[T]] textActiveIndices.foreach { case (name, aggIndices) => val diffToExamine = aggregateDiffs(featureSparse, aggIndices, - getTextAggregationStrategy, baseScore, textFeaturesCount.get(name).get) + getVectorAggregationStrategy, baseScore, textFeaturesCount.get(name).get) minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) } dateActiveIndices.foreach { case (name, aggIndices) => val diffToExamine = aggregateDiffs(featureSparse, aggIndices, - getDateAggregationStrategy, baseScore, dateFeaturesCount.get(name).get) + getVectorAggregationStrategy, baseScore, dateFeaturesCount.get(name).get) minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index ec83422a2e..dcaa57e018 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -508,8 +508,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI topK: Int = 20 ): Array[Map[OpVectorColumnHistory, Insights]] = { val transformer = new RecordInsightsLOCO(model).setInput(featureVector).setTopK(topK) - .setDateAggregationStrategy(strategy) - .setTextAggregationStrategy(strategy) + .setVectorAggregationStrategy(strategy) val insights = transformer.transform(df) insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) } From de23c9c4e939b6aa6cb11d0a7e1eb9774b902e60 Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 21 Oct 2019 12:01:31 -0700 Subject: [PATCH 09/13] Refactoring --- .../impl/insights/RecordInsightsLOCO.scala | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 0ff9ce4319..1e2aafaaed 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -125,10 +125,10 @@ class RecordInsightsLOCO[T <: Model[T]] private val dateMapTypes = Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) - private lazy val textFeaturesCount: Map[String, Int] = getFeatureCount(isTextIndex) - private lazy val dateFeaturesCount: Map[String, Int] = getFeatureCount(isDateIndex) + // Map[FeatureName(Date/Text), VectorSize] + private lazy val aggFeaturesSize: Map[String, Int] = getFeaturesSize(isTextIndex) ++ getFeaturesSize(isDateIndex) - private def getFeatureCount(predicate: OpVectorColumnHistory => Boolean): Map[String, Int] = histories + private def getFeaturesSize(predicate: OpVectorColumnHistory => Boolean): Map[String, Int] = histories .filter(predicate) .groupBy { h => getRawFeatureName(h).get } .mapValues(_.length).view.toMap @@ -186,14 +186,14 @@ class RecordInsightsLOCO[T <: Model[T]] aggIndices: Array[(Int, Int)], strategy: VectorAggregationStrategy, baseScore: Array[Double], - featureCount: Int + featureSize: Int ): Array[Double] = { strategy match { case VectorAggregationStrategy.Avg => aggIndices .map { case (i, oldInd) => computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) } .foldLeft(Array.empty[Double])(sumArrays) - .map( _ / featureCount) + .map( _ / featureSize) case VectorAggregationStrategy.LeaveOutVector => val copyFeatureSparse = featureSparse.copy @@ -212,22 +212,15 @@ class RecordInsightsLOCO[T <: Model[T]] val minMaxHeap = new MinMaxHeap(k) // Map[FeatureName, (Array[SparseVectorIndices], Array[ActualIndices]) - val textActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] - val dateActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] + val aggActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] (0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) => val history = histories(oldInd) history match { - case h if isTextIndex(h) => { + case h if isTextIndex(h) || isDateIndex(h) => { for {name <- getRawFeatureName(h)} { - val indices = textActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) - textActiveIndices.update(name, indices :+ (i, oldInd)) - } - } - case h if isDateIndex(h) => { - for {name <- getRawFeatureName(h)} { - val indices = dateActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) - dateActiveIndices.update(name, indices :+ (i, oldInd)) + val indices = aggActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) + aggActiveIndices.update(name, indices :+ (i, oldInd)) } } case _ => { @@ -237,17 +230,11 @@ class RecordInsightsLOCO[T <: Model[T]] } } - // Aggregate active indices of each text feature and date feature based on their respective strategy. - textActiveIndices.foreach { - case (name, aggIndices) => - val diffToExamine = aggregateDiffs(featureSparse, aggIndices, - getVectorAggregationStrategy, baseScore, textFeaturesCount.get(name).get) - minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) - } - dateActiveIndices.foreach { + // Aggregate active indices of each text feature and date feature based on vector aggregate strategy. + aggActiveIndices.foreach { case (name, aggIndices) => val diffToExamine = aggregateDiffs(featureSparse, aggIndices, - getVectorAggregationStrategy, baseScore, dateFeaturesCount.get(name).get) + getVectorAggregationStrategy, baseScore, aggFeaturesSize.get(name).get) minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) } From 2fcb05e104a0ab3ea607e284d69bd516561f707e Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 21 Oct 2019 12:06:10 -0700 Subject: [PATCH 10/13] Fixing scala style --- .../op/stages/impl/insights/RecordInsightsLOCOTest.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index dcaa57e018..7e26e4d48c 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -298,8 +298,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } for {strategy <- VectorAggregationStrategy.values} { - it should s"aggregate values for text and textMap derived features when " + - s"strategy=$strategy" in { + it should s"aggregate values for text and textMap derived features when strategy=$strategy" in { val (df, featureVector, label) = generateTestTextData val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) val actualInsights = generateRecordInsights(model, df, featureVector, strategy) @@ -329,7 +328,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } for {strategy <- VectorAggregationStrategy.values} { - it should s"aggregate values for date, datetime, dateMap and dateTimeMap derived features when " + + it should "aggregate values for date, datetime, dateMap and dateTimeMap derived features when " + s"strategy=$strategy" in { val (df, featureVector, label) = generateTestDateData val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) From 75934d1cf3e6da6149040207b7b3ba305c3018a5 Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 28 Oct 2019 17:21:29 -0700 Subject: [PATCH 11/13] Refactoring --- .../insights/RecordInsightsLOCOTest.scala | 93 +++++++++---------- 1 file changed, 46 insertions(+), 47 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 830d3d37ff..c6178bb584 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -292,57 +292,56 @@ class RecordInsightsLOCOTest extends FunSpec with TestSparkContext with RecordIn } } - describe("with text data ") { - for {strategy <- VectorAggregationStrategy.values} { - it (s"aggregate values for text and textMap derived features when strategy=$strategy") { - val (df, featureVector, label) = generateTestTextData - val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) - val actualInsights = generateRecordInsights(model, df, featureVector, strategy) - - withClue("TextArea can have two null indicator values") { - actualInsights.map(p => assert(p.size == 7 || p.size == 8)) - } - withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + - "aggregation required for LOCO on this field.") { - actualInsights.foreach { p => - assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) - && r.indicatorValue.isDefined)) - } - } - assertLOCOSum(actualInsights) - assertAggregatedText(textFeatureName, strategy, model, df, featureVector, label, actualInsights) - assertAggregatedText(textAreaFeatureName, strategy, model, df, featureVector, label, actualInsights) - assertAggregatedTextMap(textMapFeatureName, "k0", strategy, model, df, featureVector, label, - actualInsights) - assertAggregatedTextMap(textMapFeatureName, "k1", strategy, model, df, featureVector, label, - actualInsights) - assertAggregatedTextMap(textAreaMapFeatureName, "k0", strategy, model, df, featureVector, label, - actualInsights) - assertAggregatedTextMap(textAreaMapFeatureName, "k1", strategy, model, df, featureVector, label, - actualInsights) + for {strategy <- VectorAggregationStrategy.values} { + it (s"aggregate values for text and textMap derived features when strategy=$strategy") { + val (df, featureVector, label) = generateTestTextData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualInsights = generateRecordInsights(model, df, featureVector, strategy) + + withClue("TextArea can have two null indicator values") { + actualInsights.map(p => assert(p.size == 7 || p.size == 8)) + } + withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + + "aggregation required for LOCO on this field.") { + actualInsights.foreach { p => + assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) + && r.indicatorValue.isDefined)) + } } + + assertLOCOSum(actualInsights) + assertAggregatedText(textFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedText(textAreaFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedTextMap(textMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textAreaMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textAreaMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) } } - describe("with date data ") { - for {strategy <- VectorAggregationStrategy.values} { - it (s"aggregate values for date, datetime, dateMap and dateTimeMap derived features when strategy=$strategy") { - val (df, featureVector, label) = generateTestDateData - val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) - val actualInsights = generateRecordInsights(model, df, featureVector, strategy, topK = 40) - - assertLOCOSum(actualInsights) - assertAggregatedDate(dateFeatureName, strategy, model, df, featureVector, label, actualInsights) - assertAggregatedDate(dateTimeFeatureName, strategy, model, df, featureVector, label, actualInsights) - assertAggregatedDateMap(dateMapFeatureName, "k0", strategy, model, df, featureVector, label, - actualInsights) - assertAggregatedDateMap(dateMapFeatureName, "k1", strategy, model, df, featureVector, label, - actualInsights) - assertAggregatedDateMap(dateTimeMapFeatureName, "k0", strategy, model, df, featureVector, label, - actualInsights) - assertAggregatedDateMap(dateTimeMapFeatureName, "k1", strategy, model, df, featureVector, label, - actualInsights) - } + + + for {strategy <- VectorAggregationStrategy.values} { + it (s"aggregate values for date, datetime, dateMap and dateTimeMap derived features when strategy=$strategy") { + val (df, featureVector, label) = generateTestDateData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualInsights = generateRecordInsights(model, df, featureVector, strategy, topK = 40) + + assertLOCOSum(actualInsights) + assertAggregatedDate(dateFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedDate(dateTimeFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedDateMap(dateMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateTimeMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateTimeMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) } } } From 3ebda0f33c382b0ad11d31fa70c544a298ebe314 Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 4 Nov 2019 13:24:32 -0800 Subject: [PATCH 12/13] Addressed Mathew's and Gera's PR review comments --- .../impl/insights/RecordInsightsLOCO.scala | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 1e2aafaaed..7a10fc53fc 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -66,7 +66,7 @@ trait RecordInsightsLOCOParams extends Params { final val vectorAggregationStrategy = new Param[String](parent = this, name = "vectorAggregationStrategy", doc = "Aggregate text/date vector by " + - "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" + + "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or " + "2. Avg strategy - calculate the loco for each column of the vector and then average all the locos." ) def setVectorAggregationStrategy(strategy: VectorAggregationStrategy): this.type = @@ -116,30 +116,33 @@ class RecordInsightsLOCO[T <: Model[T]] /** * These are the name of the types we want to perform an aggregation of the LOCO results over derived features */ - private val textTypes = - Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], FeatureType.typeName[TextList]) - private val textMapTypes = - Set(FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap]) - private val dateTypes = - Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime]) - private val dateMapTypes = - Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) - - // Map[FeatureName(Date/Text), VectorSize] - private lazy val aggFeaturesSize: Map[String, Int] = getFeaturesSize(isTextIndex) ++ getFeaturesSize(isDateIndex) - - private def getFeaturesSize(predicate: OpVectorColumnHistory => Boolean): Map[String, Int] = histories - .filter(predicate) + private val textTypes = Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], + FeatureType.typeName[TextList], FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap]) + private val dateTypes = Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime], + FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) + + // Map of RawFeatureName to the size of its derived features that needs to be aggregated + // for the above textTypes and dateTypes. + private lazy val aggFeaturesSize: Map[String, Int] = histories + .filter(h => isTextFeature(h) || isDateFeature(h)) .groupBy { h => getRawFeatureName(h).get } - .mapValues(_.length).view.toMap + .mapValues(_.length) - private def isTextIndex(h: OpVectorColumnHistory): Boolean = { - h.parentFeatureType.exists((textTypes ++ textMapTypes).contains) && + /** + * Return whether this feature derived from hashed Text(Map)Vectorizer + * @return Boolean + */ + private def isTextFeature(h: OpVectorColumnHistory): Boolean = { + h.parentFeatureType.exists(textTypes.contains) && h.indicatorValue.isEmpty && h.descriptorValue.isEmpty } - private def isDateIndex(h: OpVectorColumnHistory): Boolean = { - h.parentFeatureType.exists((dateTypes ++ dateMapTypes).contains) && h.descriptorValue.isDefined + /** + * Return whether this feature derived from unit circle Date(Map)Vectorizer + * @return Boolean + */ + private def isDateFeature(h: OpVectorColumnHistory): Boolean = { + h.parentFeatureType.exists(dateTypes.contains) && h.descriptorValue.isDefined } private def computeDiff @@ -172,7 +175,7 @@ class RecordInsightsLOCO[T <: Model[T]] // TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the // feature history after multiple transformations has been fixed name.map { n => - val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) { + val timePeriodName = if (dateTypes.exists(history.parentFeatureType.contains)) { history.descriptorValue .flatMap(convertToTimePeriod) .map(p => "_" + p.entryName) @@ -217,7 +220,7 @@ class RecordInsightsLOCO[T <: Model[T]] (0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) => val history = histories(oldInd) history match { - case h if isTextIndex(h) || isDateIndex(h) => { + case h if isTextFeature(h) || isDateFeature(h) => { for {name <- getRawFeatureName(h)} { val indices = aggActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) aggActiveIndices.update(name, indices :+ (i, oldInd)) From 9776e790ff6176169140f7fef44abef61091704b Mon Sep 17 00:00:00 2001 From: "sanmitra.ijeri" Date: Mon, 4 Nov 2019 13:27:08 -0800 Subject: [PATCH 13/13] Refactoring --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 7a10fc53fc..76241757c7 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -133,8 +133,7 @@ class RecordInsightsLOCO[T <: Model[T]] * @return Boolean */ private def isTextFeature(h: OpVectorColumnHistory): Boolean = { - h.parentFeatureType.exists(textTypes.contains) && - h.indicatorValue.isEmpty && h.descriptorValue.isEmpty + h.parentFeatureType.exists(textTypes.contains) && h.indicatorValue.isEmpty && h.descriptorValue.isEmpty } /**