Skip to content

Commit

Permalink
Rollback covary* resetting coder logic (spotify#3969)
Browse files Browse the repository at this point in the history
  • Loading branch information
stormy-ua authored and regadas committed Sep 1, 2021
1 parent 64111de commit a4882f1
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 70 deletions.
44 changes: 3 additions & 41 deletions scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -298,52 +298,14 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
// Collection operations
// =======================================================================

/**
* Changes the underlying type of the SCollection from T to U and resets the coder to match the
* result underlying type. The function is unsafe and should be used internally only to support
* covary/contravary implementations.
* @tparam U
* The result underlying type.
* @return
* The result SCollection with the changed underlying type.
*/
private[scio] def unsafeCastElementWithCoder[U: Coder]: SCollection[U] = {
val coder = CoderMaterializer.beam(context, Coder[U])
ensureSerializable(coder).fold(throw _, this.asInstanceOf[SCollection[U]].setCoder)
}

/**
* Equivalent of the [[contravary]], but doesn't reset the coder and hence doesn't require it as
* implicit. For internal use only and mostly to minimize the number of breaking changes to
* migrate from covary/contravary versions which don't reset the coder.
*/
private[scio] def unsafeContravary[U <: T]: SCollection[U] =
this.asInstanceOf[SCollection[U]]

/**
* Equivalent of the [[covary]], but doesn't reset the coder and hence doesn't require it as
* implicit. For internal use only and mostly to minimize the number of breaking changes to
* migrate from covary/contravary versions which don't reset the coder.
*/
private[scio] def unsafeCovary[U >: T]: SCollection[U] =
this.asInstanceOf[SCollection[U]]

/**
* Equivalent of the [[covary_]], but doesn't reset the coder and hence doesn't require it as
* implicit. For internal use only and mostly to minimize the number of breaking changes to
* migrate from covary/contravary versions which don't reset the coder.
*/
private[scio] def unsafeCovary_[U](implicit ev: T <:< U): SCollection[U] = this
.asInstanceOf[SCollection[U]]

/** lifts this [[SCollection]] to the specified type */
def covary[U >: T: Coder]: SCollection[U] = unsafeCastElementWithCoder[U]
def covary[U >: T]: SCollection[U] = this.asInstanceOf[SCollection[U]]

/** lifts this [[SCollection]] to the specified type */
def covary_[U: Coder](implicit ev: T <:< U): SCollection[U] = unsafeCastElementWithCoder[U]
def covary_[U](implicit ev: T <:< U): SCollection[U] = this.asInstanceOf[SCollection[U]]

/** lifts this [[SCollection]] to the specified type */
def contravary[U <: T: Coder]: SCollection[U] = unsafeCastElementWithCoder[U]
def contravary[U <: T]: SCollection[U] = this.asInstanceOf[SCollection[U]]

/**
* Convert this SCollection to an [[SCollectionWithFanout]] that uses an intermediate node to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ final private case class PubSubMessagePubsubIOWithoutAttributes[T <: beam.Pubsub
) extends PubsubIOWithoutAttributes[T] {
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val t = setup(beam.PubsubIO.readMessages(), params)
sc.applyTransform(t).unsafeContravary[T]
sc.applyTransform(t).contravary[T]
}

override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ trait SCollectionSyntax {
maxBatchBytesSize: Option[Int] = None
)(implicit ev: T <:< (V, Map[String, String])): ClosedTap[Nothing] = {
implicit val vCoder =
BeamCoders.getTupleCoders(coll.unsafeCovary_[(V, Map[String, String])])._1
BeamCoders.getTupleCoders(coll.covary_[(V, Map[String, String])])._1
val io = PubsubIO.withAttributes[V](topic, idAttribute, timestampAttribute)
coll
.unsafeCovary_[(V, Map[String, String])]
.covary_[(V, Map[String, String])]
.write(io)(PubsubIO.WriteParam(maxBatchSize, maxBatchBytesSize))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,10 @@ import org.joda.time.{DateTimeConstants, Duration, Instant}

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.coders.Coder
import com.spotify.scio.schemas.Schema
import org.apache.beam.sdk.util.CoderUtils

object SCollectionTest {
class TestA(val l: Long)
case class TestB(override val l: Long, s: String) extends TestA(l)
}

class SCollectionTest extends PipelineSpec {
import com.spotify.scio.values.SCollectionTest._

"SCollection" should "support applyTransform()" in {
runWithContext { sc =>
Expand Down Expand Up @@ -838,22 +831,4 @@ class SCollectionTest extends PipelineSpec {
}
}

it should "reset coder after covary is applied" in {
runWithContext { sc =>
val coll1 = sc.parallelize(Seq(new TestA(0)))
val coll2 = sc.parallelize(Seq(TestB(1, "1")))
val coll = coll2.covary[TestA] ++ coll1

// The test fails if covary doesn't reset the coder to the Coder[TestA],
// because the result collection is a mix of TestA and TestB and Coder[TestB]
// won't be able to decode an instance of TestA from coll1.
val beamCoder = CoderMaterializer.beamWithDefault(coll.coder, sc.options)
val testAInstance = new TestA(0)
val bytes = CoderUtils.encodeToByteArray(beamCoder, testAInstance)
val result = CoderUtils.decodeFromByteArray(beamCoder, bytes)

result.l shouldBe testAInstance.l
}
}

}

0 comments on commit a4882f1

Please sign in to comment.