diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index da8ef7fda8..19068f8345 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,12 +1,16 @@ name: CI -on: [push, pull_request] +on: + push: + branches: + - master + pull_request: + branches: + - master jobs: build: - runs-on: ubuntu-latest - steps: - uses: actions/checkout@v1 - name: Set up JDK 1.8 @@ -16,4 +20,4 @@ jobs: - name: Build run: ./gradlew scalaStyle reportScoverage - name: Build Helloworld - run: cd helloworld && ./gradlew scalaStyle test + run: cd helloworld && ./gradlew scalaStyle test diff --git a/ROADMAP.md b/ROADMAP.md index 3e1913519a..0f5c88e54e 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,4 +1,4 @@ -# The TransmogrifAI Roadmap +# TransmogrifAI Roadmap ## Short Term diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index 78f58e7be9..d1ebb22ddf 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -335,7 +335,6 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { val (fittedStages, newResultFeatures) = if (stages.exists(_.isInstanceOf[Estimator[_]])) { val rawData = generateRawData() - // Update features with fitted stages val fittedStgs = fitStages(data = rawData, stagesToFit = stages, persistEveryKStages) val newResultFtrs = resultFeatures.map(_.copyWithNewStages(fittedStgs)) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 7bcf02f969..76241757c7 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -64,9 +64,21 @@ trait RecordInsightsLOCOParams extends Params { def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy)) + final val vectorAggregationStrategy = new Param[String](parent = this, name = "vectorAggregationStrategy", + doc = "Aggregate text/date vector by " + + "1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or " + + "2. Avg strategy - calculate the loco for each column of the vector and then average all the locos." + ) + def setVectorAggregationStrategy(strategy: VectorAggregationStrategy): this.type = + set(vectorAggregationStrategy, strategy.entryName) + def getVectorAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName( + $(vectorAggregationStrategy)) + + setDefault( topK -> 20, - topKStrategy -> TopKStrategy.Abs.entryName + topKStrategy -> TopKStrategy.Abs.entryName, + vectorAggregationStrategy -> VectorAggregationStrategy.Avg.entryName ) } @@ -104,30 +116,33 @@ class RecordInsightsLOCO[T <: Model[T]] /** * These are the name of the types we want to perform an aggregation of the LOCO results over derived features */ - private val textTypes = - Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], FeatureType.typeName[TextList]) - private val textMapTypes = - Set(FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap]) - private val dateTypes = - Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime]) - private val dateMapTypes = - Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) - - // Indices of features derived from hashed Text(Map)Vectorizer - private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes, - h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty) - - // Indices of features derived from unit Date(Map)Vectorizer - private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined) + private val textTypes = Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], + FeatureType.typeName[TextList], FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap]) + private val dateTypes = Set(FeatureType.typeName[Date], FeatureType.typeName[DateTime], + FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) + + // Map of RawFeatureName to the size of its derived features that needs to be aggregated + // for the above textTypes and dateTypes. + private lazy val aggFeaturesSize: Map[String, Int] = histories + .filter(h => isTextFeature(h) || isDateFeature(h)) + .groupBy { h => getRawFeatureName(h).get } + .mapValues(_.length) + + /** + * Return whether this feature derived from hashed Text(Map)Vectorizer + * @return Boolean + */ + private def isTextFeature(h: OpVectorColumnHistory): Boolean = { + h.parentFeatureType.exists(textTypes.contains) && h.indicatorValue.isEmpty && h.descriptorValue.isEmpty + } /** - * Return the indices of features derived from given types. - * @return Seq[Int] + * Return whether this feature derived from unit circle Date(Map)Vectorizer + * @return Boolean */ - private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] = - histories.collect { - case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index - }.distinct.sorted + private def isDateFeature(h: OpVectorColumnHistory): Boolean = { + h.parentFeatureType.exists(dateTypes.contains) && h.descriptorValue.isDefined + } private def computeDiff ( @@ -159,7 +174,7 @@ class RecordInsightsLOCO[T <: Model[T]] // TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the // feature history after multiple transformations has been fixed name.map { n => - val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) { + val timePeriodName = if (dateTypes.exists(history.parentFeatureType.contains)) { history.descriptorValue .flatMap(convertToTimePeriod) .map(p => "_" + p.entryName) @@ -168,82 +183,72 @@ class RecordInsightsLOCO[T <: Model[T]] } } + private def aggregateDiffs( + featureSparse: SparseVector, + aggIndices: Array[(Int, Int)], + strategy: VectorAggregationStrategy, + baseScore: Array[Double], + featureSize: Int + ): Array[Double] = { + strategy match { + case VectorAggregationStrategy.Avg => + aggIndices + .map { case (i, oldInd) => computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) } + .foldLeft(Array.empty[Double])(sumArrays) + .map( _ / featureSize) + + case VectorAggregationStrategy.LeaveOutVector => + val copyFeatureSparse = featureSparse.copy + aggIndices.foreach {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)} + computeDiff(copyFeatureSparse, baseScore) + } + } + private def returnTopPosNeg ( featureSparse: SparseVector, - zeroCountByFeature: Map[String, Int], - featureSize: Int, baseScore: Array[Double], k: Int, indexToExamine: Int ): Seq[LOCOValue] = { val minMaxHeap = new MinMaxHeap(k) - val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])] - - agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap, - baseScore) - - // Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features - // Adding LOCO results from aggregation map into heaps - for {(name, (indices, ar)) <- aggregationMap} { - // The index here is arbitrary - val (i, n) = (indices.head, indices.length) - val zeroCounts = zeroCountByFeature.get(name).getOrElse(0) - val diffToExamine = ar.map(_ / (n + zeroCounts)) - minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) - } - minMaxHeap.dequeueAll - } + // Map[FeatureName, (Array[SparseVectorIndices], Array[ActualIndices]) + val aggActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]] - private def agggregateDiffs( - featureVec: SparseVector, - indexToExamine: Int, - minMaxHeap: MinMaxHeap, - aggregationMap: mutable.Map[String, (Array[Int], Array[Double])], - baseScore: Array[Double] - ): Unit = { - computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) => + (0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) => val history = histories(oldInd) history match { - // If indicator value and descriptor value of a derived text feature are empty, then it is - // a hashing tf output. We aggregate such features for each (rawFeatureName). - case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) => + case h if isTextFeature(h) || isDateFeature(h) => { for {name <- getRawFeatureName(h)} { - val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double])) - aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine))) + val indices = aggActiveIndices.getOrElse(name, (Array.empty[(Int, Int)])) + aggActiveIndices.update(name, indices :+ (i, oldInd)) } - case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) + } + case _ => { + val diffToExamine = computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) + minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) + } } } - } - private def computeDiffs( - featureVec: SparseVector, - baseScore: Array[Double] - ) = { - (0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) => - (i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore)) + // Aggregate active indices of each text feature and date feature based on vector aggregate strategy. + aggActiveIndices.foreach { + case (name, aggIndices) => + val diffToExamine = aggregateDiffs(featureSparse, aggIndices, + getVectorAggregationStrategy, baseScore, aggFeaturesSize.get(name).get) + minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine) } + + minMaxHeap.dequeueAll } override def transformFn: OPVector => TextMap = features => { val baseResult = modelApply(labelDummy, features) val baseScore = baseResult.score - val featureSize = features.value.size // TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse - val featureIndexSet = featuresSparse.indices.toSet - - // Besides non 0 values, we want to check the text/date features as well - val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices) - .filterNot(featureIndexSet.contains) - - // Count zeros by feature name - val zeroCountByFeature = zeroValIndices - .groupBy(i => getRawFeatureName(histories(i)).get) - .mapValues(_.length).view.toMap val k = $(topK) // Index where to examine the difference in the prediction vector @@ -254,14 +259,14 @@ class RecordInsightsLOCO[T <: Model[T]] // For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability) case n if n > 2 => baseResult.prediction.toInt } - val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine) + val topPosNeg = returnTopPosNeg(featuresSparse, baseScore, k, indexToExamine) val top = getTopKStrategy match { case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k) // Take top K positive and top K negative LOCOs, hence 2 * K case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k) } - val allIndices = featuresSparse.indices ++ zeroValIndices + val allIndices = featuresSparse.indices top.map { case LOCOValue(i, _, diffs) => RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs) }.toMap.toTextMap @@ -329,3 +334,14 @@ object TopKStrategy extends Enum[TopKStrategy] { case object Abs extends TopKStrategy("abs") case object PositiveNegative extends TopKStrategy("positive and negative") } + + +sealed abstract class VectorAggregationStrategy(val name: String) extends EnumEntry with Serializable + +object VectorAggregationStrategy extends Enum[VectorAggregationStrategy] { + val values = findValues + case object LeaveOutVector extends + VectorAggregationStrategy("calculate the loco by leaving out the entire vector") + case object Avg extends + VectorAggregationStrategy("calculate the loco for each column of the vector and then average all the locos") +} diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala index 5970b4cc70..ff59746623 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala @@ -353,22 +353,6 @@ trait DataBalancerParams extends Params { private[op] def getUpSampleFraction: Double = $(upSampleFraction) - /** - * Fraction to sample majority data - * Value should be in ]0.0, 1.0] - * - * @group param - */ - private[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction", - "fraction to sample majority data", ParamValidators.inRange( - lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true - ) - ) - - private[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value) - - private[op] def getDownSampleFraction: Double = $(downSampleFraction) - /** * Whether or not positive data is in minority * Value should be in true or false diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala index 5a84fedd53..5218255625 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala @@ -34,7 +34,7 @@ import com.salesforce.op.UID import com.salesforce.op.stages.impl.selector.ModelSelectorNames import org.apache.spark.ml.attribute.{MetadataHelper, NominalAttribute} import org.apache.spark.ml.param._ -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{Metadata, MetadataBuilder} @@ -56,10 +56,12 @@ case object DataCutter { seed: Long = SplitterParamsDefault.seedDefault, reserveTestFraction: Double = SplitterParamsDefault.ReserveTestFractionDefault, maxLabelCategories: Int = SplitterParamsDefault.MaxLabelCategoriesDefault, - minLabelFraction: Double = SplitterParamsDefault.MinLabelFractionDefault + minLabelFraction: Double = SplitterParamsDefault.MinLabelFractionDefault, + maxTrainingSample: Int = SplitterParamsDefault.MaxTrainingSampleDefault ): DataCutter = { new DataCutter() .setSeed(seed) + .setMaxTrainingSample(maxTrainingSample) .setReserveTestFraction(reserveTestFraction) .setMaxLabelCategories(maxLabelCategories) .setMinLabelFraction(minLabelFraction) @@ -83,6 +85,7 @@ class DataCutter(uid: String = UID[DataCutter]) extends Splitter(uid = uid) with * @return Parameters set in examining data */ override def preValidationPrepare(data: DataFrame): PrevalidationVal = { + val labelColName = if (isSet(labelColumnName)) { getLabelColumnName } else { @@ -123,7 +126,15 @@ class DataCutter(uid: String = UID[DataCutter]) extends Splitter(uid = uid) with .filter(r => labelSet.contains(r.getDouble(labelColIdx))) .withColumn(labelColName, data(labelColName).as(labelColName, metadataNA.toMetadata)) + // calculate the down sample fraction + val dataSetSize = data.count() + val sampleF = getMaxTrainingSample / dataSetSize.toDouble + val downSampleFraction = math.min(sampleF, SplitterParamsDefault.DownSampleFractionDefault) + setDownSampleFraction(downSampleFraction) + summary = Option(DataCutterSummary( + preSplitterDataCount = dataSetSize, + downSamplingFraction = getDownSampleFraction, labelsKept = getLabelsToKeep, labelsDropped = getLabelsToDrop, labelsDroppedTotal = getLabelsDroppedTotal @@ -131,6 +142,25 @@ class DataCutter(uid: String = UID[DataCutter]) extends Splitter(uid = uid) with PrevalidationVal(summary, Option(dataPrep)) } + /** + * Rebalance the training data within the validation step + * + * @param data to prepare for model training. first column must be the label as a double + * @return balanced training set and a test set + */ + override def validationPrepare(data: Dataset[Row]): Dataset[Row] = { + + val dataPrep = super.validationPrepare(data) + + // check if down sampling is needed + val balanced: DataFrame = if (getDownSampleFraction < 1.0) { + dataPrep.sample( false, getDownSampleFraction, getSeed) + } else { + dataPrep + } + balanced.persist() + } + def getLabelsFromMetadata(data: DataFrame): Array[String] = { val labelSF = data.schema.head @@ -203,7 +233,11 @@ class DataCutter(uid: String = UID[DataCutter]) extends Splitter(uid = uid) with s" minLabelFraction = $minLabelFract, maxLabelCategories = $maxLabels. \n" + s"Label counts were: ${labelCounts.collect().toSeq}") } - DataCutterSummary(labelsKept.toSeq, labelsDropped.toSeq, labelsDroppedTotal.toLong) + DataCutterSummary( + labelsKept = labelsKept.toSeq, + labelsDropped = labelsDropped.toSeq, + labelsDroppedTotal = labelsDroppedTotal.toLong + ) } override def copy(extra: ParamMap): DataCutter = { @@ -273,6 +307,8 @@ private[impl] trait DataCutterParams extends SplitterParams { */ case class DataCutterSummary ( + preSplitterDataCount: Long = 0L, + downSamplingFraction: Double = SplitterParamsDefault.DownSampleFractionDefault, labelsKept: Seq[Double], labelsDropped: Seq[Double], labelsDroppedTotal: Long @@ -288,6 +324,8 @@ case class DataCutterSummary def toMetadata(skipUnsupported: Boolean): Metadata = { new MetadataBuilder() .putString(SplitterSummary.ClassName, this.getClass.getName) + .putLong(ModelSelectorNames.PreSplitterDataCount, preSplitterDataCount) + .putDouble(ModelSelectorNames.DownSample, downSamplingFraction) .putDoubleArray(ModelSelectorNames.LabelsKept, labelsKept.toArray) .putDoubleArray(ModelSelectorNames.LabelsDropped, labelsDropped.toArray) .putLong(ModelSelectorNames.LabelsDroppedTotal, labelsDroppedTotal) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index cbcfc3e9c0..87f7449e76 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -62,7 +62,7 @@ case object DataSplitter { * * @param uid */ -class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) with DataSplitterParams { +class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) with SplitterParams { /** * Function to set the down sampling fraction and parameters before passing into the validation step diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala index 900fb60a36..3e004c2b99 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala @@ -126,7 +126,7 @@ private[op] class OpCrossValidation[M <: Model[_], E <: Estimator[_]] label = label, features = features, splitter = splitter ) ).getOrElse{ - splitter.map(s => (s.validationPrepare(training), s.validationPrepare(validation))) + splitter.map(s => (s.validationPrepare(training), validation)) .getOrElse((training, validation)) } getSummary(modelInfo = modelInfo, label = label, features = features, train = newTrain, test = newTest) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala index 338056fcdf..577574cf09 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala @@ -80,7 +80,7 @@ private[op] class OpTrainValidationSplit[M <: Model[_], E <: Estimator[_]] features = features, splitter = splitter )).getOrElse { - splitter.map(s => (s.validationPrepare(trainingDataset), s.validationPrepare(validationDataset))) + splitter.map(s => (s.validationPrepare(trainingDataset), validationDataset)) .getOrElse((trainingDataset, validationDataset)) } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala index ec83367f46..18aecd4735 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala @@ -129,6 +129,23 @@ trait SplitterParams extends Params { def setReserveTestFraction(value: Double): this.type = set(reserveTestFraction, value) def getReserveTestFraction: Double = $(reserveTestFraction) + /** + * Fraction to sample majority data + * Value should be in (0.0, 1.0] + * + * @group param + */ + private[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction", + "fraction to sample majority data", ParamValidators.inRange( + lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true + ) + ) + setDefault(downSampleFraction, SplitterParamsDefault.DownSampleFractionDefault) + + private[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value) + + private[op] def getDownSampleFraction: Double = $(downSampleFraction) + /** * Maximum size of dataset want to train on. * Value should be > 0. @@ -183,6 +200,8 @@ private[op] object SplitterSummary { downSamplingFraction = metadata.getDouble(ModelSelectorNames.DownSample) ) case s if s == classOf[DataCutterSummary].getName => DataCutterSummary( + preSplitterDataCount = metadata.getLong(ModelSelectorNames.PreSplitterDataCount), + downSamplingFraction = metadata.getDouble(ModelSelectorNames.DownSample), labelsKept = metadata.getDoubleArray(ModelSelectorNames.LabelsKept), labelsDropped = metadata.getDoubleArray(ModelSelectorNames.LabelsDropped), labelsDroppedTotal = metadata.getLong(ModelSelectorNames.LabelsDroppedTotal) diff --git a/core/src/main/scala/com/salesforce/op/utils/stages/FitStagesUtil.scala b/core/src/main/scala/com/salesforce/op/utils/stages/FitStagesUtil.scala index 968efdd161..4a157a6af9 100644 --- a/core/src/main/scala/com/salesforce/op/utils/stages/FitStagesUtil.scala +++ b/core/src/main/scala/com/salesforce/op/utils/stages/FitStagesUtil.scala @@ -220,7 +220,6 @@ private[op] case object FitStagesUtil { fittedTransformers: Seq[OPStage] = Seq.empty )(implicit spark: SparkSession): FittedDAG = { val alreadyFitted: ListBuffer[OPStage] = ListBuffer(fittedTransformers: _*) - val (newTrain, newTest) = dag.foldLeft(train -> test) { case ((currTrain, currTest), stagesLayer) => val index = stagesLayer.head._2 diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala index 55c2a0ad17..a01a1c0c57 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala @@ -100,6 +100,8 @@ class MultiClassificationModelSelectorTest extends FlatSpec with TestSparkContex .drop("txtLabel") .select("label", "features") + val datacount = data.count().toDouble + val stageNames = Array("label_prediction", "label_rawPrediction", "label_probability") val (label, Array(features: Feature[OPVector]@unchecked)) = FeatureBuilder.fromDataFrame[RealNN]( @@ -135,6 +137,39 @@ class MultiClassificationModelSelectorTest extends FlatSpec with TestSparkContex modelSelector.models.exists(_._1.getClass.getSimpleName == MTT.OpXGBoostClassifier.entryName) shouldBe true } + it should "set the data splitting params correctly" in { + val modelSelector = MultiClassificationModelSelector() + modelSelector.splitter.get.setReserveTestFraction(0.1).setSeed(11L).setMaxTrainingSample(1000) + + modelSelector.splitter.get.getSeed shouldBe 11L + modelSelector.splitter.get.getReserveTestFraction shouldBe 0.1 + modelSelector.splitter.get.getMaxTrainingSample shouldBe 1000 + } + + it should "down-sample when the training set is greater than the maxTrainingSample" in { + + implicit val vectorEncoder: org.apache.spark.sql.Encoder[Vector] = ExpressionEncoder() + implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder) + val dataCount = data.count().toDouble + val maxTrainingSample = (dataCount / 1.2).toInt + val sampleF = maxTrainingSample / dataCount + val downSampleFraction = math.min(sampleF, 1.0) + val dataCutter = DataCutter(seed = 42, maxTrainingSample = maxTrainingSample) + val modelSelector = + MultiClassificationModelSelector + .withTrainValidationSplit( + modelTypesToUse = Seq(MTT.OpLogisticRegression), + splitter = Option(dataCutter), + seed = 10L + ) + val model = modelSelector.setInput(label, features).fit(data) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + val modelDownSampleFraction = metaData.dataPrepParameters("downSampleFraction" ) + + modelDownSampleFraction shouldBe downSampleFraction + } + + it should "split into training and test" in { implicit val vectorEncoder: org.apache.spark.sql.Encoder[Vector] = ExpressionEncoder() implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 2d72dd3166..3ac59bab28 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -44,8 +44,7 @@ import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomT import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata} import com.salesforce.op.{FeatureHistory, OpWorkflow, _} -import org.apache.spark.ml.PredictionModel -import org.apache.spark.ml.classification.LogisticRegressionModel +import org.apache.spark.ml.Model import org.apache.spark.ml.linalg._ import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -54,7 +53,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Encoder, Row} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, FunSpec, Suite} +import org.scalatest.{FunSpec, Suite} @RunWith(classOf[JUnitRunner]) @@ -293,101 +292,60 @@ class RecordInsightsLOCOTest extends FunSpec with TestSparkContext with RecordIn } } - it("should aggregate values for text and textMap derived features") { - val testData = generateTestTextData - withClue("TextArea can have two null indicator values") { - testData.actualRecordInsights.map(p => assert(p.size == 7 || p.size == 8)) - } - withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + - "aggregation required for LOCO on this field.") { - testData.actualRecordInsights.foreach { p => - assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)) - } - } + for {strategy <- VectorAggregationStrategy.values} { + it (s"aggregate values for text and textMap derived features when strategy=$strategy") { + val (df, featureVector, label) = generateTestTextData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualInsights = generateRecordInsights(model, df, featureVector, strategy) - assertLOCOSum(testData.actualRecordInsights) - assertAggregatedText(textFeatureName) - assertAggregatedTextMap(textMapFeatureName, "k0") - assertAggregatedTextMap(textMapFeatureName, "k1") - assertAggregatedText(textAreaFeatureName) - assertAggregatedTextMap(textAreaMapFeatureName, "k0") - assertAggregatedTextMap(textAreaMapFeatureName, "k1") - - - /** - * Compare the aggregation made by RecordInsightsLOCO on a text field to one made manually - * - * @param textFeatureName Text Field Name - */ - def assertAggregatedText(textFeatureName: String): Unit = { - withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && - history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, testData) + withClue("TextArea can have two null indicator values") { + actualInsights.map(p => assert(p.size == 7 || p.size == 8)) } - } - - /** - * Compare the aggregation made by RecordInsightsLOCO to one made manually - * - * @param textMapFeatureName Text Map Field Name - */ - def assertAggregatedTextMap(textMapFeatureName: String, keyName: String): Unit = { - withClue(s"Aggregate all the derived hashing tf of rawMapFeature - $textMapFeatureName for key - $keyName") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textMapFeatureName) && - history.grouping == Option(keyName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty - assertAggregatedWithPredicate(predicate, testData) + withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + + "aggregation required for LOCO on this field.") { + actualInsights.foreach { p => + assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) + && r.indicatorValue.isDefined)) + } } + + assertLOCOSum(actualInsights) + assertAggregatedText(textFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedText(textAreaFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedTextMap(textMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textAreaMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedTextMap(textAreaMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) } } - it("should aggregate values for date, datetime, dateMap and dateTimeMap derived features") { - val testData = generateTestDateData - - assertLOCOSum(testData.actualRecordInsights) - assertAggregatedDate(dateFeatureName) - assertAggregatedDate(dateTimeFeatureName) - assertAggregatedDateMap(dateMapFeatureName, "k0") - assertAggregatedDateMap(dateMapFeatureName, "k1") - assertAggregatedDateMap(dateTimeMapFeatureName, "k0") - assertAggregatedDateMap(dateTimeMapFeatureName, "k1") - - /** - * Compare the aggregation made by RecordInsightsLOCO on a Date/DateTime field to one made manually - * - * @param dateFeatureName Date/DateTime Field - */ - def assertAggregatedDate(dateFeatureName: String): Unit = { - for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { - withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawFeature - $dateFeatureName.") { - val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateFeatureName) && - history.descriptorValue.isDefined && - history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, testData) - } - } - } - /** - * Compare the aggregation made by RecordInsightsLOCO on a DateMap/DateTimeMap field to one made manually - * - * @param dateMapFeatureName DateMap/DateTimeMap Field - */ - def assertAggregatedDateMap(dateMapFeatureName: String, keyName: String): Unit = { - for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { - withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawMapFeature - $dateMapFeatureName " + - s"with key as $keyName.") { - val predicate = (history: OpVectorColumnHistory) => - history.parentFeatureOrigins == Seq(dateMapFeatureName) && - history.grouping == Option(keyName) && history.descriptorValue.isDefined && - history.descriptorValue.get.split("_").last == timePeriod.entryName - assertAggregatedWithPredicate(predicate, testData) - } - } + for {strategy <- VectorAggregationStrategy.values} { + it (s"aggregate values for date, datetime, dateMap and dateTimeMap derived features when strategy=$strategy") { + val (df, featureVector, label) = generateTestDateData + val model = new OpLogisticRegression().setInput(label, featureVector).fit(df) + val actualInsights = generateRecordInsights(model, df, featureVector, strategy, topK = 40) + + assertLOCOSum(actualInsights) + assertAggregatedDate(dateFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedDate(dateTimeFeatureName, strategy, model, df, featureVector, label, actualInsights) + assertAggregatedDateMap(dateMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateTimeMapFeatureName, "k0", strategy, model, df, featureVector, label, + actualInsights) + assertAggregatedDateMap(dateTimeMapFeatureName, "k1", strategy, model, df, featureVector, label, + actualInsights) } } } + private def addMetaData(df: DataFrame, fieldName: String, size: Int): DataFrame = { val columns = (0 until size).map(_.toString).map(i => new OpVectorColumnMetadata(Seq(i), Seq(i), Some(i), Some(i))) val hist = (0 until size).map(_.toString).map(i => i -> FeatureHistory(Seq(s"a_$i"), Seq(s"b_$i"))).toMap @@ -405,6 +363,93 @@ class RecordInsightsLOCOTest extends FunSpec with TestSparkContext with RecordIn } } + /** + * Compare the aggregation made by RecordInsightsLOCO on a text field to one made manually + * + * @param textFeatureName Text Field Name + */ + def assertAggregatedText(textFeatureName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + withClue(s"Aggregate all the derived hashing tf features of rawFeature - $textFeatureName.") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textFeatureName) && + history.indicatorValue.isEmpty && history.descriptorValue.isEmpty + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) + } + } + + /** + * Compare the aggregation made by RecordInsightsLOCO to one made manually + * + * @param textMapFeatureName Text Map Field Name + */ + def assertAggregatedTextMap(textMapFeatureName: String, keyName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + withClue(s"Aggregate all the derived hashing tf of rawMapFeature - $textMapFeatureName for key - $keyName") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(textMapFeatureName) && + history.grouping == Option(keyName) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) + } + } + + /** + * Compare the aggregation made by RecordInsightsLOCO on a Date/DateTime field to one made manually + * + * @param dateFeatureName Date/DateTime Field + */ + def assertAggregatedDate(dateFeatureName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { + withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawFeature - $dateFeatureName.") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateFeatureName) && + history.descriptorValue.isDefined && + history.descriptorValue.get.split("_").last == timePeriod.entryName + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) + } + } + } + + /** + * Compare the aggregation made by RecordInsightsLOCO on a DateMap/DateTimeMap field to one made manually + * + * @param dateMapFeatureName DateMap/DateTimeMap Field + */ + def assertAggregatedDateMap(dateMapFeatureName: String, keyName: String, + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualInsights: Array[Map[OpVectorColumnHistory, Insights]] + ): Unit = { + for {timePeriod <- TransmogrifierDefaults.CircularDateRepresentations} { + withClue(s"Aggregate x_$timePeriod and y_$timePeriod of rawMapFeature - $dateMapFeatureName " + + s"with key as $keyName.") { + val predicate = (history: OpVectorColumnHistory) => history.parentFeatureOrigins == Seq(dateMapFeatureName) && + history.grouping == Option(keyName) && history.descriptorValue.isDefined && + history.descriptorValue.get.split("_").last == timePeriod.entryName + assertAggregatedWithPredicate(predicate, strategy, model, df, featureVector, label, actualInsights) + } + } + } + /** * Compare the aggregation made by RecordInsightsLOCO to one made manually * @@ -412,34 +457,46 @@ class RecordInsightsLOCOTest extends FunSpec with TestSparkContext with RecordIn */ private def assertAggregatedWithPredicate( predicate: OpVectorColumnHistory => Boolean, - testData: RecordInsightsTestData[LogisticRegressionModel] + strategy: VectorAggregationStrategy, + model: OpPredictorWrapperModel[_], + df: DataFrame, + featureVector: FeatureLike[OPVector], + label: FeatureLike[RealNN], + actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]] ): Unit = { implicit val enc: Encoder[(Array[Double], Long)] = ExpressionEncoder() implicit val enc2: Encoder[Seq[Double]] = ExpressionEncoder() - val meta = OpVectorMetadata.apply(testData.featureTransformedDF.schema(testData.featureVector.name)) + val meta = OpVectorMetadata.apply(df.schema(featureVector.name)) val indices = meta.getColumnHistory() .filter(predicate) .map(_.index) - val expectedLocos = testData.featureTransformedDF.select(testData.label, testData.featureVector).map { + val expectedLocos = df.select(label, featureVector).map { case Row(l: Double, v: Vector) => - val featureArray = v.toArray - val locos = indices.map { i => - val oldVal = v(i) - val baseScore = testData.sparkModel.transformFn(l.toRealNN, v.toOPVector).score.toSeq - featureArray.update(i, 0.0) - val newScore = testData.sparkModel.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq - featureArray.update(i, oldVal) - baseScore.zip(newScore).map { case (b, n) => b - n } + val featureArray = v.copy.toArray + val baseScore = model.transformFn(l.toRealNN, v.toOPVector).score.toSeq + strategy match { + case VectorAggregationStrategy.Avg => + val locos = indices.map { i => + val oldVal = v(i) + featureArray.update(i, 0.0) + val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq + featureArray.update(i, oldVal) + baseScore.zip(newScore).map { case (b, n) => b - n } + } + val sumLOCOs = locos.reduce((a1, a2) => a1.zip(a2).map { case (l, r) => l + r }) + sumLOCOs.map(_ / indices.length) + case VectorAggregationStrategy.LeaveOutVector => + indices.foreach { i => featureArray.update(i, 0.0) } + val newScore = model.transformFn(l.toRealNN, featureArray.toOPVector).score.toSeq + baseScore.zip(newScore).map { case (b, n) => b - n } } - val sumLOCOs = locos.reduce((a1, a2) => a1.zip(a2).map { case (l, r) => l + r }) - sumLOCOs.map(_ / indices.length) } val expected = expectedLocos.collect().toSeq.filter(_.head != 0.0) - val actual = testData.actualRecordInsights + val actual = actualRecordInsights .flatMap(_.find { case (history, _) => predicate(history) }) .map(_._2.map(_._2)).toSeq val zip = actual.zip(expected) @@ -449,8 +506,22 @@ class RecordInsightsLOCOTest extends FunSpec with TestSparkContext with RecordIn } } } + + private def generateRecordInsights[T <: Model[T]]( + model: T, + df: DataFrame, + featureVector: FeatureLike[OPVector], + strategy: VectorAggregationStrategy, + topK: Int = 20 + ): Array[Map[OpVectorColumnHistory, Insights]] = { + val transformer = new RecordInsightsLOCO(model).setInput(featureVector).setTopK(topK) + .setVectorAggregationStrategy(strategy) + val insights = transformer.transform(df) + insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) + } } + trait RecordInsightsTestDataGenerator extends TestSparkContext { self: Suite => @@ -471,7 +542,7 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val textAreaFeatureName = "textArea" val textAreaMapFeatureName = "textAreaMap" - def generateTestDateData: RecordInsightsTestData[LogisticRegressionModel] = { + def generateTestDateData: (DataFrame, FeatureLike[OPVector], FeatureLike[RealNN]) = { val refDate = TransmogrifierDefaults.ReferenceDate.minusMillis(1) val minStep = 1000000 @@ -515,19 +586,10 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { val featureVector = Seq(dateVector, datetimeVector, dateMapVector, datetimeMapVector).combine() val featureTransformedDF = new OpWorkflow().setResultFeatures(featureVector, label).transform(rawData) - // Train a model - val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(featureTransformedDF) - - // RecordInsightsLOCO - val locoTransformer = new RecordInsightsLOCO(sparkModel).setInput(featureVector).setTopK(40) - val locoInsights = locoTransformer.transform(featureTransformedDF) - val parsedInsights = locoInsights.collect(locoTransformer.getOutput()).map(i => - RecordInsightsParser.parseInsights(i)) - - RecordInsightsTestData(rawData, featureTransformedDF, featureVector, label, sparkModel, parsedInsights) + (featureTransformedDF, featureVector, label) } - def generateTestTextData: RecordInsightsTestData[LogisticRegressionModel] = { + def generateTestTextData: (DataFrame, FeatureLike[OPVector], FeatureLike[RealNN]) = { // Random Text Data val textData: Seq[Text] = RandomText.strings(5, 10).withProbabilityOfEmpty(0.3).take(numRows).toList @@ -605,29 +667,10 @@ trait RecordInsightsTestDataGenerator extends TestSparkContext { // Sanity Checker val checker = new SanityChecker().setInput(label, featureVector) - val checked = checker.fit(vectorized).transform(vectorized) + val checkedDf = checker.fit(vectorized).transform(vectorized) val checkedFeatureVector = checker.getOutput() - // RecordInsightsLOCO - val sparkModel = new OpLogisticRegression().setInput(label, checkedFeatureVector).fit(checked) - - val transformer = new RecordInsightsLOCO(sparkModel).setInput(checkedFeatureVector) - - val insights = transformer.transform(checked) - - val parsed = insights.collect(transformer.getOutput()).map(i => RecordInsightsParser.parseInsights(i)) - - RecordInsightsTestData(testData, checked, checkedFeatureVector, label, sparkModel, parsed) + (checkedDf, checkedFeatureVector, label) } } - -case class RecordInsightsTestData[M <: PredictionModel[Vector, M]] -( - rawDF: DataFrame, - featureTransformedDF: DataFrame, - featureVector: FeatureLike[OPVector], - label: FeatureLike[RealNN], - sparkModel: OpPredictorWrapperModel[M], - actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]] -) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index 54c66868ed..d7d625b9bc 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -38,19 +38,22 @@ import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => RMT} import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorSummary} import com.salesforce.op.stages.impl.tuning.{BestEstimator, DataSplitter} -import com.salesforce.op.test.TestSparkContext +import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} +import com.salesforce.op.testkit.{RandomReal, RandomVector} import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichMetadata._ import ml.dmlc.xgboost4j.scala.spark.OpXGBoostQuietLogging import org.apache.spark.SparkException import org.apache.spark.ml.linalg.{Vector, Vectors} -import org.apache.spark.ml.param.ParamPair +import org.apache.spark.ml.param.{ParamMap, ParamPair} import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.junit.runner.RunWith +import org.scalacheck.Gen import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner +import org.scalatest.prop.Checkers import scala.concurrent.TimeoutException import scala.concurrent.duration.Duration @@ -59,12 +62,13 @@ import scala.util.Random @RunWith(classOf[JUnitRunner]) class RegressionModelSelectorTest extends FlatSpec with TestSparkContext - with CompareParamGrid with OpXGBoostQuietLogging { + with CompareParamGrid with OpXGBoostQuietLogging with Checkers { val seed = 1234L val stageNames = "label_prediction" val dataCount = 200 import spark.implicits._ + val rand = new Random(seed) val rawData: Seq[(Double, Vector)] = List.range(-100, 100, 1).map(i => @@ -144,7 +148,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext val model = modelSelector.setInput(label, features).fit(data) val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) - val modelDownSampleFraction = metaData.dataPrepParameters("downSampleFraction" ) + val modelDownSampleFraction = metaData.dataPrepParameters("downSampleFraction") modelDownSampleFraction shouldBe downSampleFraction } @@ -437,7 +441,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext val fitted = modelSelector.fit(data) fitted.modelStageIn.parent.extractParamMap().toSeq - .collect{ case p: ParamPair[_] if p.param.name == "cacheNodeIds" => p.value }.head shouldBe myParam + .collect { case p: ParamPair[_] if p.param.name == "cacheNodeIds" => p.value }.head shouldBe myParam val meta = ModelSelectorSummary.fromMetadata(fitted.getMetadata().getSummaryMetadata()) meta.bestModelName shouldBe myEstimatorName @@ -448,4 +452,78 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext .map { case (score: Prediction, label: RealNN) => math.abs(score.prediction - label.v.get) }.sum assert(res <= scores.length, "prediction failed") } + + + import org.scalacheck.Prop + + val rowInteger = Gen.choose(500, 1000) + val columnInteger = Gen.choose(10, 50) + + val selector = RegressionModelSelector.withCrossValidation(modelTypesToUse = + Seq(RegressionModelsToTry.OpGeneralizedLinearRegression, + RegressionModelsToTry.OpRandomForestRegressor, + RegressionModelsToTry.OpLinearRegression), + numFolds = 5 + ) + // Property based tests where the dataset is randomly generated + ignore should "pick Linear Regression or GLM when the response a linear combination of predictors" in { + check(Prop.forAllNoShrink(rowInteger) { n => + Prop.forAllNoShrink(columnInteger) { p => + Prop.collect(n, p) { + + val vectors = RandomVector.dense(RandomReal.exponential(mean = 1.0), p).take(n).toSeq + val labels = vectors.map(_.value.toArray.sum.toRealNN) + val (data, features, label) = TestFeatureBuilder("features", "response", vectors.zip(labels)) + val response = label.copy(isResponse = true) + + val modelSelector = selector.copy(ParamMap.empty).setInput(response, features) + val model = modelSelector.fit(data) + val modelName = model.getSparkMlStage().get.toString() + modelName.contains("linReg") || modelName.contains("glm") + } + } + }) + } + + ignore should "pick RandomForest when the response a decision stump of a predictor" in { + check(Prop.forAllNoShrink(rowInteger) { n => + Prop.forAllNoShrink(columnInteger) { p => + Prop.collect(n, p) { + + val vectors = RandomVector.dense(RandomReal.exponential(mean = 1.0), p).take(n).toSeq + val labels = vectors.map(v => { + val head = v.value(0) + if (head > 1.0) 1000.0 else 500.0 + }.toRealNN) + val (data, features, label) = TestFeatureBuilder("features", "response", vectors.zip(labels)) + val response = label.copy(isResponse = true) + + + val modelSelector = selector.copy(ParamMap.empty).setInput(response, features) + val model = modelSelector.fit(data) + val modelName = model.getSparkMlStage().get.toString() + modelName.contains("rfr") + } + } + }) + } + + ignore should "pick GLM when the response a the exponential of a linear combination of the predictors" in { + check(Prop.forAllNoShrink(rowInteger) { n => + Prop.forAllNoShrink(columnInteger) { p => + Prop.collect(n, p) { + + val vectors = RandomVector.dense(RandomReal.exponential(mean = 1.0), p).take(n).toSeq + val labels = vectors.map(v => math.exp(v.value.toArray.sum).toRealNN) + val (data, features, label) = TestFeatureBuilder("features", "response", vectors.zip(labels)) + val response = label.copy(isResponse = true) + + val modelSelector = selector.copy(ParamMap.empty).setInput(response, features) + val model = modelSelector.fit(data) + val modelName = model.getSparkMlStage().get.toString() + modelName.contains("glm") + } + } + }) + } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala index bcd804585c..b6268a05cc 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala @@ -32,6 +32,8 @@ package com.salesforce.op.stages.impl.tuning import com.salesforce.op.test.TestSparkContext import com.salesforce.op.testkit.{RandomIntegral, RandomReal, RandomVector} +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.mllib.random.RandomRDDs import org.apache.spark.sql.Dataset import org.junit.runner.RunWith import org.scalatest.FlatSpec @@ -49,6 +51,7 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary } val vectors = RandomVector.sparse(RandomReal.poisson(2), 10).limit(100000) + val trainingLimitDefault = 1E6.toLong val data = labels.zip(vectors).zip(labelsBiased) val dataSize = data.size val randDF = sc.makeRDD(data.map { case ((l, v), b) => (l.toDouble.get, v.value, b.toString) }).toDF() @@ -65,6 +68,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary s.labelsKept.length shouldBe 1000 s.labelsDropped.length shouldBe 0 s shouldBe DataCutterSummary( + preSplitterDataCount = dataSize, + downSamplingFraction = 1.0, dc1.getLabelsToKeep, dc1.getLabelsToDrop, dc1.getLabelsDroppedTotal @@ -80,6 +85,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary s.labelsKept.length shouldBe 1000 s.labelsDropped.length shouldBe 0 s shouldBe DataCutterSummary( + preSplitterDataCount = dataSize, + downSamplingFraction = 1.0, dc2.getLabelsToKeep, dc2.getLabelsToDrop, dc2.getLabelsDroppedTotal @@ -87,6 +94,43 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary } } + it should "set and get all data cutter params" in { + val maxRows = dataSize / 2 + val downSampleFraction = maxRows / dataSize.toDouble + + val dataCutter = DataCutter() + .setSeed(seed) + .setReserveTestFraction(0.0) + .setMaxLabelCategories(100000) + .setMinLabelFraction(0.0) + .setMaxTrainingSample(maxRows) + .setDownSampleFraction(downSampleFraction) + + dataCutter.getSeed shouldBe seed + dataCutter.getReserveTestFraction shouldBe 0.0 + dataCutter.getMaxLabelCategories shouldBe 100000 + dataCutter.getMinLabelFraction shouldBe 0.0 + dataCutter.getMaxTrainingSample shouldBe maxRows + dataCutter.getDownSampleFraction shouldBe downSampleFraction + } + + it should "down-sample when the data count is above the default training limit" in { + val numRows = trainingLimitDefault * 2 + val dataCutter = DataCutter() + val data = + RandomRDDs.normalVectorRDD(sc, numRows, 3, seed = seed) + .map(v => (1.0, Vectors.dense(v.toArray), "A")).toDF() + + dataCutter.preValidationPrepare(data) + val dataBalanced = dataCutter.validationPrepare(data) + // validationPrepare calls the data sample method that samples the data to a target ratio but there is an epsilon + // to how precise this function is which is why we need to check around that epsilon + val samplingErrorEpsilon = (0.1 * trainingLimitDefault).toLong + + dataCutter.getDownSampleFraction shouldBe 0.5 + dataBalanced.count() shouldBe trainingLimitDefault +- samplingErrorEpsilon + } + it should "throw an error when all the data is filtered out" in { val dataCutter = DataCutter(seed = seed, minLabelFraction = 0.4) assertThrows[RuntimeException](dataCutter.preValidationPrepare(randDF)) @@ -109,6 +153,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary s.labelsDropped.length shouldBe 10 s.labelsDroppedTotal shouldBe 900 s shouldBe DataCutterSummary( + preSplitterDataCount = dataSize, + downSamplingFraction = 1.0, dc1.getLabelsToKeep, dc1.getLabelsToDrop, dc1.getLabelsDroppedTotal @@ -125,6 +171,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary s.labelsDropped.length shouldBe 10 s.labelsDroppedTotal shouldBe 997 s shouldBe DataCutterSummary( + preSplitterDataCount = dataSize, + downSamplingFraction = 1.0, dc2.getLabelsToKeep, dc2.getLabelsToDrop, dc2.getLabelsDroppedTotal @@ -144,6 +192,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary assertDataCutterSummary(s1.summaryOpt) { s => s.labelsKept.length + s.labelsDroppedTotal shouldBe distinct s shouldBe DataCutterSummary( + preSplitterDataCount = dataSize, + downSamplingFraction = 1.0, dc1.getLabelsToKeep, dc1.getLabelsToDrop, dc1.getLabelsDroppedTotal @@ -159,6 +209,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext with SplitterSummary s.labelsDroppedTotal shouldBe 997 s.labelsDropped.length shouldBe 10 s shouldBe DataCutterSummary( + preSplitterDataCount = dataSize, + downSamplingFraction = 1.0, dc2.getLabelsToKeep, dc2.getLabelsToDrop, dc2.getLabelsDroppedTotal diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/SplitterSummaryAsserts.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/SplitterSummaryAsserts.scala index e828c7ba22..6d15a1a5e0 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/SplitterSummaryAsserts.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/SplitterSummaryAsserts.scala @@ -63,6 +63,8 @@ trait SplitterSummaryAsserts { meta.getString(SplitterSummary.ClassName) shouldBe classOf[DataCutterSummary].getName meta.getDoubleArray(ModelSelectorNames.LabelsKept).foreach(_ should be >= 0.0) meta.getDoubleArray(ModelSelectorNames.LabelsDropped).foreach(_ should be >= 0.0) + meta.getDouble(ModelSelectorNames.DownSample) should be >= 0.0 + meta.getLong(ModelSelectorNames.PreSplitterDataCount) should be >= 0L meta.getLong(ModelSelectorNames.LabelsDroppedTotal) should be >= 0L assert(s) case x =>