Skip to content

Added akka stream decoding flow and sink #18

Merged
merged 3 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ lazy val core = phobosModule("core")
lazy val derevo = phobosModule("derevo") dependsOn (core % "compile->compile;test->test")
lazy val enumeratum = phobosModule("enumeratum") dependsOn (core % "compile->compile;test->test")
lazy val akka = phobosModule("akka") dependsOn (core % "compile->compile;test->test")
lazy val `akka-stream` = phobosModule("akka-stream") dependsOn (core % "compile->compile;test->test")
lazy val monix = phobosModule("monix") dependsOn (core % "compile->compile;test->test")
lazy val fs2 = phobosModule("fs2") dependsOn (core % "compile->compile;test->test")

lazy val modules: List[ProjectReference] = List(core, akka, derevo, enumeratum, monix, fs2)
lazy val modules: List[ProjectReference] = List(core, akka, derevo, enumeratum, monix, fs2, `akka-stream`)


lazy val phobos = project
Expand Down
4 changes: 4 additions & 0 deletions modules/akka-stream/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.6.4",
"com.typesafe.akka" %% "akka-testkit" % "2.6.4" % Test
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package ru.tinkoff.phobos

object akka_stream extends ops.AkkaStreamOps
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ru.tinkoff.phobos.ops

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Keep, Sink}
import javax.xml.stream.XMLStreamConstants
import ru.tinkoff.phobos.decoding._
import scala.concurrent.Future

private[phobos] trait AkkaStreamOps {

/**
* @note - works only for streams emmiting single xml document
* */
def decodingFlow[A: XmlDecoder](charset: String = "UTF-8"): Flow[Array[Byte], Either[DecodingError, A], NotUsed] = {
val xmlDecoder = XmlDecoder[A]
Flow[Array[Byte]]
.fold(Option.empty[SinkDecoderState[A]]) { (stateOpt, bytes) =>
val state = stateOpt.getOrElse(SinkDecoderState.initial(xmlDecoder, charset)) // trick to make this flow reusable (because of mutable Cursor)
import state.{cursor, elementDecoder, xmlStreamReader}
xmlStreamReader.getInputFeeder.feedInput(bytes, 0, bytes.length)
do {
cursor.next()
} while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT)

Some {
state withEncoder {
if (elementDecoder.result(cursor.history).isRight) {
elementDecoder
} else {
elementDecoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri)
}
}
}
}
.map {
case None =>
throw DecodingError("Got an internal error while decoding byte stream", Nil)

case Some(SinkDecoderState(_, cursor, elementDecoder)) =>
elementDecoder.result(cursor.history)
}
}

/**
* @note - works only for streams emmiting single xml document
* */
def decodingFlowUnsafe[A: XmlDecoder](charset: String = "UTF-8"): Flow[Array[Byte], A, NotUsed] =
decodingFlow(charset).map(_.fold(throw _, identity))

def decodingSink[A: XmlDecoder](charset: String = "UTF-8"): Sink[Array[Byte], Future[Either[DecodingError, A]]] =
decodingFlow(charset).toMat(Sink.head)(Keep.right)

def decodingSinkUnsafe[A: XmlDecoder](charset: String = "UTF-8"): Sink[Array[Byte], Future[A]] =
decodingFlowUnsafe(charset).toMat(Sink.head)(Keep.right)
}

private[phobos] case class SinkDecoderState[A](
xmlStreamReader: XmlStreamReader,
cursor: Cursor,
elementDecoder: ElementDecoder[A]
) {
def withEncoder(that: ElementDecoder[A]): SinkDecoderState[A] = copy(elementDecoder = that)
}

private[phobos] object SinkDecoderState {

def initial[A](xmlDecoder: XmlDecoder[A], charset: String): SinkDecoderState[A] = {
val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset)
val cursor = new Cursor(sr)
SinkDecoderState(
xmlStreamReader = sr,
cursor = cursor,
elementDecoder = xmlDecoder.elementdecoder
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package ru.tinkoff.phobos.test

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{Materializer, SystemMaterializer}
import akka.testkit.TestKit
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.wordspec.AsyncWordSpecLike
import ru.tinkoff.phobos.akka_stream._
import ru.tinkoff.phobos.annotations.{ElementCodec, XmlCodec}
import ru.tinkoff.phobos.syntax.text

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure

class AkkaStreamTest
extends TestKit(ActorSystem("akka-stream-test")) with AsyncWordSpecLike with ScalaFutures with BeforeAndAfterAll {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 200.millis)
private implicit lazy val mat: Materializer = SystemMaterializer(system).materializer

override def beforeAll(): Unit = {
system.registerOnTermination {
mat.shutdown()
}
}

override def afterAll(): Unit = system.terminate()

import AkkaStreamTest._

"Akka decodingFlow" should {
"decode case classes correctly" in {

val xml = """
|<foo>
| <qux>1234</qux>
| <bars>2</bars>
| <maybeBar>1</maybeBar>
| <bars>3</bars>
|</foo>
|""".stripMargin

val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3)))
val bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes))

