Skip to content

Commit

Permalink
Merge branch 'master' into context2
Browse files Browse the repository at this point in the history
  • Loading branch information
rssh committed Oct 30, 2021
2 parents 2e7794c + 83092b6 commit a74c5d9
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 57 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//val dottyVersion = "3.0.2-RC1-bin-SNAPSHOT"
val dottyVersion = "3.1.1-RC1-bin-SNAPSHOT"
//val dottyVersion = "3.0.1-RC2"
//val dottyVersion = "3.1.0"

ThisBuild/version := "0.9.4-SNAPSHOT"
ThisBuild/version := "0.9.6-SNAPSHOT"
ThisBuild/versionScheme := Some("semver-spec")


Expand Down Expand Up @@ -43,7 +45,7 @@ lazy val cps = crossProject(JSPlatform, JVMPlatform)
"-source-links:shared=github://rssh/dotty-cps-async/master#shared",
"-source-links:jvm=github://rssh/dotty-cps-async/master#jvm"),
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test",
mimaPreviousArtifacts := Set("com.github.rssh" %% "dotty-cps-async" % "0.9.3")
mimaPreviousArtifacts := Set("com.github.rssh" %% "dotty-cps-async" % "0.9.5")
).jsSettings(
scalaJSUseMainModuleInitializer := true,
Compile / doc / scalacOptions := Seq("-groups",
Expand Down
8 changes: 4 additions & 4 deletions docs/BasicUsage.rst
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
Dependency
===========

The current prerelease is 0.9.3 for using with scala-3.0.2.
The current prerelease is 0.9.4 for using with scala-3.1.0.

.. code-block:: scala
scalaVersion := "3.0.2"
libraryDependencies += "com.github.rssh" %% "dotty-cps-async" % "0.9.3"
scalaVersion := "3.1.0"
libraryDependencies += "com.github.rssh" %% "dotty-cps-async" % "0.9.5"
JavaScript also supported.

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "dotty-cps-async" % "0.9.3"
libraryDependencies += "com.github.rssh" %%% "dotty-cps-async" % "0.9.5"
Expand Down
2 changes: 1 addition & 1 deletion docs/Features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ It is also possible to compile sip22 async code without changing of the source c

.. code-block:: scala
libraryDependencies += "com.github.rssh" %% "shim-scala-async-dotty-cps-async" % "0.9.3",
libraryDependencies += "com.github.rssh" %% "shim-scala-async-dotty-cps-async" % "0.9.5",
Note that compatibility was not a primary goal during the development of dotty-cps-async. Generated code is quite different, so if you need a bug-to-bug compatible version of scala2 async, you should use the port of the original -XAsync compiler plugin.
Expand Down
12 changes: 6 additions & 6 deletions docs/Integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ cats-effect

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-cats-effect" % "0.9.0"
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-cats-effect" % "0.9.1"
Note, that for cats-effect also exists https://github.com/typelevel/cats-effect-cps, integrated with typelevel stack.
Expand All @@ -32,39 +32,39 @@ monix

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-monix" % "0.9.0"
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-monix" % "0.9.1"
scalaz IO
^^^^^^^^^

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-scalaz" % "0.9.0"
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-scalaz" % "0.9.1"
ZIO and ZIO Streams
^^^^^^^^^^^^^^^^^^^

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-zio" % "0.9.0"
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-zio" % "0.9.1"
Akka Streams
^^^^^^^^^^^^^

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-akka-stream" % "0.9.0"
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-akka-stream" % "0.9.1"
Fs2 Stream
^^^^^^^^^^^^^

.. code-block:: scala
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-fs2" % "0.9.0"
libraryDependencies += "com.github.rssh" %%% "cps-async-connect-fs2" % "0.9.1"
Expand Down
21 changes: 13 additions & 8 deletions jvm/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_]:CpsConcurrentMonad,T](using Executio

class State:
val finishRef = new AtomicReference[Try[Unit]|Null]()
val emitStart = new AtomicBoolean
val emitStart = new AtomicBoolean()
val supplyEvents = new ConcurrentLinkedDeque[SupplyEventRecord]()
val consumerEvents = new ConcurrentLinkedDeque[Promise[SupplyEventRecord]]()
val stepStage = new AtomicInteger(0)
Expand All @@ -44,10 +44,11 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_]:CpsConcurrentMonad,T](using Executio
def queueConsumer(): F[SupplyEventRecord] =
val p = Promise[SupplyEventRecord]()
consumerEvents.offer(p)
enterStep()
asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback =>
val retval = asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback =>
p.future.onComplete(evalCallback)
}
enterStep()
retval

def finish(r: Try[Unit]):Unit =
finishRef.set(r)
Expand Down Expand Up @@ -81,10 +82,14 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_]:CpsConcurrentMonad,T](using Executio
consumerEvents.addFirst(consumer)
}
checkFinish()
if (stepStage.compareAndSet(StageBusy, StageFree)) then
done = true
else
stepStage.set(StageBusy)
if supplyEvents.isEmpty() || consumerEvents.isEmpty() then
if(stepStage.compareAndSet(StageBusy, StageFree)) then
if supplyEvents.isEmpty() || consumerEvents.isEmpty() then
done = true
else
enterStep()
else
stepStage.set(StageBusy)
}
}

