From 7e22443940be8c3b3a7b26da5f47adb099dc7fe0 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 11 Jul 2019 09:11:10 -0700 Subject: [PATCH 1/2] Replace mapValues with immutable Map where applicable --- .../OpMultiClassificationEvaluator.scala | 8 +- .../op/filters/PreparedFeatures.scala | 4 +- .../impl/feature/PhoneNumberParser.scala | 2 +- .../feature/TimePeriodMapTransformer.scala | 2 +- .../op/filters/PreparedFeaturesTest.scala | 78 ++++++++++++------- .../op/aggregators/ExtendedMultiset.scala | 2 +- 6 files changed, 59 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala index 0f5ff11f11..372db9b43e 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala @@ -232,10 +232,10 @@ private[op] class OpMultiClassificationEvaluator ThresholdMetrics( topNs = topNs, thresholds = thresholds, - correctCounts = agg.mapValues { case (cor, _) => cor.toSeq }, - incorrectCounts = agg.mapValues { case (_, incor) => incor.toSeq }, - noPredictionCounts = agg.mapValues { case (cor, incor) => - (Array.fill(nThresholds)(nRows) + cor.map(-_) + incor.map(-_)).toSeq + correctCounts = agg.map { case (k, (cor, _)) => k -> cor.toSeq }, + incorrectCounts = agg.map { case (k, (_, incor)) => k -> incor.toSeq }, + noPredictionCounts = agg.map { case (k, (cor, incor)) => + k -> (Array.fill(nThresholds)(nRows) + cor.map(-_) + incor.map(-_)).toSeq } ) } diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 0c725cdc2f..7d15633f4d 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -57,12 +57,12 @@ private[filters] case class PreparedFeatures * @return pair consisting of response and predictor summaries (in this order) */ def summaries: (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) = - responses.mapValues(Summary(_)) -> predictors.mapValues(Summary(_)) + responses.map { case (k, s) => k -> Summary(s) } -> predictors.map { case (k, s) => k -> Summary(s) } /** * Computes vector of size responseKeys.length + predictorKeys.length. The first responses.length * values are the actual response values (nulls replaced with 0.0). Its (i + responses.length)th value - * is 1 iff. the predictor associated to ith feature key is null, for i >= 0. + * is 1 iff. the predictor associated to ith f eature key is null, for i >= 0. * * @param responseKeys response feature keys * @param predictorKeys set of all predictor keys needed for constructing binary vector diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/PhoneNumberParser.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/PhoneNumberParser.scala index 537d0ad3c2..3f5cba75d8 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/PhoneNumberParser.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/PhoneNumberParser.scala @@ -247,7 +247,7 @@ class IsValidPhoneMapDefaultCountry(uid: String = UID[IsValidPhoneMapDefaultCoun phoneNumberMap.value .mapValues(p => PhoneNumberParser.validate(p.toPhone, region, isStrict)) - .collect{ case(k, v) if !v.isEmpty => k -> v.value.get }.toBinaryMap + .collect { case (k, SomeValue(Some(b))) => k -> b }.toBinaryMap } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/TimePeriodMapTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/TimePeriodMapTransformer.scala index ba6d3c346b..8e7c5cc65e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/TimePeriodMapTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/TimePeriodMapTransformer.scala @@ -53,5 +53,5 @@ class TimePeriodMapTransformer[I <: DateMap] ) extends UnaryTransformer[I, IntegralMap](operationName = "dateMapToTimePeriod", uid = uid) { override def transformFn: I => IntegralMap = - (i: I) => i.value.mapValues(t => period.extractIntFromMillis(t).toLong).toIntegralMap + (i: I) => i.value.map { case (k, t) => k -> period.extractIntFromMillis(t).toLong }.toIntegralMap } diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 38689d7615..48caeedcce 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -32,6 +32,7 @@ package com.salesforce.op.filters import com.salesforce.op.features.types._ import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} +import com.salesforce.op.filters.Summary._ import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest} @@ -42,38 +43,13 @@ import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import com.salesforce.op.filters.Summary._ + +import scala.util.{Failure, Success, Try} @RunWith(classOf[JUnitRunner]) class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { - val responseKey1: FeatureKey = "Response1" -> None - val responseKey2: FeatureKey = "Response2" -> None - val predictorKey1: FeatureKey = "Predictor1" -> None - val predictorKey2A: FeatureKey = "Predictor2" -> Option("A") - val predictorKey2B: FeatureKey = "Predictor2" -> Option("B") - - val preparedFeatures1 = PreparedFeatures( - responses = Map(responseKey1 -> Right(Seq(1.0)), responseKey2 -> Right(Seq(0.5))), - predictors = Map( - predictorKey1 -> Right(Seq(0.0, 0.0)), - predictorKey2A -> Left(Seq("i", "ii")), - predictorKey2B -> Left(Seq("iii")))) - val preparedFeatures2 = PreparedFeatures( - responses = Map(responseKey1 -> Right(Seq(0.0))), - predictors = Map(predictorKey1 -> Right(Seq(0.4, 0.5)))) - val preparedFeatures3 = PreparedFeatures( - responses = Map(responseKey2 -> Right(Seq(-0.5))), - predictors = Map(predictorKey2A -> Left(Seq("iv")))) - val allPreparedFeatures = Seq(preparedFeatures1, preparedFeatures2, preparedFeatures3) - implicit val sgTuple2 = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]() - val (allResponseSummaries, allPredictorSummaries) = allPreparedFeatures.map(_.summaries).reduce(_ + _) - - val allResponseKeys1 = Array(responseKey1, responseKey2) - val allResponseKeys2 = Array(responseKey1) - val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B) - val allPredictorKeys2 = Array(predictorKey1) - + import PreparedFeaturesTestData._ Spec[PreparedFeatures] should "produce correct summaries" in { val (responseSummaries1, predictorSummaries1) = preparedFeatures1.summaries @@ -100,6 +76,15 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { predictorKey2B -> Summary(1.0, 1.0, 1.0, 1)) } + it should "produce summaries that are serializable" in { + Try(spark.sparkContext.makeRDD(allPreparedFeatures).map(_.summaries).reduce(_ + _)) match { + case Failure(error) => fail(error) + case Success((responses, predictors)) => + responses shouldBe allResponseSummaries + predictors shouldBe allPredictorSummaries + } + } + it should "produce correct null-label leakage vector with single response" in { preparedFeatures1.getNullLabelLeakageVector(allResponseKeys2, allPredictorKeys1).toArray shouldEqual Array(1.0, 0.0, 0.0, 0.0) @@ -218,3 +203,40 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { }.toSeq should contain theSameElementsInOrderAs expectedResult } } + +object PreparedFeaturesTestData { + + val responseKey1: FeatureKey = "Response1" -> None + val responseKey2: FeatureKey = "Response2" -> None + val predictorKey1: FeatureKey = "Predictor1" -> None + val predictorKey2A: FeatureKey = "Predictor2" -> Option("A") + val predictorKey2B: FeatureKey = "Predictor2" -> Option("B") + + val preparedFeatures1 = PreparedFeatures( + responses = Map(responseKey1 -> Right(Seq(1.0)), responseKey2 -> Right(Seq(0.5))), + predictors = Map( + predictorKey1 -> Right(Seq(0.0, 0.0)), + predictorKey2A -> Left(Seq("i", "ii")), + predictorKey2B -> Left(Seq("iii"))) + ) + + val preparedFeatures2 = PreparedFeatures( + responses = Map(responseKey1 -> Right(Seq(0.0))), + predictors = Map(predictorKey1 -> Right(Seq(0.4, 0.5))) + ) + + val preparedFeatures3 = PreparedFeatures( + responses = Map(responseKey2 -> Right(Seq(-0.5))), + predictors = Map(predictorKey2A -> Left(Seq("iv"))) + ) + + val allPreparedFeatures = Seq(preparedFeatures1, preparedFeatures2, preparedFeatures3) + implicit val sgTuple2 = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]() + val (allResponseSummaries, allPredictorSummaries) = allPreparedFeatures.map(_.summaries).reduce(_ + _) + + val allResponseKeys1 = Array(responseKey1, responseKey2) + val allResponseKeys2 = Array(responseKey1) + val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B) + val allPredictorKeys2 = Array(predictorKey1) + +} diff --git a/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala b/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala index 660fb74e23..407555b3cc 100644 --- a/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala +++ b/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala @@ -41,7 +41,7 @@ import com.twitter.algebird._ * However, order does not matter, so {a, a, b} and {a, b, a} are the same multiset. */ trait ExtendedMultiset extends MapMonoid[String, Long] with Group[Map[String, Long]] { - override def negate(kv: Map[String, Long]): Map[String, Long] = kv.mapValues { v => -v } + override def negate(kv: Map[String, Long]): Map[String, Long] = kv.map { case (k, v) => k -> -v } override def minus(x: Map[String, Long], y: Map[String, Long]): Map[String, Long] = { val keys = x.keySet ++ y.keySet From a639d36fb0228da09aac15bb75803a08340878fb Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 11 Jul 2019 09:14:16 -0700 Subject: [PATCH 2/2] comment --- .../main/scala/com/salesforce/op/filters/PreparedFeatures.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 7d15633f4d..2907000258 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -62,7 +62,7 @@ private[filters] case class PreparedFeatures /** * Computes vector of size responseKeys.length + predictorKeys.length. The first responses.length * values are the actual response values (nulls replaced with 0.0). Its (i + responses.length)th value - * is 1 iff. the predictor associated to ith f eature key is null, for i >= 0. + * is 1 iff. the predictor associated to ith feature key is null, for i >= 0. * * @param responseKeys response feature keys * @param predictorKeys set of all predictor keys needed for constructing binary vector