Skip to content

Commit

Permalink
Tweaks to OpBinScoreEvaluator (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
shaeselix authored and leahmcguire committed Apr 3, 2019
1 parent df42f37 commit 6bb63ba
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.salesforce.op.UID
import com.twitter.algebird.Operators._
import com.twitter.algebird.Tuple4Semigroup
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.DoubleType
Expand All @@ -46,8 +47,8 @@ import org.slf4j.LoggerFactory
* This evaluator creates the specified number of bins and computes the statistics for each bin
* and returns [[BinaryClassificationBinMetrics]].
*
* @param numOfBins number of bins to produce
* @param uid uid for instance
* @param numOfBins number of bins to produce
* @param uid uid for instance
*/
private[op] class OpBinScoreEvaluator
(
Expand All @@ -66,60 +67,69 @@ private[op] class OpBinScoreEvaluator
val labelColumnName = getLabelCol
val dataToUse = makeDataToUse(data, labelColumnName)
.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd
val scoreAndLabels = dataToUse.map {
case Row(prob: Vector, label: Double) => (prob(1), label)
case Row(prob: Double, label: Double) => (prob, label)
}
evaluateScoreAndLabels(scoreAndLabels)
}

if (dataToUse.isEmpty()) {
log.warn("The dataset is empty. Returning empty metrics.")
BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq())
} else {
val scoreAndLabels = dataToUse.map {
case Row(prob: Vector, label: Double) => (prob(1), label)
case Row(prob: Double, label: Double) => (prob, label)
}
def evaluateScoreAndLabels(scoreAndLabels: RDD[(Double, Double)]): BinaryClassificationBinMetrics = {

val (maxScore, minScore) = scoreAndLabels.map {
case (score, _) => (score, score)
}.fold(1.0, 0.0) {
case ((maxVal, minVal), (scoreMax, scoreMin)) =>
(math.max(maxVal, scoreMax), math.min(minVal, scoreMin))
}
val (maxScore, minScore) = scoreAndLabels.map {
case (score, _) => (score, score)
}.fold(1.0, 0.0) {
case ((maxVal, minVal), (scoreMax, scoreMin)) =>
(math.max(maxVal, scoreMax), math.min(minVal, scoreMin))
}

// Finding stats per bin -> avg score, avg conv rate,
// total num of data points and overall brier score.
implicit val sg = new Tuple4Semigroup[Double, Double, Long, Double]()
val stats = scoreAndLabels.map {
case (score, label) =>
(getBinIndex(score, minScore, maxScore), (score, label, 1L, math.pow(score - label, 2)))
}.reduceByKey(_ + _).map {
case (bin, (scoreSum, labelSum, count, squaredError)) =>
(bin, scoreSum / count, labelSum / count, count, squaredError)
}.collect()

val zero = (new Array[Double](numOfBins), new Array[Double](numOfBins), new Array[Long](numOfBins), 0.0, 0L)
val (averageScore, averageConversionRate, numberOfDataPoints, brierScoreSum, numberOfPoints) =
stats.foldLeft(zero) {
case ((score, convRate, dataPoints, brierScoreSum, totalPoints),
(binIndex, avgScore, avgConvRate, counts, squaredError)) =>
score(binIndex) = avgScore
convRate(binIndex) = avgConvRate
dataPoints(binIndex) = counts
(score, convRate, dataPoints, brierScoreSum + squaredError, totalPoints + counts)
}

// binCenters is the center point in each bin.
// e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75].
val diff = maxScore - minScore
val binCenters = for {i <- 0 until numOfBins} yield minScore + ((diff * i) / numOfBins) + (diff / (2 * numOfBins))

val metrics = BinaryClassificationBinMetrics(
BrierScore = brierScoreSum / numberOfPoints,
binCenters = binCenters,
numberOfDataPoints = numberOfDataPoints,
averageScore = averageScore,
averageConversionRate = averageConversionRate
)

log.info("Evaluated metrics: {}", metrics.toString)
metrics
// Finding stats per bin -> avg score, avg conv rate,
// total num of data points and overall brier score.
implicit val sg = new Tuple4Semigroup[Double, Long, Long, Double]()
val stats = scoreAndLabels.map {
case (score, label) =>
(getBinIndex(score, minScore, maxScore),
(score, if (label > 0.0) 1L else 0L, 1L, math.pow(score - label, 2)))
}.reduceByKey(_ + _).map {
case (bin, (scoreSum, positiveCount, count, squaredError)) =>
(bin, scoreSum, positiveCount, count, squaredError)
}.collect()

stats.toList match {
case Nil => BinaryClassificationBinMetrics.empty
case _ => {
val zero = (new Array[Double](numOfBins), new Array[Double](numOfBins),
new Array[Long](numOfBins), new Array[Long](numOfBins), 0.0, 0L)
val (averageScore, averageConversionRate, numberOfDataPoints, positiveLabels, brierScoreSum, numberOfPoints) =
stats.foldLeft(zero) {
case ((score, convRate, dataPoints, positiveLabels, brierScoreSum, totalPoints),
(binIndex, scoreSum, positiveCount, counts, squaredError)) =>
score(binIndex) = scoreSum / counts
convRate(binIndex) = positiveCount.toDouble / counts
dataPoints(binIndex) = counts
positiveLabels(binIndex) = positiveCount
(score, convRate, dataPoints, positiveLabels, brierScoreSum + squaredError, totalPoints + counts)
}

// binCenters is the center point in each bin.
// e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75].
val diff = maxScore - minScore
val binCenters = for {i <- 0 until numOfBins}
yield minScore + ((diff * i) / numOfBins) + (diff / (2 * numOfBins))

val metrics = BinaryClassificationBinMetrics(
BrierScore = brierScoreSum / numberOfPoints,
binSize = diff / numOfBins,
binCenters = binCenters,
numberOfDataPoints = numberOfDataPoints,
numberOfPositiveLabels = positiveLabels,
averageScore = averageScore,
averageConversionRate = averageConversionRate
)

log.info("Evaluated metrics: {}", metrics.toString)
metrics
}
}
}

Expand All @@ -133,21 +143,31 @@ private[op] class OpBinScoreEvaluator
/**
* Metrics of BinaryClassificationBinMetrics
*
* @param BrierScore brier score for overall dataset
* @param binCenters center of each bin
* @param numberOfDataPoints total number of data points in each bin
* @param averageScore average score in each bin
* @param averageConversionRate average conversion rate in each bin
* @param BrierScore brier score for overall dataset
* @param binSize size of each bin
* @param binCenters center of each bin
* @param numberOfDataPoints total number of data points in each bin
* @param numberOfPositiveLabels count of labels > 0 in each bin
* @param averageScore average score in each bin
* @param averageConversionRate average conversion rate in each bin
*/
case class BinaryClassificationBinMetrics
(
BrierScore: Double,
binSize: Double,
@JsonDeserialize(contentAs = classOf[java.lang.Double])
binCenters: Seq[Double],
@JsonDeserialize(contentAs = classOf[java.lang.Long])
numberOfDataPoints: Seq[Long],
@JsonDeserialize(contentAs = classOf[java.lang.Double])
numberOfPositiveLabels: Seq[Long],
@JsonDeserialize(contentAs = classOf[java.lang.Double])
averageScore: Seq[Double],
@JsonDeserialize(contentAs = classOf[java.lang.Double])
averageConversionRate: Seq[Double]
) extends EvaluationMetrics

object BinaryClassificationBinMetrics {
def empty: BinaryClassificationBinMetrics =
BinaryClassificationBinMetrics(0.0, 0.0, Seq(), Seq(), Seq(), Seq(), Seq())
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ import org.slf4j.LoggerFactory
* The metrics are AUROC, AUPR, Precision, Recall, F1 and Error Rate
* Default evaluation returns AUROC
*
* @param name name of default metric
* @param name name of default metric
* @param isLargerBetter is metric better if larger
* @param uid uid for instance
* @param uid uid for instance
*/

private[op] class OpBinaryClassificationEvaluator
Expand Down Expand Up @@ -123,7 +123,8 @@ private[op] class OpBinaryClassificationEvaluator
}
}