val result = bytesSource
.via(decodingFlow[Foo]())
.runWith(Sink.head)
.futureValue

assert(result.isRight)
assert(result contains foo)
}

"raise correct errors" in {

val xml = """
|<foo>
| <quxar>safgd</quxar>
| <bars>2</bars>
| <maybeBar>1</maybeBar>
| <bars>3</bars>
|</foo>
|""".stripMargin

val bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes))

val result = bytesSource
.via(decodingFlow[Foo]())
.runWith(Sink.head)
.futureValue

assert(result.isLeft)
}

"is reusable" in {

val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3)))
val xml = """
|<foo>
| <qux>1234</qux>
| <bars>2</bars>
| <maybeBar>1</maybeBar>
| <bars>3</bars>
|</foo>
|""".stripMargin

val fooFlow = decodingFlow[Foo]()

def bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes))

val result = Source(1 to 20)
.mapAsyncUnordered(parallelism = Runtime.getRuntime.availableProcessors()) { _ =>
bytesSource
.via(fooFlow)
.runWith(Sink.head)
}
.runWith(Sink.seq)
.futureValue

assert(result.forall(_.contains(foo)))
}
}

"Akka decodingFlowUnsafe" should {
"decode case classes correctly" in {

val xml = """
|<foo>
| <qux>1234</qux>
| <bars>2</bars>
| <maybeBar>1</maybeBar>
| <bars>3</bars>
|</foo>
|""".stripMargin

val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3)))
val bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes))

val result = bytesSource
.via(decodingFlowUnsafe[Foo]())
.runWith(Sink.head)
.futureValue

assert(result == foo)
}
}
}

object AkkaStreamTest {
@ElementCodec
case class Bar(@text txt: Int)
@XmlCodec("foo")
case class Foo(qux: Int, maybeBar: Option[Bar], bars: List[Bar])
}
46 changes: 24 additions & 22 deletions modules/fs2/src/main/scala/ru/tinkoff/phobos/ops/Fs2Ops.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import ru.tinkoff.phobos.decoding.{Cursor, ElementDecoder, XmlDecoder, XmlStream
import fs2.Stream

private[phobos] trait Fs2Ops {
implicit class DecoderOps[A](xmlDecoder: XmlDecoder[A]) {
def decodeFromStream[F[_], G[_]](stream: Stream[F, Array[Byte]], charset: String = "UTF-8")(
implicit compiler: Stream.Compiler[F, G],
monadError: MonadError[G, Throwable]): G[A] = {
val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset)
val cursor = new Cursor(sr)
implicit def DecoderOps[A](xmlDecoder: XmlDecoder[A]): DecoderOps[A] = new DecoderOps[A](xmlDecoder)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как насчёт сделать такой же трюк для мониска заодно?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А для чего этот трюк?

}

class DecoderOps[A](private val xmlDecoder: XmlDecoder[A]) extends AnyVal {
def decodeFromStream[F[_], G[_]](stream: Stream[F, Array[Byte]], charset: String = "UTF-8")(
implicit compiler: Stream.Compiler[F, G],
monadError: MonadError[G, Throwable]): G[A] = {
val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset)
val cursor = new Cursor(sr)

stream
.fold[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) =>
sr.getInputFeeder.feedInput(bytes, 0, bytes.length)
do {
cursor.next()
} while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT)
stream
.fold[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) =>
sr.getInputFeeder.feedInput(bytes, 0, bytes.length)
do {
cursor.next()
} while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT)

if (decoder.result(cursor.history).isRight) {
decoder
} else {
decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri)
}
if (decoder.result(cursor.history).isRight) {
decoder
} else {
decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri)
}
.map(_.result(cursor.history))
.compile
.lastOrError
.flatMap(result => MonadError[G, Throwable].fromEither(result))
}
}
.map(_.result(cursor.history))
.compile
.lastOrError
.flatMap(result => MonadError[G, Throwable].fromEither(result))
}
}