-
Notifications
You must be signed in to change notification settings - Fork 712
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1418 from weaveworks/log-n-report-merger
A caching, log(n) complexity report merger
- Loading branch information
Showing
3 changed files
with
284 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package app | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
"sort" | ||
|
||
"github.com/bluele/gcache" | ||
"github.com/spaolacci/murmur3" | ||
|
||
"github.com/weaveworks/scope/report" | ||
) | ||
|
||
// Merger is the type for a thing that can merge reports. | ||
type Merger interface { | ||
Merge([]report.Report) report.Report | ||
} | ||
|
||
type dumbMerger struct{} | ||
|
||
// MakeDumbMerger makes a Merger which merges together reports in the simplest possible way. | ||
func MakeDumbMerger() Merger { | ||
return dumbMerger{} | ||
} | ||
|
||
func (dumbMerger) Merge(reports []report.Report) report.Report { | ||
rpt := report.MakeReport() | ||
id := murmur3.New64() | ||
for _, r := range reports { | ||
rpt = rpt.Merge(r) | ||
id.Write([]byte(r.ID)) | ||
} | ||
rpt.ID = fmt.Sprintf("%x", id.Sum64()) | ||
return rpt | ||
} | ||
|
||
type smartMerger struct { | ||
cache gcache.Cache | ||
} | ||
|
||
// NewSmartMerger makes a Merger which merges together reports, caching intermediate merges | ||
// to accelerate future merges. Idea is to cache pair-wise merged reports, forming a merge | ||
// tree. Merging a new report into this tree should be log(N). | ||
func NewSmartMerger() Merger { | ||
return &smartMerger{ | ||
cache: gcache.New(1000).LRU().Build(), | ||
} | ||
} | ||
|
||
type node struct { | ||
id uint64 | ||
rpt report.Report | ||
} | ||
|
||
type byID []*node | ||
|
||
func (ns byID) Len() int { return len(ns) } | ||
func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] } | ||
func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id } | ||
|
||
func hash(ids ...string) uint64 { | ||
id := murmur3.New64() | ||
for _, i := range ids { | ||
id.Write([]byte(i)) | ||
} | ||
return id.Sum64() | ||
} | ||
|
||
func (s *smartMerger) ClearCache() { | ||
s.cache.Purge() | ||
} | ||
|
||
func (s *smartMerger) Merge(reports []report.Report) report.Report { | ||
// Start with a sorted list of leaves. | ||
// Note we must dedupe reports with the same ID to ensure the | ||
// algorithm below doesn't go into an infinite loop. This is | ||
// fine as reports with the same ID are assumed to be the same. | ||
nodes := []*node{} | ||
seen := map[uint64]struct{}{} | ||
for _, r := range reports { | ||
id := hash(r.ID) | ||
if _, ok := seen[id]; ok { | ||
continue | ||
} | ||
seen[id] = struct{}{} | ||
nodes = append(nodes, &node{ | ||
id: id, | ||
rpt: r, | ||
}) | ||
} | ||
sort.Sort(byID(nodes)) | ||
|
||
// Define how to merge two nodes together. The result of merging | ||
// two reports is cached. | ||
merge := func(left, right *node) *node { | ||
id := hash(left.rpt.ID, right.rpt.ID) | ||
|
||
if result, err := s.cache.Get(id); err == nil { | ||
return result.(*node) | ||
} | ||
|
||
n := &node{ | ||
id: id, | ||
rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt), | ||
} | ||
s.cache.Set(id, n) | ||
return n | ||
} | ||
|
||
// Define how to reduce n nodes to 1. | ||
// Min and max are both inclusive! | ||
var reduce func(min, max uint64, nodes []*node) *node | ||
reduce = func(min, max uint64, nodes []*node) *node { | ||
switch len(nodes) { | ||
case 0: | ||
return &node{rpt: report.MakeReport()} | ||
case 1: | ||
return nodes[0] | ||
case 2: | ||
return merge(nodes[0], nodes[1]) | ||
} | ||
|
||
partition := min + ((max - min) / 2) | ||
index := sort.Search(len(nodes), func(i int) bool { | ||
return nodes[i].id > partition | ||
}) | ||
if index == len(nodes) { | ||
return reduce(min, partition, nodes) | ||
} else if index == 0 { | ||
return reduce(partition+1, max, nodes) | ||
} | ||
left := reduce(min, partition, nodes[:index]) | ||
right := reduce(partition+1, max, nodes[index:]) | ||
return merge(left, right) | ||
} | ||
|
||
return reduce(0, math.MaxUint64, nodes).rpt | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package app_test | ||
|
||
import ( | ||
"fmt" | ||
"math/rand" | ||
"testing" | ||
|
||
"github.com/weaveworks/scope/app" | ||
"github.com/weaveworks/scope/report" | ||
"github.com/weaveworks/scope/test" | ||
"github.com/weaveworks/scope/test/reflect" | ||
) | ||
|
||
func TestMerger(t *testing.T) { | ||
// Use 3 reports to check the pair-wise merging in SmartMerger | ||
report1 := report.MakeReport() | ||
report1.Endpoint.AddNode(report.MakeNode("foo")) | ||
report2 := report.MakeReport() | ||
report2.Endpoint.AddNode(report.MakeNode("bar")) | ||
report3 := report.MakeReport() | ||
report3.Endpoint.AddNode(report.MakeNode("baz")) | ||
reports := []report.Report{ | ||
report1, report2, report3, | ||
} | ||
want := report.MakeReport() | ||
want.Endpoint. | ||
AddNode(report.MakeNode("foo")). | ||
AddNode(report.MakeNode("bar")). | ||
AddNode(report.MakeNode("baz")) | ||
|
||
for _, merger := range []app.Merger{app.MakeDumbMerger(), app.NewSmartMerger()} { | ||
// Test the empty list case | ||
if have := merger.Merge([]report.Report{}); !reflect.DeepEqual(have, report.MakeReport()) { | ||
t.Errorf("Bad merge: %s", test.Diff(have, want)) | ||
} | ||
|
||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { | ||
t.Errorf("Bad merge: %s", test.Diff(have, want)) | ||
} | ||
|
||
// Repeat the above test to ensure caching works | ||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { | ||
t.Errorf("Bad merge: %s", test.Diff(have, want)) | ||
} | ||
} | ||
} | ||
|
||
func TestSmartMerger(t *testing.T) { | ||
// Use 3 reports _WITH SAME ID_ | ||
report1 := report.MakeReport() | ||
report1.Endpoint.AddNode(report.MakeNode("foo")) | ||
report1.ID = "foo" | ||
report2 := report.MakeReport() | ||
report2.Endpoint.AddNode(report.MakeNode("bar")) | ||
report2.ID = "foo" | ||
report3 := report.MakeReport() | ||
report3.Endpoint.AddNode(report.MakeNode("baz")) | ||
report3.ID = "foo" | ||
reports := []report.Report{ | ||
report1, report2, report3, | ||
} | ||
want := report.MakeReport() | ||
want.Endpoint.AddNode(report.MakeNode("foo")) | ||
|
||
merger := app.NewSmartMerger() | ||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { | ||
t.Errorf("Bad merge: %s", test.Diff(have, want)) | ||
} | ||
} | ||
|
||
func BenchmarkSmartMerger(b *testing.B) { | ||
benchmarkMerger(b, app.NewSmartMerger(), false) | ||
} | ||
|
||
func BenchmarkSmartMergerWithoutCaching(b *testing.B) { | ||
benchmarkMerger(b, app.NewSmartMerger(), true) | ||
} | ||
|
||
func BenchmarkDumbMerger(b *testing.B) { | ||
benchmarkMerger(b, app.MakeDumbMerger(), false) | ||
} | ||
|
||
const numHosts = 15 | ||
|
||
func benchmarkMerger(b *testing.B, merger app.Merger, clearCache bool) { | ||
makeReport := func() report.Report { | ||
rpt := report.MakeReport() | ||
for i := 0; i < 100; i++ { | ||
rpt.Endpoint.AddNode(report.MakeNode(fmt.Sprintf("%x", rand.Int63()))) | ||
} | ||
return rpt | ||
} | ||
|
||
reports := []report.Report{} | ||
for i := 0; i < numHosts*5; i++ { | ||
reports = append(reports, makeReport()) | ||
} | ||
merger.Merge(reports) // prime the cache | ||
if clearable, ok := merger.(interface { | ||
ClearCache() | ||
}); ok && clearCache { | ||
clearable.ClearCache() | ||
} | ||
|
||
b.ReportAllocs() | ||
b.ResetTimer() | ||
|
||
for i := 0; i < b.N; i++ { | ||
// replace 1/3 of hosts work of reports & merge them all | ||
for i := 0; i < numHosts/3; i++ { | ||
reports[rand.Intn(len(reports))] = makeReport() | ||
} | ||
|
||
merger.Merge(reports) | ||
|
||
if clearable, ok := merger.(interface { | ||
ClearCache() | ||
}); ok && clearCache { | ||
clearable.ClearCache() | ||
} | ||
} | ||
} |