Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Summingbird changes for Algebird 0.13.0 #715

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def scalaBinaryVersion(scalaVersion: String) = scalaVersion match {

def isScala210x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.10"

val algebirdVersion = "0.12.0"
val algebirdVersion = "0.13.0"
val bijectionVersion = "0.9.1"
val chillVersion = "0.7.3"
val commonsHttpClientVersion = "3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package com.twitter.summingbird.batch

import java.util.{ TimeZone, UUID }

import com.twitter.algebird.{ Intersection, Interval }
import com.twitter.scalding.{ DateParser, RichDate }
import java.util.{TimeZone, UUID}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it should be with spaces (I think it's where your IDEA style settings going against our scalafmt setup).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah seems like the Twitter defaults are set to not include the space :-).

import com.twitter.algebird.{Intersection, Interval}
import com.twitter.scalding.{DateParser, RichDate}
import com.twitter.summingbird.batch.state.HDFSState
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.scalatest.WordSpec

class HDFSStateLaws extends WordSpec {
Expand Down Expand Up @@ -81,7 +79,7 @@ class HDFSStateLaws extends WordSpec {
}
}

def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp) = Interval.leftClosedRightOpen[Timestamp](low, high).right.get
def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp) = Interval.leftClosedRightOpen[Timestamp](low, high)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put a type on the result here? Interval[Timestamp]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


def shouldNotAcceptInterval(state: WaitingState[Interval[Timestamp]], interval: Interval[Timestamp], message: String = "PreparedState accepted a bad Interval!") = {
state.begin.willAccept(interval) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package com.twitter.summingbird.batch
import com.twitter.algebird.Monoid
import com.twitter.algebird.{
Empty,
Interval,
Intersection,
InclusiveLower,
ExclusiveLower,
ExclusiveUpper,
InclusiveLower,
InclusiveUpper,
ExclusiveLower,
Intersection,
Interval,
Universe
}
import com.twitter.bijection.{ Bijection, Injection }
import com.twitter.bijection.{Bijection, Injection}
import scala.collection.Iterator.iterate

/**
Expand Down Expand Up @@ -75,7 +75,7 @@ object BatchID {
.flatMap {
case (min, max, cnt) =>
if ((min + cnt) == (max + 1L)) {
Some(Interval.leftClosedRightOpen(min, max.next).right.get)
Some[Interval[BatchID]](Interval.leftClosedRightOpen(min, max.next))
} else {
// These batches are not contiguous, not an interval
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ object Timestamp {
implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] {
def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] {
def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

// This is a right semigroup, that given any two Timestamps just take the one on the right.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ limitations under the License.

package com.twitter.summingbird.batch

import org.scalacheck.{ Arbitrary, Gen, Properties }
import org.scalacheck.{Arbitrary, Gen, Properties}
import org.scalacheck.Prop._

import java.util.concurrent.TimeUnit

import com.twitter.algebird.Interval

object BatchLaws extends Properties("BatchID") {
Expand Down Expand Up @@ -54,11 +51,8 @@ object BatchLaws extends Properties("BatchID") {
forAll(Arbitrary.arbitrary[BatchID], Gen.choose(0L, 1000L)) { (b1: BatchID, diff: Long) =>
// We can't enumerate too much:
val b2 = b1 + diff
val interval = Interval.leftClosedRightOpen(b1, b2.next) match {
case Left(i) => i
case Right(i) => i
}
(BatchID.toInterval(BatchID.range(b1, b2)) == Some(interval)) &&
val interval: Interval[BatchID] = Interval.leftClosedRightOpen(b1, b2.next)
(BatchID.toInterval(BatchID.range(b1, b2)) == Some(interval)) &&
BatchID.toIterable(interval).toList == BatchID.range(b1, b2).toList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
package com.twitter.summingbird.scalding

import com.twitter.algebird.{
ExclusiveUpper,
InclusiveUpper,
Intersection,
Interval,
ExclusiveUpper
Interval
}
import com.twitter.summingbird.batch.{
Batcher,
BatchID,
Batcher,
PrepareState,
RunningState,
Timestamp,
WaitingState
}
import com.twitter.summingbird.batch.store.HDFSMetadata

import org.slf4j.LoggerFactory

import scala.util.{ Try => ScalaTry, Success, Failure }
import scala.util.{Failure, Success, Try => ScalaTry}

/**
* State representation used by the builder API for compatibility.
Expand Down Expand Up @@ -81,7 +79,7 @@ private[scalding] class VersionedState(meta: HDFSMetadata, startDate: Option[Tim
Interval.leftClosedRightOpen(
batcher.earliestTimeOf(beginning),
batcher.earliestTimeOf(end)
).right.get
)
}

def willAccept(available: Interval[Timestamp]) =
Expand Down