Skip to content

Commit

Permalink
refactor(i): Simplify addSubPlan slightly (sourcenetwork#2307)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Part of sourcenetwork#1709

## Description

Simplifies `addSubPlan` slightly, and adds a test documenting sourcenetwork#1709.

No user visible behaviour should have been changed in this PR.
  • Loading branch information
AndrewSisley authored Feb 13, 2024
1 parent 31297f8 commit c117d84
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 239 deletions.
2 changes: 0 additions & 2 deletions planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,5 +400,3 @@ func (n *dagScanNode) dagBlockToNodeDoc(block blocks.Block) (core.Doc, []*ipld.L

return commit, heads, nil
}

func (n *dagScanNode) Append() bool { return true }
258 changes: 27 additions & 231 deletions planner/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,6 @@ type MultiNode interface {
Children() []planNode
}

// mergeNode is a special interface for the MultiNode
// system. A mergeNode provides an entire document
// in its Values() func, with all the specific and
// necessary fields and subfields already merged
// into the doc
type mergeNode interface {
planNode
Merge() bool
}

// appendNode is a special interface for the MultiNode
// system.
type appendNode interface {
planNode
Append() bool
}

// parallelNode implements the MultiNode interface. It
// enables parallel execution of planNodes. This is needed
// if a single request has multiple Select statements at the
Expand Down Expand Up @@ -132,10 +115,10 @@ func (p *parallelNode) Next() (bool, error) {
var err error
// isMerge := false
switch n := plan.(type) {
case mergeNode:
case *scanNode, *typeIndexJoin:
// isMerge = true
next, err = p.nextMerge(i, n)
case appendNode:
case *dagScanNode:
next, err = p.nextAppend(i, n)
}
if err != nil {
Expand All @@ -148,7 +131,7 @@ func (p *parallelNode) Next() (bool, error) {
return orNext, nil
}

func (p *parallelNode) nextMerge(index int, plan mergeNode) (bool, error) {
func (p *parallelNode) nextMerge(index int, plan planNode) (bool, error) {
if next, err := plan.Next(); !next {
return false, err
}
Expand All @@ -159,52 +142,7 @@ func (p *parallelNode) nextMerge(index int, plan mergeNode) (bool, error) {
return true, nil
}

/*
scan node
=========
{
_docID: bae-ALICE,
name: Alice,
points: 124,
verified: false
}
typeJoin node(merge)
=============
{
friends: [
{
_docID: bae-BOB,
name: bob,
points: 99.9,
verified: true,
}
]
}
output
======
{
_docID: bae-ALICE,
name: Alice,
points: 124,
verified: false,
friends: [
{
_docID: bae-BOB,
name: bob,
points: 99.9,
verified: true,
}
]
}
*/

func (p *parallelNode) nextAppend(index int, plan appendNode) (bool, error) {
func (p *parallelNode) nextAppend(index int, plan planNode) (bool, error) {
key := p.currentValue.GetID()
if key == "" {
return false, nil
Expand Down Expand Up @@ -235,43 +173,6 @@ func (p *parallelNode) nextAppend(index int, plan appendNode) (bool, error) {
return true, nil
}

/*
query {
user {
_docID
name
points
verified
_version {
cid
}
}
}
scan node
=========
{
_docID: bae-ALICE,
name: Alice,
points: 124,
verified: false
}
_version: commitSelectTopNode(append)
===================
[
{
cid: QmABC
},
{
cid: QmDEF
}
...
]
*/

func (p *parallelNode) Source() planNode { return p.multiscan }

func (p *parallelNode) Children() []planNode {
Expand All @@ -283,171 +184,66 @@ func (p *parallelNode) addChild(fieldIndex int, node planNode) {
p.childIndexes = append(p.childIndexes, fieldIndex)
}

/*
user {
friends {
name
}
addresses {
street_name
}
}
Select {
source: scanNode(user)
}
||||||
\/\/\/
Select {
source: TypeJoin(friends, user) {
joinPlan {
typeJoinMany {
root: scanNode(user)
subType: Select {
source: scanNode(friends)
}
}
}
},
}
||||||
\/\/\/
Select {
source: MultiNode[
{
TypeJoin(friends, user) {
joinPlan {
typeJoinMany {
root: multiscan(scanNode(user))
subType: Select {
source: scanNode(friends)
}
}
}
}
},
{
TypeJoin(addresses, user) {
joinPlan {
typeJoinMany {
root: multiscan(scanNode(user))
subType: Select {
source: scanNode(addresses)
}
}
}
}
}]
}
}
select addSubPlan {
check if source is MultiNode
yes =>
get multiScan node
create new plan with multi scan node
append
no = >
create new multinode
get scan node from existing source
create multiscan
replace existing source scannode with multiScan
add existing source to new MultiNode
add new plan to multNode
}
Select {
source: Parallel {[
TypeJoin {
},
commitScan {
}
]}
}
*/

// @todo: Document AddSubPlan method
func (s *selectNode) addSubPlan(fieldIndex int, plan planNode) error {
src := s.source
switch node := src.(type) {
func (s *selectNode) addSubPlan(fieldIndex int, newPlan planNode) error {
switch sourceNode := s.source.(type) {
// if its a scan node, we either replace or create a multinode
case *scanNode, *pipeNode:
switch plan.(type) {
case mergeNode:
s.source = plan
case appendNode:
switch newPlan.(type) {
case *scanNode, *typeIndexJoin:
s.source = newPlan
case *dagScanNode:
m := &parallelNode{
p: s.planner,
docMapper: docMapper{src.DocumentMap()},
docMapper: docMapper{s.source.DocumentMap()},
}
m.addChild(-1, src)
m.addChild(fieldIndex, plan)
m.addChild(-1, s.source)
m.addChild(fieldIndex, newPlan)
s.source = m
default:
return client.NewErrUnhandledType("sub plan", plan)
return client.NewErrUnhandledType("sub plan", newPlan)
}

// source is a mergeNode, like a TypeJoin
case mergeNode:
origScan, _ := walkAndFindPlanType[*scanNode](plan)
case *typeIndexJoin:
origScan, _ := walkAndFindPlanType[*scanNode](newPlan)
if origScan == nil {
return ErrFailedToFindScanNode
}
// create our new multiscanner
multiscan := &multiScanNode{scanNode: origScan}
// replace our current source internal scanNode with our new multiscanner
if err := s.planner.walkAndReplacePlan(src, origScan, multiscan); err != nil {
if err := s.planner.walkAndReplacePlan(s.source, origScan, multiscan); err != nil {
return err
}
// create multinode
multinode := &parallelNode{
p: s.planner,
multiscan: multiscan,
docMapper: docMapper{src.DocumentMap()},
docMapper: docMapper{s.source.DocumentMap()},
}
multinode.addChild(-1, src)
multinode.addChild(-1, s.source)
multiscan.addReader()
// replace our new node internal scanNode with our new multiscanner
if err := s.planner.walkAndReplacePlan(plan, origScan, multiscan); err != nil {
if err := s.planner.walkAndReplacePlan(newPlan, origScan, multiscan); err != nil {
return err
}
// add our newly updated plan to the multinode
multinode.addChild(fieldIndex, plan)
multinode.addChild(fieldIndex, newPlan)
multiscan.addReader()
s.source = multinode

// we already have an existing parallelNode as our source
case *parallelNode:
switch plan.(type) {
// easy, just append, since append doest need any internal relaced scannode
case appendNode:
node.addChild(fieldIndex, plan)

switch newPlan.(type) {
// We have a internal multiscanNode on our MultiNode
case mergeNode:
multiscan, sourceIsMultiscan := node.Source().(*multiScanNode)
if !sourceIsMultiscan {
return client.NewErrUnexpectedType[*multiScanNode]("mergeNode", node.Source())
}

case *scanNode, *typeIndexJoin:
// replace our new node internal scanNode with our existing multiscanner
if err := s.planner.walkAndReplacePlan(plan, multiscan.Source(), multiscan); err != nil {
if err := s.planner.walkAndReplacePlan(newPlan, sourceNode.multiscan.Source(), sourceNode.multiscan); err != nil {
return err
}
multiscan.addReader()
// add our newly updated plan to the multinode
node.addChild(fieldIndex, plan)
default:
return client.NewErrUnhandledType("sub plan", plan)
sourceNode.multiscan.addReader()
}

sourceNode.addChild(fieldIndex, newPlan)
}
return nil
}
3 changes: 0 additions & 3 deletions planner/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ func (n *scanNode) Explain(explainType request.ExplainType) (map[string]any, err
}
}

// Merge implements mergeNode
func (n *scanNode) Merge() bool { return true }

func (p *Planner) Scan(
mapperSelect *mapper.Select,
colDesc client.CollectionDescription,
Expand Down
3 changes: 0 additions & 3 deletions planner/type_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,6 @@ func (n *typeIndexJoin) Explain(explainType request.ExplainType) (map[string]any
}
}

// Merge implements mergeNode
func (n *typeIndexJoin) Merge() bool { return true }

// typeJoinOne is the plan node for a type index join
// where the root type is the primary in a one-to-one relation request.
type typeJoinOne struct {
Expand Down
Loading

0 comments on commit c117d84

Please sign in to comment.