From a4882f152ba72c87d1dea708e36b9ebd6f47ebd6 Mon Sep 17 00:00:00 2001 From: Kirill Panarin Date: Tue, 31 Aug 2021 05:12:13 -0400 Subject: [PATCH] Rollback covary* resetting coder logic (#3969) --- .../com/spotify/scio/values/SCollection.scala | 44 ++----------------- .../com/spotify/scio/pubsub/PubsubIO.scala | 2 +- .../pubsub/syntax/SCollectionSyntax.scala | 4 +- .../spotify/scio/values/SCollectionTest.scala | 27 +----------- 4 files changed, 7 insertions(+), 70 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 1403554d25..ecb3b5a003 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -298,52 +298,14 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { // Collection operations // ======================================================================= - /** - * Changes the underlying type of the SCollection from T to U and resets the coder to match the - * result underlying type. The function is unsafe and should be used internally only to support - * covary/contravary implementations. - * @tparam U - * The result underlying type. - * @return - * The result SCollection with the changed underlying type. - */ - private[scio] def unsafeCastElementWithCoder[U: Coder]: SCollection[U] = { - val coder = CoderMaterializer.beam(context, Coder[U]) - ensureSerializable(coder).fold(throw _, this.asInstanceOf[SCollection[U]].setCoder) - } - - /** - * Equivalent of the [[contravary]], but doesn't reset the coder and hence doesn't require it as - * implicit. For internal use only and mostly to minimize the number of breaking changes to - * migrate from covary/contravary versions which don't reset the coder. - */ - private[scio] def unsafeContravary[U <: T]: SCollection[U] = - this.asInstanceOf[SCollection[U]] - - /** - * Equivalent of the [[covary]], but doesn't reset the coder and hence doesn't require it as - * implicit. For internal use only and mostly to minimize the number of breaking changes to - * migrate from covary/contravary versions which don't reset the coder. - */ - private[scio] def unsafeCovary[U >: T]: SCollection[U] = - this.asInstanceOf[SCollection[U]] - - /** - * Equivalent of the [[covary_]], but doesn't reset the coder and hence doesn't require it as - * implicit. For internal use only and mostly to minimize the number of breaking changes to - * migrate from covary/contravary versions which don't reset the coder. - */ - private[scio] def unsafeCovary_[U](implicit ev: T <:< U): SCollection[U] = this - .asInstanceOf[SCollection[U]] - /** lifts this [[SCollection]] to the specified type */ - def covary[U >: T: Coder]: SCollection[U] = unsafeCastElementWithCoder[U] + def covary[U >: T]: SCollection[U] = this.asInstanceOf[SCollection[U]] /** lifts this [[SCollection]] to the specified type */ - def covary_[U: Coder](implicit ev: T <:< U): SCollection[U] = unsafeCastElementWithCoder[U] + def covary_[U](implicit ev: T <:< U): SCollection[U] = this.asInstanceOf[SCollection[U]] /** lifts this [[SCollection]] to the specified type */ - def contravary[U <: T: Coder]: SCollection[U] = unsafeCastElementWithCoder[U] + def contravary[U <: T]: SCollection[U] = this.asInstanceOf[SCollection[U]] /** * Convert this SCollection to an [[SCollectionWithFanout]] that uses an intermediate node to diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala index cf9ed7f5b7..127e4be575 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala @@ -292,7 +292,7 @@ final private case class PubSubMessagePubsubIOWithoutAttributes[T <: beam.Pubsub ) extends PubsubIOWithoutAttributes[T] { override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { val t = setup(beam.PubsubIO.readMessages(), params) - sc.applyTransform(t).unsafeContravary[T] + sc.applyTransform(t).contravary[T] } override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/syntax/SCollectionSyntax.scala index 1d9e9a8b38..3397f74d1c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/syntax/SCollectionSyntax.scala @@ -77,10 +77,10 @@ trait SCollectionSyntax { maxBatchBytesSize: Option[Int] = None )(implicit ev: T <:< (V, Map[String, String])): ClosedTap[Nothing] = { implicit val vCoder = - BeamCoders.getTupleCoders(coll.unsafeCovary_[(V, Map[String, String])])._1 + BeamCoders.getTupleCoders(coll.covary_[(V, Map[String, String])])._1 val io = PubsubIO.withAttributes[V](topic, idAttribute, timestampAttribute) coll - .unsafeCovary_[(V, Map[String, String])] + .covary_[(V, Map[String, String])] .write(io)(PubsubIO.WriteParam(maxBatchSize, maxBatchBytesSize)) } } diff --git a/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index 6479d26724..29275b61f5 100644 --- a/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -40,17 +40,10 @@ import org.joda.time.{DateTimeConstants, Duration, Instant} import scala.collection.mutable import scala.jdk.CollectionConverters._ -import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.coders.Coder import com.spotify.scio.schemas.Schema -import org.apache.beam.sdk.util.CoderUtils - -object SCollectionTest { - class TestA(val l: Long) - case class TestB(override val l: Long, s: String) extends TestA(l) -} class SCollectionTest extends PipelineSpec { - import com.spotify.scio.values.SCollectionTest._ "SCollection" should "support applyTransform()" in { runWithContext { sc => @@ -838,22 +831,4 @@ class SCollectionTest extends PipelineSpec { } } - it should "reset coder after covary is applied" in { - runWithContext { sc => - val coll1 = sc.parallelize(Seq(new TestA(0))) - val coll2 = sc.parallelize(Seq(TestB(1, "1"))) - val coll = coll2.covary[TestA] ++ coll1 - - // The test fails if covary doesn't reset the coder to the Coder[TestA], - // because the result collection is a mix of TestA and TestB and Coder[TestB] - // won't be able to decode an instance of TestA from coll1. - val beamCoder = CoderMaterializer.beamWithDefault(coll.coder, sc.options) - val testAInstance = new TestA(0) - val bytes = CoderUtils.encodeToByteArray(beamCoder, testAInstance) - val result = CoderUtils.decodeFromByteArray(beamCoder, bytes) - - result.l shouldBe testAInstance.l - } - } - }