Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Support Scala 2.12 #724

Merged
merged 14 commits into from
Jul 25, 2017
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ env:

matrix:
include:
- scala: 2.10.6
jdk: oraclejdk8
script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION test mimaReportBinaryIssues

- scala: 2.11.8
- scala: 2.11.11
jdk: oraclejdk8
script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION clean coverage test coverageReport mimaReportBinaryIssues
after_success:
- bash <(curl -s https://codecov.io/bash)

- scala: 2.12.2
jdk: oraclejdk8
script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION clean test

cache:
directories:
- $HOME/.sbt/0.13/dependency
Expand Down
23 changes: 10 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import sbtassembly.Plugin._

def scalaBinaryVersion(scalaVersion: String) = scalaVersion match {
case version if version startsWith "2.10" => "2.10"
case version if version startsWith "2.11" => "2.11"
case version if version startsWith "2.12" => "2.12"
case _ => sys.error("Unsupported scala version: " + scalaVersion)
Expand All @@ -15,25 +14,23 @@ def isScala210x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.1
def sequentialExecution: Boolean =
Option(System.getProperty("sequentialExecution")).map(_.toBoolean).getOrElse(false)

val algebirdVersion = "0.12.0"
val bijectionVersion = "0.9.1"
val chillVersion = "0.8.3"
val commonsHttpClientVersion = "3.1"
val algebirdVersion = "0.13.0"
val bijectionVersion = "0.9.5"
val chillVersion = "0.8.4"
val commonsLangVersion = "2.6"
val finagleVersion = "6.35.0"
val hadoopVersion = "1.2.1"
val junitVersion = "4.11"
val log4jVersion = "1.2.16"
val novocodeJunitVersion = "0.10"
val scalaCheckVersion = "1.13.4"
val scalatestVersion = "3.0.1"
val scaldingVersion = "0.16.1-RC3"
val scaldingVersion = "0.17.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we roll back to 0.17.0 here or wait for 0.17.2 due to the recently discovered semver issue with 0.17.1 cc @piyushnarang

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.17.0 breaks SB tests so we can't roll back. I'm going to publish 0.17.2 today.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val slf4jVersion = "1.6.6"
val storehausVersion = "0.15.0-RC1"
val storehausVersion = "0.15.0"
val stormDep = "org.apache.storm" % "storm-core" % "1.0.2"
val tormentaVersion = "0.12.0"
val utilVersion = "6.34.0"
val chainVersion = "0.1.0"
val utilVersion = "6.43.0"
val chainVersion = "0.2.0"

val extraSettings = mimaDefaultSettings

Expand All @@ -50,8 +47,8 @@ val executionSettings = if (sequentialExecution) {

val sharedSettings = extraSettings ++ executionSettings ++ Seq(
organization := "com.twitter",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
scalaVersion := "2.11.11",
crossScalaVersions := Seq("2.11.11", "2.12.2"),
// To support hadoop 1.x
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),

Expand Down Expand Up @@ -188,7 +185,7 @@ def youngestForwardCompatible(subProj: String) =
// Uncomment after release.
// Some(subProj)
// .filterNot(unreleasedModules.contains(_))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we delete these commented lines?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we need to uncomment after each version bump. Deleting seems like a good way to keep making mima mistakes like the scalding one.

// .map { s => "com.twitter" % ("summingbird-" + s + "_2.10") % "0.9.0" }
// .map { s => "com.twitter" % ("summingbird-" + s + "_2.11") % "0.9.0" }

/**
* Empty this each time we publish a new version (and bump the minor number)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class HDFSStateLaws extends WordSpec {
}
}

def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp) = Interval.leftClosedRightOpen[Timestamp](low, high).right.get
def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp): Interval[Timestamp] =
Interval.leftClosedRightOpen[Timestamp](low, high)

def shouldNotAcceptInterval(state: WaitingState[Interval[Timestamp]], interval: Interval[Timestamp], message: String = "PreparedState accepted a bad Interval!") = {
state.begin.willAccept(interval) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,15 @@ object Timestamp {
implicit val timestamp2Long: Bijection[Timestamp, Long] =
Bijection.build[Timestamp, Long] { _.milliSinceEpoch } { Timestamp(_) }

implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] {
// Workaround for https://github.com/twitter/algebird/issues/635
implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] with Serializable {
def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] {
def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

// This is a right semigroup, that given any two Timestamps just take the one on the right.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[scalding] class VersionedState(meta: HDFSMetadata, startDate: Option[Tim
Interval.leftClosedRightOpen(
batcher.earliestTimeOf(beginning),
batcher.earliestTimeOf(end)
).right.get
)
}

def willAccept(available: Interval[Timestamp]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class ClientStoreProps extends Properties("ClientStore") {
val offline = Future.value(Some((b, 0)))
val nextB = BatchID(b.id + offset)
if (offset >= 0) {
Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline)) == offline.get
Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline)) == Await.result(offline)
} else {
Await.ready(ClientStore.offlineLTEQBatch(0, nextB, offline)).isThrow
Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline).liftToTry).isThrow
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.twitter.conversions.time._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.memcached.KetamaClientBuilder
import com.twitter.finagle.memcached.protocol.text.Memcached
import com.twitter.finagle.transport.Transport
import com.twitter.storehaus.Store
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.storehaus.memcache.{ HashEncoder, MemcacheStore }
Expand All @@ -43,12 +44,14 @@ object Memcache {
.tcpConnectTimeout(DEFAULT_TIMEOUT)
.requestTimeout(DEFAULT_TIMEOUT)
.connectTimeout(DEFAULT_TIMEOUT)
.readerIdleTimeout(DEFAULT_TIMEOUT)
.hostConnectionLimit(1)
.codec(Memcached())

val liveness = builder.params[Transport.Liveness].copy(readTimeout = DEFAULT_TIMEOUT)
val liveBuilder = builder.configured(liveness)

KetamaClientBuilder()
.clientBuilder(builder)
.clientBuilder(liveBuilder)
.nodes("localhost:11211")
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import com.twitter.summingbird.SummingbirdRuntimeStats

import com.twitter.scalding.{ Test => TestMode, _ }

import org.scalacheck._
import org.scalacheck.Arbitrary
import org.scalacheck.Cogen

import org.apache.hadoop.conf.Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.io.File

import com.twitter.scalding._

import org.scalacheck._
import org.scalacheck.Arbitrary

import org.scalatest.WordSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.twitter.summingbird.batch.Batcher
import com.twitter.summingbird.storm.spout.TraversableSpout
import org.scalatest.WordSpec
import org.scalacheck._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
/**
* Tests for Summingbird's Storm planner.
*/
Expand Down Expand Up @@ -102,9 +102,9 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts
val spouts = stormTopo.get_spouts
assert(bolts.size == 1 && spouts.size == 1)
assert(bolts("Tail").get_common.get_parallelism_hint == 7)
assert(bolts.get("Tail").get_common.get_parallelism_hint == 7)

val spout = spouts.head._2
val spout = spouts.asScala.head._2
assert(spout.get_common.get_parallelism_hint == 10)
}

Expand Down Expand Up @@ -132,8 +132,8 @@ class TopologyTests extends WordSpec {
val spouts = stormTopo.get_spouts

assert(stormTopo.get_bolts_size == 1 && stormTopo.get_spouts_size == 1)
assert(spouts.head._2.get_common.get_parallelism_hint == 10)
assert(bolts("Tail").get_common.get_parallelism_hint == 7)
assert(spouts.asScala.head._2.get_common.get_parallelism_hint == 10)
assert(bolts.get("Tail").get_common.get_parallelism_hint == 7)
}

/*
Expand Down Expand Up @@ -229,7 +229,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}
Expand All @@ -250,7 +250,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}
Expand All @@ -270,7 +270,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}
Expand All @@ -286,7 +286,7 @@ class TopologyTests extends WordSpec {
val stormTopo = storm.plan(p).topology
// Source producer
val bolts = stormTopo.get_bolts
val spouts = stormTopo.get_spouts
val spouts = stormTopo.get_spouts.asScala
val spout = spouts.head._2

assert(spout.get_common.get_parallelism_hint == 30)
Expand All @@ -307,7 +307,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(0).get_common.get_parallelism_hint == 5)
}
Expand Down