Skip to content

Commit

Permalink
Log(n) complexity report merger.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed May 4, 2016
1 parent f65e198 commit 54a760a
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 31 deletions.
55 changes: 24 additions & 31 deletions app/collector.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package app

import (
"fmt"
"sync"
"time"

"github.com/spaolacci/murmur3"
"golang.org/x/net/context"

"github.com/weaveworks/scope/common/mtime"
Expand Down Expand Up @@ -35,10 +33,12 @@ type Collector interface {
// Collector receives published reports from multiple producers. It yields a
// single merged report, representing all collected reports.
type collector struct {
mtx sync.Mutex
reports []timestampReport
window time.Duration
cached *report.Report
mtx sync.Mutex
reports []report.Report
timestamps []time.Time
window time.Duration
cached *report.Report
merger Merger
waitableCondition
}

Expand Down Expand Up @@ -78,15 +78,18 @@ func NewCollector(window time.Duration) Collector {
waitableCondition: waitableCondition{
waiters: map[chan struct{}]struct{}{},
},
merger: NewSmartMerger(),
}
}

// Add adds a report to the collector's internal state. It implements Adder.
func (c *collector) Add(_ context.Context, rpt report.Report) error {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{mtime.Now(), rpt})
c.reports = clean(c.reports, c.window)
c.reports = append(c.reports, rpt)
c.timestamps = append(c.timestamps, mtime.Now())

c.clean()
c.cached = nil
if rpt.Shortcut {
c.Broadcast()
Expand All @@ -104,37 +107,27 @@ func (c *collector) Report(_ context.Context) (report.Report, error) {
// and there is a cached report, return that.
if c.cached != nil && len(c.reports) > 0 {
oldest := mtime.Now().Add(-c.window)
if c.reports[0].timestamp.After(oldest) {
if c.timestamps[0].After(oldest) {
return *c.cached, nil
}
}
c.reports = clean(c.reports, c.window)

rpt := report.MakeReport()
id := murmur3.New64()
for _, tr := range c.reports {
rpt = rpt.Merge(tr.report)
id.Write([]byte(tr.report.ID))
}
rpt.ID = fmt.Sprintf("%x", id.Sum64())
c.cached = &rpt
return rpt, nil
}

type timestampReport struct {
timestamp time.Time
report report.Report
c.clean()
return c.merger.Merge(c.reports), nil
}

func clean(reports []timestampReport, window time.Duration) []timestampReport {
func (c *collector) clean() {
var (
cleaned = make([]timestampReport, 0, len(reports))
oldest = mtime.Now().Add(-window)
cleanedReports = make([]report.Report, 0, len(c.reports))
cleanedTimestamps = make([]time.Time, 0, len(c.timestamps))
oldest = mtime.Now().Add(-c.window)
)
for _, tr := range reports {
if tr.timestamp.After(oldest) {
cleaned = append(cleaned, tr)
for i, r := range c.reports {
if c.timestamps[i].After(oldest) {
cleanedReports = append(cleanedReports, r)
cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i])
}
}
return cleaned
c.reports = cleanedReports
c.timestamps = cleanedTimestamps
}
138 changes: 138 additions & 0 deletions app/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package app

import (
"fmt"
"math"
"sort"

"github.com/bluele/gcache"
"github.com/spaolacci/murmur3"

"github.com/weaveworks/scope/report"
)

// Merger is the type for a thing that can merge reports.
type Merger interface {
Merge([]report.Report) report.Report
}

type dumbMerger struct{}

// MakeDumbMerger makes a Merger which merges together reports in the simplest possible way.
func MakeDumbMerger() Merger {
return dumbMerger{}
}

func (dumbMerger) Merge(reports []report.Report) report.Report {
rpt := report.MakeReport()
id := murmur3.New64()
for _, r := range reports {
rpt = rpt.Merge(r)
id.Write([]byte(r.ID))
}
rpt.ID = fmt.Sprintf("%x", id.Sum64())
return rpt
}

type smartMerger struct {
cache gcache.Cache
}

// NewSmartMerger makes a Merger which merges together reports, caching intermediate merges
// to accelerate future merges. Idea is to cache pair-wise merged reports, forming a merge
// tree. Merging a new report into this tree should be log(N).
func NewSmartMerger() Merger {
return &smartMerger{
cache: gcache.New(1000).LRU().Build(),
}
}

type node struct {
id uint64
rpt report.Report
}

type byID []*node

func (ns byID) Len() int { return len(ns) }
func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id }

func hash(ids ...string) uint64 {
id := murmur3.New64()
for _, i := range ids {
id.Write([]byte(i))
}
return id.Sum64()
}

func (s *smartMerger) ClearCache() {
s.cache.Purge()
}

func (s *smartMerger) Merge(reports []report.Report) report.Report {
// Start with a sorted list of leaves.
// Note we must dedupe reports with the same ID to ensure the
// algorithm below doesn't go into an infinite loop. This is
// fine as reports with the same ID are assumed to be the same.
nodes := []*node{}
seen := map[uint64]struct{}{}
for _, r := range reports {
id := hash(r.ID)
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
nodes = append(nodes, &node{
id: id,
rpt: r,
})
}
sort.Sort(byID(nodes))

// Define how to merge two nodes together. The result of merging
// two reports is cached.
merge := func(left, right *node) *node {
id := hash(left.rpt.ID, right.rpt.ID)

if result, err := s.cache.Get(id); err == nil {
return result.(*node)
}

n := &node{
id: id,
rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt),
}
s.cache.Set(id, n)
return n
}

// Define how to reduce n nodes to 1.
// Min and max are both inclusive!
var reduce func(min, max uint64, nodes []*node) *node
reduce = func(min, max uint64, nodes []*node) *node {
switch len(nodes) {
case 0:
return &node{rpt: report.MakeReport()}
case 1:
return nodes[0]
case 2:
return merge(nodes[0], nodes[1])
}

partition := min + ((max - min) / 2)
index := sort.Search(len(nodes), func(i int) bool {
return nodes[i].id > partition
})
if index == len(nodes) {
return reduce(min, partition, nodes)
} else if index == 0 {
return reduce(partition+1, max, nodes)
}
left := reduce(min, partition, nodes[:index])
right := reduce(partition+1, max, nodes[index:])
return merge(left, right)
}

return reduce(0, math.MaxUint64, nodes).rpt
}
122 changes: 122 additions & 0 deletions app/merger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package app_test

import (
"fmt"
"math/rand"
"testing"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/reflect"
)

func TestMerger(t *testing.T) {
// Use 3 reports to check the pair-wise merging in SmartMerger
report1 := report.MakeReport()
report1.Endpoint.AddNode(report.MakeNode("foo"))
report2 := report.MakeReport()
report2.Endpoint.AddNode(report.MakeNode("bar"))
report3 := report.MakeReport()
report3.Endpoint.AddNode(report.MakeNode("baz"))
reports := []report.Report{
report1, report2, report3,
}
want := report.MakeReport()
want.Endpoint.
AddNode(report.MakeNode("foo")).
AddNode(report.MakeNode("bar")).
AddNode(report.MakeNode("baz"))

for _, merger := range []app.Merger{app.MakeDumbMerger(), app.NewSmartMerger()} {
// Test the empty list case
if have := merger.Merge([]report.Report{}); !reflect.DeepEqual(have, report.MakeReport()) {
t.Errorf("Bad merge: %s", test.Diff(have, want))
}

if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
t.Errorf("Bad merge: %s", test.Diff(have, want))
}

// Repeat the above test to ensure caching works
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
t.Errorf("Bad merge: %s", test.Diff(have, want))
}
}
}

