Skip to content

Commit

Permalink
perf: add self-profiling support to enable PGO
Browse files Browse the repository at this point in the history
Add a flag for the `mro` tool to collect profiles information.

Add an environment variable to make `mrp` profile itself.  If set,
`mrjob` will also profile itself, writing the profile into its metadata
directory.

Add PGO profiles for mrjob mro.  These are very small
profiles, made smaller by trimming.  For `mrp` we'll wait
until these are deployed in production so we can collect
some more representative profiles.
  • Loading branch information
adam-azarchs authored Feb 16, 2024
1 parent 181d004 commit 2826c42
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 57 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pkg
coverage.out
coverage.html
/.vscode/
*.pprof
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ GOTESTS=$(GOLIBTESTS) $(GOBINTESTS) test-all
VERSION=$(shell git describe --tags --always --dirty)
RELEASE=false
SRC_ROOT=$(abspath $(dir $(PWD))../..)
GO_FLAGS=-ldflags "-X '$(REPO)/martian/util.__VERSION__=$(VERSION)' -X $(REPO)/martian/util.__RELEASE__='$(RELEASE)'" -gcflags "-trimpath $(SRC_ROOT)"
GO_FLAGS=-trimpath -ldflags "-X '$(REPO)/martian/util.__VERSION__=$(VERSION)' -X $(REPO)/martian/util.__RELEASE__='$(RELEASE)'"

unexport GOPATH
export GO111MODULE=on
Expand Down Expand Up @@ -166,6 +166,9 @@ test/disable_test/pipeline_test: test/disable_test/disable_test.json \
integration_prereqs
test/martian_test.py $<

test/retry_map_call_map_test/pipeline_test: test/retry_map_call_map_test/autoretry_pass.json \
integration_prereqs
test/martian_test.py $<

test/retry_test/pipeline_test: test/retry_test/autoretry_pass.json \
integration_prereqs
Expand All @@ -186,8 +189,18 @@ longtests: test/split_test/pipeline_test \
test/fork_test/ar_fail/pipeline_fail \
test/map_test/pipeline_test \
test/disable_test/pipeline_test \
test/retry_map_call_map_test/pipeline_test \
test/retry_test/pipeline_test

