diff --git a/app/collector.go b/app/collector.go index e2afae4a48..41274d3c42 100644 --- a/app/collector.go +++ b/app/collector.go @@ -1,11 +1,9 @@ package app import ( - "fmt" "sync" "time" - "github.com/spaolacci/murmur3" "golang.org/x/net/context" "github.com/weaveworks/scope/common/mtime" @@ -39,6 +37,7 @@ type collector struct { reports []timestampReport window time.Duration cached *report.Report + merger merger waitableCondition } @@ -78,6 +77,7 @@ func NewCollector(window time.Duration) Collector { waitableCondition: waitableCondition{ waiters: map[chan struct{}]struct{}{}, }, + merger: newSmartMerger(), } } @@ -110,15 +110,11 @@ func (c *collector) Report(_ context.Context) (report.Report, error) { } c.reports = clean(c.reports, c.window) - rpt := report.MakeReport() - id := murmur3.New64() + rpts := []report.Report{} for _, tr := range c.reports { - rpt = rpt.Merge(tr.report) - id.Write([]byte(tr.report.ID)) + rpts = append(rpts, tr.report) } - rpt.ID = fmt.Sprintf("%x", id.Sum64()) - c.cached = &rpt - return rpt, nil + return c.merger.Merge(rpts), nil } type timestampReport struct { diff --git a/app/merger.go b/app/merger.go new file mode 100644 index 0000000000..a93eb444f8 --- /dev/null +++ b/app/merger.go @@ -0,0 +1,111 @@ +package app + +import ( + "fmt" + "sort" + + "github.com/bluele/gcache" + "github.com/spaolacci/murmur3" + + "github.com/weaveworks/scope/report" +) + +type merger interface { + Merge([]report.Report) report.Report +} + +type dumbMerger struct{} + +func (dumbMerger) Merge(reports []report.Report) report.Report { + rpt := report.MakeReport() + id := murmur3.New64() + for _, r := range reports { + rpt = rpt.Merge(r) + id.Write([]byte(r.ID)) + } + rpt.ID = fmt.Sprintf("%x", id.Sum64()) + return rpt +} + +type smartMerger struct { + cache gcache.Cache +} + +func newSmartMerger() merger { + return &smartMerger{ + cache: gcache.New(100).LRU().Build(), + } +} + +type node struct { + id string + rpt report.Report + left, right *node +} + +type byID []*node + +func (ns byID) Len() int { return len(ns) } +func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] } +func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id } + +func hash(_1, _2 string) string { + id := murmur3.New64() + id.Write([]byte(_1)) + id.Write([]byte(_2)) + return fmt.Sprintf("%x", id.Sum64()) +} + +func (s *smartMerger) Merge(reports []report.Report) report.Report { + // Idea is to cache pair-wise merged reports, forming a merge + // tree. Merging a new report into this tree should be log(N). + + // Start with a sorted list of leaves + nodes := []*node{} + for _, r := range reports { + nodes = append(nodes, &node{ + id: r.ID, + rpt: r, + }) + } + sort.Sort(byID(nodes)) + + // Define how to merge two nodes together. Cache it. + merge := func(left, right *node) *node { + id := hash(left.id, right.id) + + if result, err := s.cache.Get(id); err == nil { + return result.(*node) + } + + n := &node{ + id: id, + rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt), + left: left, + right: right, + } + s.cache.Set(id, n) + return n + } + + // Define how to reduce n nodes to n/2 nodes. + pass := func(nodes []*node) []*node { + output := []*node{} + for i := 0; i < len(nodes)/2; i++ { + output = append(output, merge(nodes[i*2], nodes[i*2+1])) + } + if len(nodes)%2 > 0 { + output = append(output, nodes[len(nodes)-1]) + } + return output + } + + // Iterative reduce n nodes down to 1 + for len(nodes) > 1 { + nodes = pass(nodes) + } + if len(nodes) == 0 { + return report.MakeReport() + } + return nodes[0].rpt +}