Skip to content

Commit

Permalink
Log(n) complexity report merger.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed May 3, 2016
1 parent e486cb6 commit 6d92ce2
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 9 deletions.
14 changes: 5 additions & 9 deletions app/collector.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package app

import (
"fmt"
"sync"
"time"

"github.com/spaolacci/murmur3"
"golang.org/x/net/context"

"github.com/weaveworks/scope/common/mtime"
Expand Down Expand Up @@ -39,6 +37,7 @@ type collector struct {
reports []timestampReport
window time.Duration
cached *report.Report
merger merger
waitableCondition
}

Expand Down Expand Up @@ -78,6 +77,7 @@ func NewCollector(window time.Duration) Collector {
waitableCondition: waitableCondition{
waiters: map[chan struct{}]struct{}{},
},
merger: newSmartMerger(),
}
}

Expand Down Expand Up @@ -110,15 +110,11 @@ func (c *collector) Report(_ context.Context) (report.Report, error) {
}
c.reports = clean(c.reports, c.window)

rpt := report.MakeReport()
id := murmur3.New64()
rpts := []report.Report{}
for _, tr := range c.reports {
rpt = rpt.Merge(tr.report)
id.Write([]byte(tr.report.ID))
rpts = append(rpts, tr.report)
}
rpt.ID = fmt.Sprintf("%x", id.Sum64())
c.cached = &rpt
return rpt, nil
return c.merger.Merge(rpts), nil
}

type timestampReport struct {
Expand Down
111 changes: 111 additions & 0 deletions app/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package app

import (
"fmt"
"sort"

"github.com/bluele/gcache"
"github.com/spaolacci/murmur3"

"github.com/weaveworks/scope/report"
)

type merger interface {
Merge([]report.Report) report.Report
}

type dumbMerger struct{}

func (dumbMerger) Merge(reports []report.Report) report.Report {
rpt := report.MakeReport()
id := murmur3.New64()
for _, r := range reports {
rpt = rpt.Merge(r)
id.Write([]byte(r.ID))
}
rpt.ID = fmt.Sprintf("%x", id.Sum64())
return rpt
}

type smartMerger struct {
cache gcache.Cache
}

func newSmartMerger() merger {
return &smartMerger{
cache: gcache.New(100).LRU().Build(),
}
}

type node struct {
id string
rpt report.Report
left, right *node
}

type byID []*node

func (ns byID) Len() int { return len(ns) }
func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id }

func hash(_1, _2 string) string {
id := murmur3.New64()
id.Write([]byte(_1))
id.Write([]byte(_2))
return fmt.Sprintf("%x", id.Sum64())
}

func (s *smartMerger) Merge(reports []report.Report) report.Report {
// Idea is to cache pair-wise merged reports, forming a merge
// tree. Merging a new report into this tree should be log(N).

// Start with a sorted list of leaves
nodes := []*node{}
for _, r := range reports {
nodes = append(nodes, &node{
id: r.ID,
rpt: r,
})
}
sort.Sort(byID(nodes))

// Define how to merge two nodes together. Cache it.
merge := func(left, right *node) *node {
id := hash(left.id, right.id)

if result, err := s.cache.Get(id); err == nil {
return result.(*node)
}

n := &node{
id: id,
rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt),
left: left,
right: right,
}
s.cache.Set(id, n)
return n
}

// Define how to reduce n nodes to n/2 nodes.
pass := func(nodes []*node) []*node {
output := []*node{}
for i := 0; i < len(nodes)/2; i++ {
output = append(output, merge(nodes[i*2], nodes[i*2+1]))
}
if len(nodes)%2 > 0 {
output = append(output, nodes[len(nodes)-1])
}
return output
}

// Iterative reduce n nodes down to 1
for len(nodes) > 1 {
nodes = pass(nodes)
}
if len(nodes) == 0 {
return report.MakeReport()
}
return nodes[0].rpt
}

0 comments on commit 6d92ce2

Please sign in to comment.