# Collect profiles to use for profile-guided optimization for mrp and mrjob,
# and merge/update the appropriate default.pgo files.
update_pgo: integration_prereqs
mkdir -p profiles
rm -f profiles/*.pprof
MRO_SELF_PROFILE="$(PWD)/profiles/*.pprof" $(MAKE) longtests
find $(PWD)/profiles -maxdepth 1 -name "*.pprof" | xargs go tool pprof -proto > $(PWD)/cmd/mrp/default.pgo
find $(PWD)/test -name _selfProfile.pprof | xargs go tool pprof -proto > $(PWD)/cmd/mrjob/default.pgo

clean:
rm -rf $(GOBIN)
rm -rf $(dir $(GOBIN))pkg
Expand Down
1 change: 1 addition & 0 deletions cmd/mrjob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
go_binary(
name = "mrjob",
embed = [":mrjob_lib"],
pgoprofile = "default.pgo",
visibility = ["//:__pkg__"],
)

Expand Down
Binary file added cmd/mrjob/default.pgo
Binary file not shown.
30 changes: 30 additions & 0 deletions cmd/mrjob/mrjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path"
"regexp"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -60,6 +61,10 @@ func main() {
fqname := path.Base(args[3])
journalPath := path.Dir(args[3])

if os.Getenv("MRO_SELF_PROFILE") != "" {
startCpuProfile(metadataPath)
}

run := runner{
ioStats: core.NewIoStatsBuilder(),
metadata: core.NewMetadataRunWithJournalPath(fqname, metadataPath, filesPath, journalPath, runType),
Expand All @@ -86,6 +91,25 @@ func main() {
run.WaitLoop()
}

// If we're running a CPU self-profile, this is the handle to it.
var selfProfile *os.File

func startCpuProfile(metadataPath string) {
// This isn't going through the usual metadata API because we want
// the profile to include construction of that.
f, err := os.Create(path.Join(metadataPath, "_selfProfile.pprof"))
if err != nil {
util.PrintError(err, "profile", "Error recording CPU profile")
return
}
if err := pprof.StartCPUProfile(f); err != nil {
f.Close()
util.PrintError(err, "profile", "Error recording CPU profile")
return
}
selfProfile = f
}

func (self *runner) Init() {
// In case the job template was wrong, set the working directory now.
if err := os.Chdir(self.metadata.FilesPath()); err != nil {
Expand Down Expand Up @@ -224,6 +248,12 @@ func (self *runner) waitForPerf() {
case <-time.After(15 * time.Second):
}
}
if selfProfile != nil {
pprof.StopCPUProfile()
if err := selfProfile.Close(); err != nil {
util.PrintError(err, "profile", "Error closing cpu profile")
}
}
}

func totalCpu(ru *core.RusageInfo) float64 {
Expand Down
1 change: 1 addition & 0 deletions cmd/mro/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ go_library(
go_binary(
name = "mro",
embed = [":mro_lib"],
pgoprofile = "default.pgo",
visibility = ["//visibility:public"],
)
7 changes: 4 additions & 3 deletions cmd/mro/check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func CompileAll(mroPaths []string, checkSrcPath bool) (int, []*syntax.Ast, error
return len(fileNames), asts, nil
}

func Main(argv []string) {
func Main(argv []string) int {
util.SetPrintLogger(os.Stderr)
// Command-line arguments.
doc := `Martian Compiler.
Expand Down Expand Up @@ -101,7 +101,7 @@ Options:

if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
return 1
}

if mkjson {
Expand Down Expand Up @@ -186,8 +186,9 @@ Options:
fmt.Fprintln(os.Stderr, "Successfully compiled", count, "mro files.")

if wasErr {
os.Exit(1)
return 1
}
return 0
}

func printCallGraphs(asts []*syntax.Ast) bool {
Expand Down
Binary file added cmd/mro/default.pgo
Binary file not shown.
11 changes: 6 additions & 5 deletions cmd/mro/edit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s stringListValue) Set(v string) error {
return nil
}

func Main(argv []string) {
func Main(argv []string) int {
util.SetPrintLogger(os.Stderr)
util.SetupSignalHandlers()

Expand Down Expand Up @@ -116,12 +116,12 @@ func Main(argv []string) {
}
if *version {
fmt.Println(util.GetVersion())
os.Exit(0)
return 0
}

if flags.NArg() < 1 {
flags.Usage()
os.Exit(1)
return 1
}

cwd, _ := os.Getwd()
Expand All @@ -146,7 +146,7 @@ func Main(argv []string) {
if err != nil {
fmt.Fprintln(flags.Output(),
err.Error())
os.Exit(10)
return 10
}

if edit != nil {
Expand Down Expand Up @@ -186,7 +186,7 @@ func Main(argv []string) {
fmt.Fprintln(os.Stderr,
"Error finding unused stage outputs:",
err.Error())
os.Exit(11)
return 11
} else if len(unused) > 0 {
fmt.Fprintln(os.Stderr,
"Stage outputs not used in top-level outs or other stage inputs:")
Expand Down Expand Up @@ -217,6 +217,7 @@ func Main(argv []string) {
}
}
}
return 0
}

func loadFiles(names, mroPaths []string, parser *syntax.Parser) ([][]byte, []*syntax.Ast) {
Expand Down
17 changes: 9 additions & 8 deletions cmd/mro/format/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/martian-lang/martian/martian/util"
)

func Main(argv []string) {
func Main(argv []string) int {
util.SetPrintLogger(os.Stderr)
// Command-line arguments.
doc := `Martian Formatter.
Expand All @@ -49,14 +49,14 @@ Options:
opts, err := docopt.Parse(doc, argv, true, martianVersion, false)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
return 1
}

// Martian environment variables.
cwd, err := os.Getwd()
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
return 1
}
mroPaths := util.ParseMroPath(cwd)
if value := os.Getenv("MROPATH"); len(value) > 0 {
Expand All @@ -72,7 +72,7 @@ Options:
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, err.Error())
fmt.Fprintln(os.Stderr)
os.Exit(1)
return 1
}
for _, f := range fnames {
if strings.HasSuffix(f, ".mro") {
Expand All @@ -87,7 +87,7 @@ Options:
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, err.Error())
fmt.Fprintln(os.Stderr)
os.Exit(1)
return 1
}
if err := ioutil.WriteFile(fname, []byte(fsrc), 0644); err != nil {
fmt.Fprintf(os.Stderr, "Error writing to %s: %s\n",
Expand All @@ -103,14 +103,14 @@ Options:
b, err := ioutil.ReadAll(os.Stdin)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
return 1
}
fsrc, err := syntax.FormatSrcBytes(b, fn, fixIncludes, mroPaths)
if err != nil {
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, err.Error())
fmt.Fprintln(os.Stderr)
os.Exit(1)
return 1
}
fmt.Print(fsrc)
} else {
Expand All @@ -121,7 +121,7 @@ Options:
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, err.Error())
fmt.Fprintln(os.Stderr)
os.Exit(1)
return 1
}
if opts["--rewrite"].(bool) {
if err := ioutil.WriteFile(fname, []byte(fsrc), 0644); err != nil {
Expand All @@ -133,4 +133,5 @@ Options:
}
}
}
return 0
}
14 changes: 7 additions & 7 deletions cmd/mro/graph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/martian-lang/martian/martian/util"
)

func Main(argv []string) {
func Main(argv []string) int {
util.SetPrintLogger(os.Stderr)
syntax.SetEnforcementLevel(syntax.EnforceError)

Expand Down Expand Up @@ -50,7 +50,7 @@ func Main(argv []string) {
fmt.Fprintln(flags.Output(),
"Cannot render input/output traces as json or dot.")
flags.Usage()
os.Exit(1)
return 1
}
if stageInput != "" {
if !traceInput(stageInput, cg, lookup) {
Expand All @@ -62,27 +62,27 @@ func Main(argv []string) {
fmt.Fprintln(os.Stderr, "Callable", stageOutput, "not found")
}
}
os.Exit(0)
return 0
}
pcg, ok := cg.(*syntax.CallGraphPipeline)
if !ok {
fmt.Fprintln(os.Stderr,
"Best call found was not a pipeline:",
cg.GetFqid())
os.Exit(1)
return 1
}
if asDot {
if asJson {
fmt.Fprintln(flags.Output(),
"Cannot render both json and dot.")
flags.Usage()
os.Exit(1)
return 1
}
renderDot(pcg)
os.Exit(0)
return 0
}
renderJson(pcg)
os.Exit(0)
return 0
}

func getGraph(fname string) (syntax.CallGraphNode, *syntax.TypeLookup) {
Expand Down
Loading

0 comments on commit 2826c42

Please sign in to comment.