From 54a760a56dcb49f833065234d69f1311c7b10ae4 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 3 May 2016 14:37:28 +0100 Subject: [PATCH] Log(n) complexity report merger. --- app/collector.go | 55 ++++++++---------- app/merger.go | 138 +++++++++++++++++++++++++++++++++++++++++++++ app/merger_test.go | 122 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 284 insertions(+), 31 deletions(-) create mode 100644 app/merger.go create mode 100644 app/merger_test.go diff --git a/app/collector.go b/app/collector.go index e2afae4a48..a33e11cffd 100644 --- a/app/collector.go +++ b/app/collector.go @@ -1,11 +1,9 @@ package app import ( - "fmt" "sync" "time" - "github.com/spaolacci/murmur3" "golang.org/x/net/context" "github.com/weaveworks/scope/common/mtime" @@ -35,10 +33,12 @@ type Collector interface { // Collector receives published reports from multiple producers. It yields a // single merged report, representing all collected reports. type collector struct { - mtx sync.Mutex - reports []timestampReport - window time.Duration - cached *report.Report + mtx sync.Mutex + reports []report.Report + timestamps []time.Time + window time.Duration + cached *report.Report + merger Merger waitableCondition } @@ -78,6 +78,7 @@ func NewCollector(window time.Duration) Collector { waitableCondition: waitableCondition{ waiters: map[chan struct{}]struct{}{}, }, + merger: NewSmartMerger(), } } @@ -85,8 +86,10 @@ func NewCollector(window time.Duration) Collector { func (c *collector) Add(_ context.Context, rpt report.Report) error { c.mtx.Lock() defer c.mtx.Unlock() - c.reports = append(c.reports, timestampReport{mtime.Now(), rpt}) - c.reports = clean(c.reports, c.window) + c.reports = append(c.reports, rpt) + c.timestamps = append(c.timestamps, mtime.Now()) + + c.clean() c.cached = nil if rpt.Shortcut { c.Broadcast() @@ -104,37 +107,27 @@ func (c *collector) Report(_ context.Context) (report.Report, error) { // and there is a cached report, return that. if c.cached != nil && len(c.reports) > 0 { oldest := mtime.Now().Add(-c.window) - if c.reports[0].timestamp.After(oldest) { + if c.timestamps[0].After(oldest) { return *c.cached, nil } } - c.reports = clean(c.reports, c.window) - - rpt := report.MakeReport() - id := murmur3.New64() - for _, tr := range c.reports { - rpt = rpt.Merge(tr.report) - id.Write([]byte(tr.report.ID)) - } - rpt.ID = fmt.Sprintf("%x", id.Sum64()) - c.cached = &rpt - return rpt, nil -} -type timestampReport struct { - timestamp time.Time - report report.Report + c.clean() + return c.merger.Merge(c.reports), nil } -func clean(reports []timestampReport, window time.Duration) []timestampReport { +func (c *collector) clean() { var ( - cleaned = make([]timestampReport, 0, len(reports)) - oldest = mtime.Now().Add(-window) + cleanedReports = make([]report.Report, 0, len(c.reports)) + cleanedTimestamps = make([]time.Time, 0, len(c.timestamps)) + oldest = mtime.Now().Add(-c.window) ) - for _, tr := range reports { - if tr.timestamp.After(oldest) { - cleaned = append(cleaned, tr) + for i, r := range c.reports { + if c.timestamps[i].After(oldest) { + cleanedReports = append(cleanedReports, r) + cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i]) } } - return cleaned + c.reports = cleanedReports + c.timestamps = cleanedTimestamps } diff --git a/app/merger.go b/app/merger.go new file mode 100644 index 0000000000..975b31d3b8 --- /dev/null +++ b/app/merger.go @@ -0,0 +1,138 @@ +package app + +import ( + "fmt" + "math" + "sort" + + "github.com/bluele/gcache" + "github.com/spaolacci/murmur3" + + "github.com/weaveworks/scope/report" +) + +// Merger is the type for a thing that can merge reports. +type Merger interface { + Merge([]report.Report) report.Report +} + +type dumbMerger struct{} + +// MakeDumbMerger makes a Merger which merges together reports in the simplest possible way. +func MakeDumbMerger() Merger { + return dumbMerger{} +} + +func (dumbMerger) Merge(reports []report.Report) report.Report { + rpt := report.MakeReport() + id := murmur3.New64() + for _, r := range reports { + rpt = rpt.Merge(r) + id.Write([]byte(r.ID)) + } + rpt.ID = fmt.Sprintf("%x", id.Sum64()) + return rpt +} + +type smartMerger struct { + cache gcache.Cache +} + +// NewSmartMerger makes a Merger which merges together reports, caching intermediate merges +// to accelerate future merges. Idea is to cache pair-wise merged reports, forming a merge +// tree. Merging a new report into this tree should be log(N). +func NewSmartMerger() Merger { + return &smartMerger{ + cache: gcache.New(1000).LRU().Build(), + } +} + +type node struct { + id uint64 + rpt report.Report +} + +type byID []*node + +func (ns byID) Len() int { return len(ns) } +func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] } +func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id } + +func hash(ids ...string) uint64 { + id := murmur3.New64() + for _, i := range ids { + id.Write([]byte(i)) + } + return id.Sum64() +} + +func (s *smartMerger) ClearCache() { + s.cache.Purge() +} + +func (s *smartMerger) Merge(reports []report.Report) report.Report { + // Start with a sorted list of leaves. + // Note we must dedupe reports with the same ID to ensure the + // algorithm below doesn't go into an infinite loop. This is + // fine as reports with the same ID are assumed to be the same. + nodes := []*node{} + seen := map[uint64]struct{}{} + for _, r := range reports { + id := hash(r.ID) + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + nodes = append(nodes, &node{ + id: id, + rpt: r, + }) + } + sort.Sort(byID(nodes)) + + // Define how to merge two nodes together. The result of merging + // two reports is cached. + merge := func(left, right *node) *node { + id := hash(left.rpt.ID, right.rpt.ID) + + if result, err := s.cache.Get(id); err == nil { + return result.(*node) + } + + n := &node{ + id: id, + rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt), + } + s.cache.Set(id, n) + return n + } + + // Define how to reduce n nodes to 1. + // Min and max are both inclusive! + var reduce func(min, max uint64, nodes []*node) *node + reduce = func(min, max uint64, nodes []*node) *node { + switch len(nodes) { + case 0: + return &node{rpt: report.MakeReport()} + case 1: + return nodes[0] + case 2: + return merge(nodes[0], nodes[1]) + } + + partition := min + ((max - min) / 2) + index := sort.Search(len(nodes), func(i int) bool { + return nodes[i].id > partition + }) + if index == len(nodes) { + return reduce(min, partition, nodes) + } else if index == 0 { + return reduce(partition+1, max, nodes) + } + left := reduce(min, partition, nodes[:index]) + right := reduce(partition+1, max, nodes[index:]) + return merge(left, right) + } + + return reduce(0, math.MaxUint64, nodes).rpt +} diff --git a/app/merger_test.go b/app/merger_test.go new file mode 100644 index 0000000000..06abc7505c --- /dev/null +++ b/app/merger_test.go @@ -0,0 +1,122 @@ +package app_test + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/reflect" +) + +func TestMerger(t *testing.T) { + // Use 3 reports to check the pair-wise merging in SmartMerger + report1 := report.MakeReport() + report1.Endpoint.AddNode(report.MakeNode("foo")) + report2 := report.MakeReport() + report2.Endpoint.AddNode(report.MakeNode("bar")) + report3 := report.MakeReport() + report3.Endpoint.AddNode(report.MakeNode("baz")) + reports := []report.Report{ + report1, report2, report3, + } + want := report.MakeReport() + want.Endpoint. + AddNode(report.MakeNode("foo")). + AddNode(report.MakeNode("bar")). + AddNode(report.MakeNode("baz")) + + for _, merger := range []app.Merger{app.MakeDumbMerger(), app.NewSmartMerger()} { + // Test the empty list case + if have := merger.Merge([]report.Report{}); !reflect.DeepEqual(have, report.MakeReport()) { + t.Errorf("Bad merge: %s", test.Diff(have, want)) + } + + if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { + t.Errorf("Bad merge: %s", test.Diff(have, want)) + } + + // Repeat the above test to ensure caching works + if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { + t.Errorf("Bad merge: %s", test.Diff(have, want)) + } + } +} + +func TestSmartMerger(t *testing.T) { + // Use 3 reports _WITH SAME ID_ + report1 := report.MakeReport() + report1.Endpoint.AddNode(report.MakeNode("foo")) + report1.ID = "foo" + report2 := report.MakeReport() + report2.Endpoint.AddNode(report.MakeNode("bar")) + report2.ID = "foo" + report3 := report.MakeReport() + report3.Endpoint.AddNode(report.MakeNode("baz")) + report3.ID = "foo" + reports := []report.Report{ + report1, report2, report3, + } + want := report.MakeReport() + want.Endpoint.AddNode(report.MakeNode("foo")) + + merger := app.NewSmartMerger() + if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { + t.Errorf("Bad merge: %s", test.Diff(have, want)) + } +} + +func BenchmarkSmartMerger(b *testing.B) { + benchmarkMerger(b, app.NewSmartMerger(), false) +} + +func BenchmarkSmartMergerWithoutCaching(b *testing.B) { + benchmarkMerger(b, app.NewSmartMerger(), true) +} + +func BenchmarkDumbMerger(b *testing.B) { + benchmarkMerger(b, app.MakeDumbMerger(), false) +} + +const numHosts = 15 + +func benchmarkMerger(b *testing.B, merger app.Merger, clearCache bool) { + makeReport := func() report.Report { + rpt := report.MakeReport() + for i := 0; i < 100; i++ { + rpt.Endpoint.AddNode(report.MakeNode(fmt.Sprintf("%x", rand.Int63()))) + } + return rpt + } + + reports := []report.Report{} + for i := 0; i < numHosts*5; i++ { + reports = append(reports, makeReport()) + } + merger.Merge(reports) // prime the cache + if clearable, ok := merger.(interface { + ClearCache() + }); ok && clearCache { + clearable.ClearCache() + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // replace 1/3 of hosts work of reports & merge them all + for i := 0; i < numHosts/3; i++ { + reports[rand.Intn(len(reports))] = makeReport() + } + + merger.Merge(reports) + + if clearable, ok := merger.(interface { + ClearCache() + }); ok && clearCache { + clearable.ClearCache() + } + } +}