Expand Down Expand Up @@ -120,7 +125,7 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_]:CpsConcurrentMonad,T](using Executio
val p = Promise[Unit]()
val emitted = Emitted(v, p)
consumer.success(emitted)
asyncMonad.adoptCallbackStyle{ emitCallback =>
asyncMonad.adoptCallbackStyle[Unit]{ emitCallback =>
p.future.onComplete(emitCallback)
}
else
Expand Down
4 changes: 1 addition & 3 deletions jvm/src/test/scala/cpstest/TestAsyncListMerge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import cps.util.*

class TestAsyncListMerge:

given CpsSchedulingMonad[CompletableFuture] = CompletableFutureCpsMonad



@Test def testMergeTwoNonEmptyTimedLists() = {

var last1: Int = -1
Expand Down
76 changes: 76 additions & 0 deletions jvm/src/test/scala/cpstest/TestAsyncListStress.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cpstest

import org.junit.{Test,Ignore}
import org.junit.Assert.*

import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global

import cps.*
import cps.monads.{*,given}
import cps.stream.*

import cps.util.*


class TestAsyncListStress:


@Test @Ignore def testMergeKNonEmptyTimedLists() = {

val nRuns = 10
val nStreams = 10
val delays = Array(50,100)
val streamSize = 1000


for( i <- 1 to nRuns) {

val f = async[Future] {

var mergedStream: AsyncList[Future,Int] = AsyncList.empty[Future]
for (j <- 1 to nStreams) {
val stream = asyncStream[AsyncList[Future,Int]] { out =>
val delay = delays( j % delays.length ).milliseconds
for(i <- 1 to streamSize) {
await(FutureSleep(delay))
out.emit(i)
}
}
mergedStream = mergedStream.merge(stream)
}

val fAll = mergedStream.takeListAll()
val all = await(fAll)
assert(all.size == nStreams * streamSize)
println(s"TestAsyncListMergeStress: done $i")

}

val limit = (delays.max * streamSize * 3).milliseconds
println(s"TestAsycListMetgeStress: waiting ${limit} = ${limit.toSeconds} seconds")
Await.result(f, limit)

}

}

@Test @Ignore def testTimedGenLists() = {
val nRuns = 2
val nElements = 100000
val delay = 3
for(i <- 1 to nRuns) {
val stream = asyncStream[AsyncList[Future,Int]] { out =>
for(j <- 1 to nElements) {
await(FutureSleep(delay.milliseconds))
out.emit(i)
}
}
val fList = stream.takeListAll()
val timeToWait = (delay * nElements * 2).milliseconds
println(s"AsyncListStressTest.testTimedGenList: waiting ${timeToWait.toSeconds} seconds")
Await.result(fList,timeToWait)
}
}

6 changes: 4 additions & 2 deletions shared/src/main/scala/cps/CpsMonad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ trait CpsTryMonad[F[_]] extends CpsMonad[F] {
case NonFatal(ex) => error(ex)
}
}


/**
* ensure that `action` will run before getting value from `fa`
Expand Down Expand Up @@ -177,7 +177,9 @@ trait CpsTryMonad[F[_]] extends CpsMonad[F] {
case NonFatal(ex) => error(ex)
}


/**
* transform `r` into pure value or error.
**/
def fromTry[A](r: Try[A]): F[A] =
r match
case Success(a) => pure(a)
Expand Down
4 changes: 2 additions & 2 deletions shared/src/main/scala/cps/macros/CpsExpr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait CpsExpr[F[_]:Type,T:Type](monad:Expr[CpsMonad[F]], prev: Seq[ExprTreeGen])
if (prev.isEmpty)
fLast
else
Block(prev.toList.map(_.extract), fLast.asTerm).asExprOf[F[T]]
Block(prev.toList.map(_.extract), fLast.asTerm).changeOwner(Symbol.spliceOwner).asExprOf[F[T]]

def prependExprs(exprs: Seq[ExprTreeGen]): CpsExpr[F,T]

Expand Down Expand Up @@ -84,7 +84,7 @@ abstract class SyncCpsExpr[F[_]:Type, T: Type](dm: Expr[CpsMonad[F]],
} else {
Typed(lastTerm, TypeTree.of[T])
}
Block(prev.toList.map(_.extract), typedLast).asExprOf[T]
Block(prev.toList.map(_.extract), typedLast).changeOwner(Symbol.spliceOwner).asExprOf[T]
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ trait InlinedTreeTransform[F[_], CT]:
val awaitVals = funValDefs.awaitVals.filter(x => usedAwaitVals.contains(x.symbol))
val body =
if (!awaitVals.isEmpty) {
bodyWithoutAwaits match
bodyWithoutAwaits.changeOwner(Symbol.spliceOwner) match
case Block(statements, last) => Block(awaitVals ++ statements, last)
case other => Block(awaitVals, other)
} else
Expand Down
24 changes: 10 additions & 14 deletions shared/src/main/scala/cps/macros/forest/ValDefTransform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,17 @@ object ValDefTransform:

override def fLast(using Quotes) =
import quotes.reflect._

def appendBlockExpr[A:quoted.Type](rhs: quotes.reflect.Term, expr: Expr[A]):Expr[A] =
buildAppendBlockExpr(oldValDef.asInstanceOf[quotes.reflect.ValDef],
rhs, expr)


next.syncOrigin match
case Some(nextOrigin) =>
'{
${monad}.map(${cpsRhs.transformed})((v:V) =>
${appendBlockExpr('v.asTerm, nextOrigin)})
${buildAppendBlockExpr('v, nextOrigin)})
}
case None =>
'{
${monad}.flatMap(${cpsRhs.transformed})((v:V)=>
${appendBlockExpr('v.asTerm, next.transformed)})
${buildAppendBlockExpr('v, next.transformed)})
}

override def prependExprs(exprs: Seq[ExprTreeGen]): CpsExpr[F,T] =
Expand All @@ -141,25 +137,25 @@ object ValDefTransform:
RhsFlatMappedCpsExpr(using thisQuotes)(monad,prev,oldValDef,cpsRhs,next.append(e))


private def buildAppendBlock(using Quotes)(
oldValDef: quotes.reflect.ValDef, rhs:quotes.reflect.Term,
exprTerm:quotes.reflect.Term): quotes.reflect.Term =
private def buildAppendBlock(using Quotes)(rhs:quotes.reflect.Term,
exprTerm:quotes.reflect.Term): quotes.reflect.Term =
{
import quotes.reflect._
import scala.quoted.Expr

val valDef = ValDef(oldValDef.symbol, Some(rhs.changeOwner(oldValDef.symbol)))
exprTerm match
val castedOldValDef = oldValDef.asInstanceOf[quotes.reflect.ValDef]
val valDef = ValDef(castedOldValDef.symbol, Some(rhs.changeOwner(castedOldValDef.symbol)))
exprTerm.changeOwner(castedOldValDef.symbol.owner) match
case Block(stats,last) =>
Block(valDef::stats, last)
case other =>
Block(valDef::Nil,other)

}

private def buildAppendBlockExpr[A:Type](using Quotes)(oldValDef: quotes.reflect.ValDef, rhs: quotes.reflect.Term, expr:Expr[A]):Expr[A] =
private def buildAppendBlockExpr[A:Type](using Quotes)(rhs: Expr[V], expr:Expr[A]):Expr[A] =
import quotes.reflect._
buildAppendBlock(oldValDef,rhs,expr.asTerm).asExprOf[A]
buildAppendBlock(rhs.asTerm,expr.asTerm).asExprOf[A]

}

Expand Down
7 changes: 1 addition & 6 deletions shared/src/main/scala/cps/macros/misc/WithOptExprProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,5 @@ object WithOptExprProxy:
case blockExpr@Block(stats, expr) =>
Block(proxyValDef::stats, expr)
case expr => Block(List(proxyValDef), expr)
resTerm.asExprOf[S]

resTerm.changeOwner(Symbol.spliceOwner).asExprOf[S]





15 changes: 8 additions & 7 deletions shared/src/main/scala/cps/stream/AsyncList.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ object AsyncList {
def takeTo[B <: AbstractBuffer[T]](buffer: B, n: Int):F[B] =
if (n == 0) then
summon[CpsMonad[F]].pure(buffer)
var next: AsyncList[F,T] = this
var current: Cons[F, T] = this
var endLoop = false
var nRest = n
while(nRest != 0 && !endLoop) {
else
var next: AsyncList[F,T] = this
var current: Cons[F, T] = this
var endLoop = false
var nRest = n
while(nRest != 0 && !endLoop) {
buffer.addOne(current.head)
next = current.tailFun()
next match
Expand All @@ -204,8 +205,8 @@ object AsyncList {
case _ =>
endLoop = true
nRest = nRest - 1
}
next.takeTo(buffer, nRest)
}
next.takeTo(buffer, nRest)

def merge[S >: T](other: AsyncList[F,S]): AsyncList[F,S] =
Cons(head, ()=>other.merge(tailFun()))
Expand Down
2 changes: 1 addition & 1 deletion shared/src/test/scala/cps/pe/TestFizzBuzz.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestFizzBuzz:
//implicit inline def debugLevel: cps.macroFlags.DebugLevel = cps.macroFlags.DebugLevel(10)


@Test def testFizBuzz: Unit =
@Test def testFizBuzz =
val c = async[PureEffect] {
val logger = PEToyLogger.make()
val counter = PEIntRef.make(-1)
Expand Down
Loading

0 comments on commit a74c5d9

Please sign in to comment.