Skip to content

Commit

Permalink
[split] Spool improvements: * Fix forcing issues with *:: ... there w…
Browse files Browse the repository at this point in the history
…ere two problems: 1) `implicit def syntax[A]` was forcing its lazy parameter 2) the `cons` method used in Syntax[A] was also forcing the lazy parameter * Add an optionally-imported seqToScala implicit to add .toScala to Seq[A]

RB_ID=263489
  • Loading branch information
Stu Hood authored and CI committed Jan 9, 2014
1 parent 2d8f0de commit 12faddb
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 12 deletions.
51 changes: 40 additions & 11 deletions util-core/src/main/scala/com/twitter/concurrent/Spool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import java.io.EOFException
import scala.collection.mutable.ArrayBuffer

/**
* A spool is an asynchronous stream. It more or less
* mimics the scala {{Stream}} collection, but with cons
* cells that have deferred tails.
* A spool is an asynchronous stream. It more or less mimics the scala
* {{Stream}} collection, but with cons cells that have either eager or
* deferred tails.
*
* Construction is done with Spool.cons, Spool.empty. Convenience
* syntax like that of [[scala.Stream]] is provided. In order to use
* these operators for deconstruction, they must be imported
* explicitly ({{import Spool.{*::, **::}}})
* Construction of eager Spools is done with either Spool.cons or
* the {{**::}} operator. To construct a lazy/deferred Spool which
* materializes its tail on demand, use the {{*::}} operator. In order
* to use these operators for deconstruction, they must be imported
* explicitly (ie: {{import Spool.{*::, **::}}})
*
* {{{
* def fill(rest: Promise[Spool[Int]]) {
Expand Down Expand Up @@ -149,6 +150,21 @@ object Spool {
override def toString = "Cons(%s, %c)".format(head, if (tail.isDefined) '*' else '?')
}

private class LazyCons[A](val head: A, next: => Future[Spool[A]])
extends Spool[A]
{
def isEmpty = false
lazy val tail = next
def collect[B](f: PartialFunction[A, B]) = {
val next_ = tail flatMap { _.collect(f) }
if (f.isDefinedAt(head)) Future.value(Cons(f(head), next_))
else next_
}

// NB: not touching tail, to avoid forcing unnecessarily
override def toString = "Cons(%s, ?)".format(head)
}

object Empty extends Spool[Nothing] {
def isEmpty = true
def head = throw new NoSuchElementException("spool is empty")
Expand All @@ -158,7 +174,8 @@ object Spool {
}

/**
* Cons a value & (possibly deferred) tail to a new {{Spool}}.
* Cons a value & tail to a new {{Spool}}. To defer the tail of the Spool, use
* the {{*::}} operator instead.
*/
def cons[A](value: A, next: Future[Spool[A]]): Spool[A] = Cons(value, next)
def cons[A](value: A, nextSpool: Spool[A]): Spool[A] = Cons(value, Future.value(nextSpool))
Expand All @@ -168,6 +185,18 @@ object Spool {
*/
def empty[A]: Spool[A] = Empty

/**
* Adds an implicit method to efficiently convert a Seq[A] to a Spool[A]
*/
class ToSpool[A](s: Seq[A]) {
def toSpool: Spool[A] =
s.reverse.foldLeft(Spool.empty: Spool[A]) {
case (tail, head) => cons(head, tail)
}
}

implicit def seqToSpool[A](s: Seq[A]) = new ToSpool(s)

/**
* Syntax support. We retain different constructors for future
* resolving vs. not.
Expand All @@ -177,18 +206,18 @@ object Spool {
*/

class Syntax[A](tail: => Future[Spool[A]]) {
def *::(head: A) = cons(head, tail)
def *::(head: A): Spool[A] = new LazyCons(head, tail)
}

implicit def syntax[A](s: Future[Spool[A]]) = new Syntax(s)
implicit def syntax[A](s: => Future[Spool[A]]) = new Syntax(s)

object *:: {
def unapply[A](s: Spool[A]): Option[(A, Future[Spool[A]])] = {
if (s.isEmpty) None
else Some((s.head, s.tail))
}
}
class Syntax1[A](tail: => Spool[A]) {
class Syntax1[A](tail: Spool[A]) {
def **::(head: A) = cons(head, tail)
}

Expand Down
30 changes: 29 additions & 1 deletion util-core/src/test/scala/com/twitter/concurrent/SpoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.io.EOFException
import org.specs.SpecificationWithJUnit
import scala.collection.mutable.ArrayBuffer

import Spool.{*::, **::}
import Spool.{*::, **::, seqToSpool}

class SpoolSpec extends SpecificationWithJUnit {
"Empty Spool" should {
Expand Down Expand Up @@ -118,6 +118,23 @@ class SpoolSpec extends SpecificationWithJUnit {
val fold = s.reduceLeft{(x, y) => x + y}
Await.result(fold) must be_==(3)
}

"be roundtrippable through toSeq/toSpool" in {
val seq = (0 to 10).toSeq
Await.result(seq.toSpool.toSeq) must be_==(seq)
}

"flatten via flatMap of toSpool" in {
val spool = Seq(1, 2) **:: Seq(3, 4) **:: Spool.empty
val seq = Await.result(spool.toSeq)

val flatSpool =
spool.flatMap { inner =>
Future.value(inner.toSpool)
}

Await.result(flatSpool.flatMap(_.toSeq)) must be_==(seq.flatten)
}
}

"Simple resolved spool with EOFException" should {
Expand Down Expand Up @@ -248,5 +265,16 @@ class SpoolSpec extends SpecificationWithJUnit {
f.isDefined must beTrue
Await.result(f) must be_==(3)
}

"be lazy" in {
def mkSpool(i: Int = 0): Future[Spool[Int]] =
Future.value {
if (i < 3)
i *:: mkSpool(i + 1)
else
throw new AssertionError("Should not have produced " + i)
}
mkSpool() must not(throwA[AssertionError])
}
}
}

0 comments on commit 12faddb

Please sign in to comment.