From a9825c449ace07b0d40b83ac3bb4b64f24153727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Del=C3=A9pine?= Date: Fri, 2 Jul 2021 14:30:00 -0400 Subject: [PATCH 1/4] Rename Sparkey hash* methods to avoid shadowing regular methods --- .../LargeHashSCollectionFunctions.scala | 2 +- .../PairLargeHashSCollectionFunctions.scala | 22 +++++++++---------- ...airLargeHashSCollectionFunctionsTest.scala | 12 +++++----- .../scio/extra/sparkey/SparkeyTest.scala | 21 ++++++++++++++++++ 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala index 0f72c84f4e..330f65024f 100644 --- a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala @@ -30,6 +30,6 @@ class LargeHashSCollectionFunctions[T](private val self: SCollection[T]) { */ def hashFilter(sideInput: SideInput[SparkeySet[T]]): SCollection[T] = { implicit val coder = self.coder - self.map((_, ())).hashIntersectByKey(sideInput).keys + self.map((_, ())).largeHashIntersectByKey(sideInput).keys } } diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala index 781a0ce62e..a951f2bdb9 100644 --- a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala @@ -19,7 +19,7 @@ package com.spotify.scio.extra.sparkey import com.spotify.scio.coders.Coder import com.spotify.scio.extra.sparkey.instances.{SparkeyMap, SparkeySet} -import com.spotify.scio.values.{SCollection, SideInput} +import com.spotify.scio.values.{PairHashSCollectionFunctions, SCollection, SideInput} import com.spotify.sparkey.CompressionType /** @@ -52,7 +52,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, (V, W))] = { implicit val wCoder: Coder[W] = rhs.valueCoder - hashJoin(rhs.asLargeMultiMapSideInput(numShards, compressionType, compressionBlockSize)) + largeHashJoin(rhs.asLargeMultiMapSideInput(numShards, compressionType, compressionBlockSize)) } /** @@ -69,7 +69,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, * * @group join */ - def hashJoin[W: Coder]( + def largeHashJoin[W: Coder]( sideInput: SideInput[SparkeyMap[K, Iterable[W]]] ): SCollection[(K, (V, W))] = self.transform { in => @@ -102,7 +102,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, (V, Option[W]))] = { implicit val wCoder: Coder[W] = rhs.valueCoder - hashLeftOuterJoin( + largeHashLeftOuterJoin( rhs.asLargeMultiMapSideInput(numShards, compressionType, compressionBlockSize) ) } @@ -118,7 +118,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, * }}} * @group join */ - def hashLeftOuterJoin[W: Coder]( + def largeHashLeftOuterJoin[W: Coder]( sideInput: SideInput[SparkeyMap[K, Iterable[W]]] ): SCollection[(K, (V, Option[W]))] = { self.transform { in => @@ -146,7 +146,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, (Option[V], Option[W]))] = { implicit val wCoder = rhs.valueCoder - hashFullOuterJoin( + largeHashFullOuterJoin( rhs.asLargeMultiMapSideInput(numShards, compressionType, compressionBlockSize) ) } @@ -163,7 +163,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, * * @group join */ - def hashFullOuterJoin[W: Coder]( + def largeHashFullOuterJoin[W: Coder]( sideInput: SideInput[SparkeyMap[K, Iterable[W]]] ): SCollection[(K, (Option[V], Option[W]))] = self.transform { in => @@ -210,7 +210,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionType: CompressionType = DefaultCompressionType, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, V)] = - hashIntersectByKey(rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize)) + largeHashIntersectByKey(rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize)) /** * Return an SCollection with the pairs from `this` whose keys are in the SideSet `rhs`. @@ -220,7 +220,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, * @group per * key */ - def hashIntersectByKey(sideInput: SideInput[SparkeySet[K]]): SCollection[(K, V)] = + def largeHashIntersectByKey(sideInput: SideInput[SparkeySet[K]]): SCollection[(K, V)] = self .withSideInputs(sideInput) .filter { case ((k, _), sideInputCtx) => sideInputCtx(sideInput).contains(k) } @@ -240,7 +240,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionType: CompressionType = DefaultCompressionType, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, V)] = - hashSubtractByKey(rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize)) + largeHashSubtractByKey(rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize)) /** * Return an SCollection with the pairs from `this` whose keys are not in SideInput[Set] `rhs`. @@ -248,7 +248,7 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, * @group per * key */ - def hashSubtractByKey(sideInput: SideInput[SparkeySet[K]]): SCollection[(K, V)] = + def largeHashSubtractByKey(sideInput: SideInput[SparkeySet[K]]): SCollection[(K, V)] = self .withSideInputs(sideInput) .filter { case ((k, _), sideInputCtx) => !sideInputCtx(sideInput).contains(k) } diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala index bb61020b31..115df1eabd 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala @@ -67,7 +67,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13))).asLargeMultiMapSideInput - val p = p1.hashJoin(p2) + val p = p1.largeHashJoin(p2) p should containInAnyOrder(Seq(("a", (1, 11)), ("a", (2, 11)), ("b", (3, 12)), ("b", (3, 13)))) } @@ -77,7 +77,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3))) val p2 = sc.parallelize[(String, Int)](Map.empty).asLargeMultiMapSideInput - val p = p1.hashJoin(p2) + val p = p1.largeHashJoin(p2) p should containInAnyOrder(Seq.empty[(String, (Int, Int))]) } @@ -123,7 +123,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("d", 14))).asLargeMultiMapSideInput - val p = p1.hashLeftOuterJoin(p2) + val p = p1.largeHashLeftOuterJoin(p2) p should containInAnyOrder(Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) } } @@ -181,7 +181,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq(("a", 1), ("b", 2))) val p2 = sc.parallelize(Seq(("a", 11), ("c", 13))).asLargeMultiMapSideInput - val p = p1.hashFullOuterJoin(p2) + val p = p1.largeHashFullOuterJoin(p2) p should containInAnyOrder( Seq(("a", (Some(1), Some(11))), ("b", (Some(2), None)), ("c", (None, Some(13)))) ) @@ -228,7 +228,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("b", 4))) val p2 = sc.parallelize(Seq[String]("a", "b", "d")).asLargeSetSideInput - val p = p1.hashIntersectByKey(p2) + val p = p1.largeHashIntersectByKey(p2) p should containInAnyOrder(Seq(("a", 1), ("b", 2), ("b", 4))) } } @@ -276,7 +276,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq[String]("a", "b")).asLargeSetSideInput - val output = p1.hashSubtractByKey(p2) + val output = p1.largeHashSubtractByKey(p2) output should haveSize(1) output should containInAnyOrder(Seq(("c", 4))) } diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala index f09c71ef6d..cc6d609941 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala @@ -664,4 +664,25 @@ class SparkeyTest extends PipelineSpec { .basePath FileUtils.deleteDirectory(new File(basePath)) } + + it should "not override the regular hashJoin method" in { + import com.spotify.scio.extra.sparkey._ + + val sc = ScioContext() + + val lhsInput = Seq((1, "a"), (2, "c"), (3, "e"), (4, "g")) + val rhsInput = Seq((1, "b"), (2, "d"), (3, "f")) + + val rhs = sc.parallelize(rhsInput) + val lhs = sc.parallelize(lhsInput) + + val result = lhs + .hashJoin(rhs) + .materialize + + val scioResult = sc.run().waitUntilFinish() + val expectedOutput = List((1,("a", "b")), (2,("c","d")), (3,("e","f"))) + + scioResult.tap(result).value.toList should contain theSameElementsAs expectedOutput + } } From d711f04466b59cec9e78c47a119711a07cd2bbbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Del=C3=A9pine?= Date: Fri, 2 Jul 2021 15:09:35 -0400 Subject: [PATCH 2/4] Scalafmt fixes --- .../sparkey/PairLargeHashSCollectionFunctions.scala | 10 +++++++--- .../com/spotify/scio/extra/sparkey/SparkeyTest.scala | 12 +++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala index a951f2bdb9..c9ae726aa8 100644 --- a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctions.scala @@ -19,7 +19,7 @@ package com.spotify.scio.extra.sparkey import com.spotify.scio.coders.Coder import com.spotify.scio.extra.sparkey.instances.{SparkeyMap, SparkeySet} -import com.spotify.scio.values.{PairHashSCollectionFunctions, SCollection, SideInput} +import com.spotify.scio.values.{SCollection, SideInput} import com.spotify.sparkey.CompressionType /** @@ -210,7 +210,9 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionType: CompressionType = DefaultCompressionType, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, V)] = - largeHashIntersectByKey(rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize)) + largeHashIntersectByKey( + rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize) + ) /** * Return an SCollection with the pairs from `this` whose keys are in the SideSet `rhs`. @@ -240,7 +242,9 @@ class PairLargeHashSCollectionFunctions[K, V](private val self: SCollection[(K, compressionType: CompressionType = DefaultCompressionType, compressionBlockSize: Int = DefaultCompressionBlockSize ): SCollection[(K, V)] = - largeHashSubtractByKey(rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize)) + largeHashSubtractByKey( + rhs.asLargeSetSideInput(numShards, compressionType, compressionBlockSize) + ) /** * Return an SCollection with the pairs from `this` whose keys are not in SideInput[Set] `rhs`. diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala index cc6d609941..5e73f4cd64 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala @@ -17,11 +17,7 @@ package com.spotify.scio.extra.sparkey -import java.io.File -import java.nio.file.Files -import java.util.Arrays - -import com.github.benmanes.caffeine.cache.{Cache => CCache, Caffeine} +import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CCache} import com.spotify.scio._ import com.spotify.scio.testing._ import com.spotify.scio.util._ @@ -29,6 +25,9 @@ import com.spotify.sparkey._ import org.apache.beam.sdk.io.FileSystems import org.apache.commons.io.FileUtils +import java.io.File +import java.nio.file.Files +import java.util.Arrays import scala.jdk.CollectionConverters._ final case class TestCache[K, V](testId: String) extends CacheT[K, V, CCache[K, V]] { @@ -666,7 +665,6 @@ class SparkeyTest extends PipelineSpec { } it should "not override the regular hashJoin method" in { - import com.spotify.scio.extra.sparkey._ val sc = ScioContext() @@ -681,7 +679,7 @@ class SparkeyTest extends PipelineSpec { .materialize val scioResult = sc.run().waitUntilFinish() - val expectedOutput = List((1,("a", "b")), (2,("c","d")), (3,("e","f"))) + val expectedOutput = List((1, ("a", "b")), (2, ("c", "d")), (3, ("e", "f"))) scioResult.tap(result).value.toList should contain theSameElementsAs expectedOutput } From c3af58dda8255d02d78e87b59611ca396f7582ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Del=C3=A9pine?= Date: Fri, 2 Jul 2021 15:18:42 -0400 Subject: [PATCH 3/4] Rename hashFilter to largeHashFilter as well --- .../scio/extra/sparkey/LargeHashSCollectionFunctions.scala | 2 +- .../scio/extra/sparkey/LargeHashSCollectionFunctionsTest.scala | 2 +- .../extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala index 330f65024f..9571724eb5 100644 --- a/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctions.scala @@ -28,7 +28,7 @@ class LargeHashSCollectionFunctions[T](private val self: SCollection[T]) { * * @group transform */ - def hashFilter(sideInput: SideInput[SparkeySet[T]]): SCollection[T] = { + def largeHashFilter(sideInput: SideInput[SparkeySet[T]]): SCollection[T] = { implicit val coder = self.coder self.map((_, ())).largeHashIntersectByKey(sideInput).keys } diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctionsTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctionsTest.scala index 432e378876..0e559b4a73 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctionsTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/LargeHashSCollectionFunctionsTest.scala @@ -24,7 +24,7 @@ class LargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq("a", "b", "c", "b")) val p2 = sc.parallelize(Seq[String]("a", "a", "b", "e")).asLargeSetSideInput - val p = p1.hashFilter(p2) + val p = p1.largeHashFilter(p2) p should containInAnyOrder(Seq("a", "b", "b")) } } diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala index 115df1eabd..bced69ed78 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/PairLargeHashSCollectionFunctionsTest.scala @@ -286,7 +286,7 @@ class PairLargeHashSCollectionFunctionsTest extends PipelineSpec { runWithContext { sc => val p1 = sc.parallelize(Seq("a", "b", "c", "b")) val p2 = sc.parallelize(Seq[String]("a", "a", "b", "e")).asLargeSetSideInput - val p = p1.hashFilter(p2) + val p = p1.largeHashFilter(p2) p should containInAnyOrder(Seq("a", "b", "b")) } } From 465996ae0ceb3552d5365612768f96abc5151e93 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 31 Aug 2021 10:20:06 +0100 Subject: [PATCH 4/4] Rebase --- .../test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala index 5e73f4cd64..50fa195b4a 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/sparkey/SparkeyTest.scala @@ -17,7 +17,7 @@ package com.spotify.scio.extra.sparkey -import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CCache} +import com.github.benmanes.caffeine.cache.{Cache => CCache, Caffeine} import com.spotify.scio._ import com.spotify.scio.testing._ import com.spotify.scio.util._