From fda239d7241b5090c5d313eec8a2d4a6006df906 Mon Sep 17 00:00:00 2001 From: Paul Cacheux Date: Mon, 28 Mar 2022 09:44:47 +0200 Subject: [PATCH 1/2] [CWS] run multiple tar/btf extractions in parallel --- .../probe/constantfetch/btfhub/main.go | 137 ++++++++++++------ 1 file changed, 91 insertions(+), 46 deletions(-) diff --git a/pkg/security/probe/constantfetch/btfhub/main.go b/pkg/security/probe/constantfetch/btfhub/main.go index c36634d5e1498..52331b6ac1677 100644 --- a/pkg/security/probe/constantfetch/btfhub/main.go +++ b/pkg/security/probe/constantfetch/btfhub/main.go @@ -11,6 +11,7 @@ package main import ( "archive/tar" "bytes" + "context" "encoding/json" "flag" "fmt" @@ -20,37 +21,37 @@ import ( "path" "path/filepath" "reflect" + "runtime" + "sort" "strings" + "sync" "github.com/DataDog/datadog-agent/pkg/security/ebpf/kernel" + "github.com/DataDog/datadog-agent/pkg/security/log" "github.com/DataDog/datadog-agent/pkg/security/probe" "github.com/DataDog/datadog-agent/pkg/security/probe/constantfetch" utilKernel "github.com/DataDog/datadog-agent/pkg/util/kernel" "github.com/smira/go-xz" + "golang.org/x/sync/semaphore" ) func main() { var archiveRootPath string var constantOutputPath string - var sampling int flag.StringVar(&archiveRootPath, "archive-root", "", "Root path of BTFHub archive") flag.StringVar(&constantOutputPath, "output", "", "Output path for JSON constants") - flag.IntVar(&sampling, "sampling", 1, "Sampling rate, take 1 over n elements") flag.Parse() - twCollector := newTreeWalkCollector(sampling) + twCollector := newTreeWalkCollector() if err := filepath.WalkDir(archiveRootPath, twCollector.treeWalkerBuilder(archiveRootPath)); err != nil { panic(err) } - fmt.Printf("%d kernels\n", len(twCollector.kernels)) - fmt.Printf("%d unique constants\n", len(twCollector.constants)) - export := constantfetch.BTFHubConstants{ - Constants: twCollector.constants, - Kernels: twCollector.kernels, - } + export := twCollector.finish() + fmt.Printf("%d kernels\n", len(export.Kernels)) + fmt.Printf("%d unique constants\n", len(export.Constants)) output, err := json.MarshalIndent(export, "", "\t") if err != nil { @@ -63,52 +64,59 @@ func main() { } type treeWalkCollector struct { - constants []map[string]uint64 - kernels []constantfetch.BTFHubKernel counter int - sampling int + wg sync.WaitGroup + sem *semaphore.Weighted + resultsMu sync.Mutex + results []extractionResult } -func newTreeWalkCollector(sampling int) *treeWalkCollector { +func newTreeWalkCollector() *treeWalkCollector { return &treeWalkCollector{ - constants: make([]map[string]uint64, 0), - kernels: make([]constantfetch.BTFHubKernel, 0), - counter: 0, - sampling: sampling, + counter: 0, + sem: semaphore.NewWeighted(int64(runtime.NumCPU())), } } -func (c *treeWalkCollector) appendConstants(distrib, version, arch, unameRelease string, constants map[string]uint64) { - index := -1 - for i, other := range c.constants { - if reflect.DeepEqual(other, constants) { - index = i - break +func (c *treeWalkCollector) finish() constantfetch.BTFHubConstants { + c.wg.Wait() + + sort.Slice(c.results, func(i, j int) bool { return c.results[i].index < c.results[j].index }) + + constants := make([]map[string]uint64, 0) + kernels := make([]constantfetch.BTFHubKernel, 0) + + for _, res := range c.results { + index := -1 + for i, other := range constants { + if reflect.DeepEqual(other, res.constants) { + index = i + break + } + } + + if index == -1 { + index = len(constants) + constants = append(constants, res.constants) } - } - if index == -1 { - index = len(c.constants) - c.constants = append(c.constants, constants) + kernels = append(kernels, constantfetch.BTFHubKernel{ + Distribution: res.distribution, + DistribVersion: res.distribVersion, + Arch: res.arch, + UnameRelease: res.unameRelease, + ConstantsIndex: index, + }) } - c.kernels = append(c.kernels, constantfetch.BTFHubKernel{ - Distribution: distrib, - DistribVersion: version, - Arch: arch, - UnameRelease: unameRelease, - ConstantsIndex: index, - }) + return constantfetch.BTFHubConstants{ + Constants: constants, + Kernels: kernels, + } } func (c *treeWalkCollector) treeWalkerBuilder(prefix string) fs.WalkDirFunc { return func(path string, d fs.DirEntry, err error) error { - c.counter++ - if c.counter != c.sampling { - return nil - } - c.counter = 0 - if err != nil { return err } @@ -120,6 +128,9 @@ func (c *treeWalkCollector) treeWalkerBuilder(prefix string) fs.WalkDirFunc { return nil } + btfRunIndex := c.counter + c.counter++ + pathSuffix := strings.TrimPrefix(path, prefix) btfParts := strings.Split(pathSuffix, "/") @@ -132,18 +143,52 @@ func (c *treeWalkCollector) treeWalkerBuilder(prefix string) fs.WalkDirFunc { arch := btfParts[len(btfParts)-2] unameRelease := strings.TrimSuffix(btfParts[len(btfParts)-1], ".btf.tar.xz") - fmt.Println(path) - - constants, err := extractConstantsFromBTF(path, distribution, distribVersion) - if err != nil { - return err + c.wg.Add(1) + if err := c.sem.Acquire(context.TODO(), 1); err != nil { + return fmt.Errorf("failed to acquire sem token: %w", err) } - c.appendConstants(distribution, distribVersion, arch, unameRelease, constants) + go func() { + defer func() { + c.wg.Done() + c.sem.Release(1) + }() + + fmt.Println(btfRunIndex, path) + + constants, err := extractConstantsFromBTF(path, distribution, distribVersion) + if err != nil { + log.Errorf("failed to extract constants from `%s`: %v", path, err) + return + } + + res := extractionResult{ + index: btfRunIndex, + distribution: distribution, + distribVersion: distribVersion, + arch: arch, + unameRelease: unameRelease, + constants: constants, + } + + c.resultsMu.Lock() + c.results = append(c.results, res) + c.resultsMu.Unlock() + }() + return nil } } +type extractionResult struct { + index int + distribution string + distribVersion string + arch string + unameRelease string + constants map[string]uint64 +} + func extractConstantsFromBTF(archivePath, distribution, distribVersion string) (map[string]uint64, error) { btfReader, err := createBTFReaderFromTarball(archivePath) if err != nil { From f5d26e49f393fbda268b12eab953e62185ca00ce Mon Sep 17 00:00:00 2001 From: Paul Cacheux Date: Mon, 28 Mar 2022 12:29:15 +0200 Subject: [PATCH 2/2] [CWS] increase concurrent goroutines --- pkg/security/probe/constantfetch/btfhub/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/security/probe/constantfetch/btfhub/main.go b/pkg/security/probe/constantfetch/btfhub/main.go index 52331b6ac1677..4ffff368b130b 100644 --- a/pkg/security/probe/constantfetch/btfhub/main.go +++ b/pkg/security/probe/constantfetch/btfhub/main.go @@ -74,7 +74,7 @@ type treeWalkCollector struct { func newTreeWalkCollector() *treeWalkCollector { return &treeWalkCollector{ counter: 0, - sem: semaphore.NewWeighted(int64(runtime.NumCPU())), + sem: semaphore.NewWeighted(int64(runtime.NumCPU() * 2)), } }