Skip to content

Commit

Permalink
Merge PR '#37', Stream Parser Support - pt4
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchun committed Aug 27, 2020
2 parents fa5754e + 81676ed commit 8049e7d
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 22 deletions.
51 changes: 43 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Overview
Change Logs
===

2020-08-??
- Add XML stream loading and parsing support.

2019-11-11
- Add XPath query caching.

Expand Down Expand Up @@ -48,26 +51,58 @@ if err != nil {
}
```

#### Parse a XML from URL.
#### Parse an XML from URL.

```go
doc, err := xmlquery.LoadURL("http://www.example.com/sitemap.xml")
```

#### Parse a XML from string.
#### Parse an XML from string.

```go
s := `<?xml version="1.0" encoding="utf-8"?><rss version="2.0"></rss>`
doc, err := xmlquery.Parse(strings.NewReader(s))
```

#### Parse a XML from io.Reader.
#### Parse an XML from io.Reader.

```go
f, err := os.Open("../books.xml")
doc, err := xmlquery.Parse(f)
```

#### Parse an XML in a stream fashion (simple case without element filtering).

```go
f, err := os.Open("../books.xml")
p, err := xmlquery.CreateStreamParser(f, "/bookstore/book")
for {
n, err := p.Read()
if err == io.EOF {
break
}
if err != nil {
...
}
}
```

#### Parse an XML in a stream fashion (simple case advanced element filtering).

```go
f, err := os.Open("../books.xml")
p, err := xmlquery.CreateStreamParser(f, "/bookstore/book", "/bookstore/book[price>=10]")
for {
n, err := p.Read()
if err == io.EOF {
break
}
if err != nil {
...
}
}
```

#### Find authors of all books in the bookstore.

```go
Expand Down Expand Up @@ -210,11 +245,11 @@ func main(){

List of supported XPath query packages
===
|Name |Description |
|--------------------------|----------------|
|[htmlquery](https://github.com/antchfx/htmlquery) | XPath query package for the HTML document|
|[xmlquery](https://github.com/antchfx/xmlquery) | XPath query package for the XML document|
|[jsonquery](https://github.com/antchfx/jsonquery) | XPath query package for the JSON document|
| Name | Description |
| ------------------------------------------------- | ----------------------------------------- |
| [htmlquery](https://github.com/antchfx/htmlquery) | XPath query package for the HTML document |
| [xmlquery](https://github.com/antchfx/xmlquery) | XPath query package for the XML document |
| [jsonquery](https://github.com/antchfx/jsonquery) | XPath query package for the JSON document |

Questions
===
Expand Down
165 changes: 151 additions & 14 deletions parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package xmlquery
import (
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"strings"

"github.com/antchfx/xpath"
"golang.org/x/net/html/charset"
)

Expand All @@ -20,12 +22,30 @@ func LoadURL(url string) (*Node, error) {
return Parse(resp.Body)
}

// Parse returns the parse tree for the XML from the given Reader.
func Parse(r io.Reader) (*Node, error) {
p := createParser(r)
for {
_, err := p.parse()
if err == io.EOF {
return p.doc, nil
}
if err != nil {
return nil, err
}
}
}

type parser struct {
decoder *xml.Decoder
doc *Node
space2prefix map[string]string
level int
prev *Node
decoder *xml.Decoder
doc *Node
space2prefix map[string]string
level int
prev *Node
streamElementXPath *xpath.Expr // Under streaming mode, this specifies the xpath to the target element node(s).
streamElementFilter *xpath.Expr // If specified, it provides a futher filtering on the target element.
streamNode *Node // Need to remmeber the last target node So we can clean it up upon next Read() call.
streamNodePrev *Node // Need to remember target node's prev so upon target node removal, we can restore correct prev.
}

func createParser(r io.Reader) *parser {
Expand All @@ -43,6 +63,8 @@ func createParser(r io.Reader) *parser {
}

func (p *parser) parse() (*Node, error) {
var streamElementNodeCounter int

for {
tok, err := p.decoder.Token()
if err != nil {
Expand Down Expand Up @@ -99,10 +121,54 @@ func (p *parser) parse() (*Node, error) {
}
addSibling(p.prev.Parent, node)
}
// If we're in the streaming mode, we need to remember the node if it is the target node
// so that when we finish processing the node's EndElement, we know how/what to return to
// caller. Also we need to remove the target node from the tree upon next Read() call so
// memory doesn't grow unbounded.
if p.streamElementXPath != nil {
if p.streamNode == nil {
if QuerySelector(p.doc, p.streamElementXPath) != nil {
p.streamNode = node
p.streamNodePrev = p.prev
streamElementNodeCounter = 1
}
} else {
streamElementNodeCounter++
}
}
p.prev = node
p.level++
case xml.EndElement:
p.level--
// If we're in streaming mode, and we already have a potential streaming
// target node identified (p.streamNode != nil) then we need to check if
// this is the real one we want to return to caller.
if p.streamNode != nil {
streamElementNodeCounter--
if streamElementNodeCounter == 0 {
// Now we know this element node is the at least passing the initial
// p.streamElementXPath check and is a potential target node candidate.
// We need to have 1 more check with p.streamElementFilter (if given) to
// ensure it is really the element node we want.
// The reason we need a two-step check process is because the following
// situation:
// <AAA><BBB>b1</BBB></AAA>
// And say the p.streamElementXPath = "/AAA/BBB[. != 'b1']". Now during
// xml.StartElement time, the <BBB> node is still empty, so it will pass
// the p.streamElementXPath check. However, eventually we know this <BBB>
// shouldn't be returned to the caller. Having a second more fine-grained
// filter check ensures that. So in this case, the caller should really
// setup the stream parser with:
// streamElementXPath = "/AAA/BBB["
// streamElementFilter = "/AAA/BBB[. != 'b1']"
if p.streamElementFilter == nil || QuerySelector(p.doc, p.streamElementFilter) != nil {
return p.streamNode, nil
}
// otherwise, this isn't our target node. clean things up.
p.streamNode = nil
p.streamNodePrev = nil
}
}
case xml.CharData:
node := &Node{Type: CharDataNode, Data: string(tok), level: p.level}
if p.level == p.prev.level {
Expand Down Expand Up @@ -150,16 +216,87 @@ func (p *parser) parse() (*Node, error) {
}
}

// Parse returns the parse tree for the XML from the given Reader.
func Parse(r io.Reader) (*Node, error) {
p := createParser(r)
for {
_, err := p.parse()
if err == io.EOF {
return p.doc, nil
}
// StreamParser enables loading and parsing an XML document in a streaming fashion.
type StreamParser struct {
p *parser
}

// CreateStreamParser creates a StreamParser. Argument streamElementXPath is required.
// Argument streamElementFilter is optional and should only be used in advanced scenarios.
//
// Scenario 1: simple case:
// xml := `<AAA><BBB>b1</BBB><BBB>b2</BBB></AAA>`
// sp, err := CreateStreamParser(strings.NewReader(xml), "/AAA/BBB")
// if err != nil {
// panic(err)
// }
// for {
// n, err := sp.Read()
// if err != nil {
// break
// }
// fmt.Println(n.OutputXML(true))
// }
// Output will be:
// <BBB>b1</BBB>
// <BBB>b2</BBB>
//
// Scenario 2: advanced case:
// xml := `<AAA><BBB>b1</BBB><BBB>b2</BBB></AAA>`
// sp, err := CreateStreamParser(strings.NewReader(xml), "/AAA/BBB", "/AAA/BBB[. != 'b1']")
// if err != nil {
// panic(err)
// }
// for {
// n, err := sp.Read()
// if err != nil {
// break
// }
// fmt.Println(n.OutputXML(true))
// }
// Output will be:
// <BBB>b2</BBB>
//
// As the argument names indicate, streamElementXPath should be used for providing xpath query pointing
// to the target element node only, no extra filtering on the element itself or its children; while
// streamElementFilter, if needed, can provide additional filtering on the target element and its children.
//
// CreateStreamParser returns error if either streamElementXPath or streamElementFilter, if provided, cannot
// be successfully parsed and compiled into a valid xpath query.
func CreateStreamParser(r io.Reader, streamElementXPath string, streamElementFilter ...string) (*StreamParser, error) {
elemXPath, err := getQuery(streamElementXPath)
if err != nil {
return nil, fmt.Errorf("invalid streamElementXPath '%s', err: %s", streamElementXPath, err.Error())
}
elemFilter := (*xpath.Expr)(nil)
if len(streamElementFilter) > 0 {
elemFilter, err = getQuery(streamElementFilter[0])
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid streamElementFilter '%s', err: %s", streamElementFilter[0], err.Error())
}
}
sp := &StreamParser{
p: createParser(r),
}
sp.p.streamElementXPath = elemXPath
sp.p.streamElementFilter = elemFilter
return sp, nil
}

// Read returns a target node that satisifies the XPath specified by caller at StreamParser creation
// time. If there is no more satisifying target node after reading the rest of the XML document, io.EOF
// will be returned. At any time, any XML parsing error encountered, the error will be returned and
// the stream parsing is stopped. Calling Read() after an error is returned (including io.EOF) is not
// allowed the behavior will be undefined. Also note, due to the streaming nature, calling Read() will
// automatically remove any previous target node(s) from the document tree.
func (sp *StreamParser) Read() (*Node, error) {
// Because this is a streaming read, we need to release/remove last
// target node from the node tree to free up memory.
if sp.p.streamNode != nil {
removeFromTree(sp.p.streamNode)
sp.p.prev = sp.p.streamNodePrev
sp.p.streamNode = nil
sp.p.streamNodePrev = nil
}
return sp.p.parse()
}
Loading

0 comments on commit 8049e7d

Please sign in to comment.