final protected def getBinaryEvaluatorMetric(
final protected def getBinaryEvaluatorMetric
(
metricName: ClassificationEvalMetric,
dataset: Dataset[_],
default: => Double
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest {
pretty should not include m.modelName
}
}
pretty should include("area under precision-recall | 1.0")
pretty should include regex raw"area under precision-recall\s+|\s+1.0"
pretty should include("Model Evaluation Metrics")
pretty should include("Top Model Insights")
pretty should include("Top Positive Correlations")
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {

val prettySummary = fittedWorkflow.summaryPretty()
log.info(prettySummary)
prettySummary should include regex raw"area under precision-recall\s+|\s+1.0\s+|\s+0.0"
prettySummary should include("Selected Model - OpLogisticRegression")
prettySummary should include("area under precision-recall | 1.0 | 0.0")
prettySummary should include("Model Evaluation Metrics")
prettySummary should include("Top Model Insights")
prettySummary should include("Top Positive Correlations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext {

metrics shouldBe BinaryClassificationBinMetrics(
0.09800605366,
0.25,
Seq(0.125, 0.375, 0.625, 0.875),
Seq(2, 0, 1, 2),
Seq(0, 0, 0, 2),
Seq(0.003205, 0.0, 0.7, 0.99999),
Seq(0.0, 0.0, 0.0, 1.0)
)
Expand All @@ -89,8 +91,10 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext {

metrics shouldBe BinaryClassificationBinMetrics(
40.999986666733335,
3.2499975,
Seq(0.62500875, 3.87500625, 7.125003749999999, 10.37500125),
Seq(2, 0, 0, 1),
Seq(1, 0, 0, 1),
Seq(0.49999999999999994, 0.0, 0.0, 12.0),
Seq(0.5, 0.0, 0.0, 1.0)
)
Expand All @@ -107,7 +111,7 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext {
val metrics = new OpBinScoreEvaluator(numOfBins = 10)
.setLabelCol(labelEmptyData.name).setPredictionCol(predictionEmptyData.name).evaluateAll(emptyData)

metrics shouldBe BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq())
metrics shouldBe BinaryClassificationBinMetrics(0.0, 0.0, Seq(), Seq(), Seq(), Seq(), Seq())
}

it should "evaluate bin metrics for skewed data" in {
Expand All @@ -116,8 +120,10 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext {

metrics shouldBe BinaryClassificationBinMetrics(
7.294225500000013E-4,
0.2,
Seq(0.1, 0.30000000000000004, 0.5, 0.7, 0.9),
Seq(0, 0, 0, 0, 4),
Seq(0, 0, 0, 0, 4),
Seq(0.0, 0.0, 0.0, 0.0, 0.98617),
Seq(0.0, 0.0, 0.0, 0.0, 1.0)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ package com.salesforce.op.evaluators
import com.salesforce.op.evaluators.BinaryClassEvalMetrics._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification.{BinaryClassificationModelSelector, OpLogisticRegression}
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType
import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.evaluation._
Expand Down Expand Up @@ -167,7 +166,7 @@ class OpBinaryClassificationEvaluatorTest extends FlatSpec with TestSparkContext

val (tp, tn, fp, fn, precision, recall, f1) = getPosNegValues(
transformedData2.select(prediction.name, test_label.name).rdd
.map( r => Row(r.getMap[String, Double](0).toMap.toPrediction.prediction, r.getDouble(1)) )
.map(r => Row(r.getMap[String, Double](0).toMap.toPrediction.prediction, r.getDouble(1)))
)

tp.toDouble shouldBe metrics.TP
Expand Down Expand Up @@ -197,7 +196,6 @@ class OpBinaryClassificationEvaluatorTest extends FlatSpec with TestSparkContext
}



it should "evaluate the metrics on dataset with only the label and prediction 1" in {
val transformedDataOne = model.setInput(one_label, one_features).transform(one_ds)

Expand Down

0 comments on commit 6bb63ba

Please sign in to comment.