Skip to content

Commit

Permalink
XML Writer akka#498 (akka#499)
Browse files Browse the repository at this point in the history
Implementation of the serialization of the ParseEvent from xml-parser
  • Loading branch information
Korbik authored and ennru committed Nov 11, 2017
1 parent 3ff01ef commit 66a49fd
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 1 deletion.
18 changes: 18 additions & 0 deletions docs/src/main/paradox/data-transformations/xml.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ Scala
Java
: @@snip (../../../../../xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlParsingTest.java) { #parser-usage }

## XML writing

XML processing pipeline ends with an @scaladoc[XmlWriting.writer](akka.stream.alpakka.xml.scaladsl.XmlWriting$) flow which writes a stream of XML parser events to @scaladoc[ByteString](akka.util.ByteString)s.

Scala
: @@snip (../../../../../xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala) { #writer }

Java
: @@snip (../../../../../xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java) { #writer }

To write an XML document run XML document source with this writer.

Scala
: @@snip (../../../../../xml/src/test/scala/akka/stream/alpakka/xml/scaladsl/XmlWritingTest.scala) { #writer-usage }

Java
: @@snip (../../../../../xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlWritingTest.java) { #writer-usage }

## XML Subslice

Use @scaladoc[XmlParsing.subslice](akka.stream.alpakka.xml.scaladsl.XmlParsing$) to filter out all elements not corresponding to a certain path.
Expand Down
63 changes: 62 additions & 1 deletion xml/src/main/scala/akka/stream/alpakka/xml/Xml.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

package akka.stream.alpakka.xml

import java.nio.charset.Charset
import java.util.Optional
import javax.xml.stream.XMLOutputFactory

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import akka.util.{ByteString, ByteStringBuilder}
import com.fasterxml.aalto.stax.InputFactoryImpl
import com.fasterxml.aalto.{AsyncByteArrayFeeder, AsyncXMLInputFactory, AsyncXMLStreamReader}

Expand Down Expand Up @@ -176,6 +178,65 @@ object Xml {
}
}

/**
* Internal API
*/
private[xml] class StreamingXmlWriter(charset: Charset) extends GraphStage[FlowShape[ParseEvent, ByteString]] {
val in: Inlet[ParseEvent] = Inlet("XMLWriter.in")
val out: Outlet[ByteString] = Outlet("XMLWriter.out")
override val shape: FlowShape[ParseEvent, ByteString] = FlowShape(in, out)

private val xMLOutputFactory = XMLOutputFactory.newInstance()

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
val byteStringBuilder = new ByteStringBuilder()

val output = xMLOutputFactory.createXMLStreamWriter(byteStringBuilder.asOutputStream, charset.name())

setHandlers(in, out, this)

override def onPush(): Unit = {
val ev: ParseEvent = grab(in)
ev match {
case StartDocument =>
output.writeStartDocument()

case EndDocument =>
output.writeEndDocument()

case StartElement(localName, attributes) =>
output.writeStartElement(localName)
attributes.foreach(t => output.writeAttribute(t._1, t._2))

case EndElement(_) =>
output.writeEndElement()

case Characters(text) =>
output.writeCharacters(text)
case ProcessingInstruction(Some(target), Some(data)) =>
output.writeProcessingInstruction(target, data)

case ProcessingInstruction(Some(target), None) =>
output.writeProcessingInstruction(target)

case ProcessingInstruction(None, Some(data)) =>
output.writeProcessingInstruction(None.orNull, data)
case ProcessingInstruction(None, None) =>
case Comment(text) =>
output.writeComment(text)

case CData(text) =>
output.writeCData(text)
}
push(out, byteStringBuilder.result().compact)
byteStringBuilder.clear()
}

override def onPull(): Unit = pull(in)
}
}

/**
* Internal API
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.xml.javadsl

import java.nio.charset.{Charset, StandardCharsets}

import akka.NotUsed
import akka.stream.alpakka.xml.ParseEvent
import akka.stream.alpakka.xml.Xml.StreamingXmlWriter
import akka.stream.scaladsl.Flow
import akka.util.ByteString

object XmlWriting {

/**
* Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings.
* encoding UTF-8
*/
def writer(): akka.stream.javadsl.Flow[ParseEvent, ByteString, NotUsed] =
Flow.fromGraph(new StreamingXmlWriter(StandardCharsets.UTF_8)).asJava

/**
* Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings.
* @param strEncoding encoding of the stream
*/
def writer(charset: Charset): akka.stream.javadsl.Flow[ParseEvent, ByteString, NotUsed] =
Flow.fromGraph(new StreamingXmlWriter(charset)).asJava

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.xml.scaladsl

import java.nio.charset.{Charset, StandardCharsets}

import akka.NotUsed
import akka.stream.alpakka.xml.ParseEvent
import akka.stream.alpakka.xml.Xml.StreamingXmlWriter
import akka.stream.scaladsl.Flow
import akka.util.ByteString

object XmlWriting {

/**
* Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings.
* @param charset charset of encoding
*/
def writer(charset: Charset): Flow[ParseEvent, ByteString, NotUsed] =
Flow.fromGraph(new StreamingXmlWriter(charset))

/**
* Writer Flow that takes a stream of XML events similar to SAX and write ByteStrings.
* encoding UTF-8
*/
val writer: Flow[ParseEvent, ByteString, NotUsed] = writer(StandardCharsets.UTF_8)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.xml.javadsl;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.xml.*;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;

public class XmlWritingTest {
private static ActorSystem system;
private static Materializer materializer;

@Test
public void xmlWriter() throws InterruptedException, ExecutionException, TimeoutException {

// #writer
final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class)
.via(XmlWriting.writer())
.map((ByteString bs) -> bs.utf8String())
.toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
// #writer

// #writer-usage
final String doc = "<?xml version='1.0' encoding='UTF-8'?><doc><elem>elem1</elem><elem>elem2</elem></doc>";
final List<ParseEvent> docList= new ArrayList<ParseEvent>();
docList.add(StartDocument.getInstance());
docList.add(StartElement.create("doc", Collections.emptyMap()));
docList.add(StartElement.create("elem", Collections.emptyMap()));
docList.add(Characters.create("elem1"));
docList.add(EndElement.create("elem"));
docList.add(StartElement.create("elem", Collections.emptyMap()));
docList.add(Characters.create("elem2"));
docList.add(EndElement.create("elem"));
docList.add(EndElement.create("doc"));
docList.add(EndDocument.getInstance());


final CompletionStage<String> resultStage = Source.from(docList).runWith(write, materializer);
// #writer-usage

resultStage.thenAccept((str) -> {
assertEquals(doc,str);
}).toCompletableFuture().get(5, TimeUnit.SECONDS);
}

@BeforeClass
public static void setup() throws Exception {
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
}

@AfterClass
public static void teardown() throws Exception {
JavaTestKit.shutdownActorSystem(system);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.xml.scaladsl

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.xml._
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}

import scala.concurrent.Future
import scala.concurrent.duration._

class XmlWritingTest extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
implicit val system = ActorSystem("Test")
implicit val mat = ActorMaterializer()

// #writer
val writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent]
.via(XmlWriting.writer)
.map[String](_.utf8String)
.toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
// #writer

"XML Writer" must {

"properly write simple XML" in {
// #writer-usage
val listEl: List[ParseEvent] = List(
StartDocument,
StartElement("doc", Map.empty),
StartElement("elem", Map.empty),
Characters("elem1"),
EndElement("elem"),
StartElement("elem", Map.empty),
Characters("elem2"),
EndElement("elem"),
EndElement("doc"),
EndDocument
)

val doc = "<?xml version='1.0' encoding='UTF-8'?><doc><elem>elem1</elem><elem>elem2</elem></doc>"
val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)
// #writer-usage

resultFuture.futureValue(Timeout(20.seconds)) should ===(doc)
}

"properly process a comment" in {
val doc = "<?xml version='1.0' encoding='UTF-8'?><doc><!--comment--></doc>"
val listEl = List(
StartDocument,
StartElement("doc", Map.empty),
Comment("comment"),
EndElement("doc"),
EndDocument
)

val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)

resultFuture.futureValue(Timeout(20.seconds)) should ===(doc)
}

"properly process parse instructions" in {
val doc = """<?xml version='1.0' encoding='UTF-8'?><?target content?><doc/>"""
val listEl = List(
StartDocument,
ProcessingInstruction(Some("target"), Some("content")),
StartElement("doc", Map.empty),
EndElement("doc"),
EndDocument
)

val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)

resultFuture.futureValue(Timeout(20.seconds)) should ===(doc)

}

"properly process attributes" in {
val doc =
"""<?xml version='1.0' encoding='UTF-8'?><doc good="yes"><elem nice="yes" very="true">elem1</elem></doc>"""
val listEl = List(
StartDocument,
StartElement("doc", Map("good" -> "yes")),
StartElement("elem", Map("nice" -> "yes", "very" -> "true")),
Characters("elem1"),
EndElement("elem"),
EndElement("doc"),
EndDocument
)

val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)

resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
}

"properly process CData blocks" in {
val doc = """<?xml version='1.0' encoding='UTF-8'?><doc><![CDATA[<not>even</valid>]]></doc>"""

val listEl = List(
StartDocument,
StartElement("doc", Map.empty),
CData("<not>even</valid>"),
EndElement("doc"),
EndDocument
)
val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)

resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
}

}

override protected def afterAll(): Unit = system.terminate()
}

0 comments on commit 66a49fd

Please sign in to comment.