func TestSmartMerger(t *testing.T) {
// Use 3 reports _WITH SAME ID_
report1 := report.MakeReport()
report1.Endpoint.AddNode(report.MakeNode("foo"))
report1.ID = "foo"
report2 := report.MakeReport()
report2.Endpoint.AddNode(report.MakeNode("bar"))
report2.ID = "foo"
report3 := report.MakeReport()
report3.Endpoint.AddNode(report.MakeNode("baz"))
report3.ID = "foo"
reports := []report.Report{
report1, report2, report3,
}
want := report.MakeReport()
want.Endpoint.AddNode(report.MakeNode("foo"))

merger := app.NewSmartMerger()
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
t.Errorf("Bad merge: %s", test.Diff(have, want))
}
}

func BenchmarkSmartMerger(b *testing.B) {
benchmarkMerger(b, app.NewSmartMerger(), false)
}

func BenchmarkSmartMergerWithoutCaching(b *testing.B) {
benchmarkMerger(b, app.NewSmartMerger(), true)
}

func BenchmarkDumbMerger(b *testing.B) {
benchmarkMerger(b, app.MakeDumbMerger(), false)
}

const numHosts = 15

func benchmarkMerger(b *testing.B, merger app.Merger, clearCache bool) {
makeReport := func() report.Report {
rpt := report.MakeReport()
for i := 0; i < 100; i++ {
rpt.Endpoint.AddNode(report.MakeNode(fmt.Sprintf("%x", rand.Int63())))
}
return rpt
}

reports := []report.Report{}
for i := 0; i < numHosts*5; i++ {
reports = append(reports, makeReport())
}
merger.Merge(reports) // prime the cache
if clearable, ok := merger.(interface {
ClearCache()
}); ok && clearCache {
clearable.ClearCache()
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
// replace 1/3 of hosts work of reports & merge them all
for i := 0; i < numHosts/3; i++ {
reports[rand.Intn(len(reports))] = makeReport()
}

merger.Merge(reports)

if clearable, ok := merger.(interface {
ClearCache()
}); ok && clearCache {
clearable.ClearCache()
}
}
}

0 comments on commit 54a760a

Please sign in to comment.