diff --git a/build.sbt b/build.sbt index cdd22aa..133e36f 100644 --- a/build.sbt +++ b/build.sbt @@ -44,10 +44,11 @@ lazy val core = phobosModule("core") lazy val derevo = phobosModule("derevo") dependsOn (core % "compile->compile;test->test") lazy val enumeratum = phobosModule("enumeratum") dependsOn (core % "compile->compile;test->test") lazy val akka = phobosModule("akka") dependsOn (core % "compile->compile;test->test") +lazy val `akka-stream` = phobosModule("akka-stream") dependsOn (core % "compile->compile;test->test") lazy val monix = phobosModule("monix") dependsOn (core % "compile->compile;test->test") lazy val fs2 = phobosModule("fs2") dependsOn (core % "compile->compile;test->test") -lazy val modules: List[ProjectReference] = List(core, akka, derevo, enumeratum, monix, fs2) +lazy val modules: List[ProjectReference] = List(core, akka, derevo, enumeratum, monix, fs2, `akka-stream`) lazy val phobos = project diff --git a/modules/akka-stream/build.sbt b/modules/akka-stream/build.sbt new file mode 100644 index 0000000..1380492 --- /dev/null +++ b/modules/akka-stream/build.sbt @@ -0,0 +1,4 @@ +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-stream" % "2.6.4", + "com.typesafe.akka" %% "akka-testkit" % "2.6.4" % Test +) diff --git a/modules/akka-stream/src/main/scala/ru/tinkoff/phobos/akka_stream.scala b/modules/akka-stream/src/main/scala/ru/tinkoff/phobos/akka_stream.scala new file mode 100644 index 0000000..fd25c44 --- /dev/null +++ b/modules/akka-stream/src/main/scala/ru/tinkoff/phobos/akka_stream.scala @@ -0,0 +1,3 @@ +package ru.tinkoff.phobos + +object akka_stream extends ops.AkkaStreamOps diff --git a/modules/akka-stream/src/main/scala/ru/tinkoff/phobos/ops/AkkaStreamOps.scala b/modules/akka-stream/src/main/scala/ru/tinkoff/phobos/ops/AkkaStreamOps.scala new file mode 100644 index 0000000..987e8b0 --- /dev/null +++ b/modules/akka-stream/src/main/scala/ru/tinkoff/phobos/ops/AkkaStreamOps.scala @@ -0,0 +1,76 @@ +package ru.tinkoff.phobos.ops + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Keep, Sink} +import javax.xml.stream.XMLStreamConstants +import ru.tinkoff.phobos.decoding._ +import scala.concurrent.Future + +private[phobos] trait AkkaStreamOps { + + /** + * @note - works only for streams emmiting single xml document + * */ + def decodingFlow[A: XmlDecoder](charset: String = "UTF-8"): Flow[Array[Byte], Either[DecodingError, A], NotUsed] = { + val xmlDecoder = XmlDecoder[A] + Flow[Array[Byte]] + .fold(Option.empty[SinkDecoderState[A]]) { (stateOpt, bytes) => + val state = stateOpt.getOrElse(SinkDecoderState.initial(xmlDecoder, charset)) // trick to make this flow reusable (because of mutable Cursor) + import state.{cursor, elementDecoder, xmlStreamReader} + xmlStreamReader.getInputFeeder.feedInput(bytes, 0, bytes.length) + do { + cursor.next() + } while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT) + + Some { + state withEncoder { + if (elementDecoder.result(cursor.history).isRight) { + elementDecoder + } else { + elementDecoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri) + } + } + } + } + .map { + case None => + throw DecodingError("Got an internal error while decoding byte stream", Nil) + + case Some(SinkDecoderState(_, cursor, elementDecoder)) => + elementDecoder.result(cursor.history) + } + } + + /** + * @note - works only for streams emmiting single xml document + * */ + def decodingFlowUnsafe[A: XmlDecoder](charset: String = "UTF-8"): Flow[Array[Byte], A, NotUsed] = + decodingFlow(charset).map(_.fold(throw _, identity)) + + def decodingSink[A: XmlDecoder](charset: String = "UTF-8"): Sink[Array[Byte], Future[Either[DecodingError, A]]] = + decodingFlow(charset).toMat(Sink.head)(Keep.right) + + def decodingSinkUnsafe[A: XmlDecoder](charset: String = "UTF-8"): Sink[Array[Byte], Future[A]] = + decodingFlowUnsafe(charset).toMat(Sink.head)(Keep.right) +} + +private[phobos] case class SinkDecoderState[A]( + xmlStreamReader: XmlStreamReader, + cursor: Cursor, + elementDecoder: ElementDecoder[A] +) { + def withEncoder(that: ElementDecoder[A]): SinkDecoderState[A] = copy(elementDecoder = that) +} + +private[phobos] object SinkDecoderState { + + def initial[A](xmlDecoder: XmlDecoder[A], charset: String): SinkDecoderState[A] = { + val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset) + val cursor = new Cursor(sr) + SinkDecoderState( + xmlStreamReader = sr, + cursor = cursor, + elementDecoder = xmlDecoder.elementdecoder + ) + } +} diff --git a/modules/akka-stream/src/test/scala/ru/tinkoff/phobos/test/AkkaStreamTest.scala b/modules/akka-stream/src/test/scala/ru/tinkoff/phobos/test/AkkaStreamTest.scala new file mode 100644 index 0000000..83ac8da --- /dev/null +++ b/modules/akka-stream/src/test/scala/ru/tinkoff/phobos/test/AkkaStreamTest.scala @@ -0,0 +1,135 @@ +package ru.tinkoff.phobos.test + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Sink, Source} +import akka.stream.{Materializer, SystemMaterializer} +import akka.testkit.TestKit +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.wordspec.AsyncWordSpecLike +import ru.tinkoff.phobos.akka_stream._ +import ru.tinkoff.phobos.annotations.{ElementCodec, XmlCodec} +import ru.tinkoff.phobos.syntax.text +import scala.concurrent.duration._ + +class AkkaStreamTest + extends TestKit(ActorSystem("akka-stream-test")) with AsyncWordSpecLike with ScalaFutures with BeforeAndAfterAll { + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 200.millis) + private implicit lazy val mat: Materializer = SystemMaterializer(system).materializer + + override def beforeAll(): Unit = { + system.registerOnTermination { + mat.shutdown() + } + } + + override def afterAll(): Unit = system.terminate() + + import AkkaStreamTest._ + + "Akka decodingFlow" should { + "decode case classes correctly" in { + + val xml = """ + | + | 1234 + | 2 + | 1 + | 3 + | + |""".stripMargin + + val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3))) + val bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes)) + + val result = bytesSource + .via(decodingFlow[Foo]()) + .runWith(Sink.head) + .futureValue + + assert(result.isRight) + assert(result contains foo) + } + + "raise correct errors" in { + + val xml = """ + | + | safgd + | 2 + | 1 + | 3 + | + |""".stripMargin + + val bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes)) + + val result = bytesSource + .via(decodingFlow[Foo]()) + .runWith(Sink.head) + .futureValue + + assert(result.isLeft) + } + + "is reusable" in { + + val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3))) + val xml = """ + | + | 1234 + | 2 + | 1 + | 3 + | + |""".stripMargin + + val fooFlow = decodingFlow[Foo]() + + def bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes)) + + val result = Source(1 to 20) + .mapAsyncUnordered(parallelism = Runtime.getRuntime.availableProcessors()) { _ => + bytesSource + .via(fooFlow) + .runWith(Sink.head) + } + .runWith(Sink.seq) + .futureValue + + assert(result.forall(_.contains(foo))) + } + } + + "Akka decodingFlowUnsafe" should { + "decode case classes correctly" in { + + val xml = """ + | + | 1234 + | 2 + | 1 + | 3 + | + |""".stripMargin + + val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3))) + val bytesSource = Source.fromIterator(() => xml.grouped(5).map(_.getBytes)) + + val result = bytesSource + .via(decodingFlowUnsafe[Foo]()) + .runWith(Sink.head) + .futureValue + + assert(result == foo) + } + } +} + +object AkkaStreamTest { + @ElementCodec + case class Bar(@text txt: Int) + @XmlCodec("foo") + case class Foo(qux: Int, maybeBar: Option[Bar], bars: List[Bar]) +} diff --git a/modules/fs2/src/main/scala/ru/tinkoff/phobos/ops/Fs2Ops.scala b/modules/fs2/src/main/scala/ru/tinkoff/phobos/ops/Fs2Ops.scala index ee3fd46..8bae6ca 100644 --- a/modules/fs2/src/main/scala/ru/tinkoff/phobos/ops/Fs2Ops.scala +++ b/modules/fs2/src/main/scala/ru/tinkoff/phobos/ops/Fs2Ops.scala @@ -7,30 +7,32 @@ import ru.tinkoff.phobos.decoding.{Cursor, ElementDecoder, XmlDecoder, XmlStream import fs2.Stream private[phobos] trait Fs2Ops { - implicit class DecoderOps[A](xmlDecoder: XmlDecoder[A]) { - def decodeFromStream[F[_], G[_]](stream: Stream[F, Array[Byte]], charset: String = "UTF-8")( - implicit compiler: Stream.Compiler[F, G], - monadError: MonadError[G, Throwable]): G[A] = { - val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset) - val cursor = new Cursor(sr) + implicit def DecoderOps[A](xmlDecoder: XmlDecoder[A]): DecoderOps[A] = new DecoderOps[A](xmlDecoder) +} + +class DecoderOps[A](private val xmlDecoder: XmlDecoder[A]) extends AnyVal { + def decodeFromStream[F[_], G[_]](stream: Stream[F, Array[Byte]], charset: String = "UTF-8")( + implicit compiler: Stream.Compiler[F, G], + monadError: MonadError[G, Throwable]): G[A] = { + val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset) + val cursor = new Cursor(sr) - stream - .fold[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) => - sr.getInputFeeder.feedInput(bytes, 0, bytes.length) - do { - cursor.next() - } while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT) + stream + .fold[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) => + sr.getInputFeeder.feedInput(bytes, 0, bytes.length) + do { + cursor.next() + } while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT) - if (decoder.result(cursor.history).isRight) { - decoder - } else { - decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri) - } + if (decoder.result(cursor.history).isRight) { + decoder + } else { + decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri) } - .map(_.result(cursor.history)) - .compile - .lastOrError - .flatMap(result => MonadError[G, Throwable].fromEither(result)) - } + } + .map(_.result(cursor.history)) + .compile + .lastOrError + .flatMap(result => MonadError[G, Throwable].fromEither(result)) } } diff --git a/modules/monix/src/main/scala/ru/tinkoff/phobos/ops/MonixOps.scala b/modules/monix/src/main/scala/ru/tinkoff/phobos/ops/MonixOps.scala index c197713..3a1813e 100644 --- a/modules/monix/src/main/scala/ru/tinkoff/phobos/ops/MonixOps.scala +++ b/modules/monix/src/main/scala/ru/tinkoff/phobos/ops/MonixOps.scala @@ -6,27 +6,30 @@ import monix.reactive.Observable import ru.tinkoff.phobos.decoding.{Cursor, ElementDecoder, XmlDecoder, XmlStreamReader} private[phobos] trait MonixOps { - implicit class DecoderOps[A](xmlDecoder: XmlDecoder[A]) { - def decodeFromObservable(observable: Observable[Array[Byte]], charset: String = "UTF-8"): Task[A] = { - val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset) - val cursor = new Cursor(sr) + implicit def DecoderOps[A](xmlDecoder: XmlDecoder[A]): DecoderOps[A] = new DecoderOps[A](xmlDecoder) +} - observable.foldLeftL[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) => - sr.getInputFeeder.feedInput(bytes, 0, bytes.length) - do { - cursor.next() - } while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT) +class DecoderOps[A](private val xmlDecoder: XmlDecoder[A]) extends AnyVal { + def decodeFromObservable(observable: Observable[Array[Byte]], charset: String = "UTF-8"): Task[A] = { + val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset) + val cursor = new Cursor(sr) - if (decoder.result(cursor.history).isRight) { - decoder - } else { - decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri) - } - } - .flatMap { a => - sr.getInputFeeder.endOfInput() - Task.fromEither(a.result(cursor.history)) + observable + .foldLeftL[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) => + sr.getInputFeeder.feedInput(bytes, 0, bytes.length) + do { + cursor.next() + } while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT) + + if (decoder.result(cursor.history).isRight) { + decoder + } else { + decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri) } - } + } + .flatMap { a => + sr.getInputFeeder.endOfInput() + Task.fromEither(a.result(cursor.history)) + } } }