diff --git a/.travis.yml b/.travis.yml index a9441ed5d..ce3ee3b9b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,16 +7,16 @@ env: matrix: include: - - scala: 2.10.6 - jdk: oraclejdk8 - script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION test mimaReportBinaryIssues - - - scala: 2.11.8 + - scala: 2.11.11 jdk: oraclejdk8 script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION clean coverage test coverageReport mimaReportBinaryIssues after_success: - bash <(curl -s https://codecov.io/bash) + - scala: 2.12.2 + jdk: oraclejdk8 + script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION clean test + cache: directories: - $HOME/.sbt/0.13/dependency diff --git a/build.sbt b/build.sbt index 690ac8feb..42392d0b6 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,6 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings import sbtassembly.Plugin._ def scalaBinaryVersion(scalaVersion: String) = scalaVersion match { - case version if version startsWith "2.10" => "2.10" case version if version startsWith "2.11" => "2.11" case version if version startsWith "2.12" => "2.12" case _ => sys.error("Unsupported scala version: " + scalaVersion) @@ -15,25 +14,23 @@ def isScala210x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.1 def sequentialExecution: Boolean = Option(System.getProperty("sequentialExecution")).map(_.toBoolean).getOrElse(false) -val algebirdVersion = "0.12.0" -val bijectionVersion = "0.9.1" -val chillVersion = "0.8.3" -val commonsHttpClientVersion = "3.1" +val algebirdVersion = "0.13.0" +val bijectionVersion = "0.9.5" +val chillVersion = "0.8.4" val commonsLangVersion = "2.6" -val finagleVersion = "6.35.0" val hadoopVersion = "1.2.1" val junitVersion = "4.11" val log4jVersion = "1.2.16" val novocodeJunitVersion = "0.10" val scalaCheckVersion = "1.13.4" val scalatestVersion = "3.0.1" -val scaldingVersion = "0.16.1-RC3" +val scaldingVersion = "0.17.2" val slf4jVersion = "1.6.6" -val storehausVersion = "0.15.0-RC1" +val storehausVersion = "0.15.0" val stormDep = "org.apache.storm" % "storm-core" % "1.0.2" val tormentaVersion = "0.12.0" -val utilVersion = "6.34.0" -val chainVersion = "0.1.0" +val utilVersion = "6.43.0" +val chainVersion = "0.2.0" val extraSettings = mimaDefaultSettings @@ -50,8 +47,8 @@ val executionSettings = if (sequentialExecution) { val sharedSettings = extraSettings ++ executionSettings ++ Seq( organization := "com.twitter", - scalaVersion := "2.11.7", - crossScalaVersions := Seq("2.10.5", "2.11.7"), + scalaVersion := "2.11.11", + crossScalaVersions := Seq("2.11.11", "2.12.2"), // To support hadoop 1.x javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), @@ -188,7 +185,7 @@ def youngestForwardCompatible(subProj: String) = // Uncomment after release. // Some(subProj) // .filterNot(unreleasedModules.contains(_)) -// .map { s => "com.twitter" % ("summingbird-" + s + "_2.10") % "0.9.0" } +// .map { s => "com.twitter" % ("summingbird-" + s + "_2.11") % "0.9.0" } /** * Empty this each time we publish a new version (and bump the minor number) diff --git a/summingbird-batch-hadoop/src/test/scala/com/twitter/summingbird/batch/HDFSStateLaws.scala b/summingbird-batch-hadoop/src/test/scala/com/twitter/summingbird/batch/HDFSStateLaws.scala index e217fab8d..705525e66 100644 --- a/summingbird-batch-hadoop/src/test/scala/com/twitter/summingbird/batch/HDFSStateLaws.scala +++ b/summingbird-batch-hadoop/src/test/scala/com/twitter/summingbird/batch/HDFSStateLaws.scala @@ -81,7 +81,8 @@ class HDFSStateLaws extends WordSpec { } } - def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp) = Interval.leftClosedRightOpen[Timestamp](low, high).right.get + def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp): Interval[Timestamp] = + Interval.leftClosedRightOpen[Timestamp](low, high) def shouldNotAcceptInterval(state: WaitingState[Interval[Timestamp]], interval: Interval[Timestamp], message: String = "PreparedState accepted a bad Interval!") = { state.begin.willAccept(interval) match { diff --git a/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala b/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/Timestamp.scala similarity index 96% rename from summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala rename to summingbird-batch/src/main/scala/com/twitter/summingbird/batch/Timestamp.scala index 9c64bee36..f8167cbd9 100644 --- a/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala +++ b/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/Timestamp.scala @@ -53,16 +53,15 @@ object Timestamp { implicit val timestamp2Long: Bijection[Timestamp, Long] = Bijection.build[Timestamp, Long] { _.milliSinceEpoch } { Timestamp(_) } - implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] { + // Workaround for https://github.com/twitter/algebird/issues/635 + implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] with Serializable { def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp - def partialOrdering = Timestamp.orderingOnTimestamp } implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] { def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp - def partialOrdering = Timestamp.orderingOnTimestamp } // This is a right semigroup, that given any two Timestamps just take the one on the right. diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/VersionedState.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/VersionedState.scala index 1154796fa..5d03a57c8 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/VersionedState.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/VersionedState.scala @@ -81,7 +81,7 @@ private[scalding] class VersionedState(meta: HDFSMetadata, startDate: Option[Tim Interval.leftClosedRightOpen( batcher.earliestTimeOf(beginning), batcher.earliestTimeOf(end) - ).right.get + ) } def willAccept(available: Interval[Timestamp]) = diff --git a/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala b/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala index f8c0b6b27..ed646f406 100644 --- a/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala +++ b/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala @@ -80,9 +80,9 @@ class ClientStoreProps extends Properties("ClientStore") { val offline = Future.value(Some((b, 0))) val nextB = BatchID(b.id + offset) if (offset >= 0) { - Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline)) == offline.get + Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline)) == Await.result(offline) } else { - Await.ready(ClientStore.offlineLTEQBatch(0, nextB, offline)).isThrow + Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline).liftToTry).isThrow } } } diff --git a/summingbird-core-test/src/main/scala/com/twitter/summingbird/ArbitraryWorkaround.scala b/summingbird-core-test/src/main/scala/com/twitter/summingbird/ArbitraryWorkaround.scala new file mode 100644 index 000000000..4cc8ade2a --- /dev/null +++ b/summingbird-core-test/src/main/scala/com/twitter/summingbird/ArbitraryWorkaround.scala @@ -0,0 +1,26 @@ +package com.twitter.summingbird + +import org.scalacheck.{ Arbitrary, Gen } + +/** + * [[org.scalacheck.GenArities]] and [[org.scalacheck.ArbitraryArities]] classes from scalacheck + * contains to many lambdas in the same file which leads to + * bug with functions serialization in Scala 2.12: https://issues.scala-lang.org/browse/SI-10232 + * + * As a workaround this class contains overriden implicits for such cases. + * Should be imported if you import [[Arbitrary]] class. + * + * ScalaCheck issue is tracked at https://github.com/rickynils/scalacheck/issues/342. + */ +object ArbitraryWorkaround { + implicit val f1: Arbitrary[Int => Int] = Arbitrary(Gen.const(x => x * 2)) + implicit val f2: Arbitrary[Int => List[(Int, Int)]] = Arbitrary(Gen.const(x => List((x, x * 3)))) + implicit val f3: Arbitrary[Int => Option[Int]] = Arbitrary(Gen.const(x => { + if (x % 2 == 0) None else Some(x * 4) + })) + implicit val f4: Arbitrary[Int => List[Int]] = Arbitrary(Gen.const(x => List(x * 5))) + implicit val f5: Arbitrary[((Int, (Int, Option[Int]))) => List[(Int, Int)]] = Arbitrary(Gen.const { + case (x, (y, optZ)) => List((x, y), (x, optZ.getOrElse(42))) + }) + implicit val f6: Arbitrary[((Int, Int)) => List[(Int, Int)]] = Arbitrary(Gen.const(x => List(x, x))) +} diff --git a/summingbird-example/src/main/scala/com/twitter/summingbird/example/Storage.scala b/summingbird-example/src/main/scala/com/twitter/summingbird/example/Storage.scala index 3f0b807e3..4bc377cc9 100644 --- a/summingbird-example/src/main/scala/com/twitter/summingbird/example/Storage.scala +++ b/summingbird-example/src/main/scala/com/twitter/summingbird/example/Storage.scala @@ -23,6 +23,7 @@ import com.twitter.conversions.time._ import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.memcached.KetamaClientBuilder import com.twitter.finagle.memcached.protocol.text.Memcached +import com.twitter.finagle.transport.Transport import com.twitter.storehaus.Store import com.twitter.storehaus.algebra.MergeableStore import com.twitter.storehaus.memcache.{ HashEncoder, MemcacheStore } @@ -43,12 +44,14 @@ object Memcache { .tcpConnectTimeout(DEFAULT_TIMEOUT) .requestTimeout(DEFAULT_TIMEOUT) .connectTimeout(DEFAULT_TIMEOUT) - .readerIdleTimeout(DEFAULT_TIMEOUT) .hostConnectionLimit(1) .codec(Memcached()) + val liveness = builder.params[Transport.Liveness].copy(readTimeout = DEFAULT_TIMEOUT) + val liveBuilder = builder.configured(liveness) + KetamaClientBuilder() - .clientBuilder(builder) + .clientBuilder(liveBuilder) .nodes("localhost:11211") .build() } diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala index 848fafdb9..c75a5efaa 100644 --- a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala @@ -24,7 +24,9 @@ import com.twitter.summingbird.SummingbirdRuntimeStats import com.twitter.scalding.{ Test => TestMode, _ } -import org.scalacheck._ +import org.scalacheck.Arbitrary +import com.twitter.summingbird.ArbitraryWorkaround._ +import org.scalacheck.Cogen import org.apache.hadoop.conf.Configuration diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/VersionBatchedStoreTest.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/VersionBatchedStoreTest.scala index e57fe0d0c..dcf224b87 100644 --- a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/VersionBatchedStoreTest.scala +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/VersionBatchedStoreTest.scala @@ -27,7 +27,8 @@ import java.io.File import com.twitter.scalding._ -import org.scalacheck._ +import org.scalacheck.Arbitrary +import com.twitter.summingbird.ArbitraryWorkaround._ import org.scalatest.WordSpec diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala index 9e17dadf3..6a7b6fc40 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala @@ -20,7 +20,8 @@ import com.twitter.summingbird._ import com.twitter.summingbird.batch.Batcher import com.twitter.summingbird.online.option.LeftJoinGrouping import org.scalatest.WordSpec -import org.scalacheck._ +import org.scalacheck.Arbitrary +import com.twitter.summingbird.ArbitraryWorkaround._ /** * Tests for Summingbird's Storm planner. diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 2d9b1e98c..6e9b013dd 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -23,7 +23,7 @@ import com.twitter.summingbird.batch.Batcher import com.twitter.summingbird.storm.spout.TraversableSpout import org.scalatest.WordSpec import org.scalacheck._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Tests for Summingbird's Storm planner. */ @@ -102,9 +102,9 @@ class TopologyTests extends WordSpec { val bolts = stormTopo.get_bolts val spouts = stormTopo.get_spouts assert(bolts.size == 1 && spouts.size == 1) - assert(bolts("Tail").get_common.get_parallelism_hint == 7) + assert(bolts.get("Tail").get_common.get_parallelism_hint == 7) - val spout = spouts.head._2 + val spout = spouts.asScala.head._2 assert(spout.get_common.get_parallelism_hint == 10) } @@ -132,8 +132,8 @@ class TopologyTests extends WordSpec { val spouts = stormTopo.get_spouts assert(stormTopo.get_bolts_size == 1 && stormTopo.get_spouts_size == 1) - assert(spouts.head._2.get_common.get_parallelism_hint == 10) - assert(bolts("Tail").get_common.get_parallelism_hint == 7) + assert(spouts.asScala.head._2.get_common.get_parallelism_hint == 10) + assert(bolts.get("Tail").get_common.get_parallelism_hint == 7) } /* @@ -229,7 +229,7 @@ class TopologyTests extends WordSpec { val bolts = stormTopo.get_bolts // Tail will have 1 -, distance from there should be onwards - val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) } + val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) } assert(TDistMap(1).get_common.get_parallelism_hint == 50) } @@ -250,7 +250,7 @@ class TopologyTests extends WordSpec { val bolts = stormTopo.get_bolts // Tail will have 1 -, distance from there should be onwards - val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) } + val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) } assert(TDistMap(1).get_common.get_parallelism_hint == 50) } @@ -270,7 +270,7 @@ class TopologyTests extends WordSpec { val bolts = stormTopo.get_bolts // Tail will have 1 -, distance from there should be onwards - val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) } + val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) } assert(TDistMap(1).get_common.get_parallelism_hint == 50) } @@ -286,7 +286,7 @@ class TopologyTests extends WordSpec { val stormTopo = storm.plan(p).topology // Source producer val bolts = stormTopo.get_bolts - val spouts = stormTopo.get_spouts + val spouts = stormTopo.get_spouts.asScala val spout = spouts.head._2 assert(spout.get_common.get_parallelism_hint == 30) @@ -307,7 +307,7 @@ class TopologyTests extends WordSpec { val bolts = stormTopo.get_bolts // Tail will have 1 -, distance from there should be onwards - val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) } + val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) } assert(TDistMap(0).get_common.get_parallelism_hint == 5) }