diff --git a/core/src/main/scala/com/salesforce/op/ModelInsights.scala b/core/src/main/scala/com/salesforce/op/ModelInsights.scala index ce897cfd30..81005d37ea 100644 --- a/core/src/main/scala/com/salesforce/op/ModelInsights.scala +++ b/core/src/main/scala/com/salesforce/op/ModelInsights.scala @@ -33,33 +33,28 @@ package com.salesforce.op import com.salesforce.op.evaluators._ import com.salesforce.op.features._ import com.salesforce.op.features.types._ -import com.salesforce.op.stages.impl.ModelsToTry -import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry +import com.salesforce.op.stages._ import com.salesforce.op.stages.impl.feature.TransmogrifierDefaults import com.salesforce.op.stages.impl.preparators._ -import com.salesforce.op.stages.impl.regression.RegressionModelsToTry -import com.salesforce.op.stages.impl.selector.ModelSelectorBaseNames._ import com.salesforce.op.stages.impl.selector._ -import com.salesforce.op.stages._ -import com.salesforce.op.utils.json.JsonUtils +import com.salesforce.op.stages.impl.tuning.{DataBalancerSummary, DataCutterSummary, DataSplitterSummary} +import com.salesforce.op.utils.json.EnumEntrySerializer import com.salesforce.op.utils.spark.RichMetadata._ import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} import com.salesforce.op.utils.table.Alignment._ import com.salesforce.op.utils.table.Table -import enumeratum._ import org.apache.spark.ml.classification._ import org.apache.spark.ml.regression._ import org.apache.spark.ml.{Model, PipelineStage, Transformer} import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel -import org.apache.spark.sql.types.Metadata import org.json4s._ -import org.json4s.jackson.JsonMethods._ -import org.json4s.jackson.Serialization.{write, writePretty} +import org.json4s.jackson.Serialization +import org.json4s.jackson.Serialization._ +import com.salesforce.op.utils.json.SpecialDoubleSerializer import org.slf4j.LoggerFactory import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag -import scala.util.{Success, Failure, Try} +import scala.util.Try /** * Summary of all model insights @@ -75,111 +70,11 @@ case class ModelInsights ( label: LabelSummary, features: Seq[FeatureInsights], - selectedModelInfo: Map[String, Any], + selectedModelInfo: Option[ModelSelectorSummary], trainingParams: OpParams, stageInfo: Map[String, Any] ) { - /** - * Selected model UID - */ - def selectedModelUID: String = selectedModelInfo(BestModelUid).toString - - /** - * Selected model name - */ - def selectedModelName: String = selectedModelInfo(BestModelName).toString - - /** - * Selected model type, i.e. LogisticRegression, RandomForest etc. - */ - def selectedModelType: ModelsToTry = modelType(selectedModelName).get - - /** - * Selected model validation results computed during Cross Validation or Train Validation Split - */ - def selectedModelValidationResults: Map[String, String] = validationResults(selectedModelName) - - /** - * Train set evaluation metrics for selected model - */ - def selectedModelTrainEvalMetrics: EvaluationMetrics = evaluationMetrics(TrainingEval) - - /** - * Test set evaluation metrics (if any) for selected model - */ - def selectedModelTestEvalMetrics: Option[EvaluationMetrics] = { - selectedModelInfo.get(HoldOutEval).map(_ => evaluationMetrics(HoldOutEval)) - } - - /** - * Validation results for all models computed during Cross Validation or Train Validation Split - * - * @return validation results keyed by model name - */ - def validationResults: Map[String, Map[String, String]] = { - val res = for { - results <- getMap[String, Any](selectedModelInfo, TrainValSplitResults).recoverWith { - case e => getMap[String, Any](selectedModelInfo, CrossValResults) - } - } yield results.keys.map(k => k -> getMap[String, String](results, k).getOrElse(Map.empty)) - res match { - case Failure(e) => throw new RuntimeException(s"Failed to extract validation results", e) - case Success(ok) => ok.toMap - } - } - - /** - * Validation results for a specified model type computed during Cross Validation or Train Validation Split - * - * @return validation results keyed by model name - */ - def validationResults(mType: ModelsToTry): Map[String, Map[String, String]] = { - validationResults.filter { case (modelName, _) => modelType(modelName).toOption.contains(mType) } - } - - /** - * All validated model types - */ - def validatedModelTypes: Set[ModelsToTry] = - validationResults.keys.flatMap(modelName => modelType(modelName).toOption).toSet - - /** - * Validation type, i.e TrainValidationSplit, CrossValidation - */ - def validationType: ValidationType = { - if (getMap[String, Any](selectedModelInfo, TrainValSplitResults).isSuccess) ValidationType.TrainValidationSplit - else if (getMap[String, Any](selectedModelInfo, CrossValResults).isSuccess) ValidationType.CrossValidation - else throw new RuntimeException(s"Failed to determine validation type") - } - - /** - * Evaluation metric type, i.e. AuPR, AuROC, F1 etc. - */ - def evaluationMetricType: EnumEntry with EvalMetric = { - val knownEvalMetrics = { - (BinaryClassEvalMetrics.values ++ MultiClassEvalMetrics.values ++ RegressionEvalMetrics.values) - .map(m => m.humanFriendlyName -> m).toMap - } - val evalMetrics = validationResults.flatMap(_._2.keys).flatMap(knownEvalMetrics.get).toSet.toList - evalMetrics match { - case evalMetric :: Nil => evalMetric - case Nil => throw new RuntimeException("Unable to determine evaluation metric type: no metrics were found") - case metrics => throw new RuntimeException( - s"Unable to determine evaluation metric type since: multiple metrics were found - " + metrics.mkString(",")) - } - } - - /** - * Problem type, i.e. Binary Classification, Multi Classification or Regression - */ - def problemType: ProblemType = selectedModelTrainEvalMetrics match { - case _: BinaryClassificationMetrics => ProblemType.BinaryClassification - case _: MultiClassificationMetrics => ProblemType.MultiClassification - case _: RegressionMetrics => ProblemType.Regression - case _ => ProblemType.Unknown - } - /** * Serialize to json string * @@ -187,10 +82,11 @@ case class ModelInsights * @return json string */ def toJson(pretty: Boolean = true): String = { - implicit val formats = DefaultFormats + implicit val formats = ModelInsights.SerializationFormats if (pretty) writePretty(this) else write(this) } + /** * High level model summary in a compact print friendly format containing: * selected model info, model evaluation results and feature correlations/contributions/cramersV values. @@ -209,23 +105,34 @@ case class ModelInsights res.mkString("\n") } + private def validatedModelTypes = selectedModelInfo.map(_.validationResults.map(_.modelType).toList.distinct) + .getOrElse(List.empty) + private def evaluationMetric = selectedModelInfo.map(_.evaluationMetric.humanFriendlyName) + private def validationResults(modelType: String) = selectedModelInfo + .map(_.validationResults.filter(_.modelType == modelType).toList).getOrElse(List.empty) + private def prettyValidationResults: Seq[String] = { val evalSummary = { val vModelTypes = validatedModelTypes - "Evaluated %s model%s using %s and %s metric.".format( - vModelTypes.mkString(", "), - if (vModelTypes.size > 1) "s" else "", - validationType.humanFriendlyName, // TODO add number of folds or train/split ratio if possible - evaluationMetricType.humanFriendlyName - ) - } + for { + ev <- selectedModelInfo.map(_.validationType.humanFriendlyName) + met <- evaluationMetric + } yield { + "Evaluated %s model%s using %s and %s metric.".format( + vModelTypes.mkString(", "), + if (vModelTypes.size > 1) "s" else "", + ev, // TODO add number of folds or train/split ratio + met + ) + } + }.getOrElse("No model selector found") val modelEvalRes = for { modelType <- validatedModelTypes modelValidationResults = validationResults(modelType) - evalMetric = evaluationMetricType.humanFriendlyName + evalMetric <- evaluationMetric } yield { - val evalMetricValues = modelValidationResults.flatMap { case (_, metrics) => - metrics.get(evalMetric).flatMap(v => Try(v.toDouble).toOption) + val evalMetricValues = modelValidationResults.map { eval => + eval.metricValues.asInstanceOf[SingleMetric].value } val minMetricValue = evalMetricValues.reduceOption[Double](math.min).getOrElse(Double.NaN) val maxMetricValue = evalMetricValues.reduceOption[Double](math.max).getOrElse(Double.NaN) @@ -243,32 +150,37 @@ case class ModelInsights } private def prettySelectedModelInfo: String = { - val bestModelType = selectedModelType - val name = s"Selected Model - $bestModelType" - val validationResults = selectedModelValidationResults.toSeq ++ Seq( - "name" -> selectedModelName, - "uid" -> selectedModelUID, - "modelType" -> selectedModelType - ) - val table = Table(name = name, columns = Seq("Model Param", "Value"), rows = validationResults.sortBy(_._1)) + val name = selectedModelInfo.map( sm => s"Selected Model - ${sm.bestModelType} ${sm.bestModelName}" ).getOrElse("") + val validationResults: Seq[(String, Any)] = selectedModelInfo.map( + _.validationResults.flatMap(e => Seq("name" -> e.modelName, "uid" -> e.modelUID, "modelType" -> e.modelType) ++ + e.modelParameters.toSeq) + ).getOrElse(Seq.empty) + val table = Table(name = name, columns = Seq("Model Param", "Value"), rows = validationResults) table.prettyString() } private def modelEvaluationMetrics: String = { val name = "Model Evaluation Metrics" - val trainEvalMetrics = selectedModelTrainEvalMetrics - val testEvalMetrics = selectedModelTestEvalMetrics + val niceMetricsNames = (BinaryClassEvalMetrics.values ++ MultiClassEvalMetrics.values ++ + RegressionEvalMetrics.values ++ OpEvaluatorNames.values) + .map(m => m.entryName -> m.humanFriendlyName).toMap + def niceName(nm: String): String = nm.split("_").lastOption.flatMap(n => niceMetricsNames.get(n)).getOrElse(nm) + val trainEvalMetrics = selectedModelInfo.map(_.trainEvaluation) + val testEvalMetrics = selectedModelInfo.flatMap(_.holdoutEvaluation) val (metricNameCol, holdOutCol, trainingCol) = ("Metric Name", "Hold Out Set Value", "Training Set Value") - val trainMetrics = trainEvalMetrics.toMap.collect { case (k, v: Double) => k -> v.toString }.toSeq.sortBy(_._1) - val table = testEvalMetrics match { - case Some(testMetrics) => + (trainEvalMetrics, testEvalMetrics) match { + case (Some(trainMetrics), Some(testMetrics)) => + val trainMetricsMap = trainMetrics.toMap.collect { case (k, v: Double) => k -> v.toString }.toSeq.sortBy(_._1) val testMetricsMap = testMetrics.toMap - val rows = trainMetrics.map { case (k, v) => (k, v, testMetricsMap(k).toString) } - Table(name = name, columns = Seq(metricNameCol, trainingCol, holdOutCol), rows = rows) - case None => - Table(name = name, columns = Seq(metricNameCol, trainingCol), rows = trainMetrics) + val rows = trainMetricsMap + .map { case (k, v) => (niceName(k), v, testMetricsMap(k).toString) } + Table(name = name, columns = Seq(metricNameCol, trainingCol, holdOutCol), rows = rows).prettyString() + case (Some(trainMetrics), None) => + val trainMetricsMap = trainMetrics.toMap.collect { case (k, v: Double) => + niceName(k) -> v.toString }.toSeq.sortBy(_._1) + Table(name = name, columns = Seq(metricNameCol, trainingCol), rows = trainMetricsMap).prettyString() + case (None, _) => "No metrics found" } - table.prettyString() } private def topKInsights(s: Seq[(FeatureInsights, Insights, Double)], topK: Int): Seq[(String, Double)] = { @@ -345,79 +257,6 @@ case class ModelInsights private def numericalTable(columns: Seq[String], rows: Seq[(String, Double)]): Option[String] = if (rows.isEmpty) None else Some(Table(columns, rows).prettyString(columnAlignments = Map(columns.last -> Right))) - private def modelType(modelName: String): Try[ModelsToTry] = Try { - classificationModelType.orElse(regressionModelType).lift(modelName).getOrElse( - throw new RuntimeException(s"Unsupported model type for best model '$modelName'")) - } - - private def classificationModelType: PartialFunction[String, ClassificationModelsToTry] = { - case v if v.startsWith("logreg") => ClassificationModelsToTry.LogisticRegression - case v if v.startsWith("rfc") => ClassificationModelsToTry.RandomForest - case v if v.startsWith("dtc") => ClassificationModelsToTry.DecisionTree - case v if v.startsWith("nb") => ClassificationModelsToTry.NaiveBayes - } - - private def regressionModelType: PartialFunction[String, RegressionModelsToTry] = { - case v if v.startsWith("linReg") => RegressionModelsToTry.LinearRegression - case v if v.startsWith("rfr") => RegressionModelsToTry.RandomForestRegression - case v if v.startsWith("dtr") => RegressionModelsToTry.DecisionTreeRegression - case v if v.startsWith("gbtr") => RegressionModelsToTry.GBTRegression - } - - private def evaluationMetrics(metricsName: String): EvaluationMetrics = { - val res = for { - metricsMap <- getMap[String, Any](selectedModelInfo, metricsName) - evalMetrics <- Try(toEvaluationMetrics(metricsMap)) - } yield evalMetrics - res match { - case Failure(e) => throw new RuntimeException(s"Failed to extract '$metricsName' metrics", e) - case Success(ok) => ok - } - } - - private def getMap[K, V](m: Map[String, Any], name: String): Try[Map[K, V]] = Try { - m(name) match { - case m: Map[String, Any]@unchecked => m("map").asInstanceOf[Map[K, V]] - case m: Metadata => m.underlyingMap.asInstanceOf[Map[K, V]] - } - } - - private val MetricName = "\\((.*)\\)\\_(.*)".r - - private def toEvaluationMetrics(metrics: Map[String, Any]): EvaluationMetrics = { - import OpEvaluatorNames._ - val metricsType = metrics.keys.headOption match { - case Some(MetricName(t, _)) if Set(binary, multi, regression).contains(t) => t - case v => throw new RuntimeException(s"Invalid model metric '$v'") - } - def parse[T <: EvaluationMetrics : ClassTag] = { - val vals = metrics.map { case (MetricName(_, name), value) => name -> value } - val valsJson = JsonUtils.toJsonString(vals) - JsonUtils.fromString[T](valsJson).get - } - metricsType match { - case `binary` => parse[BinaryClassificationMetrics] - case `multi` => parse[MultiClassificationMetrics] - case `regression` => parse[RegressionMetrics] - case t => throw new RuntimeException(s"Unsupported metrics type '$t'") - } - } -} - -sealed trait ProblemType extends EnumEntry with Serializable - object ProblemType extends Enum[ProblemType] { - val values = findValues - case object BinaryClassification extends ProblemType - case object MultiClassification extends ProblemType - case object Regression extends ProblemType - case object Unknown extends ProblemType -} - -sealed abstract class ValidationType(val humanFriendlyName: String) extends EnumEntry with Serializable -object ValidationType extends Enum[ValidationType] { - val values = findValues - case object CrossValidation extends ValidationType("Cross Validation") - case object TrainValidationSplit extends ValidationType("Train Validation Split") } /** @@ -446,6 +285,7 @@ case class LabelSummary */ trait LabelInfo + /** * Summary of label distribution for continuous label * @@ -464,6 +304,7 @@ case class Continuous(min: Double, max: Double, mean: Double, variance: Double) */ case class Discrete(domain: Seq[String], prob: Seq[Double]) extends LabelInfo + /** * Summary of feature insights for all features derived from a given input (raw) feature * @@ -521,15 +362,34 @@ case class Insights case object ModelInsights { @transient protected lazy val log = LoggerFactory.getLogger(this.getClass) + val SerializationFormats: Formats = { + val typeHints = FullTypeHints(List( + classOf[Continuous], classOf[Discrete], + classOf[DataBalancerSummary], classOf[DataCutterSummary], classOf[DataSplitterSummary], + classOf[SingleMetric], classOf[MultiMetrics], classOf[BinaryClassificationMetrics], classOf[ThresholdMetrics], + classOf[MultiClassificationMetrics], classOf[RegressionMetrics] + )) + val evalMetricsSerializer = new CustomSerializer[EvalMetric](_ => + ( { case JString(s) => EvalMetric.withNameInsensitive(s) }, + { case x: EvalMetric => JString(x.entryName) } + ) + ) + Serialization.formats(typeHints) + + EnumEntrySerializer.json4s[ValidationType](ValidationType) + + EnumEntrySerializer.json4s[ProblemType](ProblemType) + + new SpecialDoubleSerializer + + evalMetricsSerializer + } + /** * Read ModelInsights from a json * * @param json model insights in json * @return Try[ModelInsights] */ - def fromJson(json: String): Try[ModelInsights] = Try { - implicit val formats = DefaultFormats - parse(json).extract[ModelInsights] + def fromJson(json: String): Try[ModelInsights] = { + implicit val formats: Formats = SerializationFormats + Try { read[ModelInsights](json) } } /** @@ -758,17 +618,18 @@ case object ModelInsights { }.getOrElse(Seq.empty[Seq[Double]]) } - private def getModelInfo(model: Option[SelectedModel]): Map[String, Any] = { - model.map(_.getMetadata().getSummaryMetadata().wrapped.underlyingMap) - .getOrElse(Map.empty) + private def getModelInfo(model: Option[SelectedModel]): Option[ModelSelectorSummary] = { + model.map(m => ModelSelectorSummary.fromMetadata(m.getMetadata().getSummaryMetadata())) } private def getStageInfo(stages: Array[OPStage]): Map[String, Any] = { - def getParams(stage: PipelineStage): Map[String, Any] = + def getParams(stage: PipelineStage): Map[String, String] = stage.extractParamMap().toSeq .collect{ + case p if p.param.name == OpPipelineStageParamsNames.InputFeatures => + p.param.name -> p.value.asInstanceOf[Array[TransientFeature]].map(_.toJsonString()).mkString(", ") case p if p.param.name != OpPipelineStageParamsNames.OutputMetadata && - p.param.name != OpPipelineStageParamsNames.InputSchema => p.param.name -> p.value + p.param.name != OpPipelineStageParamsNames.InputSchema => p.param.name -> p.value.toString }.toMap stages.map { s => diff --git a/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala b/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala index c9949b8912..77fb9d0752 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala @@ -36,6 +36,8 @@ import com.salesforce.op.utils.json.JsonUtils import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.sql.Dataset +import scala.util.Try + /** * Just a handy factory for evaluators */ @@ -57,7 +59,7 @@ object Evaluators { */ def auROC(): OpBinaryClassificationEvaluator = new OpBinaryClassificationEvaluator( - name = BinaryClassEvalMetrics.AuROC.humanFriendlyName, isLargerBetter = true) { + name = BinaryClassEvalMetrics.AuROC, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = getBinaryEvaluatorMetric(BinaryClassEvalMetrics.AuROC, dataset) } @@ -66,7 +68,7 @@ object Evaluators { * Area under Precision/Recall curve */ def auPR(): OpBinaryClassificationEvaluator = - new OpBinaryClassificationEvaluator(name = BinaryClassEvalMetrics.AuPR.humanFriendlyName, isLargerBetter = true) { + new OpBinaryClassificationEvaluator(name = BinaryClassEvalMetrics.AuPR, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = getBinaryEvaluatorMetric(BinaryClassEvalMetrics.AuPR, dataset) } @@ -76,7 +78,7 @@ object Evaluators { */ def precision(): OpBinaryClassificationEvaluator = new OpBinaryClassificationEvaluator( - name = MultiClassEvalMetrics.Precision.humanFriendlyName, isLargerBetter = true) { + name = MultiClassEvalMetrics.Precision, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = { import dataset.sparkSession.implicits._ new MulticlassMetrics(dataset.select(getPredictionCol, getLabelCol).as[(Double, Double)].rdd).precision(1.0) @@ -88,7 +90,7 @@ object Evaluators { */ def recall(): OpBinaryClassificationEvaluator = new OpBinaryClassificationEvaluator( - name = MultiClassEvalMetrics.Recall.humanFriendlyName, isLargerBetter = true) { + name = MultiClassEvalMetrics.Recall, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = { import dataset.sparkSession.implicits._ new MulticlassMetrics(dataset.select(getPredictionCol, getLabelCol).as[(Double, Double)].rdd).recall(1.0) @@ -99,7 +101,7 @@ object Evaluators { * F1 score */ def f1(): OpBinaryClassificationEvaluator = - new OpBinaryClassificationEvaluator(name = MultiClassEvalMetrics.F1.humanFriendlyName, isLargerBetter = true) { + new OpBinaryClassificationEvaluator(name = MultiClassEvalMetrics.F1, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = { import dataset.sparkSession.implicits._ new MulticlassMetrics( @@ -112,7 +114,7 @@ object Evaluators { */ def error(): OpBinaryClassificationEvaluator = new OpBinaryClassificationEvaluator( - name = MultiClassEvalMetrics.Error.humanFriendlyName, isLargerBetter = false) { + name = MultiClassEvalMetrics.Error, isLargerBetter = false) { override def evaluate(dataset: Dataset[_]): Double = 1.0 - getMultiEvaluatorMetric(MultiClassEvalMetrics.Error, dataset) } @@ -135,7 +137,7 @@ object Evaluators { new OpBinaryClassificationEvaluatorBase[SingleMetric]( uid = UID[OpBinaryClassificationEvaluatorBase[SingleMetric]] ) { - override val name: String = metricName + override val name: EvalMetric = OpEvaluatorNames.Custom(metricName, metricName) override val isLargerBetter: Boolean = islbt override def getDefaultMetric: SingleMetric => Double = _.value override def evaluateAll(dataset: Dataset[_]): SingleMetric = { @@ -143,7 +145,7 @@ object Evaluators { val ds = dataset.select(getLabelCol, getRawPredictionCol, getProbabilityCol, getPredictionCol) .as[(Double, OPVector#Value, OPVector#Value, Double)] val metric = evaluateFn(ds) - SingleMetric(name, metric) + SingleMetric(name.humanFriendlyName, metric) } } } @@ -166,7 +168,7 @@ object Evaluators { */ def precision(): OpMultiClassificationEvaluator = new OpMultiClassificationEvaluator( - name = MultiClassEvalMetrics.Precision.humanFriendlyName, isLargerBetter = true) { + name = MultiClassEvalMetrics.Precision, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = getMultiEvaluatorMetric(MultiClassEvalMetrics.Precision, dataset) } @@ -175,7 +177,7 @@ object Evaluators { * Weighted Recall */ def recall(): OpMultiClassificationEvaluator = - new OpMultiClassificationEvaluator(name = MultiClassEvalMetrics.Recall.humanFriendlyName, isLargerBetter = true) { + new OpMultiClassificationEvaluator(name = MultiClassEvalMetrics.Recall, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = getMultiEvaluatorMetric(MultiClassEvalMetrics.Recall, dataset) } @@ -184,7 +186,7 @@ object Evaluators { * F1 Score */ def f1(): OpMultiClassificationEvaluator = - new OpMultiClassificationEvaluator(name = MultiClassEvalMetrics.F1.humanFriendlyName, isLargerBetter = true) { + new OpMultiClassificationEvaluator(name = MultiClassEvalMetrics.F1, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = getMultiEvaluatorMetric(MultiClassEvalMetrics.F1, dataset) } @@ -193,7 +195,7 @@ object Evaluators { * Prediction Error */ def error(): OpMultiClassificationEvaluator = - new OpMultiClassificationEvaluator(name = MultiClassEvalMetrics.Error.humanFriendlyName, isLargerBetter = false) { + new OpMultiClassificationEvaluator(name = MultiClassEvalMetrics.Error, isLargerBetter = false) { override def evaluate(dataset: Dataset[_]): Double = 1.0 - getMultiEvaluatorMetric(MultiClassEvalMetrics.Error, dataset) } @@ -216,7 +218,7 @@ object Evaluators { new OpMultiClassificationEvaluatorBase[SingleMetric]( uid = UID[OpMultiClassificationEvaluatorBase[SingleMetric]] ) { - override val name: String = metricName + override val name: EvalMetric = OpEvaluatorNames.Custom(metricName, metricName) override val isLargerBetter: Boolean = islbt override def getDefaultMetric: SingleMetric => Double = _.value @@ -227,7 +229,7 @@ object Evaluators { .as[(Double, OPVector#Value, OPVector#Value, Double)] try { val metric = evaluateFn(ds) - SingleMetric(name, metric) + SingleMetric(name.humanFriendlyName, metric) } catch { case iae: IllegalArgumentException => val size = dataset.count @@ -257,7 +259,7 @@ object Evaluators { */ def mse(): OpRegressionEvaluator = new OpRegressionEvaluator( - name = RegressionEvalMetrics.MeanSquaredError.humanFriendlyName, isLargerBetter = false) { + name = RegressionEvalMetrics.MeanSquaredError, isLargerBetter = false) { override def evaluate(dataset: Dataset[_]): Double = getRegEvaluatorMetric(RegressionEvalMetrics.MeanSquaredError, dataset) } @@ -267,7 +269,7 @@ object Evaluators { */ def mae(): OpRegressionEvaluator = new OpRegressionEvaluator( - name = RegressionEvalMetrics.MeanAbsoluteError.humanFriendlyName, isLargerBetter = false) { + name = RegressionEvalMetrics.MeanAbsoluteError, isLargerBetter = false) { override def evaluate(dataset: Dataset[_]): Double = getRegEvaluatorMetric(RegressionEvalMetrics.MeanAbsoluteError, dataset) } @@ -276,7 +278,7 @@ object Evaluators { * R2 */ def r2(): OpRegressionEvaluator = - new OpRegressionEvaluator(name = RegressionEvalMetrics.R2.humanFriendlyName, isLargerBetter = true) { + new OpRegressionEvaluator(name = RegressionEvalMetrics.R2, isLargerBetter = true) { override def evaluate(dataset: Dataset[_]): Double = getRegEvaluatorMetric(RegressionEvalMetrics.R2, dataset) } @@ -286,7 +288,7 @@ object Evaluators { */ def rmse(): OpRegressionEvaluator = new OpRegressionEvaluator( - name = RegressionEvalMetrics.RootMeanSquaredError.humanFriendlyName, isLargerBetter = false) { + name = RegressionEvalMetrics.RootMeanSquaredError, isLargerBetter = false) { override def evaluate(dataset: Dataset[_]): Double = getRegEvaluatorMetric(RegressionEvalMetrics.RootMeanSquaredError, dataset) } @@ -309,7 +311,7 @@ object Evaluators { new OpRegressionEvaluatorBase[SingleMetric]( uid = UID[OpRegressionEvaluatorBase[SingleMetric]] ) { - override val name: String = metricName + override val name: EvalMetric = OpEvaluatorNames.Custom(metricName, metricName) override val isLargerBetter: Boolean = islbt override def getDefaultMetric: SingleMetric => Double = _.value @@ -318,7 +320,7 @@ object Evaluators { import dataset.sparkSession.implicits._ val ds = dataset.select(getLabelCol, getPredictionCol).as[(Double, Double)] val metric = evaluateFn(ds) - SingleMetric(name, metric) + SingleMetric(name.humanFriendlyName, metric) } } } @@ -349,4 +351,3 @@ case class MultiMetrics(metrics: Map[String, EvaluationMetrics]) extends Evaluat } override def toString: String = JsonUtils.toJsonString(this.toMap, pretty = true) } - diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassificationEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassificationEvaluator.scala index 9791d0d7d7..e79f39d3f6 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassificationEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassificationEvaluator.scala @@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory private[op] class OpBinaryClassificationEvaluator ( - override val name: String = OpEvaluatorNames.binary, + override val name: EvalMetric = OpEvaluatorNames.Binary, override val isLargerBetter: Boolean = true, override val uid: String = UID[OpBinaryClassificationEvaluator], val numBins: Int = 100 @@ -179,7 +179,6 @@ case class BinaryClassificationMetrics @JsonDeserialize(contentAs = classOf[java.lang.Double]) falsePositiveRateByThreshold: Seq[Double] ) extends EvaluationMetrics { - def rocCurve: Seq[(Double, Double)] = recallByThreshold.zip(falsePositiveRateByThreshold) def prCurve: Seq[(Double, Double)] = precisionByThreshold.zip(recallByThreshold) } diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala index ccdd890536..9a56c23e87 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala @@ -42,6 +42,8 @@ import org.apache.spark.ml.param._ import org.apache.spark.sql.Dataset import org.apache.spark.sql.types.Metadata +import scala.util.Try + /** * Trait for labelCol param @@ -123,7 +125,6 @@ trait EvaluationMetrics extends JsonLike { * @return a map from metric name to metric value */ def toMap: Map[String, Any] = JsonUtils.toMap(JsonUtils.toJsonTree(this)) - /** * Convert metrics into metadata for saving * @return metadata @@ -131,7 +132,6 @@ trait EvaluationMetrics extends JsonLike { def toMetadata: Metadata = this.toMap.toMetadata } - /** * Base Interface for OpEvaluator to be used in Evaluator creation. Can be used for both OP and spark * eval (so with workflows and cross validation). @@ -143,7 +143,7 @@ abstract class OpEvaluatorBase[T <: EvaluationMetrics] extends Evaluator /** * Name of evaluator */ - val name: String = "Eval" + val name: EvalMetric /** * Evaluate function that returns a class or value with the calculated metric value(s). @@ -271,7 +271,7 @@ abstract class OpRegressionEvaluatorBase[T <: EvaluationMetrics] /** * Eval metric */ -trait EvalMetric extends Serializable { +trait EvalMetric extends EnumEntry with Serializable { /** * Spark metric name */ @@ -281,6 +281,21 @@ trait EvalMetric extends Serializable { * Human friendly metric name */ def humanFriendlyName: String + +} + +/** + * Eval metric companion object + */ +object EvalMetric { + + def withNameInsensitive(name: String): EvalMetric = { + BinaryClassEvalMetrics.withNameInsensitiveOption(name) + .orElse(MultiClassEvalMetrics.withNameInsensitiveOption(name)) + .orElse(RegressionEvalMetrics.withNameInsensitiveOption(name)) + .orElse(OpEvaluatorNames.withNameInsensitiveOption(name)) + .getOrElse(OpEvaluatorNames.Custom(name, name)) + } } /** @@ -290,7 +305,7 @@ sealed abstract class ClassificationEvalMetric ( val sparkEntryName: String, val humanFriendlyName: String -) extends EnumEntry with EvalMetric +) extends EvalMetric /** * Binary Classification Metrics @@ -302,7 +317,11 @@ object BinaryClassEvalMetrics extends Enum[ClassificationEvalMetric] { case object F1 extends ClassificationEvalMetric("f1", "f1") case object Error extends ClassificationEvalMetric("accuracy", "error") case object AuROC extends ClassificationEvalMetric("areaUnderROC", "area under ROC") - case object AuPR extends ClassificationEvalMetric("areaUnderPR", "area under PR") + case object AuPR extends ClassificationEvalMetric("areaUnderPR", "area under precision-recall") + case object TP extends ClassificationEvalMetric("TP", "true positive") + case object TN extends ClassificationEvalMetric("TN", "true negative") + case object FP extends ClassificationEvalMetric("FP", "false positive") + case object FN extends ClassificationEvalMetric("FN", "false negative") } /** @@ -325,8 +344,11 @@ sealed abstract class RegressionEvalMetric ( val sparkEntryName: String, val humanFriendlyName: String -) extends EnumEntry with EvalMetric +) extends EvalMetric +/** + * Regression Metrics + */ object RegressionEvalMetrics extends Enum[RegressionEvalMetric] { val values: Seq[RegressionEvalMetric] = findValues case object RootMeanSquaredError extends RegressionEvalMetric("rmse", "root mean square error") @@ -335,11 +357,34 @@ object RegressionEvalMetrics extends Enum[RegressionEvalMetric] { case object MeanAbsoluteError extends RegressionEvalMetric("mae", "mean absolute error") } + +/** + * GeneralMetrics + */ +sealed abstract class OpEvaluatorNames +( + val sparkEntryName: String, + val humanFriendlyName: String +) extends EvalMetric + /** * Contains evaluator names used in logging */ -case object OpEvaluatorNames { - val binary = "binEval" - val multi = "multiEval" - val regression = "regEval" +object OpEvaluatorNames extends Enum[OpEvaluatorNames] { + val values: Seq[OpEvaluatorNames] = findValues + + case object Binary extends OpEvaluatorNames("binEval", "binary evaluation metics") + + case object Multi extends OpEvaluatorNames("multiEval", "multiclass evaluation metics") + + case object Regression extends OpEvaluatorNames("regEval", "regression evaluation metics") + + case class Custom(name: String, humanName: String) extends OpEvaluatorNames(name, humanName) { + override def entryName: String = name.toLowerCase + } + + override def withName(name: String): OpEvaluatorNames = Try(super.withName(name)).getOrElse(Custom(name, name)) + + override def withNameInsensitive(name: String): OpEvaluatorNames = super.withNameInsensitiveOption(name) + .getOrElse(Custom(name, name)) } diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala index bd347e8384..98adebd1c7 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala @@ -59,7 +59,7 @@ import scala.collection.mutable */ private[op] class OpMultiClassificationEvaluator ( - override val name: String = OpEvaluatorNames.multi, + override val name: EvalMetric = OpEvaluatorNames.Multi, override val isLargerBetter: Boolean = true, override val uid: String = UID[OpMultiClassificationEvaluator] ) extends OpMultiClassificationEvaluatorBase[MultiClassificationMetrics](uid) { @@ -307,3 +307,4 @@ case class ThresholdMetrics @JsonDeserialize(keyAs = classOf[java.lang.Integer]) noPredictionCounts: Map[Int, Seq[Long]] ) extends EvaluationMetrics + diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpRegressionEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpRegressionEvaluator.scala index 480d473954..ef34e803af 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpRegressionEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpRegressionEvaluator.scala @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory private[op] class OpRegressionEvaluator ( - override val name: String = OpEvaluatorNames.regression, + override val name: EvalMetric = OpEvaluatorNames.Regression, override val isLargerBetter: Boolean = false, override val uid: String = UID[OpRegressionEvaluator] ) extends OpRegressionEvaluatorBase[RegressionMetrics](uid) { diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/package.scala b/core/src/main/scala/com/salesforce/op/stages/impl/package.scala index 592ac96e19..70230f2f9c 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/package.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/package.scala @@ -30,7 +30,9 @@ package com.salesforce.op.stages +import com.salesforce.op.utils.json.JsonLike import enumeratum.EnumEntry +import org.apache.spark.sql.types.Metadata package object impl { @@ -39,4 +41,7 @@ package object impl { */ trait ModelsToTry extends EnumEntry with Serializable + trait MetadataLike { + def toMetadata(): Metadata + } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerMetadata.scala b/core/src/main/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerMetadata.scala index 69577a0900..fbd2235174 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerMetadata.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerMetadata.scala @@ -30,6 +30,8 @@ package com.salesforce.op.stages.impl.preparators +import com.salesforce.op.stages.impl.MetadataLike + import scala.util.{Failure, Success, Try} import com.salesforce.op.utils.spark.RichMetadata._ import com.salesforce.op.utils.stats.OpStatistics @@ -87,7 +89,7 @@ case class SanityCheckerSummary featuresStatistics: SummaryStatistics, names: Seq[String], categoricalStats: Array[CategoricalGroupStats] -) { +) extends MetadataLike { private[op] def this( stats: Array[ColumnStatistics], @@ -146,7 +148,7 @@ case class SummaryStatistics min: Seq[Double], mean: Seq[Double], variance: Seq[Double] -) { +) extends MetadataLike { private[op] def this(colStats: MultivariateStatisticalSummary, sample: Double) = this( count = colStats.count, @@ -199,7 +201,7 @@ case class CategoricalGroupStats mutualInfo: Double, maxRuleConfidences: Array[Double], supports: Array[Double] -) { +) extends MetadataLike { /** * @return metadata of this specific categorical group */ @@ -238,7 +240,7 @@ case class CategoricalStats pointwiseMutualInfos: LabelWiseValues.Type = LabelWiseValues.empty, mutualInfos: Array[Double] = Array.empty, counts: LabelWiseValues.Type = LabelWiseValues.empty -) { +) extends MetadataLike { // TODO: Build the metadata here instead of by treating Cramer's V and mutual info as correlations def toMetadata(): Metadata = { val meta = new MetadataBuilder() @@ -268,7 +270,7 @@ case class Correlations values: Seq[Double], nanCorrs: Seq[String], corrType: CorrelationType -) { +) extends MetadataLike { assert(featuresIn.length == values.length, "Feature names and correlation values arrays must have the same length") def this(corrs: Seq[(String, Double)], nans: Seq[String], corrType: CorrelationType) = this( diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala index 0fda214f84..9e3ddccbb3 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala @@ -40,11 +40,11 @@ import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.impl.tuning._ import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichMetadata._ +import com.salesforce.op.utils.spark.RichParamMap._ import com.salesforce.op.utils.stages.FitStagesUtil._ import org.apache.spark.ml.param._ import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.sql.{Dataset, SparkSession} import scala.reflect.runtime.universe._ @@ -144,10 +144,10 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]] val ModelData(trainData, met) = splitter match { case Some(spltr) => spltr.prepare(datasetWithID) - case None => new ModelData(datasetWithID, new MetadataBuilder()) + case None => ModelData(datasetWithID, None) } - val BestEstimator(name, estimator, meta) = bestEstimator.getOrElse{ + val BestEstimator(name, estimator, summary) = bestEstimator.getOrElse{ setInputSchema(dataset.schema).transformSchema(dataset.schema) val best = validator .validate(modelInfo = modelsUse, dataset = trainData, label = in1.name, features = in2.name) @@ -155,42 +155,43 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]] best } - val bestModel = new BestModel( - name = name, - model = estimator.fit(trainData).asInstanceOf[M], - metadata = Option(meta) - ) - bestModel.metadata.foreach(meta => setMetadata(meta.build)) - val bestEst = bestModel.model.parent + val bestModel = estimator.fit(trainData).asInstanceOf[M] + val bestEst = bestModel.parent log.info(s"Selected model : ${bestEst.getClass.getSimpleName}") log.info(s"With parameters : ${bestEst.extractParamMap()}") // set input and output params - outputsColNamesMap.foreach { case (pname, pvalue) => bestModel.model.set(bestModel.model.getParam(pname), pvalue) } - - val builder = new MetadataBuilder().withMetadata(getMetadata()) // get cross val metrics - builder.putString(ModelSelectorBaseNames.BestModelUid, bestModel.model.uid) // log bestModel uid (ie model type) - builder.putString(ModelSelectorBaseNames.BestModelName, bestModel.name) // log bestModel name - splitter.collect { - case _: DataBalancer => builder.putMetadata(ModelSelectorBaseNames.ResampleValues, met) - case _: DataCutter => builder.putMetadata(ModelSelectorBaseNames.CuttValues, met) - } + outputsColNamesMap.foreach { case (pname, pvalue) => bestModel.set(bestModel.getParam(pname), pvalue) } + + // get eval results for metadata + val trainingEval = evaluate(bestModel.transform(trainData)) + + val metadataSummary = ModelSelectorSummary( + validationType = ValidationType.fromValidator(validator), + validationParameters = validator.getParams(), + dataPrepParameters = splitter.map(_.extractParamMap().getAsMap()).getOrElse(Map()), + dataPrepResults = met, + evaluationMetric = validator.evaluator.name, + problemType = ProblemType.fromEvalMetrics(trainingEval), + bestModelUID = estimator.uid, + bestModelName = name, + bestModelType = estimator.getClass.getSimpleName, + validationResults = summary, + trainEvaluation = trainingEval, + holdoutEvaluation = None + ) - // add eval results to metadata - val transformed = bestModel.model.transform(trainData) - builder.putMetadata(ModelSelectorBaseNames.TrainingEval, evaluate(transformed).toMetadata) - val allMetadata = builder.build().toSummaryMetadata() - setMetadata(allMetadata) + setMetadata(metadataSummary.toMetadata().toSummaryMetadata()) new SelectedBestModel( - bestModel.model.asInstanceOf[ModelSelectorBaseNames.ModelType], + bestModel.asInstanceOf[ModelSelectorBaseNames.ModelType], outputsColNamesMap, uid, operationName ) .setInput(in1.asFeatureLike[RealNN], in2.asFeatureLike[OPVector]) .setParent(this) - .setMetadata(allMetadata) + .setMetadata(getMetadata()) .setOutputFeatureName(getOutputFeatureName) .setEvaluators(evaluators) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorBase.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorBase.scala index 60442f25b6..b5c8f418d4 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorBase.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorBase.scala @@ -30,9 +30,7 @@ package com.salesforce.op.stages.impl.selector -import com.salesforce.op.utils.stages.FitStagesUtil._ import com.salesforce.op.UID -import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.evaluators.{EvaluationMetrics, _} import com.salesforce.op.features.TransientFeature import com.salesforce.op.features.types._ @@ -42,15 +40,16 @@ import com.salesforce.op.stages.base.binary.OpTransformer2 import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.impl.tuning._ import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams +import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichMetadata._ +import com.salesforce.op.utils.spark.RichParamMap._ +import com.salesforce.op.utils.stages.FitStagesUtil._ import org.apache.spark.ml.param._ import org.apache.spark.ml.{Estimator, Model, Transformer} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{MetadataBuilder, StructType} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import scala.reflect.runtime.universe._ -import scala.util.Try case object ModelSelectorBaseNames { @@ -106,12 +105,12 @@ private[op] trait HasEval { predictionColName.foreach(evaluator.setPredictionCol) rawPredictionColName.foreach(evaluator.setRawPredictionCol) probabilityColName.foreach(evaluator.setProbabilityCol) - evaluator.name -> evaluator.evaluateAll(data) + evaluator.name.humanFriendlyName -> evaluator.evaluateAll(data) case evaluator: OpRegressionEvaluatorBase[_] => evaluator.setLabelCol(labelColName) fullPredictionColName.foreach(evaluator.setFullPredictionCol) predictionColName.foreach(evaluator.setPredictionCol) - evaluator.name -> evaluator.evaluateAll(data) + evaluator.name.humanFriendlyName -> evaluator.evaluateAll(data) case evaluator => throw new RuntimeException(s"Evaluator $evaluator is not supported") }.toMap data.unpersist() @@ -136,9 +135,9 @@ private[op] trait HasTestEval extends HasEval { private[op] def evaluateModel(data: Dataset[_]): DataFrame = { val scored = transform(data) val metrics = evaluate(scored) - val builder = new MetadataBuilder().withMetadata(getMetadata().getSummaryMetadata()) - builder.putMetadata(ModelSelectorBaseNames.HoldOutEval, metrics.toMetadata) - setMetadata(builder.build().toSummaryMetadata()) + val metadata = ModelSelectorSummary.fromMetadata(getMetadata().getSummaryMetadata()) + .copy(holdoutEvaluation = Option(metrics)) + setMetadata(metadata.toMetadata().toSummaryMetadata()) scored } } @@ -253,7 +252,6 @@ private[op] abstract class ModelSelectorBase[M <: Model[_], E <: Estimator[_]] final override def fit(dataset: Dataset[_]): SelectedModel = { implicit val spark = dataset.sparkSession - import spark.implicits._ val datasetWithID = if (dataset.columns.contains(DataFrameFieldNames.KeyFieldName)) { @@ -266,10 +264,10 @@ private[op] abstract class ModelSelectorBase[M <: Model[_], E <: Estimator[_]] val ModelData(trainData, met) = splitter match { case Some(spltr) => spltr.prepare(datasetWithID) - case None => new ModelData(datasetWithID, new MetadataBuilder()) + case None => new ModelData(datasetWithID, None) } - val BestEstimator(name, estimator, meta) = bestEstimator.getOrElse{ + val BestEstimator(name, estimator, summary) = bestEstimator.getOrElse{ setInputSchema(dataset.schema).transformSchema(dataset.schema) val best = validator .validate(modelInfo = getUsedModels, dataset = trainData, label = in1.name, features = in2.name) @@ -277,38 +275,38 @@ private[op] abstract class ModelSelectorBase[M <: Model[_], E <: Estimator[_]] best } - val bestModel = new BestModel( - name = name, - model = estimator.fit(trainData).asInstanceOf[M], - metadata = Option(meta) - ) - bestModel.metadata.foreach(meta => setMetadata(meta.build)) - val bestClassifier = bestModel.model.parent - log.info(s"Selected model : ${bestClassifier.getClass.getSimpleName}") - log.info(s"With parameters : ${bestClassifier.extractParamMap()}") + val bestModel = estimator.fit(trainData).asInstanceOf[M] + val bestEst = bestModel.parent + log.info(s"Selected model : ${bestEst.getClass.getSimpleName}") + log.info(s"With parameters : ${bestEst.extractParamMap()}") // set input and output params - outputsColNamesMap.foreach { case (pname, pvalue) => bestModel.model.set(bestModel.model.getParam(pname), pvalue) } - - val builder = new MetadataBuilder().withMetadata(getMetadata()) // get cross val metrics - builder.putString(ModelSelectorBaseNames.BestModelUid, bestModel.model.uid) // log bestModel uid (ie model type) - builder.putString(ModelSelectorBaseNames.BestModelName, bestModel.name) // log bestModel name - splitter.collect { - case _: DataBalancer => builder.putMetadata(ModelSelectorBaseNames.ResampleValues, met) - case _: DataCutter => builder.putMetadata(ModelSelectorBaseNames.CuttValues, met) - } - + outputsColNamesMap.foreach { case (pname, pvalue) => bestModel.set(bestModel.getParam(pname), pvalue) } + + // get eval results for metadata + val trainingEval = evaluate(bestModel.transform(trainData)) + + val metadataSummary = ModelSelectorSummary( + validationType = ValidationType.fromValidator(validator), + validationParameters = validator.getParams(), + dataPrepParameters = splitter.map(_.extractParamMap().getAsMap()).getOrElse(Map()), + dataPrepResults = met, + evaluationMetric = validator.evaluator.name, + problemType = ProblemType.fromEvalMetrics(trainingEval), + bestModelUID = estimator.uid, + bestModelName = name, + bestModelType = estimator.getClass.getSimpleName, + validationResults = summary, + trainEvaluation = trainingEval, + holdoutEvaluation = None + ) - // add eval results to metadata - val transformed = bestModel.model.transform(trainData) - builder.putMetadata(ModelSelectorBaseNames.TrainingEval, evaluate(transformed).toMetadata) - val allMetadata = builder.build().toSummaryMetadata() - setMetadata(allMetadata) + setMetadata(metadataSummary.toMetadata().toSummaryMetadata()) - new SelectedModel(bestModel.model.asInstanceOf[Model[_ <: Model[_]]], outputsColNamesMap, uid) + new SelectedModel(bestModel.asInstanceOf[Model[_ <: Model[_]]], outputsColNamesMap, uid) .setParent(this) .setInput(in1.asFeatureLike[RealNN], in2.asFeatureLike[OPVector]) - .setMetadata(allMetadata) + .setMetadata(getMetadata()) .setEvaluators(evaluators) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala new file mode 100644 index 0000000000..f524e6e5be --- /dev/null +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.stages.impl.selector + +import com.salesforce.op.evaluators._ +import com.salesforce.op.stages.impl.MetadataLike +import com.salesforce.op.stages.impl.tuning.{OpCrossValidation, OpTrainValidationSplit, OpValidator, SplitterSummary} +import com.salesforce.op.utils.spark.RichMetadata._ +import enumeratum._ +import org.apache.spark.sql.types.{Metadata, MetadataBuilder} +import com.salesforce.op.stages.impl.selector.ModelSelectorSummary._ +import com.salesforce.op.utils.json.JsonUtils +import com.salesforce.op.utils.reflection.ReflectionUtils +import scala.reflect.ClassTag + +/** + * This is used to store all information about fitting and model selection generated by the model selector class + * @param validationType type of validation performed to select hyper parameters + * @param validationParameters parameters on validation + * @param dataPrepParameters parameters on data preparation before hyper parameter tuning + * @param dataPrepResults changes made to the data in data preparation + * @param evaluationMetric metric used to select hyper parameters and model + * @param problemType type of modeling (eg binary classification, regressionm etc) + * @param bestModelUID best model UID + * @param bestModelName: best model unique name + * @param bestModelType: best model type + * @param holdoutEvaluation winning model performance on holdout data set + * @param trainEvaluation winning model performance on training data set + * @param validationResults model with parameters and metric for all evaluated + */ +case class ModelSelectorSummary +( + validationType: ValidationType, + validationParameters: Map[String, Any], + dataPrepParameters: Map[String, Any], + dataPrepResults: Option[SplitterSummary], + evaluationMetric: EvalMetric, + problemType: ProblemType, + bestModelUID: String, + bestModelName: String, + bestModelType: String, + validationResults: Seq[ModelEvaluation], + trainEvaluation: EvaluationMetrics, + holdoutEvaluation: Option[EvaluationMetrics] +) extends MetadataLike { + + /** + * Convert to metadata instance + * + * @return + */ + def toMetadata(): Metadata = { + val meta = new MetadataBuilder() + .putString(ValidationTypeName, validationType.entryName) + .putMetadata(ValidationParameters, validationParameters.toMetadata) + .putMetadata(DataPrepParameters, dataPrepParameters.toMetadata) + .putString(EvaluationMetric, evaluationMetric.entryName) + .putString(ProblemTypeName, problemType.entryName) + .putString(BestModelUID, bestModelUID) + .putString(BestModelName, bestModelName) + .putString(BestModelType, bestModelType) + .putMetadataArray(ValidationResults, validationResults.map(_.toMetadata()).toArray) + .putStringArray(TrainEvaluation, + Array(trainEvaluation.getClass.getName, trainEvaluation.toJson(pretty = false))) + + dataPrepResults.map(dp => meta.putMetadata(DataPrepResults, dp.toMetadata())) + holdoutEvaluation.map(he => meta.putStringArray(HoldoutEvaluation, + Array(he.getClass.getName, he.toJson(pretty = false)))) + meta.build() + } + +} + +/** + * Evaluation summary of model + * @param modelUID uid for winning model + * @param modelName unique name for model run + * @param modelType simple name of type of model + * @param metricValues evaluation metrics for model + * @param modelParameters parameter settings for model + */ +case class ModelEvaluation +( + modelUID: String, + modelName: String, + modelType: String, + metricValues: EvaluationMetrics, + modelParameters: Map[String, Any] +) extends MetadataLike { + + /** + * Convert to metadata instance + * + * @return + */ + def toMetadata(): Metadata = { + new MetadataBuilder() + .putString(ModelUID, modelUID) + .putString(ModelName, modelName) + .putString(ModelTypeName, modelType) + .putStringArray(MetricValues, Array(metricValues.getClass.getName, metricValues.toJson(pretty = false))) + .putMetadata(ModelParameters, modelParameters.toMetadata) + .build() + } +} + + +case object ModelSelectorSummary { + + val ValidationTypeName: String = "ValidationType" + val ValidationParameters: String = "ValidationParameters" + val DataPrepParameters: String = "DataPrepParameters" + val DataPrepResults: String = "DataPrepResults" + val EvaluationMetric: String = "EvaluationMetric" + val ProblemTypeName: String = "ProblemType" + val BestModelUID: String = "BestModelUID" + val BestModelName: String = "BestModelName" + val BestModelType: String = "BestModelType" + val ValidationResults: String = "ValidationResults" + val TrainEvaluation: String = "TrainEvaluation" + val HoldoutEvaluation: String = "HoldoutEvaluation" + + val ModelUID: String = "ModelUID" + val ModelName: String = "ModelName" + val ModelTypeName: String = "ModelType" + val MetricValues: String = "MetricValues" + val ModelParameters: String = "ModelParameters" + + /** + * Create case class from the metadata stored version of this class + * @param meta metadata for this class + * @return ModelSelectorSummary + */ + def fromMetadata(meta: Metadata): ModelSelectorSummary = { + + def modelEvalFromMetadata(meta: Metadata): ModelEvaluation = { + val wrapped = meta.wrapped + val modelUID: String = wrapped.get[String](ModelUID) + val modelName: String = wrapped.get[String](ModelName) + val modelType: String = wrapped.get[String](ModelTypeName) + val Array(metName, metJson) = wrapped.get[Array[String]](MetricValues) + val metricValues: EvaluationMetrics = evalMetFromJson(metName, metJson) + val modelParameters: Map[String, Any] = wrapped.get[Metadata](ModelParameters).wrapped.underlyingMap + + ModelEvaluation( + modelUID = modelUID, + modelName = modelName, + modelType = modelType, + metricValues = metricValues, + modelParameters = modelParameters + ) + } + + + val wrapped = meta.wrapped + + val validationType: ValidationType = ValidationType.withName(wrapped.get[String](ValidationTypeName)) + val validationParameters: Map[String, Any] = wrapped.get[Metadata](ValidationParameters) + .wrapped.underlyingMap + val dataPrepParameters: Map[String, Any] = wrapped.get[Metadata](DataPrepParameters) + .wrapped.underlyingMap + val dataPrepResults: Option[SplitterSummary] = + if (wrapped.contains(DataPrepResults)) { + SplitterSummary.fromMetadata(wrapped.get[Metadata](DataPrepResults)).toOption + } else None + val evaluationMetric: EvalMetric = EvalMetric.withNameInsensitive(wrapped.get[String](EvaluationMetric)) + val problemType: ProblemType = ProblemType.withName(wrapped.get[String](ProblemTypeName)) + val bestModelUID: String = wrapped.get[String](BestModelUID) + val bestModelName: String = wrapped.get[String](BestModelName) + val bestModelType: String = wrapped.get[String](BestModelType) + val validationResults: Seq[ModelEvaluation] = wrapped.get[Array[Metadata]](ValidationResults) + .map(modelEvalFromMetadata) + val Array(metName, metJson) = wrapped.get[Array[String]](TrainEvaluation) + val trainEvaluation: EvaluationMetrics = evalMetFromJson(metName, metJson) + val holdoutEvaluation: Option[EvaluationMetrics] = + if (wrapped.contains(HoldoutEvaluation)) { + val Array(metNameHold, metJsonHold) = wrapped.get[Array[String]](HoldoutEvaluation) + Option(evalMetFromJson(metNameHold, metJsonHold)) + } else None + + ModelSelectorSummary( + validationType = validationType, + validationParameters = validationParameters, + dataPrepParameters = dataPrepParameters, + dataPrepResults = dataPrepResults, + evaluationMetric = evaluationMetric, + problemType = problemType, + bestModelUID = bestModelUID, + bestModelName = bestModelName, + bestModelType = bestModelType, + validationResults = validationResults, + trainEvaluation = trainEvaluation, + holdoutEvaluation = holdoutEvaluation) + + } + + /** + * Decode metric values from JSON string + * + * @param json encoded metrics + */ + private def evalMetFromJson(className: String, json: String): EvaluationMetrics = { + def error(c: Class[_]) = throw new IllegalArgumentException( + s"Could not extract metrics of type $c from ${json.mkString(",")}" + ) + val classZZ = ReflectionUtils.classForName(className) + classZZ match { + case n if n == classOf[MultiMetrics] => + JsonUtils.fromString[Map[String, Map[String, Any]]](json).map{ d => + val asMetrics = d.flatMap{ case (_, values) => values.map{ + case (nm: String, mp: Map[String, Any]@unchecked) => + val valsJson = JsonUtils.toJsonString(mp) // gross but it works TODO try to find a better way + nm match { + case OpEvaluatorNames.Binary.humanFriendlyName => + nm -> JsonUtils.fromString[BinaryClassificationMetrics](valsJson).get + case OpEvaluatorNames.Multi.humanFriendlyName => + nm -> JsonUtils.fromString[MultiClassificationMetrics](valsJson).get + case OpEvaluatorNames.Regression.humanFriendlyName => + nm -> JsonUtils.fromString[RegressionMetrics](valsJson).get + case _ => nm -> JsonUtils.fromString[SingleMetric](valsJson).get + }} + } + MultiMetrics(asMetrics) + }.getOrElse(error(classOf[MultiMetrics])) + case n => JsonUtils.fromString(json)(ClassTag(n)).getOrElse(error(n)) + } + } + + +} + + +sealed trait ProblemType extends EnumEntry with Serializable + +object ProblemType extends Enum[ProblemType] { + val values = findValues + case object BinaryClassification extends ProblemType + case object MultiClassification extends ProblemType + case object Regression extends ProblemType + case object Unknown extends ProblemType + + def fromEvalMetrics(eval: EvaluationMetrics): ProblemType = { + eval match { + case _: BinaryClassificationMetrics => ProblemType.BinaryClassification + case _: MultiClassificationMetrics => ProblemType.MultiClassification + case _: RegressionMetrics => ProblemType.Regression + case m: MultiMetrics => + val keys = m.metrics.keySet + if (keys.exists(_.contains(OpEvaluatorNames.Binary.humanFriendlyName))) ProblemType.BinaryClassification + else if (keys.exists(_.contains(OpEvaluatorNames.Multi.humanFriendlyName))) ProblemType.MultiClassification + else if (keys.exists(_.contains(OpEvaluatorNames.Regression.humanFriendlyName))) ProblemType.Regression + else ProblemType.Unknown + case _ => ProblemType.Unknown + } + } +} + +sealed abstract class ValidationType(val humanFriendlyName: String) extends EnumEntry with Serializable +object ValidationType extends Enum[ValidationType] { + val values = findValues + case object CrossValidation extends ValidationType("Cross Validation") + case object TrainValidationSplit extends ValidationType("Train Validation Split") + + def fromValidator(validator: OpValidator[_, _]): ValidationType = { + validator match { + case _: OpCrossValidation[_, _] => CrossValidation + case _: OpTrainValidationSplit[_, _] => TrainValidationSplit + case _ => throw new IllegalArgumentException(s"Unknown validator type $validator") + } + } +} + + diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala index 2c35507d4f..236e7e0402 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala @@ -34,7 +34,7 @@ import com.salesforce.op.UID import com.salesforce.op.stages.impl.selector.ModelSelectorBaseNames import org.apache.spark.ml.param._ import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.types.{Metadata, MetadataBuilder} import org.slf4j.LoggerFactory case object DataBalancer { @@ -72,7 +72,7 @@ case object DataBalancer { class DataBalancer(uid: String = UID[DataBalancer]) extends Splitter(uid = uid) with DataBalancerParams { @transient private lazy val log = LoggerFactory.getLogger(this.getClass) - @transient private[op] val metadataBuilder = new MetadataBuilder() + @transient private[op] var summary: Option[DataBalancerSummary] = None /** * Computes the upSample and downSample proportions. @@ -164,11 +164,6 @@ class DataBalancer(uid: String = UID[DataBalancer]) extends Splitter(uid = uid) val negativeCount = negativeData.count() val totalCount = positiveCount + negativeCount val sampleF = getSampleFraction - - // feed metadata with counts and sample fraction - metadataBuilder.putLong(ModelSelectorBaseNames.Positive, positiveCount) - metadataBuilder.putLong(ModelSelectorBaseNames.Negative, negativeCount) - metadataBuilder.putDouble(ModelSelectorBaseNames.Desired, sampleF) log.info(s"Data has $positiveCount positive and $negativeCount negative.") val (smallCount, bigCount) = { @@ -186,12 +181,15 @@ class DataBalancer(uid: String = UID[DataBalancer]) extends Splitter(uid = uid) if (smallCount.toDouble / totalCount.toDouble >= sampleF) { log.info( s"Not resampling data: $smallCount small count and $bigCount big count is greater than" + - s" requested ${sampleF}" + s" requested $sampleF" ) // if data is too big downsample val fraction = if (maxTrainSample < totalCount) maxTrainSample / totalCount.toDouble else 1.0 setAlreadyBalancedFraction(fraction) + summary = Option(DataBalancerSummary(positiveLabels = positiveCount, negativeLabels = negativeCount, + desiredFraction = sampleF, upSamplingFraction = 0.0, downSamplingFraction = fraction)) + } else { log.info(s"Sampling data to get $sampleF split versus $smallCount small and $bigCount big") val (downSample, upSample) = getProportions(smallCount, bigCount, sampleF, maxTrainSample) @@ -199,9 +197,9 @@ class DataBalancer(uid: String = UID[DataBalancer]) extends Splitter(uid = uid) setDownSampleFraction(downSample) setUpSampleFraction(upSample) - // feed metadata with upsample and downsample - metadataBuilder.putDouble(ModelSelectorBaseNames.UpSample, upSample) - metadataBuilder.putDouble(ModelSelectorBaseNames.DownSample, downSample) + // feed metadata with summary + summary = Option(DataBalancerSummary(positiveLabels = positiveCount, negativeLabels = negativeCount, + desiredFraction = sampleF, upSamplingFraction = upSample, downSamplingFraction = downSample)) val (posFraction, negFraction) = if (positiveCount < negativeCount) (upSample, downSample) @@ -250,15 +248,15 @@ class DataBalancer(uid: String = UID[DataBalancer]) extends Splitter(uid = uid) // downSample and which class is in minority if (isSet(isPositiveSmall) && isSet(downSampleFraction) && isSet(upSampleFraction)) { val (down, up) = ($(downSampleFraction), $(upSampleFraction)) - log.info(s"Sample fractions: downSample of ${down}, upSample of ${up}") + log.info(s"Sample fractions: downSample of $down, upSample of $up") val (smallData, bigData) = if ($(isPositiveSmall)) (positiveData, negativeData) else (negativeData, positiveData) - new ModelData(rebalance(smallData, up, bigData, down, seed).toDF(), metadataBuilder) + new ModelData(rebalance(smallData, up, bigData, down, seed).toDF().persist(), summary) } else { // Data is already balanced, but need to be sampled val fraction = $(alreadyBalancedFraction) log.info(s"Data is already balanced, yet it will be sampled by a fraction of $fraction") val balanced = sampleBalancedData(fraction = fraction, seed = seed, data = data, positiveData = positiveData, negativeData = negativeData).toDF() - new ModelData(balanced, metadataBuilder) + new ModelData(balanced.persist(), summary) } } @@ -427,3 +425,31 @@ trait DataBalancerParams extends Params { private[op] def getAlreadyBalancedFraction: Double = $(alreadyBalancedFraction) } + +/** + * Summary for data balancer run for storage in metadata + * @param positiveLabels count of positive labels + * @param negativeLabels count of negative labels + * @param desiredFraction desired min fraction of smaller label count + * @param upSamplingFraction up/down sampling for smaller class of label + * @param downSamplingFraction down sampling for larger class of label + */ +case class DataBalancerSummary +( + positiveLabels: Long, + negativeLabels: Long, + desiredFraction: Double, + upSamplingFraction: Double, + downSamplingFraction: Double +) extends SplitterSummary { + override def toMetadata(): Metadata = { + new MetadataBuilder() + .putString(SplitterSummary.ClassName, this.getClass.getName) + .putLong(ModelSelectorBaseNames.Positive, positiveLabels) + .putLong(ModelSelectorBaseNames.Negative, negativeLabels) + .putDouble(ModelSelectorBaseNames.Desired, desiredFraction) + .putDouble(ModelSelectorBaseNames.UpSample, upSamplingFraction) + .putDouble(ModelSelectorBaseNames.DownSample, downSamplingFraction) + .build() + } +} diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala index 1ba3210fe5..58ded25a12 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataCutter.scala @@ -97,11 +97,8 @@ class DataCutter(uid: String = UID[DataCutter]) extends Splitter(uid = uid) with val dataUse = data.filter(r => keep.contains(r.getDouble(0))) - val labelsMeta = new MetadataBuilder() - .putDoubleArray(ModelSelectorBaseNames.LabelsKept, getLabelsToKeep) - .putDoubleArray(ModelSelectorBaseNames.LabelsDropped, getLabelsToDrop) - - new ModelData(dataUse, labelsMeta) + val labelsMeta = DataCutterSummary(labelsKept = getLabelsToKeep, labelsDropped = getLabelsToDrop) + new ModelData(dataUse, Option(labelsMeta)) } /** @@ -184,3 +181,23 @@ private[impl] trait DataCutterParams extends Params { private[op] def getLabelsToDrop: Array[Double] = $(labelsToDrop) } + +/** + * Summary of results for data cutter + * @param labelsKept labels retained + * @param labelsDropped labels dropped by datacutter + */ +case class DataCutterSummary +( + labelsKept: Array[Double], + labelsDropped: Array[Double] +) extends SplitterSummary { + override def toMetadata(): Metadata = { + new MetadataBuilder() + .putString(SplitterSummary.ClassName, this.getClass.getName) + .putDoubleArray(ModelSelectorBaseNames.LabelsKept, labelsKept) + .putDoubleArray(ModelSelectorBaseNames.LabelsDropped, labelsDropped) + .build() + } +} + diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index 1f4da45d2a..32a043c57c 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -33,7 +33,7 @@ package com.salesforce.op.stages.impl.tuning import com.salesforce.op.UID import org.apache.spark.ml.param._ import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.types.{Metadata, MetadataBuilder} case object DataSplitter { @@ -69,10 +69,20 @@ class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) * @return Training set test set */ def prepare(data: Dataset[Row]): ModelData = - new ModelData(data, new MetadataBuilder()) + new ModelData(data, Option(DataSplitterSummary())) override def copy(extra: ParamMap): DataSplitter = { val copy = new DataSplitter(uid) copyValues(copy, extra) } } + +/** + * Empty class because no summary information for a datasplitter + */ +case class DataSplitterSummary() extends SplitterSummary { + override def toMetadata(): Metadata = new MetadataBuilder() + .putString(SplitterSummary.ClassName, this.getClass.getName) + .build() +} + diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala index 85296599a3..93581245f8 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala @@ -50,6 +50,9 @@ private[op] class OpCrossValidation[M <: Model[_], E <: Estimator[_]] val validationName: String = ModelSelectorBaseNames.CrossValResults private val blas = BLAS.getInstance() + override def getParams(): Map[String, Any] = Map("numFolds" -> numFolds, "seed" -> seed, + "evaluator" -> evaluator.name.humanFriendlyName, "stratify" -> stratify, "parallelism" -> parallelism) + private def findBestModel( folds: Seq[ValidatedModel[E]] ): ValidatedModel[E] = { diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala index 04d1d1852d..1801a2008b 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala @@ -42,6 +42,9 @@ private[op] class OpTrainValidationSplit[M <: Model[_], E <: Estimator[_]] val validationName: String = ModelSelectorBaseNames.TrainValSplitResults + override def getParams(): Map[String, Any] = Map("trainRatio" -> trainRatio, "seed" -> seed, + "evaluator" -> evaluator.name.humanFriendlyName, "stratify" -> stratify, "parallelism" -> parallelism) + private[op] override def validate[T]( modelInfo: Seq[(E, Array[ParamMap])], dataset: Dataset[T], diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala index 80763a4c58..954a56bed4 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala @@ -30,13 +30,13 @@ package com.salesforce.op.stages.impl.tuning -import com.salesforce.op.evaluators.{OpBinaryClassificationEvaluatorBase, OpEvaluatorBase, OpMultiClassificationEvaluatorBase} -import com.salesforce.op.features.{Feature, FeatureBuilder, FeatureLike} +import com.salesforce.op.evaluators.{OpBinaryClassificationEvaluatorBase, OpEvaluatorBase, OpMultiClassificationEvaluatorBase, SingleMetric} import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} +import com.salesforce.op.features.{Feature, FeatureBuilder} import com.salesforce.op.readers.DataFrameFieldNames import com.salesforce.op.stages.OpPipelineStage2 -import com.salesforce.op.stages.base.binary.OpTransformer2 -import com.salesforce.op.stages.impl.selector.{ModelInfo, ModelSelectorBaseNames, StageParamNames} +import com.salesforce.op.stages.impl.selector.{ModelSelectorBaseNames, StageParamNames, _} +import com.salesforce.op.utils.spark.RichParamMap._ import com.salesforce.op.utils.stages.FitStagesUtil import com.salesforce.op.utils.stages.FitStagesUtil._ import org.apache.log4j.{Level, LogManager} @@ -44,35 +44,23 @@ import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.monotonically_increasing_id -import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.sql.{Dataset, Row, SparkSession, functions} import org.apache.spark.util.SparkThreadUtils import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} -import scala.reflect.runtime.universe._ -/** - * Best Model container - * - * @param name the name of the best model - * @param model best trained model - * @param metadata optional metadata - * @tparam M model type - */ -case class BestModel[M <: Model[_]](name: String, model: M, metadata: Option[MetadataBuilder] = None) - /** * Best Estimator container * * @param name the name of the best model * @param estimator best estimator - * @param metadata optional metadata + * @param summary optional metadata * @tparam E model type */ -case class BestEstimator[E <: Estimator[_]](name: String, estimator: E, metadata: MetadataBuilder = new MetadataBuilder) +case class BestEstimator[E <: Estimator[_]](name: String, estimator: E, summary: Seq[ModelEvaluation]) /** * Validated Model container @@ -104,7 +92,7 @@ private[tuning] case class ValidatedModel[E <: Estimator[_]] * Abstract class for Validator: Cross Validation or Train Validation Split * The model type should be the output of the estimator type but specifying that kills the scala compiler */ -private[impl] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializable { +private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializable { @transient protected lazy val log: Logger = LoggerFactory.getLogger(this.getClass) @@ -124,6 +112,7 @@ private[impl] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serial case _ => false } + def getParams(): Map[String, Any] /** * Function that performs the model selection @@ -166,12 +155,12 @@ private[impl] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serial "Model Selection over {} with {} with {} and the {} metric", modelsFit.map(_.model.getClass.getSimpleName).mkString(","), validationName, splitInfo, evaluator.name ) - val meta = new MetadataBuilder() - val cvFittedModels = modelsFit.map(v => updateBestModelMetadata(meta, v) -> v.bestMetric) - val newMeta = new MetadataBuilder().putMetadata(validationName, meta.build()) - val (bestModelName, _) = if (evaluator.isLargerBetter) cvFittedModels.maxBy(_._2) else cvFittedModels.minBy(_._2) + val modelSummaries = modelsFit.flatMap(v => makeModelSummary(v)) + val bestModelName = + if (evaluator.isLargerBetter) modelSummaries.maxBy(_.metricValues.asInstanceOf[SingleMetric].value).modelName + else modelSummaries.minBy(_.metricValues.asInstanceOf[SingleMetric].value).modelName - BestEstimator(name = bestModelName, estimator = bestEstimator, metadata = newMeta) + BestEstimator(name = bestModelName, estimator = bestEstimator, summary = modelSummaries) } /** @@ -179,22 +168,22 @@ private[impl] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serial * * @return best model name */ - private[op] def updateBestModelMetadata(metadataBuilder: MetadataBuilder, v: ValidatedModel[E]): String = { - val ValidatedModel(model, bestIndex, metrics, grids) = v - val modelParams = model.extractParamMap() + private[op] def makeModelSummary(v: ValidatedModel[E]): Seq[ModelEvaluation] = { + val ValidatedModel(model, _, metrics, grids) = v + val modelParams = model.extractParamMap().getAsMap() def makeModelName(index: Int) = s"${model.uid}_$index" - for {((paramGrid, met), ind) <- grids.zip(metrics).zipWithIndex} { - val paramMetBuilder = new MetadataBuilder() - paramMetBuilder.putString(evaluator.name, met.toString) - // put in all model params from the winner - modelParams.toSeq.foreach(p => paramMetBuilder.putString(p.param.name, p.value.toString)) - // override with param map values if they exists - paramGrid.toSeq.foreach(p => paramMetBuilder.putString(p.param.name, p.value.toString)) - metadataBuilder.putMetadata(makeModelName(ind), paramMetBuilder.build()) + for {((paramGrid, met), ind) <- grids.zip(metrics).zipWithIndex} yield { + val updatedParams = modelParams ++ paramGrid.getAsMap() + ModelEvaluation( + modelUID = model.uid, + modelName = makeModelName(ind), + modelType = model.getClass.getSimpleName, + metricValues = SingleMetric(evaluator.name.humanFriendlyName, met), + modelParameters = updatedParams + ) } - makeModelName(bestIndex) } @@ -296,7 +285,7 @@ private[impl] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serial case e: OpPipelineStage2[RealNN, OPVector, Prediction]@unchecked => val (labelFeat, Array(featuresFeat: Feature[OPVector]@unchecked, _)) = FeatureBuilder.fromDataFrame[RealNN](train.toDF(), response = label, - nonNullable = Set(features, DataFrameFieldNames.KeyFieldName)) + nonNullable = Set(features, ModelSelectorBaseNames.idColName)) e.setInput(labelFeat, featuresFeat) evaluator.setFullPredictionCol(e.getOutput()) case _ => // otherwise it is a spark estimator diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala index b971679da3..a0fc2108d8 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala @@ -32,19 +32,22 @@ package com.salesforce.op.stages.impl.tuning import org.apache.spark.ml.param._ import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.types.{Metadata, MetadataBuilder} +import com.salesforce.op.stages.impl.MetadataLike +import com.salesforce.op.stages.impl.selector.ModelSelectorBaseNames +import com.salesforce.op.utils.spark.RichMetadata._ +import org.apache.spark.sql.types.Metadata + +import scala.util.Try + /** * Case class for Training & test sets * * @param train training set is persisted at construction - * @param metadata metadata built at construction + * @param summary summary for building metadata */ -case class ModelData private(train: Dataset[Row], metadata: Metadata) { - def this(train: Dataset[Row], metadata: MetadataBuilder) = - this(train.persist(), metadata.build()) -} +case class ModelData(train: Dataset[Row], summary: Option[SplitterSummary]) /** * Abstract class that will carry on the creation of training set + test set @@ -112,3 +115,27 @@ object SplitterParamsDefault { val MaxLabelCategoriesDefault = 100 val MinLabelFractionDefault = 0.0 } + +trait SplitterSummary extends MetadataLike + +private[op] object SplitterSummary { + val ClassName: String = "className" + def fromMetadata(metadata: Metadata): Try[SplitterSummary] = Try { + val map = metadata.wrapped.underlyingMap + map(ClassName) match { + case s if s == classOf[DataSplitterSummary].getCanonicalName => DataSplitterSummary() + case s if s == classOf[DataBalancerSummary].getCanonicalName => DataBalancerSummary( + positiveLabels = map(ModelSelectorBaseNames.Positive).asInstanceOf[Long], + negativeLabels = map(ModelSelectorBaseNames.Negative).asInstanceOf[Long], + desiredFraction = map(ModelSelectorBaseNames.Desired).asInstanceOf[Double], + upSamplingFraction = map(ModelSelectorBaseNames.UpSample).asInstanceOf[Double], + downSamplingFraction = map(ModelSelectorBaseNames.DownSample).asInstanceOf[Double] + ) + case s if s == classOf[DataCutterSummary].getCanonicalName => DataCutterSummary( + labelsKept = map(ModelSelectorBaseNames.LabelsKept).asInstanceOf[Array[Double]], + labelsDropped = map(ModelSelectorBaseNames.LabelsDropped).asInstanceOf[Array[Double]] + ) + } + } +} + diff --git a/core/src/main/scala/com/salesforce/op/utils/spark/RichParamMap.scala b/core/src/main/scala/com/salesforce/op/utils/spark/RichParamMap.scala new file mode 100644 index 0000000000..3d8912723f --- /dev/null +++ b/core/src/main/scala/com/salesforce/op/utils/spark/RichParamMap.scala @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.utils.spark + +import com.salesforce.op.features.TransientFeature +import org.apache.spark.ml.PipelineStage +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.sql.types.StructType + +object RichParamMap { + + /** + * Enrichment functions for ParamMap + * + * @param params Metadata + */ + implicit class RichParamMap(val params: ParamMap) extends AnyVal { + + /** + * Extract param names and values from param map + * @return map of names to values + */ + def getAsMap(): Map[String, Any] = + params.toSeq.map(pp => pp.param.name -> pp.value).toMap.map{ + case (k, v: Array[_]) => + if (v.headOption.exists(_.isInstanceOf[TransientFeature])) { + k -> v.map(_.asInstanceOf[TransientFeature].toJsonString()) + } else k -> v + case (k, v: StructType) => k -> v.toString() + case (k, v: PipelineStage) => k -> v.getClass.getName + case (k, v: Option[_]) => + if (v.exists(_.isInstanceOf[PipelineStage])) { + k -> v.getClass.getName + } else k -> v + case (k, v) => k -> v + } + + } + +} diff --git a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala index 92d06edb08..17578c2e5a 100644 --- a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala +++ b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala @@ -30,19 +30,21 @@ package com.salesforce.op -import com.salesforce.op.evaluators.{BinaryClassEvalMetrics, BinaryClassificationMetrics} import com.salesforce.op.features.Feature -import com.salesforce.op.features.types.{PickList, Real, RealNN} +import com.salesforce.op.features.types.{PickList, Prediction, Real, RealNN} import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry.{LogisticRegression, NaiveBayes} import com.salesforce.op.stages.impl.preparators._ import com.salesforce.op.stages.impl.regression.RegressionModelSelector import com.salesforce.op.stages.impl.regression.RegressionModelsToTry.LinearRegression import com.salesforce.op.stages.impl.selector.SelectedModel +import com.salesforce.op.stages.impl.selector.ValidationType._ import com.salesforce.op.stages.impl.tuning.DataSplitter import com.salesforce.op.test.PassengerSparkFixtureTest +import com.salesforce.op.utils.json.JsonUtils import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} import org.junit.runner.RunWith +import org.scalactic.Equality import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @@ -51,6 +53,22 @@ import scala.util.{Failure, Success} @RunWith(classOf[JUnitRunner]) class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest { + implicit val doubleEquality = new Equality[Double] { + def areEqual(a: Double, b: Any): Boolean = b match { + case s: Double => (a.isNaN && s.isNaN) || (a == b) + case _ => false + } + } + + implicit val doubleOptEquality = new Equality[Option[Double]] { + def areEqual(a: Option[Double], b: Any): Boolean = b match { + case None => a.isEmpty + case s: Option[Double] => (a.exists(_.isNaN) && s.exists(_.isNaN)) || + (a.nonEmpty && a.toSeq.zip(s.toSeq).forall{ case (n, m) => n == m }) + case _ => false + } + } + private val density = weight / height private val generVec = genderPL.vectorize(topK = 10, minSupport = 1, cleanText = true) private val descrVec = description.vectorize(10, false, 1, true) @@ -199,7 +217,7 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest { f.variance.nonEmpty shouldBe true f.cramersV.isEmpty shouldBe true } - insights.selectedModelInfo.contains("crossValidationResults") shouldBe true + insights.selectedModelInfo.get.validationType shouldBe CrossValidation insights.trainingParams shouldEqual params insights.stageInfo.keys.size shouldEqual 13 } @@ -230,7 +248,7 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest { f.variance.isEmpty shouldBe true f.cramersV.isEmpty shouldBe true } - insights.selectedModelInfo.contains("trainValidationSplitResults") shouldBe true + insights.selectedModelInfo.get.validationType shouldBe TrainValidationSplit insights.trainingParams shouldEqual params insights.stageInfo.keys.size shouldEqual 10 } @@ -249,42 +267,11 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest { lin.head.size shouldBe OpVectorMetadata("", checked.originStage.getMetadata()).columns.length } - it should "return best model information" in { - val insights = workflowModel.modelInsights(prob) - insights.selectedModelUID should startWith("logreg_") - insights.selectedModelName should startWith("logreg_") - insights.selectedModelType shouldBe LogisticRegression - val bestModelValidationResults = insights.selectedModelValidationResults - bestModelValidationResults.size shouldBe 15 - bestModelValidationResults.get(BinaryClassEvalMetrics.AuPR.humanFriendlyName) shouldBe Some("0.0") - val validationResults = insights.validationResults - validationResults.size shouldBe 2 - validationResults.get(insights.selectedModelName) shouldBe Some(bestModelValidationResults) - insights.validationResults(LogisticRegression) shouldBe validationResults - insights.validationResults(NaiveBayes) shouldBe Map.empty - } - - it should "return test/train evaluation metrics" in { - val insights = workflowModel.modelInsights(prob) - insights.evaluationMetricType shouldBe BinaryClassEvalMetrics.AuPR - insights.validationType shouldBe ValidationType.CrossValidation - insights.validatedModelTypes shouldBe Set(LogisticRegression) - - insights.problemType shouldBe ProblemType.BinaryClassification - insights.selectedModelTrainEvalMetrics shouldBe - BinaryClassificationMetrics(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, - Seq(0.0), Seq(0.0), Seq(0.0), Seq(1.0)) - insights.selectedModelTestEvalMetrics shouldBe Some( - BinaryClassificationMetrics(0.0, 0.0, 0.0, 0.5, 0.75, 0.5, 0.0, 1.0, 0.0, 1.0, - Seq(0.0), Seq(0.5), Seq(1.0), Seq(1.0)) - ) - } - it should "pretty print" in { val insights = workflowModel.modelInsights(prob) val pretty = insights.prettyPrint() pretty should include(s"Selected Model - $LogisticRegression") - pretty should include("| area under PR | 0.0") + pretty should include("area under precision-recall | 0.0") pretty should include("Model Evaluation Metrics") pretty should include("Top Model Insights") pretty should include("Top Positive Correlations") @@ -296,9 +283,14 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest { ModelInsights.fromJson(insights.toJson()) match { case Failure(e) => fail(e) case Success(deser) => - insights.label.labelName shouldEqual deser.label.labelName - insights.features.length shouldEqual deser.features.length - insights.selectedModelInfo.keys shouldEqual deser.selectedModelInfo.keys + insights.label shouldEqual deser.label + insights.features.zip(deser.features).foreach{ + case (i, o) => + i.featureName shouldEqual o.featureName + i.featureType shouldEqual o.featureType + i.derivedFeatures.zip(o.derivedFeatures).foreach{ case (ii, io) => ii.corr shouldEqual io.corr } + } + insights.selectedModelInfo shouldEqual deser.selectedModelInfo insights.trainingParams.toJson() shouldEqual deser.trainingParams.toJson() insights.stageInfo.keys shouldEqual deser.stageInfo.keys } diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala index 6594b731e9..dfce4637cb 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala @@ -117,8 +117,8 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val summary = model1.summary() summary should include (classOf[SanityChecker].getSimpleName) - summary should include (ModelSelectorBaseNames.HoldOutEval) - summary should include (ModelSelectorBaseNames.TrainingEval) + summary should include (""""HoldoutEvaluation" : [ "com.salesforce.op.evaluators.MultiMetrics"""") + summary should include ("TrainEvaluation") } it should "return a multi classification model that runs ts at the workflow level" in new PassenserCSVforCV { @@ -167,8 +167,8 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val summary = model1.summary() log.info(summary) summary should include (classOf[SanityChecker].getSimpleName) - summary should include (ModelSelectorBaseNames.HoldOutEval) - summary should include (ModelSelectorBaseNames.TrainingEval) + summary should include (""""HoldoutEvaluation" : [ "com.salesforce.op.evaluators.MultiMetrics"""") + summary should include ("TrainEvaluation") } it should "return a regression model that runs cv at the workflow level" in new PassenserCSVforCV { @@ -210,7 +210,7 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val summary = model1.summary() log.info(summary) summary should include (classOf[SanityChecker].getSimpleName) - summary should include (ModelSelectorBaseNames.TrainingEval) + summary should include ("TrainEvaluation") } it should "return a regression model that runs ts at the workflow level" in new PassenserCSVforCV { @@ -252,8 +252,8 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val summary = model1.summary() log.info(summary) summary should include (classOf[SanityChecker].getSimpleName) - summary should include (ModelSelectorBaseNames.HoldOutEval) - summary should include (ModelSelectorBaseNames.TrainingEval) + summary should include (""""HoldoutEvaluation" : [ "com.salesforce.op.evaluators.MultiMetrics"""") + summary should include ("TrainEvaluation") } it should "avoid adding label leakage when feature engineering would introduce it" in new PassenserCSVforCV { @@ -293,15 +293,15 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val model2 = wf2.setReader(simplePassengerForCV).train() val data2 = model2.score(keepRawFeatures = false, keepIntermediateFeatures = true) - val summary1 = model1.summary() + val summary1 = model1.modelInsights(pred1) log.info("model1.summary: \n{}", summary1) - val summary2 = model2.summary() + val summary2 = model2.modelInsights(pred2) log.info("model2.summary: \n{}", summary2) - // CV - summary1 should include ("area under PR\" : \"0.802") - summary1 should not include ("area under PR\" : \"0.81") - summary2 should include ("area under PR\" : \"0.81") + summary1.selectedModelInfo.get.validationResults + .forall(_.metricValues.asInstanceOf[SingleMetric].value < 0.81) shouldBe true + summary2.selectedModelInfo.get.validationResults + .forall(_.metricValues.asInstanceOf[SingleMetric].value < 0.81) shouldBe false } def compare( @@ -333,6 +333,6 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { class Leaker(uid: String = UID[BinaryTransformer[_, _, _]]) extends BinaryTransformer[Real, RealNN, RealNN](operationName = "makeLeaker", uid = uid) { override def transformFn: (Real, RealNN) => RealNN = - (f: Real, l: RealNN) => if (l.v.exists(_ > 0)) 1.0.toRealNN else 0.0.toRealNN + (f: Real, l: RealNN) => if (l.v.exists(_ > 0)) 1.0.toRealNN else 0.0.toRealNN override def outputIsResponse: Boolean = false } diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala index 4d0228437a..8a9f9450d3 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala @@ -391,15 +391,15 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest { log.info(summary) summary should include(classOf[SanityChecker].getSimpleName) summary should include("logreg") - summary should include(""""regParam" : "0.1"""") - summary should include(""""regParam" : "0.01"""") - summary should include(ModelSelectorBaseNames.HoldOutEval) - summary should include(ModelSelectorBaseNames.TrainingEval) + summary should include(""" "regParam" : 0.1,""") + summary should include(""" "regParam" : 0.01,""") + summary should include("ValidationResults") + summary should include("HoldoutEvaluation") val prettySummary = fittedWorkflow.summaryPretty() log.info(prettySummary) prettySummary should include(s"Selected Model - $LogisticRegression") - prettySummary should include("| area under PR | 0.25") + prettySummary should include("area under precision-recall | 1.0 | 0.0") prettySummary should include("Model Evaluation Metrics") prettySummary should include("Top Model Insights") prettySummary should include("Top Positive Correlations") diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala index 2a01239dee..7a9aad927e 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala @@ -37,7 +37,7 @@ import com.salesforce.op.stages.impl.CompareParamGrid import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry._ import com.salesforce.op.stages.impl.classification.FunctionalityForClassificationTests._ import com.salesforce.op.stages.impl.classification.ProbabilisticClassifierType._ -import com.salesforce.op.stages.impl.selector.ModelSelectorBaseNames +import com.salesforce.op.stages.impl.selector.{ModelEvaluation, ModelSelectorBaseNames, ModelSelectorSummary} import com.salesforce.op.stages.impl.tuning._ import com.salesforce.op.stages.sparkwrappers.generic.{SwQuaternaryTransformer, SwTernaryTransformer} import com.salesforce.op.test.TestSparkContext @@ -291,18 +291,19 @@ class BinaryClassificationModelSelectorTest extends FlatSpec with TestSparkConte log.info(model.getMetadata().toString) // Evaluation from train data should be there - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) BinaryClassEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) // evaluation metrics from test set should be in metadata after eval run model.evaluateModel(data) - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaDataHoldOut = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + .holdoutEvaluation BinaryClassEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaDataHoldOut.get.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) val transformedData = model.transform(data) @@ -336,18 +337,18 @@ class BinaryClassificationModelSelectorTest extends FlatSpec with TestSparkConte log.info(model.getMetadata().toString) // evaluation metrics from test set should be in metadata - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) BinaryClassEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) // evaluation metrics from test set should be in metadata after eval run model.evaluateModel(data) - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaDataHoldOut = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).holdoutEvaluation BinaryClassEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaDataHoldOut.get.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) val transformedData = model.transform(data) @@ -453,23 +454,23 @@ class BinaryClassificationModelSelectorTest extends FlatSpec with TestSparkConte assert(testEstimator.evaluators.contains(crossEntropy), "Cross entropy evaluator not present in estimator") // checking trainingEval & holdOutEval metrics - val metaData = model.getMetadata().getSummaryMetadata() - val trainMetaData = metaData.getMetadata(ModelSelectorBaseNames.TrainingEval) - val holdOutMetaData = metaData.getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + val trainMetaData = metaData.trainEvaluation + val holdOutMetaData = metaData.holdoutEvaluation.get testEstimator.evaluators.foreach { case evaluator: OpBinaryClassificationEvaluator => { BinaryClassEvalMetrics.values.foreach(metric => Seq(trainMetaData, holdOutMetaData).foreach( - metadata => assert(metadata.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metadata.json) + metadata => assert(metadata.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metadata) ) ) } case evaluator: OpBinaryClassificationEvaluatorBase[_] => { Seq(trainMetaData, holdOutMetaData).foreach(metadata => - assert(metadata.contains(s"(${evaluator.name})_${evaluator.name}"), - s"Single Metric evaluator ${evaluator.name} is not present in metadata: " + metadata.json) + assert(metadata.toJson(false).contains(s"${evaluator.name.humanFriendlyName}"), + s"Single Metric evaluator ${evaluator.name} is not present in metadata: " + metadata) ) } } @@ -480,19 +481,19 @@ class BinaryClassificationModelSelectorTest extends FlatSpec with TestSparkConte val myParam = 42 val myMetaName = "myMeta" val myMetaValue = 348954389534875.432423 - val myMetadata = new MetadataBuilder().putDouble(myMetaName, myMetaValue) + val myMetadata = ModelEvaluation(myMetaName, myMetaName, myMetaName, SingleMetric(myMetaName, myMetaValue), + Map.empty) val myEstimatorName = "myEstimator" val myEstimator = new SparkLR().setMaxIter(myParam) - val bestEstimator = new BestEstimator[ProbClassifier](myEstimatorName, myEstimator, myMetadata) + val bestEstimator = new BestEstimator[ProbClassifier](myEstimatorName, myEstimator, Seq(myMetadata)) modelSelector.stage1.bestEstimator = Option(bestEstimator) val fitted = modelSelector.fit(data) fitted.getParams.get(myEstimator.maxIter).get shouldBe myParam - val meta = fitted.stage1.getMetadata().getMetadata("summary") - meta.getDouble(myMetaName) shouldBe myMetaValue - meta.getString("bestModelName") shouldBe myEstimatorName + val meta = ModelSelectorSummary.fromMetadata(fitted.stage1.getMetadata().getSummaryMetadata()) + meta.validationResults.head shouldBe myMetadata } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala index 090c5f2fd0..07b82f8407 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala @@ -37,7 +37,7 @@ import com.salesforce.op.stages.impl.CompareParamGrid import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry._ import com.salesforce.op.stages.impl.classification.FunctionalityForClassificationTests._ import com.salesforce.op.stages.impl.classification.ProbabilisticClassifierType._ -import com.salesforce.op.stages.impl.selector.ModelSelectorBaseNames +import com.salesforce.op.stages.impl.selector.{ModelEvaluation, ModelSelectorBaseNames, ModelSelectorSummary} import com.salesforce.op.stages.impl.tuning._ import com.salesforce.op.stages.sparkwrappers.generic.{SwQuaternaryTransformer, SwTernaryTransformer} import com.salesforce.op.test.TestSparkContext @@ -281,18 +281,18 @@ class MultiClassificationModelSelectorTest extends FlatSpec with TestSparkContex log.info(model.getMetadata().prettyJson) // evaluation metrics from test set should be in metadata - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) MultiClassEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.multi})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData.trainEvaluation) ) // evaluation metrics from test set should be in metadata after eval run model.evaluateModel(data) - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaData2 = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) MultiClassEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.multi})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData2.holdoutEvaluation.get.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData2.holdoutEvaluation) ) val transformedData = model.transform(data) @@ -371,23 +371,23 @@ class MultiClassificationModelSelectorTest extends FlatSpec with TestSparkContex assert(testEstimator.evaluators.contains(crossEntropy), "Cross entropy evaluator not present in estimator") // checking trainingEval & holdOutEval metrics - val metaData = model.getMetadata().getSummaryMetadata() - val trainMetaData = metaData.getMetadata(ModelSelectorBaseNames.TrainingEval) - val holdOutMetaData = metaData.getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + val trainMetaData = metaData.trainEvaluation.toJson(false) + val holdOutMetaData = metaData.holdoutEvaluation.get.toJson(false) testEstimator.evaluators.foreach { case evaluator: OpMultiClassificationEvaluator => { MultiClassEvalMetrics.values.foreach(metric => Seq(trainMetaData, holdOutMetaData).foreach( - metadata => assert(metadata.contains(s"(${OpEvaluatorNames.multi})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metadata.json) + metadata => assert(metadata.contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metadata) ) ) } case evaluator: OpMultiClassificationEvaluatorBase[_] => { Seq(trainMetaData, holdOutMetaData).foreach( - metadata => assert(metadata.contains(s"(${evaluator.name})_${evaluator.name}"), - s"Single Metric evaluator ${evaluator.name} is not present in metadata: " + metadata.json) + metadata => assert(metadata.contains(s"${evaluator.name.humanFriendlyName}"), + s"Single Metric evaluator ${evaluator.name} is not present in metadata: " + metadata) ) } } @@ -397,19 +397,19 @@ class MultiClassificationModelSelectorTest extends FlatSpec with TestSparkContex val modelSelector: MultiClassificationModelSelector = MultiClassificationModelSelector().setInput(label, features) val myParam = "entropy" val myMetaName = "myMeta" - val myMetaValue = "This is a string metadata" - val myMetadata = new MetadataBuilder().putString(myMetaName, myMetaValue) + val myMetaValue = 0.5 + val myMetadata = ModelEvaluation(myMetaName, myMetaName, myMetaName, SingleMetric(myMetaName, myMetaValue), + Map.empty) val myEstimatorName = "myEstimatorIsAwesome" val myEstimator = new DecisionTreeClassifier().setImpurity(myParam) - val bestEstimator = new BestEstimator[ProbClassifier](myEstimatorName, myEstimator, myMetadata) + val bestEstimator = new BestEstimator[ProbClassifier](myEstimatorName, myEstimator, Seq(myMetadata)) modelSelector.stage1.bestEstimator = Option(bestEstimator) val fitted = modelSelector.fit(data) fitted.getParams.get(myEstimator.impurity).get shouldBe myParam - val meta = fitted.stage1.getMetadata().getMetadata("summary") - meta.getString(myMetaName) shouldBe myMetaValue - meta.getString("bestModelName") shouldBe myEstimatorName + val meta = ModelSelectorSummary.fromMetadata(fitted.stage1.getMetadata().getSummaryMetadata()) + meta.validationResults.head shouldBe myMetadata } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index c887adc5a7..4668562c60 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -36,7 +36,7 @@ import com.salesforce.op.features.{Feature, FeatureBuilder} import com.salesforce.op.stages.impl.CompareParamGrid import com.salesforce.op.stages.impl.regression.RegressionModelsToTry._ import com.salesforce.op.stages.impl.regression.RegressorType._ -import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorBaseNames} +import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorBaseNames, ModelSelectorSummary} import com.salesforce.op.stages.impl.tuning.BestEstimator import com.salesforce.op.test.TestSparkContext import com.salesforce.op.utils.spark.RichDataset._ @@ -228,17 +228,17 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext with Co val pred = model.getOutput() // evaluation metrics from train set should be in metadata - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) RegressionEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.regression})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) // evaluation metrics from train set should be in metadata - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaDataHoldOut = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).holdoutEvaluation RegressionEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.regression})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaDataHoldOut.get.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) val transformedData = model.transform(data) @@ -355,24 +355,24 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext with Co // checking trainingEval & holdOutEval metrics model.evaluateModel(data) - val metaData = model.getMetadata().getSummaryMetadata() - val trainMetaData = metaData.getMetadata(ModelSelectorBaseNames.TrainingEval) - val holdOutMetaData = metaData.getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + val trainMetaData = metaData.trainEvaluation + val holdOutMetaData = metaData.holdoutEvaluation.get testEstimator.evaluators.foreach { case evaluator: OpRegressionEvaluator => { RegressionEvalMetrics.values.foreach(metric => Seq(trainMetaData, holdOutMetaData).foreach( - metadata => assert(metadata.contains(s"(${OpEvaluatorNames.regression})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metadata.json) + metadata => assert(metadata.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metadata) ) ) } case evaluator: OpRegressionEvaluatorBase[_] => { Seq(trainMetaData, holdOutMetaData).foreach( metadata => - assert(metadata.contains(s"(${evaluator.name})_${evaluator.name}"), - s"Single Metric evaluator ${evaluator.name} is not present in metadata: " + metadata.json) + assert(metadata.toJson(false).contains(s"${evaluator.name.humanFriendlyName}"), + s"Single Metric evaluator ${evaluator.name} is not present in metadata: " + metadata) ) } } @@ -384,14 +384,14 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext with Co val myEstimatorName = "myEstimatorIsAwesome" val myEstimator = new GBTRegressor().setCacheNodeIds(myParam) - val bestEstimator = new BestEstimator[Regressor](myEstimatorName, myEstimator.asInstanceOf[Regressor]) + val bestEstimator = new BestEstimator[Regressor](myEstimatorName, myEstimator.asInstanceOf[Regressor], Seq.empty) modelSelector.bestEstimator = Option(bestEstimator) val fitted = modelSelector.fit(data) fitted.getSparkMlStage().get.extractParamMap().get(myEstimator.cacheNodeIds).get shouldBe myParam - val meta = fitted.getMetadata().getMetadata("summary") - meta.getString("bestModelName") shouldBe myEstimatorName + val meta = ModelSelectorSummary.fromMetadata(fitted.getMetadata().getSummaryMetadata()) + meta.bestModelName shouldBe myEstimatorName } private def assertScores(scores: Array[RealNN], labels: Array[RealNN]) = { diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummaryTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummaryTest.scala new file mode 100644 index 0000000000..1bd9595542 --- /dev/null +++ b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummaryTest.scala @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.stages.impl.selector + +import com.salesforce.op.evaluators._ +import com.salesforce.op.stages.impl.tuning.DataBalancerSummary +import com.salesforce.op.test.TestSparkContext +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class ModelSelectorSummaryTest extends FlatSpec with TestSparkContext { + + Spec[ModelSelectorSummary] should "be correctly converted to and from metadata" in { + val summary = ModelSelectorSummary( + validationType = ValidationType.CrossValidation, + validationParameters = Map("testA" -> 5, "otherB" -> Array(1, 2)), + dataPrepParameters = Map("testB" -> "5", "otherB" -> Seq("1", "2")), + dataPrepResults = Option(DataBalancerSummary(100L, 300L, 0.1, 2.0, 0.5)), + evaluationMetric = BinaryClassEvalMetrics.AuROC, + problemType = ProblemType.BinaryClassification, + bestModelUID = "test1", + bestModelName = "test2", + bestModelType = "test3", + validationResults = Seq(ModelEvaluation("test4", "test5", "test6", SingleMetric("test7", 0.1), Map.empty)), + trainEvaluation = BinaryClassificationMetrics(Precision = 0.1, Recall = 0.2, F1 = 0.3, AuROC = 0.4, + AuPR = 0.5, Error = 0.6, TP = 0.7, TN = 0.8, FP = 0.9, FN = 1.0, thresholds = Seq(1.1), + precisionByThreshold = Seq(1.2), recallByThreshold = Seq(1.3), falsePositiveRateByThreshold = Seq(1.4)), + holdoutEvaluation = Option(RegressionMetrics(RootMeanSquaredError = 1.3, MeanSquaredError = 1.4, R2 = 1.5, + MeanAbsoluteError = 1.6)) + ) + + val meta = summary.toMetadata() + val decoded = ModelSelectorSummary.fromMetadata(meta) + decoded.validationType shouldEqual summary.validationType + decoded.validationParameters.keySet shouldEqual summary.validationParameters.keySet + decoded.dataPrepParameters.keySet should contain theSameElementsAs summary.dataPrepParameters.keySet + decoded.dataPrepResults shouldEqual summary.dataPrepResults + decoded.evaluationMetric.entryName shouldEqual summary.evaluationMetric.entryName + decoded.problemType shouldEqual summary.problemType + decoded.bestModelUID shouldEqual summary.bestModelUID + decoded.bestModelName shouldEqual summary.bestModelName + decoded.bestModelType shouldEqual summary.bestModelType + decoded.validationResults shouldEqual summary.validationResults + decoded.trainEvaluation shouldEqual summary.trainEvaluation + decoded.holdoutEvaluation shouldEqual summary.holdoutEvaluation + } + + it should "be correctly converted to and from metadata even when fields are missing or empty" in { + + val summary = ModelSelectorSummary( + validationType = ValidationType.TrainValidationSplit, + validationParameters = Map.empty, + dataPrepParameters = Map.empty, + dataPrepResults = None, + evaluationMetric = MultiClassEvalMetrics.Error, + problemType = ProblemType.Regression, + bestModelUID = "test1", + bestModelName = "test2", + bestModelType = "test3", + validationResults = Seq.empty, + trainEvaluation = MultiClassificationMetrics(Precision = 0.1, Recall = 0.2, F1 = 0.3, Error = 0.4, + ThresholdMetrics = ThresholdMetrics(topNs = Seq(1, 2), thresholds = Seq(1.1, 1.2), + correctCounts = Map(1 -> Seq(100L)), incorrectCounts = Map(2 -> Seq(200L)), + noPredictionCounts = Map(3 -> Seq(300L)))), + holdoutEvaluation = None + ) + + val meta = summary.toMetadata() + val decoded = ModelSelectorSummary.fromMetadata(meta) + decoded.validationType shouldEqual summary.validationType + decoded.validationParameters.keySet shouldEqual summary.validationParameters.keySet + decoded.dataPrepParameters.keySet should contain theSameElementsAs summary.dataPrepParameters.keySet + decoded.dataPrepResults shouldEqual summary.dataPrepResults + decoded.evaluationMetric.entryName shouldEqual summary.evaluationMetric.entryName + decoded.problemType shouldEqual summary.problemType + decoded.bestModelUID shouldEqual summary.bestModelUID + decoded.bestModelName shouldEqual summary.bestModelName + decoded.bestModelType shouldEqual summary.bestModelType + decoded.validationResults shouldEqual summary.validationResults + decoded.trainEvaluation shouldEqual summary.trainEvaluation + decoded.holdoutEvaluation shouldEqual summary.holdoutEvaluation + } + +} diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala index c3f495b7bd..8a40523cb4 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala @@ -38,7 +38,6 @@ import com.salesforce.op.stages.OpPipelineStage2 import com.salesforce.op.stages.base.binary.{BinaryEstimator, BinaryModel, OpTransformer2} import com.salesforce.op.stages.impl.CompareParamGrid import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpLogisticRegressionModel, OpRandomForestClassifier} -import com.salesforce.op.stages.impl.feature.JaccardSimilarity import com.salesforce.op.stages.impl.regression.{OpLinearRegression, OpLinearRegressionModel, OpRandomForestRegressor} import com.salesforce.op.stages.impl.tuning._ import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapper, OpPredictorWrapperModel} @@ -51,7 +50,6 @@ import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.mllib.random.RandomRDDs._ -import org.apache.spark.network.util.TransportFrameDecoder.Interceptor import org.apache.spark.sql.Dataset import org.junit.runner.RunWith import org.scalatest.FlatSpec @@ -125,18 +123,18 @@ class ModelSelectorTest extends FlatSpec with TestSparkContext with CompareParam log.info(model.getMetadata().toString) // Evaluation from train data should be there - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).trainEvaluation BinaryClassEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) // evaluation metrics from test set should be in metadata after eval run model.evaluateModel(data) - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaDataHoldOut = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).holdoutEvaluation BinaryClassEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaDataHoldOut.get.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) val transformedData = model.transform(data) @@ -166,18 +164,18 @@ class ModelSelectorTest extends FlatSpec with TestSparkContext with CompareParam log.info(model.getMetadata().toString) // Evaluation from train data should be there - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).trainEvaluation RegressionEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.regression})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) // evaluation metrics from test set should be in metadata after eval run model.evaluateModel(data) - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaDataHoldOut = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).holdoutEvaluation RegressionEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.regression})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaDataHoldOut.get.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) val transformedData = model.transform(data) @@ -211,18 +209,18 @@ class ModelSelectorTest extends FlatSpec with TestSparkContext with CompareParam log.info(model.getMetadata().toString) // Evaluation from train data should be there - val metaData = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.TrainingEval) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).trainEvaluation BinaryClassEvalMetrics.values.foreach(metric => - assert(metaData.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaData.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) // evaluation metrics from test set should be in metadata after eval run model.evaluateModel(data) - val metaDataHoldOut = model.getMetadata().getSummaryMetadata().getMetadata(ModelSelectorBaseNames.HoldOutEval) + val metaDataHoldOut = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()).holdoutEvaluation BinaryClassEvalMetrics.values.foreach(metric => - assert(metaDataHoldOut.contains(s"(${OpEvaluatorNames.binary})_${metric.entryName}"), - s"Metric ${metric.entryName} is not present in metadata: " + metaData.json) + assert(metaDataHoldOut.get.toJson(true).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) val transformedData = model.transform(data) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataBalancerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataBalancerTest.scala index 0d47af7464..365bdbd65b 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataBalancerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataBalancerTest.scala @@ -108,10 +108,10 @@ class DataBalancerTest extends FlatSpec with TestSparkContext { // Rerun balancer with set params - val metadata = balancer.metadataBuilder + val metadata = balancer.summary val ModelData(expected2, _) = balancer.prepare(data) withClue("Data balancer should no update the metadata"){ - balancer.metadataBuilder shouldBe metadata + balancer.summary shouldBe metadata } expected.collect() shouldBe expected2.collect() } @@ -129,10 +129,10 @@ class DataBalancerTest extends FlatSpec with TestSparkContext { balancer.getAlreadyBalancedFraction shouldBe 1.0 // Rerun balancer with set params - val metadata = balancer.metadataBuilder + val metadata = balancer.summary val ModelData(expected2, _) = balancer.prepare(data) withClue("Data balancer should no update the metadata"){ - balancer.metadataBuilder shouldBe metadata + balancer.summary shouldBe metadata } expected.collect() shouldBe expected2.collect() @@ -152,10 +152,10 @@ class DataBalancerTest extends FlatSpec with TestSparkContext { balancer.getAlreadyBalancedFraction shouldBe maxSize.toDouble / (smallCount + bigCount) // Rerun balancer with set params - val metadata = balancer.metadataBuilder + val metadata = balancer.summary val ModelData(expected2, _) = balancer.prepare(data) withClue("Data balancer should no update the metadata"){ - balancer.metadataBuilder shouldBe metadata + balancer.summary shouldBe metadata } expected.collect() shouldBe expected2.collect() diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala index 445f40e9a7..6eb50e3992 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataCutterTest.scala @@ -61,10 +61,10 @@ class DataCutterTest extends FlatSpec with TestSparkContext { val dataCutter = DataCutter(seed = seed).setMinLabelFraction(0.0).setMaxLabelCategories(100000) val split = dataCutter.prepare(randDF) split.train.count() shouldBe dataSize - val keptMeta = split.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsKept) + val keptMeta = split.summary.get.asInstanceOf[DataCutterSummary].labelsKept keptMeta.length shouldBe 1000 keptMeta should contain theSameElementsAs dataCutter.getLabelsToKeep - val dropMeta = split.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsDropped) + val dropMeta = split.summary.get.asInstanceOf[DataCutterSummary].labelsDropped dropMeta.length shouldBe 0 dropMeta should contain theSameElementsAs dataCutter.getLabelsToDrop @@ -73,8 +73,8 @@ class DataCutterTest extends FlatSpec with TestSparkContext { .setMaxLabelCategories(100000) .prepare(biasDF) split2.train.count() shouldBe dataSize - split2.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsKept).length shouldBe 1000 - split2.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsDropped).length shouldBe 0 + split2.summary.get.asInstanceOf[DataCutterSummary].labelsKept.length shouldBe 1000 + split2.summary.get.asInstanceOf[DataCutterSummary].labelsDropped.length shouldBe 0 } it should "throw an error when all the data is filtered out" in { @@ -93,13 +93,13 @@ class DataCutterTest extends FlatSpec with TestSparkContext { .prepare(randDF) findDistinct(split.train).count() shouldBe 100 - split.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsKept).length shouldBe 100 - split.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsDropped).length shouldBe 900 + split.summary.get.asInstanceOf[DataCutterSummary].labelsKept.length shouldBe 100 + split.summary.get.asInstanceOf[DataCutterSummary].labelsDropped.length shouldBe 900 val split2 = DataCutter(seed = seed).setMaxLabelCategories(3).prepare(biasDF) findDistinct(split2.train).collect().toSet shouldEqual Set(0.0, 1.0, 2.0) - split2.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsKept).length shouldBe 3 - split2.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsDropped).length shouldBe 997 + split2.summary.get.asInstanceOf[DataCutterSummary].labelsKept.length shouldBe 3 + split2.summary.get.asInstanceOf[DataCutterSummary].labelsDropped.length shouldBe 997 } it should "filter out anything that does not have at least the specified data fraction" in { @@ -113,13 +113,13 @@ class DataCutterTest extends FlatSpec with TestSparkContext { val distTrain = findDistinct(split.train) distTrain.count() < distinct shouldBe true distTrain.count() > 0 shouldBe true - split.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsKept).length + - split.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsDropped).length shouldBe distinct + split.summary.get.asInstanceOf[DataCutterSummary].labelsKept.length + + split.summary.get.asInstanceOf[DataCutterSummary].labelsDropped.length shouldBe distinct val split2 = DataCutter(seed = seed).setMinLabelFraction(0.20).setReserveTestFraction(0.5).prepare(biasDF) findDistinct(split2.train).count() shouldBe 3 - split2.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsKept).length shouldBe 3 - split2.metadata.getDoubleArray(ModelSelectorBaseNames.LabelsDropped).length shouldBe 997 + split2.summary.get.asInstanceOf[DataCutterSummary].labelsKept.length shouldBe 3 + split2.summary.get.asInstanceOf[DataCutterSummary].labelsDropped.length shouldBe 997 } it should "filter out using the var labelsToKeep" in { diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/OpValidatorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/OpValidatorTest.scala index 8b1cf851b3..f35cfe4967 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/OpValidatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/OpValidatorTest.scala @@ -88,7 +88,7 @@ class OpValidatorTest extends FlatSpec with TestSparkContext { assertFractions(Array(1 - p, p), train) assertFractions(Array(1 - p, p), validate) } - balancer.get.metadataBuilder.build() should not be new MetadataBuilder().build() + balancer.get.summary.get.toMetadata() should not be new MetadataBuilder().build() } it should "stratify multi class data" in { @@ -107,7 +107,7 @@ class OpValidatorTest extends FlatSpec with TestSparkContext { assertFractions(Array(1 - p, p), train) assertFractions(Array(1 - p, p), validate) } - balancer.get.metadataBuilder.build() should not be new MetadataBuilder().build() + balancer.get.summary.get.toMetadata() should not be new MetadataBuilder().build() } it should "stratify multi class data" in { diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageParams.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageParams.scala index 2983ade99f..a28e4dfa94 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageParams.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageParams.scala @@ -186,4 +186,5 @@ trait OpPipelineStageParams extends InputParams { object OpPipelineStageParamsNames { val OutputMetadata: String = "outputMetadata" val InputSchema: String = "inputSchema" + val InputFeatures: String = "inputFeatures" } diff --git a/utils/src/main/scala/com/salesforce/op/utils/spark/RichMetadata.scala b/utils/src/main/scala/com/salesforce/op/utils/spark/RichMetadata.scala index b37c53cba5..1fd0b99cde 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/spark/RichMetadata.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/spark/RichMetadata.scala @@ -30,6 +30,7 @@ package com.salesforce.op.utils.spark +import org.apache.spark.ml.PipelineStage import org.apache.spark.sql.types._ import scala.collection.mutable.{Map => MMap} @@ -194,24 +195,35 @@ object RichMetadata { def toMetadata: Metadata = { val builder = new MetadataBuilder() - def unsupported(k: String) = throw new RuntimeException(s"Key '$k' has unsupported value type") + def unsupported(k: String, v: Any) = + throw new RuntimeException(s"Key '$k' has unsupported value type $v of type ${v.getClass.getName}") def putCollection(key: String, seq: Seq[Any]): MetadataBuilder = seq match { case booleanSeq(v) => builder.putBooleanArray(key, v.toArray) case intSeq(v) => builder.putLongArray(key, v.map(_.toLong).toArray) case longSeq(v) => builder.putLongArray(key, v.toArray) case doubleSeq(v) => builder.putDoubleArray(key, v.toArray) case stringSeq(v) => builder.putStringArray(key, v.toArray) - case _ => unsupported(key) + case _ => unsupported(key, seq) } theMap.foldLeft(builder) { case (m, (k, v: Boolean)) => m.putBoolean(k, v) case (m, (k, v: Double)) => m.putDouble(k, v) case (m, (k, v: Long)) => m.putLong(k, v) + case (m, (k, v: Int)) => m.putLong(k, v.toLong) case (m, (k, v: String)) => m.putString(k, v) + case (m, (k, v: PipelineStage)) => m.putString(k, v.getClass.getName) + case (m, (k, v: Metadata)) => m.putMetadata(k, v) case (m, (k, v: Seq[_])) => putCollection(k, v) case (m, (k, v: Array[_])) => putCollection(k, v) case (m, (k, v: Map[_, _])) => m.putMetadata(k, v.map { case (k, v) => k.toString -> v }.toMetadata) - case (_, (k, _)) => unsupported(k) + case (m, (k, v: Option[_])) => if (v.nonEmpty) { v.get match { + case vt: Boolean => m.putBoolean(k, vt) + case vt: Double => m.putDouble(k, vt) + case vt: Long => m.putLong(k, vt) + case vt: Int => m.putLong(k, vt.toLong) + case vt: String => m.putString(k, vt) + }} else m + case (_, (k, v)) => unsupported(k, v) }.build() } } diff --git a/utils/src/test/scala/com/salesforce/op/utils/spark/RichMetadataTest.scala b/utils/src/test/scala/com/salesforce/op/utils/spark/RichMetadataTest.scala index cd087d1f9c..4af69df785 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/spark/RichMetadataTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/spark/RichMetadataTest.scala @@ -56,7 +56,8 @@ class RichMetadataTest extends FlatSpec with TestCommon { } it should "throw an error on unsupported type in a map" in { - the[RuntimeException] thrownBy Map("a" -> Map("b" -> 1)).toMetadata + the[RuntimeException] thrownBy Map("a" -> TestClass("test")).toMetadata } } +case class TestClass(name: String)