From ab521f8984b0c6a028de23a3f6591125838abc35 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Wed, 31 Oct 2018 17:20:44 -0700 Subject: [PATCH] Improve test coverage for VectorsCombiner and make vector aggregator efficient (#168) --- .../DecisionTreeNumericMapBucketizer.scala | 3 +- .../OPCollectionHashingVectorizer.scala | 5 +- .../impl/feature/SmartTextMapVectorizer.scala | 3 +- .../impl/feature/SmartTextVectorizer.scala | 2 +- .../stages/impl/feature/VectorsCombiner.scala | 41 +------ .../feature/SmartTextVectorizerTest.scala | 4 +- .../impl/feature/VectorsCombinerTest.scala | 56 +++++---- .../MonoidAggregatorDefaults.scala | 2 +- .../salesforce/op/aggregators/OPVector.scala | 18 ++- .../op/features/types/OPVector.scala | 39 +++++++ .../op/utils/spark/RichVector.scala | 50 ++++++++- .../MonoidAggregatorDefaultsTest.scala | 12 +- .../op/features/types/OPVectorTest.scala | 106 ++++++++++++++++++ .../op/utils/spark/RichVectorTest.scala | 57 +++++++++- 14 files changed, 318 insertions(+), 80 deletions(-) create mode 100644 features/src/test/scala/com/salesforce/op/features/types/OPVectorTest.scala diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizer.scala index 2666ed9b40..3548994aa7 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizer.scala @@ -36,6 +36,7 @@ import com.salesforce.op.stages.AllowLabelAsInput import com.salesforce.op.stages.base.binary.{BinaryEstimator, BinaryModel} import com.salesforce.op.utils.spark.OpVectorColumnMetadata import com.salesforce.op.utils.spark.RichDataset._ +import com.salesforce.op.utils.spark.RichVector._ import org.apache.spark.sql.Dataset import org.apache.spark.sql.types.Metadata @@ -163,7 +164,7 @@ final class DecisionTreeNumericMapBucketizerModel[I2 <: OPMap[_]] private[op] input = cleanedInputMap.get(k) ) } - VectorsCombiner.combine(vectors).toOPVector + combine(vectors).toOPVector } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/OPCollectionHashingVectorizer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/OPCollectionHashingVectorizer.scala index 18083c84fe..fe28748f60 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/OPCollectionHashingVectorizer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/OPCollectionHashingVectorizer.scala @@ -36,6 +36,7 @@ import com.salesforce.op.features.types._ import com.salesforce.op.stages.OpPipelineStageBase import com.salesforce.op.stages.base.sequence.SequenceTransformer import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} +import com.salesforce.op.utils.spark.RichVector._ import org.apache.spark.ml.linalg.{DenseVector, SparseVector} import org.apache.spark.ml.param._ import org.apache.spark.mllib.feature.HashingTF @@ -265,7 +266,7 @@ private[op] trait HashingFun { fNameHashesWithInputs.map { case (featureNameHash, el) => hasher.transform(prepare[T](el, params.hashWithIndex, params.prependFeatureName, featureNameHash)).asML } - VectorsCombiner.combine(hashedVecs).toOPVector + combine(hashedVecs).toOPVector } } } @@ -379,7 +380,7 @@ private[op] trait MapHashingFun extends HashingFun { prepare[TextList](el, params.hashWithIndex, params.prependFeatureName, featureNameHash) ).asML }) - VectorsCombiner.combine(hashedVecs.flatten).toOPVector + combine(hashedVecs.flatten).toOPVector } } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextMapVectorizer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextMapVectorizer.scala index b2e4615bea..cbdd13992e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextMapVectorizer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextMapVectorizer.scala @@ -264,7 +264,8 @@ final class SmartTextMapVectorizerModel[T <: OPMap[String]] private[op] val textVector = hash(rowTextTokenized, keysText, args.hashingParams) val textNullIndicatorsVector = if (args.shouldTrackNulls) Seq(getNullIndicatorsVector(keysText, rowTextTokenized)) else Nil - VectorsCombiner.combineOP(Seq(categoricalVector, textVector) ++ textNullIndicatorsVector) + + categoricalVector.combine(textVector, textNullIndicatorsVector: _*) } private def getNullIndicatorsVector(keysSeq: Seq[Seq[String]], inputs: Seq[Map[String, TextList]]): OPVector = { diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizer.scala index a5f0157d50..329e8ddd6a 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizer.scala @@ -216,7 +216,7 @@ final class SmartTextVectorizerModel[T <: Text] private[op] val textVector: OPVector = hash[TextList](textTokens, getTextTransientFeatures, args.hashingParams) val textNullIndicatorsVector = if (args.shouldTrackNulls) Seq(getNullIndicatorsVector(textTokens)) else Seq.empty - VectorsCombiner.combineOP(Seq(categoricalVector, textVector) ++ textNullIndicatorsVector) + categoricalVector.combine(textVector, textNullIndicatorsVector: _*) } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/VectorsCombiner.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/VectorsCombiner.scala index da6d2c08ea..09c4aa816a 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/VectorsCombiner.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/VectorsCombiner.scala @@ -81,42 +81,9 @@ class VectorsCombiner(uid: String = UID[VectorsCombiner]) final class VectorsCombinerModel private[op] (operationName: String, uid: String) extends SequenceModel[OPVector, OPVector](operationName = operationName, uid = uid) { - def transformFn: Seq[OPVector] => OPVector = VectorsCombiner.combineOP -} - -case object VectorsCombiner { - - /** - * Combine multiple OP vectors into one - * - * @param vectors input vectors - * @return result vector - */ - def combineOP(vectors: Seq[OPVector]): OPVector = { - new OPVector(combine(vectors.view.map(_.value))) - } - - /** - * Combine multiple vectors into one - * - * @param vectors input vectors - * @return result vector - */ - def combine(vectors: Seq[Vector]): Vector = { - val indices = ArrayBuffer.empty[Int] - val values = ArrayBuffer.empty[Double] - - val size = vectors.foldLeft(0)((size, vector) => { - vector.foreachActive { case (i, v) => - if (v != 0.0) { - indices += size + i - values += v - } - } - size + vector.size - }) - Vectors.sparse(size, indices.toArray, values.toArray).compressed + def transformFn: Seq[OPVector] => OPVector = s => s.toList match { + case v1 :: v2 :: tail => v1.combine(v2, tail: _*) + case v :: Nil => v + case Nil => OPVector.empty } - } - diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizerTest.scala index 2bd0e421f1..e77711a7cd 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/SmartTextVectorizerTest.scala @@ -91,7 +91,7 @@ class SmartTextVectorizerTest val textRes = transformed.collect(textVectorized) assertNominal(fieldText, Array.fill(textRes.head.value.size)(false), textRes) val (smart, expected) = result.map { case (smartVector, categoricalVector, textVector, nullVector) => - val combined = VectorsCombiner.combineOP(Seq(categoricalVector, textVector, nullVector)) + val combined = categoricalVector.combine(textVector, nullVector) smartVector -> combined }.unzip @@ -139,7 +139,7 @@ class SmartTextVectorizerTest val textRes = transformed.collect(textVectorized) assertNominal(fieldText, Array.fill(textRes.head.value.size)(false), textRes) val (smart, expected) = result.map { case (smartVector, textVector, nullVector) => - val combined = VectorsCombiner.combineOP(Seq(textVector, nullVector)) + val combined = textVector.combine(nullVector) smartVector -> combined }.unzip diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/VectorsCombinerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/VectorsCombinerTest.scala index 0fe018c71e..5fba9855ff 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/VectorsCombinerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/VectorsCombinerTest.scala @@ -31,31 +31,42 @@ package com.salesforce.op.stages.impl.feature import com.salesforce.op._ -import com.salesforce.op.features.types.Text -import com.salesforce.op.features.{FeatureLike, TransientFeature} -import com.salesforce.op.test.PassengerSparkFixtureTest +import com.salesforce.op.features.TransientFeature +import com.salesforce.op.features.types.{Text, _} +import com.salesforce.op.stages.base.sequence.SequenceModel +import com.salesforce.op.test.{OpEstimatorSpec, PassengerSparkFixtureTest, TestFeatureBuilder} +import com.salesforce.op.testkit.{RandomReal, RandomVector} import com.salesforce.op.utils.spark.OpVectorMetadata +import com.salesforce.op.utils.spark.RichMetadata._ import org.apache.spark.ml.linalg.Vectors import org.junit.runner.RunWith -import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import com.salesforce.op.utils.spark.RichMetadata._ + @RunWith(classOf[JUnitRunner]) -class VectorsCombinerTest extends FlatSpec with PassengerSparkFixtureTest { +class VectorsCombinerTest + extends OpEstimatorSpec[OPVector, SequenceModel[OPVector, OPVector], VectorsCombiner] + with PassengerSparkFixtureTest { - val vectors = Seq( - Vectors.sparse(4, Array(0, 3), Array(1.0, 1.0)), - Vectors.dense(Array(2.0, 3.0, 4.0)), - Vectors.sparse(4, Array(1), Array(777.0)) - ) - val expected = Vectors.sparse(11, Array(0, 3, 4, 5, 6, 8), Array(1.0, 1.0, 2.0, 3.0, 4.0, 777.0)) + override def specName: String = classOf[VectorsCombiner].getSimpleName - Spec[VectorsCombiner] should "combine vectors correctly" in { - val combined = VectorsCombiner.combine(vectors) - assert(combined.compressed == combined, "combined is expected to be compressed") - combined shouldBe expected - } + val (inputData, f1, f2) = TestFeatureBuilder(Seq( + Vectors.sparse(4, Array(0, 3), Array(1.0, 1.0)).toOPVector -> + Vectors.sparse(4, Array(0, 3), Array(2.0, 3.0)).toOPVector, + Vectors.dense(Array(2.0, 3.0, 4.0)).toOPVector -> + Vectors.dense(Array(12.0, 13.0, 14.0)).toOPVector, + // Purposely added some very large sparse vectors to verify the efficiency + Vectors.sparse(100000000, Array(1), Array(777.0)).toOPVector -> + Vectors.sparse(500000000, Array(0), Array(888.0)).toOPVector + )) + + val estimator = new VectorsCombiner().setInput(f1, f2) + + val expectedResult = Seq( + Vectors.sparse(8, Array(0, 3, 4, 7), Array(1.0, 1.0, 2.0, 3.0)).toOPVector, + Vectors.dense(Array(2.0, 3.0, 4.0, 12.0, 13.0, 14.0)).toOPVector, + Vectors.sparse(600000000, Array(1, 100000000), Array(777.0, 888.0)).toOPVector + ) it should "combine metadata correctly" in { val vector = Seq(height, description, stringMap).transmogrify() @@ -69,12 +80,11 @@ class VectorsCombinerTest extends FlatSpec with PassengerSparkFixtureTest { } it should "create metadata correctly" in { - val descVect = description.map[Text]{ - t => - Text(t.value match { - case Some(text) => "this is dumb " + text - case None => "some STUFF to tokenize" - }) + val descVect = description.map[Text] { t => + Text(t.value match { + case Some(text) => "this is dumb " + text + case None => "some STUFF to tokenize" + }) }.tokenize().tf(numTerms = 5) val vector = Seq(height, stringMap, descVect).transmogrify() val Seq(inputs1, inputs2, inputs3) = vector.parents diff --git a/features/src/main/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaults.scala b/features/src/main/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaults.scala index ffccd97fdc..abec01873f 100644 --- a/features/src/main/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaults.scala +++ b/features/src/main/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaults.scala @@ -53,7 +53,7 @@ object MonoidAggregatorDefaults { val aggregator = weakTypeOf[O] match { // Vector - case wt if wt =:= weakTypeOf[OPVector] => UnionVector + case wt if wt =:= weakTypeOf[OPVector] => CombineVector // Lists case wt if wt =:= weakTypeOf[TextList] => ConcatTextList diff --git a/features/src/main/scala/com/salesforce/op/aggregators/OPVector.scala b/features/src/main/scala/com/salesforce/op/aggregators/OPVector.scala index 0fdabefb69..37c2efd17e 100644 --- a/features/src/main/scala/com/salesforce/op/aggregators/OPVector.scala +++ b/features/src/main/scala/com/salesforce/op/aggregators/OPVector.scala @@ -32,6 +32,7 @@ package com.salesforce.op.aggregators import com.salesforce.op.features.types._ import com.twitter.algebird._ +import com.salesforce.op.utils.spark.RichVector._ import org.apache.spark.ml.linalg.{Vector, Vectors} import scala.reflect.runtime.universe._ @@ -39,12 +40,21 @@ import scala.reflect.runtime.universe._ /** * Aggregator that gives the union of Vector data */ -case object UnionVector +case object CombineVector extends MonoidAggregator[Event[OPVector], Vector, OPVector] with AggregatorDefaults[OPVector] { implicit val ttag = weakTypeTag[OPVector] val ftFactory = FeatureTypeFactory[OPVector]() - val monoid: Monoid[Vector] = Monoid.from(Vectors.zeros(0))((v1: Vector, v2: Vector) => - Vectors.dense(v1.toArray ++ v2.toArray) - ) + val monoid: Monoid[Vector] = Monoid.from(Vectors.zeros(0))(_ combine _) +} + +/** +* Aggregator that gives the sum of Vector data +*/ +case object SumVector + extends MonoidAggregator[Event[OPVector], Vector, OPVector] + with AggregatorDefaults[OPVector] { + implicit val ttag = weakTypeTag[OPVector] + val ftFactory = FeatureTypeFactory[OPVector]() + val monoid: Monoid[Vector] = Monoid.from(Vectors.zeros(0))(_ + _) } diff --git a/features/src/main/scala/com/salesforce/op/features/types/OPVector.scala b/features/src/main/scala/com/salesforce/op/features/types/OPVector.scala index 3df131e3dd..8a73c09e50 100644 --- a/features/src/main/scala/com/salesforce/op/features/types/OPVector.scala +++ b/features/src/main/scala/com/salesforce/op/features/types/OPVector.scala @@ -30,6 +30,7 @@ package com.salesforce.op.features.types +import com.salesforce.op.utils.spark.RichVector._ import org.apache.spark.ml.linalg._ /** @@ -39,8 +40,46 @@ import org.apache.spark.ml.linalg._ */ class OPVector(val value: Vector) extends OPCollection { type Value = Vector + final def isEmpty: Boolean = value.size == 0 + + /** + * Add vectors + * + * @param that another vector + * @throws IllegalArgumentException if the vectors have different sizes + * @return vector addition + */ + def +(that: OPVector): OPVector = (value + that.value).toOPVector + + /** + * Subtract vectors + * + * @param that another vector + * @throws IllegalArgumentException if the vectors have different sizes + * @return vector subtraction + */ + def -(that: OPVector): OPVector = (value - that.value).toOPVector + + /** + * Dot product between vectors + * + * @param that another vector + * @throws IllegalArgumentException if the vectors have different sizes + * @return dot product + */ + def dot(that: OPVector): Double = value dot that.value + + /** + * Combine multiple vectors into one + * + * @param that another vector + * @param other other vectors + * @return result vector + */ + def combine(that: OPVector, other: OPVector*): OPVector = value.combine(that.value, other.map(_.value): _*).toOPVector } + object OPVector { def apply(value: Vector): OPVector = new OPVector(value) def empty: OPVector = FeatureTypeDefaults.OPVector diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala index e51670621a..8adfb2dc50 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala @@ -31,7 +31,9 @@ package com.salesforce.op.utils.spark import breeze.linalg.{DenseVector => BreezeDenseVector, SparseVector => BreezeSparseVector, Vector => BreezeVector} -import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} + +import scala.collection.mutable.ArrayBuffer /** * [[org.apache.spark.ml.linalg.Vector]] enrichment functions @@ -64,6 +66,30 @@ object RichVector { toSpark(res) } + /** + * Dot product between vectors + * + * @param that another vector + * @throws IllegalArgumentException if the vectors have different sizes + * @return dot product + */ + def dot(that: Vector): Double = { + require(v.size == that.size, + s"Vectors must have the same length: a.length == b.length (${v.size} != ${that.size})" + ) + v.toBreeze dot that.toBreeze + } + + /** + * Combine multiple vectors into one + * + * @param that another vector + * @param other other vectors + * @return result vector + */ + def combine(that: Vector, other: Vector*): Vector = + com.salesforce.op.utils.spark.RichVector.combine(v +: that +: other) + /** * Convert to [[breeze.linalg.Vector]] * @@ -85,4 +111,26 @@ object RichVector { } + /** + * Combine multiple vectors into one + * + * @param vectors input vectors + * @return result vector + */ + def combine(vectors: Seq[Vector]): Vector = { + val indices = ArrayBuffer.empty[Int] + val values = ArrayBuffer.empty[Double] + + val size = vectors.foldLeft(0)((size, vector) => { + vector.foreachActive { case (i, v) => + if (v != 0.0) { + indices += size + i + values += v + } + } + size + vector.size + }) + Vectors.sparse(size, indices.toArray, values.toArray).compressed + } + } diff --git a/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala b/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala index 9563d87b6b..3c6a1b4b34 100644 --- a/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala +++ b/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala @@ -519,10 +519,20 @@ class MonoidAggregatorDefaultsTest extends FlatSpec with TestCommon { assertDefaultAggr(multiPickListMapTestSeq, expectedRes) } - Spec(UnionVector.getClass) should "work" in { + Spec(CombineVector.getClass) should "work" in { assertDefaultAggr(vectorTestSeq, Vectors.dense(Array(0.1, 0.2, 1.0, 0.2))) } + Spec(SumVector.getClass) should "work" in { + val vectors = Seq(Array(0.1, 0.2), Array(1.0, -1.5), Array(0.2, 0.0)).map(Vectors.dense(_).toOPVector) + assertAggr(SumVector, vectors, Vectors.dense(Array(1.3, -1.3))) + } + it should "error on vectors of invalid sizes" in { + val vectors = Seq(Array(0.1, 0.2), Array(1.0)).map(Vectors.dense(_).toOPVector) + intercept[IllegalArgumentException](assertAggr(SumVector, vectors, Vectors.zeros(0))).getMessage shouldBe + "requirement failed: Vectors must have same length: x.length == y.length (1 != 2)" + } + Spec[CustomMonoidAggregator[_]] should "work" in { val customAgg = new CustomMonoidAggregator[Real](zero = None, associativeFn = (r1, r2) => (r1 -> r2).map(_ + _)) assertAggr(customAgg, realTestSeq, Option(doubleBase.flatten.sum)) diff --git a/features/src/test/scala/com/salesforce/op/features/types/OPVectorTest.scala b/features/src/test/scala/com/salesforce/op/features/types/OPVectorTest.scala new file mode 100644 index 0000000000..1c4cc9a90f --- /dev/null +++ b/features/src/test/scala/com/salesforce/op/features/types/OPVectorTest.scala @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.features.types + +import com.salesforce.op.test.TestCommon +import com.salesforce.op.utils.spark.RichVector._ +import org.apache.spark.ml.linalg.Vectors +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class OPVectorTest extends FlatSpec with TestCommon { + + val vectors = Seq( + Vectors.sparse(4, Array(0, 3), Array(1.0, 1.0)).toOPVector, + Vectors.dense(Array(2.0, 3.0, 4.0)).toOPVector, + // Purposely added a very large sparse vector to verify the efficiency + Vectors.sparse(100000000, Array(1), Array(777.0)).toOPVector + ) + + Spec[OPVector] should "be empty" in { + val zero = Vectors.zeros(0) + new OPVector(zero).isEmpty shouldBe true + new OPVector(zero).nonEmpty shouldBe false + zero.toOPVector shouldBe a[OPVector] + } + + it should "error on size mismatch" in { + val ones = Array.fill(vectors.size)(Vectors.sparse(1, Array(0), Array(1.0)).toOPVector) + for { + (v1, v2) <- vectors.zip(ones) + res <- Seq(() => v1 + v2, () => v1 - v2, () => v1 dot v2) + } intercept[IllegalArgumentException](res()).getMessage should { + startWith("requirement failed: Vectors must") and include("same length") + } + } + + it should "compare values" in { + val zero = Vectors.zeros(0) + new OPVector(zero) shouldBe new OPVector(zero) + new OPVector(zero).value shouldBe zero + + Vectors.dense(Array(1.0, 2.0)).toOPVector shouldBe Vectors.dense(Array(1.0, 2.0)).toOPVector + Vectors.sparse(5, Array(3, 4), Array(1.0, 2.0)).toOPVector shouldBe + Vectors.sparse(5, Array(3, 4), Array(1.0, 2.0)).toOPVector + Vectors.dense(Array(1.0, 2.0)).toOPVector should not be Vectors.dense(Array(2.0, 2.0)).toOPVector + new OPVector(Vectors.dense(Array(1.0, 2.0))) should not be Vectors.dense(Array(2.0, 2.0)).toOPVector + OPVector.empty shouldBe new OPVector(zero) + } + + it should "'+' add" in { + for {(v1, v2) <- vectors.zip(vectors)} { + (v1 + v2) shouldBe (v1.value + v2.value).toOPVector + } + } + + it should "'-' subtract" in { + for {(v1, v2) <- vectors.zip(vectors)} { + (v1 - v2) shouldBe (v1.value - v2.value).toOPVector + } + } + + it should "compute dot product" in { + for {(v1, v2) <- vectors.zip(vectors)} { + (v1 dot v2) shouldBe (v1.value dot v2.value) + } + } + + it should "combine" in { + for {(v1, v2) <- vectors.zip(vectors)} { + v1.combine(v2) shouldBe v1.value.combine(v2.value).toOPVector + v1.combine(v2, v2, v1) shouldBe v1.value.combine(v2.value, v2.value, v1.value).toOPVector + } + } + +} diff --git a/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala b/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala index 1528bba3ee..a6e161f729 100644 --- a/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala +++ b/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala @@ -62,10 +62,13 @@ class RichVectorTest extends PropSpec with PropertyChecks with TestSparkContext res <- Seq( () => sparse + wrongSize, () => sparse - wrongSize, + () => sparse dot wrongSize, () => dense + wrongSize, () => dense - wrongSize, + () => dense dot wrongSize, () => dense + wrongSize.toDense, - () => dense - wrongSize.toDense + () => dense - wrongSize.toDense, + () => dense dot wrongSize.toDense ) } { intercept[IllegalArgumentException](res()).getMessage should { @@ -75,10 +78,10 @@ class RichVectorTest extends PropSpec with PropertyChecks with TestSparkContext } } - property("Vectors should '+' add correctly") { + property("Vectors should '+' add") { forAll(sparseVectorGen) { sparse: SparseVector => - val expected = sparse.toArray.map(_ * 2) val dense = sparse.toDense + val expected = dense.values.map(_ * 2) for {res <- Seq(sparse + sparse, dense + sparse, sparse + dense, dense + dense)} { res.size shouldBe sparse.size res.toArray should contain theSameElementsAs expected @@ -86,7 +89,7 @@ class RichVectorTest extends PropSpec with PropertyChecks with TestSparkContext } } - property("Vectors should '-' subtract correctly") { + property("Vectors should '-' subtract") { forAll(sparseVectorGen) { sparse: SparseVector => val dense = sparse.toDense for {res <- Seq(sparse - sparse, dense - sparse, sparse - dense, dense - dense)} { @@ -96,7 +99,31 @@ class RichVectorTest extends PropSpec with PropertyChecks with TestSparkContext } } - property("Vectors convert to breeze vectors correctly") { + property("Vectors should compute dot product") { + forAll(sparseVectorGen) { sparse: SparseVector => + val dense = sparse.toDense + val expected = dense.values.zip(dense.values).map { case (v1, v2) => v1 * v2 }.sum + for {res <- Seq(sparse dot sparse, dense dot sparse, sparse dot dense, dense dot dense)} { + res shouldBe expected +- 1e-4 + } + } + } + + property("Vectors should combine") { + forAll(sparseVectorGen) { sparse: SparseVector => + val dense = sparse.toDense + val expected = dense.values ++ dense.values + for {res <- Seq(sparse.combine(sparse), dense.combine(sparse), sparse.combine(dense), dense.combine(dense))} { + res.size shouldBe 2 * sparse.size + res.toArray should contain theSameElementsAs expected + } + val res = sparse.combine(dense, dense, sparse) + res.size shouldBe 4 * sparse.size + res.toArray should contain theSameElementsAs (expected ++ expected) + } + } + + property("Vectors convert to breeze vectors") { forAll(sparseVectorGen) { sparse: SparseVector => val dense = sparse.toDense sparse.toBreeze.toArray should contain theSameElementsAs dense.toBreeze.toArray @@ -119,7 +146,25 @@ class RichVectorTest extends PropSpec with PropertyChecks with TestSparkContext ) } - property("Vectors add in reduce") { + property("Sparse vectors combine efficiently") { + val sparseSize = 100000000 + val sparse = new SparseVector(sparseSize, Array(0, 1, sparseSize - 1), Array(-1.0, 1.0, 3.0)) + val expected = new SparseVector(sparseSize * 2, + Array(0, 1, sparseSize - 1, sparseSize, sparseSize + 1, 2 * sparseSize - 1), + Array(-1.0, 1.0, 3.0, -1.0, 1.0, 3.0) + ) + forAllConcurrentCheck[SparseVector]( + numThreads = 10, numInvocationsPerThread = 50000, atMost = 10.seconds, + table = Table[SparseVector]("sparseVectors", sparse), + functionCheck = sparse => { + val res = sparse.combine(sparse) + res shouldBe a[SparseVector] + res shouldEqual expected + } + ) + } + + property("Vectors '+' add in reduce") { forAll(sparseVevtorsRDDGen) { rdd: RDD[Vector] => if (!rdd.isEmpty()) { val tolerance = 1e-9 // we are loosing precision here, hence the tolerance