Skip to content

Commit

Permalink
[CWS] improve speed of btfhub archive constant builder script (#11467)
Browse files Browse the repository at this point in the history
* [CWS] run multiple tar/btf extractions in parallel

* [CWS] increase concurrent goroutines
  • Loading branch information
paulcacheux authored Mar 30, 2022
1 parent 2a12a6d commit 09b2187
Showing 1 changed file with 91 additions and 46 deletions.
137 changes: 91 additions & 46 deletions pkg/security/probe/constantfetch/btfhub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package main
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -20,37 +21,37 @@ import (
"path"
"path/filepath"
"reflect"
"runtime"
"sort"
"strings"
"sync"

"github.com/DataDog/datadog-agent/pkg/security/ebpf/kernel"
"github.com/DataDog/datadog-agent/pkg/security/log"
"github.com/DataDog/datadog-agent/pkg/security/probe"
"github.com/DataDog/datadog-agent/pkg/security/probe/constantfetch"
utilKernel "github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/smira/go-xz"
"golang.org/x/sync/semaphore"
)

func main() {
var archiveRootPath string
var constantOutputPath string
var sampling int

flag.StringVar(&archiveRootPath, "archive-root", "", "Root path of BTFHub archive")
flag.StringVar(&constantOutputPath, "output", "", "Output path for JSON constants")
flag.IntVar(&sampling, "sampling", 1, "Sampling rate, take 1 over n elements")
flag.Parse()

twCollector := newTreeWalkCollector(sampling)
twCollector := newTreeWalkCollector()

if err := filepath.WalkDir(archiveRootPath, twCollector.treeWalkerBuilder(archiveRootPath)); err != nil {
panic(err)
}
fmt.Printf("%d kernels\n", len(twCollector.kernels))
fmt.Printf("%d unique constants\n", len(twCollector.constants))

export := constantfetch.BTFHubConstants{
Constants: twCollector.constants,
Kernels: twCollector.kernels,
}
export := twCollector.finish()
fmt.Printf("%d kernels\n", len(export.Kernels))
fmt.Printf("%d unique constants\n", len(export.Constants))

output, err := json.MarshalIndent(export, "", "\t")
if err != nil {
Expand All @@ -63,52 +64,59 @@ func main() {
}

type treeWalkCollector struct {
constants []map[string]uint64
kernels []constantfetch.BTFHubKernel
counter int
sampling int
wg sync.WaitGroup
sem *semaphore.Weighted
resultsMu sync.Mutex
results []extractionResult
}

func newTreeWalkCollector(sampling int) *treeWalkCollector {
func newTreeWalkCollector() *treeWalkCollector {
return &treeWalkCollector{
constants: make([]map[string]uint64, 0),
kernels: make([]constantfetch.BTFHubKernel, 0),
counter: 0,
sampling: sampling,
counter: 0,
sem: semaphore.NewWeighted(int64(runtime.NumCPU() * 2)),
}
}

func (c *treeWalkCollector) appendConstants(distrib, version, arch, unameRelease string, constants map[string]uint64) {
index := -1
for i, other := range c.constants {
if reflect.DeepEqual(other, constants) {
index = i
break
func (c *treeWalkCollector) finish() constantfetch.BTFHubConstants {
c.wg.Wait()

sort.Slice(c.results, func(i, j int) bool { return c.results[i].index < c.results[j].index })

constants := make([]map[string]uint64, 0)
kernels := make([]constantfetch.BTFHubKernel, 0)

for _, res := range c.results {
index := -1
for i, other := range constants {
if reflect.DeepEqual(other, res.constants) {
index = i
break
}
}

if index == -1 {
index = len(constants)
constants = append(constants, res.constants)
}
}

if index == -1 {
index = len(c.constants)
c.constants = append(c.constants, constants)
kernels = append(kernels, constantfetch.BTFHubKernel{
Distribution: res.distribution,
DistribVersion: res.distribVersion,
Arch: res.arch,
UnameRelease: res.unameRelease,
ConstantsIndex: index,
})
}

c.kernels = append(c.kernels, constantfetch.BTFHubKernel{
Distribution: distrib,
DistribVersion: version,
Arch: arch,
UnameRelease: unameRelease,
ConstantsIndex: index,
})
return constantfetch.BTFHubConstants{
Constants: constants,
Kernels: kernels,
}
}

func (c *treeWalkCollector) treeWalkerBuilder(prefix string) fs.WalkDirFunc {
return func(path string, d fs.DirEntry, err error) error {
c.counter++
if c.counter != c.sampling {
return nil
}
c.counter = 0

if err != nil {
return err
}
Expand All @@ -120,6 +128,9 @@ func (c *treeWalkCollector) treeWalkerBuilder(prefix string) fs.WalkDirFunc {
return nil
}

btfRunIndex := c.counter
c.counter++

pathSuffix := strings.TrimPrefix(path, prefix)

btfParts := strings.Split(pathSuffix, "/")
Expand All @@ -132,18 +143,52 @@ func (c *treeWalkCollector) treeWalkerBuilder(prefix string) fs.WalkDirFunc {
arch := btfParts[len(btfParts)-2]
unameRelease := strings.TrimSuffix(btfParts[len(btfParts)-1], ".btf.tar.xz")

fmt.Println(path)

constants, err := extractConstantsFromBTF(path, distribution, distribVersion)
if err != nil {
return err
c.wg.Add(1)
if err := c.sem.Acquire(context.TODO(), 1); err != nil {
return fmt.Errorf("failed to acquire sem token: %w", err)
}

c.appendConstants(distribution, distribVersion, arch, unameRelease, constants)
go func() {
defer func() {
c.wg.Done()
c.sem.Release(1)
}()

fmt.Println(btfRunIndex, path)

constants, err := extractConstantsFromBTF(path, distribution, distribVersion)
if err != nil {
log.Errorf("failed to extract constants from `%s`: %v", path, err)
return
}

res := extractionResult{
index: btfRunIndex,
distribution: distribution,
distribVersion: distribVersion,
arch: arch,
unameRelease: unameRelease,
constants: constants,
}

c.resultsMu.Lock()
c.results = append(c.results, res)
c.resultsMu.Unlock()
}()

return nil
}
}

type extractionResult struct {
index int
distribution string
distribVersion string
arch string
unameRelease string
constants map[string]uint64
}

func extractConstantsFromBTF(archivePath, distribution, distribVersion string) (map[string]uint64, error) {
btfReader, err := createBTFReaderFromTarball(archivePath)
if err != nil {
Expand Down

0 comments on commit 09b2187

Please sign in to comment.