diff --git a/docs/src/main/paradox/data-transformations/xml.md b/docs/src/main/paradox/data-transformations/xml.md index 48c5a10efa..29b255e721 100644 --- a/docs/src/main/paradox/data-transformations/xml.md +++ b/docs/src/main/paradox/data-transformations/xml.md @@ -28,6 +28,24 @@ Scala Java : @@snip (../../../../../xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlParsingTest.java) { #parser-usage } +## XML writing + +XML processing pipeline ends with an @scaladoc[XmlWriting.writer](akka.stream.alpakka.xml.scaladsl.XmlWriting$) flow which writes a stream of XML parser events to @scaladoc[ByteString](akka.util.ByteString)s. + +Scala +: @@snip (../../../../../xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala) { #writer } + +Java +: @@snip (../../../../../xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java) { #writer } + +To write an XML document run XML document source with this writer. + +Scala +: @@snip (../../../../../xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala) { #writer-usage } + +Java +: @@snip (../../../../../xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java) { #writer-usage } + ## XML Subslice Use @scaladoc[XmlParsing.subslice](akka.stream.alpakka.xml.scaladsl.XmlParsing$) to filter out all elements not corresponding to a certain path. diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/Xml.scala b/xml/src/main/scala/akka/stream/alpakka/xml/Xml.scala index 20cf64d664..d51bb8283c 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/Xml.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/Xml.scala @@ -4,11 +4,13 @@ package akka.stream.alpakka.xml +import java.nio.charset.Charset import java.util.Optional +import javax.xml.stream.XMLOutputFactory import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.stream.{Attributes, FlowShape, Inlet, Outlet} -import akka.util.ByteString +import akka.util.{ByteString, ByteStringBuilder} import com.fasterxml.aalto.stax.InputFactoryImpl import com.fasterxml.aalto.{AsyncByteArrayFeeder, AsyncXMLInputFactory, AsyncXMLStreamReader} @@ -176,6 +178,65 @@ object Xml { } } + /** + * Internal API + */ + private[xml] class StreamingXmlWriter(charset: Charset) extends GraphStage[FlowShape[ParseEvent, ByteString]] { + val in: Inlet[ParseEvent] = Inlet("XMLWriter.in") + val out: Outlet[ByteString] = Outlet("XMLWriter.out") + override val shape: FlowShape[ParseEvent, ByteString] = FlowShape(in, out) + + private val xMLOutputFactory = XMLOutputFactory.newInstance() + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + val byteStringBuilder = new ByteStringBuilder() + + val output = xMLOutputFactory.createXMLStreamWriter(byteStringBuilder.asOutputStream, charset.name()) + + setHandlers(in, out, this) + + override def onPush(): Unit = { + val ev: ParseEvent = grab(in) + ev match { + case StartDocument => + output.writeStartDocument() + + case EndDocument => + output.writeEndDocument() + + case StartElement(localName, attributes) => + output.writeStartElement(localName) + attributes.foreach(t => output.writeAttribute(t._1, t._2)) + + case EndElement(_) => + output.writeEndElement() + + case Characters(text) => + output.writeCharacters(text) + case ProcessingInstruction(Some(target), Some(data)) => + output.writeProcessingInstruction(target, data) + + case ProcessingInstruction(Some(target), None) => + output.writeProcessingInstruction(target) + + case ProcessingInstruction(None, Some(data)) => + output.writeProcessingInstruction(None.orNull, data) + case ProcessingInstruction(None, None) => + case Comment(text) => + output.writeComment(text) + + case CData(text) => + output.writeCData(text) + } + push(out, byteStringBuilder.result().compact) + byteStringBuilder.clear() + } + + override def onPull(): Unit = pull(in) + } + } + /** * Internal API */ diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlWriting.scala b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlWriting.scala new file mode 100644 index 0000000000..c7910a52af --- /dev/null +++ b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlWriting.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ + +package akka.stream.alpakka.xml.javadsl + +import java.nio.charset.{Charset, StandardCharsets} + +import akka.NotUsed +import akka.stream.alpakka.xml.ParseEvent +import akka.stream.alpakka.xml.Xml.StreamingXmlWriter +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +object XmlWriting { + + /** + * Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings. + * encoding UTF-8 + */ + def writer(): akka.stream.javadsl.Flow[ParseEvent, ByteString, NotUsed] = + Flow.fromGraph(new StreamingXmlWriter(StandardCharsets.UTF_8)).asJava + + /** + * Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings. + * @param strEncoding encoding of the stream + */ + def writer(charset: Charset): akka.stream.javadsl.Flow[ParseEvent, ByteString, NotUsed] = + Flow.fromGraph(new StreamingXmlWriter(charset)).asJava + +} diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlWriting.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlWriting.scala new file mode 100644 index 0000000000..9f497b546a --- /dev/null +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlWriting.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ + +package akka.stream.alpakka.xml.scaladsl + +import java.nio.charset.{Charset, StandardCharsets} + +import akka.NotUsed +import akka.stream.alpakka.xml.ParseEvent +import akka.stream.alpakka.xml.Xml.StreamingXmlWriter +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +object XmlWriting { + + /** + * Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings. + * @param charset charset of encoding + */ + def writer(charset: Charset): Flow[ParseEvent, ByteString, NotUsed] = + Flow.fromGraph(new StreamingXmlWriter(charset)) + + /** + * Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings. + * encoding UTF-8 + */ + val writer: Flow[ParseEvent, ByteString, NotUsed] = writer(StandardCharsets.UTF_8) +} diff --git a/xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java b/xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java new file mode 100644 index 0000000000..90d027fec1 --- /dev/null +++ b/xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ + +package akka.stream.alpakka.xml.javadsl; + +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.alpakka.xml.*; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.JavaTestKit; +import akka.util.ByteString; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; + +public class XmlWritingTest { + private static ActorSystem system; + private static Materializer materializer; + + @Test + public void xmlWriter() throws InterruptedException, ExecutionException, TimeoutException { + + // #writer + final Sink> write = Flow.of(ParseEvent.class) + .via(XmlWriting.writer()) + .map((ByteString bs) -> bs.utf8String()) + .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right()); + // #writer + + // #writer-usage + final String doc = "elem1elem2"; + final List docList= new ArrayList(); + docList.add(StartDocument.getInstance()); + docList.add(StartElement.create("doc", Collections.emptyMap())); + docList.add(StartElement.create("elem", Collections.emptyMap())); + docList.add(Characters.create("elem1")); + docList.add(EndElement.create("elem")); + docList.add(StartElement.create("elem", Collections.emptyMap())); + docList.add(Characters.create("elem2")); + docList.add(EndElement.create("elem")); + docList.add(EndElement.create("doc")); + docList.add(EndDocument.getInstance()); + + + final CompletionStage resultStage = Source.from(docList).runWith(write, materializer); + // #writer-usage + + resultStage.thenAccept((str) -> { + assertEquals(doc,str); + }).toCompletableFuture().get(5, TimeUnit.SECONDS); + } + + @BeforeClass + public static void setup() throws Exception { + system = ActorSystem.create(); + materializer = ActorMaterializer.create(system); + } + + @AfterClass + public static void teardown() throws Exception { + JavaTestKit.shutdownActorSystem(system); + } +} diff --git a/xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala b/xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala new file mode 100644 index 0000000000..f6defbbed7 --- /dev/null +++ b/xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ + +package akka.stream.alpakka.xml.scaladsl + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.alpakka.xml._ +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class XmlWritingTest extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { + implicit val system = ActorSystem("Test") + implicit val mat = ActorMaterializer() + + // #writer + val writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent] + .via(XmlWriting.writer) + .map[String](_.utf8String) + .toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right) + // #writer + + "XML Writer" must { + + "properly write simple XML" in { + // #writer-usage + val listEl: List[ParseEvent] = List( + StartDocument, + StartElement("doc", Map.empty), + StartElement("elem", Map.empty), + Characters("elem1"), + EndElement("elem"), + StartElement("elem", Map.empty), + Characters("elem2"), + EndElement("elem"), + EndElement("doc"), + EndDocument + ) + + val doc = "elem1elem2" + val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) + // #writer-usage + + resultFuture.futureValue(Timeout(20.seconds)) should ===(doc) + } + + "properly process a comment" in { + val doc = "" + val listEl = List( + StartDocument, + StartElement("doc", Map.empty), + Comment("comment"), + EndElement("doc"), + EndDocument + ) + + val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) + + resultFuture.futureValue(Timeout(20.seconds)) should ===(doc) + } + + "properly process parse instructions" in { + val doc = """""" + val listEl = List( + StartDocument, + ProcessingInstruction(Some("target"), Some("content")), + StartElement("doc", Map.empty), + EndElement("doc"), + EndDocument + ) + + val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) + + resultFuture.futureValue(Timeout(20.seconds)) should ===(doc) + + } + + "properly process attributes" in { + val doc = + """elem1""" + val listEl = List( + StartDocument, + StartElement("doc", Map("good" -> "yes")), + StartElement("elem", Map("nice" -> "yes", "very" -> "true")), + Characters("elem1"), + EndElement("elem"), + EndElement("doc"), + EndDocument + ) + + val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) + + resultFuture.futureValue(Timeout(3.seconds)) should ===(doc) + } + + "properly process CData blocks" in { + val doc = """even]]>""" + + val listEl = List( + StartDocument, + StartElement("doc", Map.empty), + CData("even"), + EndElement("doc"), + EndDocument + ) + val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) + + resultFuture.futureValue(Timeout(3.seconds)) should ===(doc) + } + + } + + override protected def afterAll(): Unit = system.terminate() +}