Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/profile: Use buffer pools to reduce allocs #1056

Closed
wants to merge 5 commits into from

Conversation

marselester
Copy link
Contributor

No description provided.

pkg/profiler/cpu/maps.go Outdated Show resolved Hide resolved
@kakkoyun
Copy link
Member

Could you also attach profiles if you have any?

@javierhonduco
Copy link
Contributor

javierhonduco commented Nov 22, 2022

Not a seasoned Go programmer, so let me know what you think about replacingsync.Pool with something that's perhaps simpler and clearer: directly preallocating a buffer and resetting it before using it. Something like this pseudocode:

buf := make([]byte, 0, MAX_SHARD_SIZE)
unwindTable := bytes.NewBuffer(buf)

func updateTable() {
  unwindTable.Reset() # reset slice, reusing the underlying memory buffer
  # do work involving `unwindTable`
}

The explicitness of this code is a bit easier to read I think and might make it potentially harder to introduce potential correctness issues.

@kakkoyun
Copy link
Member

Not a seasoned Go programmer, so let me know what you think about replacingsync.Pool with something that's perhaps simpler and clearer: directly preallocating a buffer and resetting it before using it. Something like this pseudocode:

buf := make([]byte, 0, MAX_SHARD_SIZE)
unwindTable := bytes.NewBuffer(buf)

func updateTable() {
  unwindTable.Reset() # reset slice, reusing the underlying memory buffer
  # do work involving `unwindTable`
}

The explicitness of this code is a bit easier to read I think and might make it potentially harder to introduce potential correctness issues.

This always allocated a new byte slice. The idea behind the sync.Pool + bytes.Buffer is to prevent allocation of the byte slice and reuse existing. Or did you mean to propose to use a single global (package level) byte slice and re-use it every time?

@javierhonduco
Copy link
Contributor

This always allocated a new byte slice. The idea behind the sync.Pool + bytes.Buffer is to prevent allocation of the byte slice and reuse existing. Or did you mean to propose to use a single global (package level) byte slice and re-use it every time?

Yeah, exactly that!

@brancz
Copy link
Member

brancz commented Nov 22, 2022

I can't find the constant that you're talking about, how much memory would we be talking about?

@marselester
Copy link
Contributor Author

marselester commented Nov 22, 2022

Hi!, thank you for the feedback!
I was looking for a low hanging fruit within parca-agent repository even though there are bigger wins in its dependencies (I skimmed code of github.com/godbus/dbus).

go tool pprof dist/parca-agent allocs.pprof
(pprof) top10 -cum
Showing nodes accounting for 1685.42MB, 33.25% of 5069.28MB total
Dropped 331 nodes (cum <= 25.35MB)
Showing top 10 nodes out of 79
      flat  flat%   sum%        cum   cum%
         0     0%     0%  2718.46MB 53.63%  github.com/oklog/run.(*Group).Run.func1
         0     0%     0%  2718.46MB 53.63%  runtime/pprof.Do
    0.50MB 0.0099% 0.0099%  2717.96MB 53.62%  github.com/parca-dev/parca-agent/pkg/profiler/cpu.(*CPU).Run
         0     0% 0.0099%  2717.96MB 53.62%  main.run.func6
         0     0% 0.0099%  2717.96MB 53.62%  main.run.func6.1
    0.50MB 0.0099%  0.02%  2051.43MB 40.47%  github.com/parca-dev/parca-agent/pkg/profiler.(*RemoteProfileWriter).Write
         0     0%  0.02%  2045.43MB 40.35%  github.com/google/pprof/profile.(*Profile).Write
         0     0%  0.02%  2032.64MB 40.10%  compress/gzip.(*Writer).Write
 1684.42MB 33.23% 33.25%  2032.14MB 40.09%  compress/flate.NewWriter
         0     0% 33.25%  1369.66MB 27.02%  github.com/godbus/dbus/v5.(*Conn).inWorker

So I checked cpu.(*CPU).Run which led me to profiler.(*RemoteProfileWriter).Write.

go tool pprof -source_path=github.com/parca-dev/parca-agent/ dist/parca-agent allocs.pprof
ROUTINE ======================== github.com/parca-dev/parca-agent/pkg/profiler/cpu.(*CPU).Run in pkg/profiler/cpu/cpu.go
  512.12kB     2.65GB (flat, cum) 53.62% of Total
         .          .    320:			return ctx.Err()
         .          .    321:		case <-ticker.C:
         .          .    322:		}
         .          .    323:
         .          .    324:		obtainStart := time.Now()
         .   134.73MB    325:		profiles, err := p.obtainProfiles(ctx)
         .          .    326:		if err != nil {
         .          .    327:			p.metrics.obtainAttempts.WithLabelValues("error").Inc()
         .          .    328:			level.Warn(p.logger).Log("msg", "failed to obtain profiles from eBPF maps", "err", err)
         .          .    329:			continue
         .          .    330:		}
         .          .    331:		p.metrics.obtainAttempts.WithLabelValues("success").Inc()
         .          .    332:		p.metrics.obtainDuration.Observe(time.Since(obtainStart).Seconds())
         .          .    333:
         .          .    334:		processLastErrors := map[int]error{}
         .          .    335:
         .          .    336:		for _, prof := range profiles {
         .          .    337:			start := time.Now()
         .          .    338:			processLastErrors[int(prof.ID.PID)] = nil
         .          .    339:
         .    13.96MB    340:			if err := p.symbolizer.Symbolize(prof); err != nil {
         .          .    341:				// This could be a partial symbolization, so we still want to send the profile.
         .          .    342:				level.Debug(p.logger).Log("msg", "failed to symbolize profile", "pid", prof.ID.PID, "err", err)
         .          .    343:				processLastErrors[int(prof.ID.PID)] = err
         .          .    344:			}
         .          .    345:
         .          .    346:			// ConvertToPprof can combine multiple profiles into a single profile,
         .          .    347:			// however right now we chose to send each profile separately.
         .          .    348:			// This is not too inefficient as we batch the profiles in a single RPC message,
         .          .    349:			// using the batch profiler writer.
         .     2.50MB    350:			pprof, err := profiler.ConvertToPprof(p.LastProfileStartedAt(), prof)
         .          .    351:			if err != nil {
         .          .    352:				level.Warn(p.logger).Log("msg", "failed to convert profile to pprof", "pid", prof.ID.PID, "err", err)
         .          .    353:				processLastErrors[int(prof.ID.PID)] = err
         .          .    354:				continue
         .          .    355:			}
         .          .    356:
         .    98.15MB    357:			labelSet := p.labelsManager.LabelSet(p.Name(), uint64(prof.ID.PID))
         .          .    358:			if labelSet == nil {
         .          .    359:				level.Debug(p.logger).Log("msg", "profile dropped", "pid", prof.ID.PID)
         .          .    360:				continue
         .          .    361:			}
         .          .    362:
         .          .    363:			p.metrics.symbolizeDuration.Observe(time.Since(start).Seconds())
         .          .    364:
         .        2GB    365:			if err := p.profileWriter.Write(ctx, labelSet, pprof); err != nil {
         .          .    366:				level.Warn(p.logger).Log("msg", "failed to write profile", "pid", prof.ID.PID, "err", err)
         .          .    367:				processLastErrors[int(prof.ID.PID)] = err
         .          .    368:				continue
         .          .    369:			}
         .          .    370:			if p.debuginfoManager != nil {
         .   512.12kB    371:				maps := p.processMappings.MapsForPID(int(prof.ID.PID))
         .          .    372:				var objFiles []*objectfile.MappedObjectFile
         .          .    373:				for _, mf := range maps {
         .   416.19MB    374:					objFile, err := p.objFileCache.ObjectFileForProcess(mf.PID, mf.Mapping)
         .          .    375:					if err != nil {
         .          .    376:						processLastErrors[int(prof.ID.PID)] = err
         .          .    377:						continue
         .          .    378:					}
  512.12kB   512.12kB    379:					objFiles = append(objFiles, objFile)
         .          .    380:				}
         .          .    381:				// Upload debug information of the discovered object files.
         .          .    382:				p.debuginfoManager.EnsureUploaded(ctx, objFiles)
         .          .    383:			}
         .          .    384:		}
ROUTINE ======================== github.com/parca-dev/parca-agent/pkg/profiler.(*RemoteProfileWriter).Write in pkg/profiler/profile_writer.go
  512.03kB        2GB (flat, cum) 40.47% of Total
         .          .     67:	}
         .          .     68:}
         .          .     69:
         .          .     70:// Write sends the profile using the designated write client.
         .          .     71:func (rw *RemoteProfileWriter) Write(ctx context.Context, labels model.LabelSet, prof *profile.Profile) error {
         .   512.02kB     72:	buf := bytes.NewBuffer(nil)
         .        2GB     73:	if err := prof.Write(buf); err != nil {
         .          .     74:		return err
         .          .     75:	}
         .          .     76:
         .   512.02kB     77:	_, err := rw.profileStoreClient.WriteRaw(ctx, &profilestorepb.WriteRawRequest{
         .          .     78:		Normalized: true,
         .          .     79:		Series: []*profilestorepb.RawProfileSeries{{
         .     4.50MB     80:			Labels: &profilestorepb.LabelSet{Labels: convertLabels(labels)},
  512.03kB   512.03kB     81:			Samples: []*profilestorepb.RawSample{{
         .          .     82:				RawProfile: buf.Bytes(),
         .          .     83:			}},
         .          .     84:		}},
         .          .     85:	})

Most allocations happen in profile.(*Profile).Write:

  • _, err := zw.Write(serialize(p))
  • z.compressor, _ = flate.NewWriter(z.w, z.level)

For some reason pprof doesn't reuse gzip.Writer and creates a new one each time zw := gzip.NewWriter(w).
So gzip package has to allocate a new compressor instead of calling compressor.Reset(w).

For the start I was thinking to reuse parca-agent buffers even though that would be just a tiny improvement in allocs.

go tool pprof -source_path=/home/vagrant/go/pkg/mod/ dist/parca-agent allocs.pprof
(pprof) list Write
ROUTINE ======================== github.com/google/pprof/profile.(*Profile).Write in github.com/google/[email protected]/profile/profile.go
         0        2GB (flat, cum) 40.35% of Total
         .          .    323:	return b
         .          .    324:}
         .          .    325:
         .          .    326:// Write writes the profile as a gzip-compressed marshaled protobuf.
         .          .    327:func (p *Profile) Write(w io.Writer) error {
         .        1MB    328:	zw := gzip.NewWriter(w)
         .          .    329:	defer zw.Close()
         .     1.98GB    330:	_, err := zw.Write(serialize(p))
         .    20.04MB    331:	return err
         .          .    332:}
         .          .    333:
         .          .    334:// WriteUncompressed writes the profile as a marshaled protobuf.
         .          .    335:func (p *Profile) WriteUncompressed(w io.Writer) error {
         .          .    336:	_, err := w.Write(serialize(p))
go tool pprof -source_path=/snap/go/current/src/ dist/parca-agent allocs.pprof
ROUTINE ======================== compress/gzip.(*Writer).Write in compress/gzip/gzip.go
         0     1.98GB (flat, cum) 40.10% of Total
         .          .    163:			z.buf[8] = 2
         .          .    164:		} else if z.level == BestSpeed {
         .          .    165:			z.buf[8] = 4
         .          .    166:		}
         .          .    167:		z.buf[9] = z.OS
         .   512.03kB    168:		_, z.err = z.w.Write(z.buf[:10])
         .          .    169:		if z.err != nil {
         .          .    170:			return 0, z.err
         .          .    171:		}
         .          .    172:		if z.Extra != nil {
         .          .    173:			z.err = z.writeBytes(z.Extra)
         .          .    174:			if z.err != nil {
         .          .    175:				return 0, z.err
         .          .    176:			}
         .          .    177:		}
         .          .    178:		if z.Name != "" {
         .          .    179:			z.err = z.writeString(z.Name)
         .          .    180:			if z.err != nil {
         .          .    181:				return 0, z.err
         .          .    182:			}
         .          .    183:		}
         .          .    184:		if z.Comment != "" {
         .          .    185:			z.err = z.writeString(z.Comment)
         .          .    186:			if z.err != nil {
         .          .    187:				return 0, z.err
         .          .    188:			}
         .          .    189:		}
         .          .    190:		if z.compressor == nil {
         .     1.98GB    191:			z.compressor, _ = flate.NewWriter(z.w, z.level)
         .          .    192:		}
         .          .    193:	}
         .          .    194:	z.size += uint32(len(p))
         .          .    195:	z.digest = crc32.Update(z.digest, crc32.IEEETable, p)
         .          .    196:	n, z.err = z.compressor.Write(p)
ROUTINE ======================== compress/flate.NewWriter in compress/flate/deflate.go
    1.64GB     1.98GB (flat, cum) 40.09% of Total
         .          .    665:// compression efficiency.
         .          .    666://
         .          .    667:// If level is in the range [-2, 9] then the error returned will be nil.
         .          .    668:// Otherwise the error returned will be non-nil.
         .          .    669:func NewWriter(w io.Writer, level int) (*Writer, error) {
    1.64GB     1.64GB    670:	var dw Writer
         .   347.72MB    671:	if err := dw.d.init(w, level); err != nil {
         .          .    672:		return nil, err
         .          .    673:	}
         .          .    674:	return &dw, nil
         .          .    675:}

allocs.pprof.zip

@javierhonduco
Copy link
Contributor

I can't find the constant that you're talking about, how much memory would we be talking about?

It's not implemented yet, was suggesting going for this approach in the pseudocode above

@marselester
Copy link
Contributor Author

I couldn't find setUnwindTable in the profile, I guess a benchmark is needed.

BTW, we can get rid of bytes.NewBuffer(stackBytes) allocations as well if it matters.

ROUTINE ======================== github.com/parca-dev/parca-agent/pkg/profiler/cpu.(*bpfMaps).readKernelStack in pkg/profiler/cpu/maps.go
         0        4MB (flat, cum) 0.079% of Total
         .          .    202:func (m *bpfMaps) readKernelStack(kernelStackID int32, stack *combinedStack) error {
         .          .    203:	if kernelStackID == 0 {
         .          .    204:		return errUnwindFailed
         .          .    205:	}
         .          .    206:
         .     1.50MB    207:	stackBytes, err := m.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
         .          .    208:	if err != nil {
         .          .    209:		return fmt.Errorf("read kernel stack trace, %v: %w", err, errMissing)
         .          .    210:	}
         .          .    211:
         .     2.50MB    212:	if err := binary.Read(bytes.NewBuffer(stackBytes), m.byteOrder, stack[stackDepth:]); err != nil {
         .          .    213:		return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
         .          .    214:	}
         .          .    215:
         .          .    216:	return nil
         .          .    217:}
ROUTINE ======================== github.com/parca-dev/parca-agent/pkg/profiler/cpu.(*bpfMaps).readUserStack in pkg/profiler/cpu/maps.go
         0     7.51MB (flat, cum)  0.15% of Total
         .          .    152:func (m *bpfMaps) readUserStack(userStackID int32, stack *combinedStack) error {
         .          .    153:	if userStackID == 0 {
         .          .    154:		return errUnwindFailed
         .          .    155:	}
         .          .    156:
         .     2.50MB    157:	stackBytes, err := m.stackTraces.GetValue(unsafe.Pointer(&userStackID))
         .          .    158:	if err != nil {
         .          .    159:		return fmt.Errorf("read user stack trace, %v: %w", err, errMissing)
         .          .    160:	}
         .          .    161:
         .        5MB    162:	if err := binary.Read(bytes.NewBuffer(stackBytes), m.byteOrder, stack[:stackDepth]); err != nil {
         .          .    163:		return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
         .          .    164:	}
         .          .    165:
         .          .    166:	return nil
         .          .    167:}

@javierhonduco
Copy link
Contributor

I couldn't find setUnwindTable in the profile, I guess a benchmark is needed.

This code is gated under a feature flag, you would need --experimental-enable-dwarf-unwinding --debug-process-names=<process_name_regex>, but I would avoid profiling this for the next few days as we have some large changes that might move the costs from one place to another, and we could be optimising now for things that aren't worth it, or even worse, creating a regression once the new changes for this part of the code land!

1.98GB (flat, cum) 40.10% of Total is a rather large amount of allocations. Wondering how much CPU time we are spending there and how much pressure that's putting on the GC!

That being said seems worth taking a look 😄

@marselester marselester force-pushed the reduce-profile-allocs branch from c0a28d2 to 624871b Compare November 22, 2022 18:00
@marselester marselester force-pushed the reduce-profile-allocs branch 4 times, most recently from 69f13c1 to 33f6195 Compare November 23, 2022 21:03
@marselester marselester marked this pull request as ready for review November 23, 2022 21:35
@marselester marselester requested a review from a team as a code owner November 23, 2022 21:35
@kakkoyun
Copy link
Member

Leaving the decision to merge @javierhonduco

Copy link
Contributor

@javierhonduco javierhonduco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but could you run an end to end tests locally? The past PRs include some examples on how to test it under the "Test Plan" section https://github.com/parca-dev/parca-agent/pulls?q=is%3Apr+is%3Aclosed+eh_frame

pkg/profiler/cpu/maps.go Outdated Show resolved Hide resolved
@marselester marselester force-pushed the reduce-profile-allocs branch from f4d6277 to 0310530 Compare November 29, 2022 15:31
@marselester
Copy link
Contributor Author

I wrote a benchmark for setUnwindTable() and results show improvements, but the numbers look huge overall, e.g., 6.84MB (new) and 14.97MB (old) allocs per call.

# new code
BenchmarkBPF-2   	      33	  35094869 ns/op	 6841518 B/op	 1729045 allocs/op

# old code
BenchmarkBPF-2   	      32	  38474101 ns/op	14975921 B/op	 1729064 allocs/op
func BenchmarkBPF(b *testing.B) {
	m, err := bpf.NewModuleFromBufferArgs(bpf.NewModuleArgs{
		BPFObjBuff: bpfObj,
		BPFObjName: "parca",
	})
	require.NoError(b, err)
	err = m.BPFLoadObject()
	require.NoError(b, err)
	b.Cleanup(m.Close)

	bpfMaps, err := initializeMaps(m, byteorder.GetHostByteOrder())
	require.NoError(b, err)

	pid := 1
	tb := unwind.NewUnwindTableBuilder(log.NewNopLogger())
	pt, err := tb.UnwindTableForPid(pid)
	require.NoError(b, err)

	b.ResetTimer()
	b.ReportAllocs()
	for n := 0; n < b.N; n++ {
		err = bpfMaps.setUnwindTable(pid, pt)
		require.NoError(b, err)
	}
}

@kakkoyun
Copy link
Member

I wrote a benchmark for setUnwindTable() and results show improvements, but the numbers look huge overall, e.g., 6.84MB (new) and 14.97MB (old) allocs per call.

# new code
BenchmarkBPF-2   	      33	  35094869 ns/op	 6841518 B/op	 1729045 allocs/op

# old code
BenchmarkBPF-2   	      32	  38474101 ns/op	14975921 B/op	 1729064 allocs/op

This is expected. We plan to reduce calls to this drastically. Maybe we should first wait for that PR to land.

@javierhonduco
Copy link
Contributor

This is expected, as Kemal mentioned we are actively working on this area and will be publishing the changes over the next few weeks.

We added a simple table generation benchmark to keep track of performance changes over time, as definitely there's a lot of low-hanging fruit. We mostly prioritised correctness over performance until now! 😄

That being said, I think your change to add the sync.Pool is still valuable and I'll be happy to merge it. Once we merge all the changes we have planned for table generation, I am more than happy to let you know if you are interested in helping us to optimise it further

Just to give you a taste of the changes I am working on (will open a PR in the next few weeks), this is how things are looking now. There's still a lot of performance left on the table, though!

[javierhonduco@fedora parca-agent]$ go test -v github.com/parca-dev/parca-agent/pkg/stack/unwind -bench=.
=== RUN   TestBuildUnwindTable
--- PASS: TestBuildUnwindTable (0.00s)
goos: linux
goarch: amd64
pkg: github.com/parca-dev/parca-agent/pkg/stack/unwind
cpu: Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz
BenchmarkParsingLibcDwarfUnwindInformation
BenchmarkParsingLibcDwarfUnwindInformation-12                163           9547304 ns/op         5753089 B/op      84624 allocs/op
PASS
ok      github.com/parca-dev/parca-agent/pkg/stack/unwind       3.438s

@marselester
Copy link
Contributor Author

Awesome, thank you for the explanation!

@marselester marselester force-pushed the reduce-profile-allocs branch from 0310530 to 4fb6b0b Compare December 1, 2022 20:53
@marselester marselester force-pushed the reduce-profile-allocs branch from 4fb6b0b to 01f2497 Compare December 12, 2022 20:30
@kakkoyun
Copy link
Member

kakkoyun commented Jan 5, 2023

@javierhonduco any thoughts on this?

@javierhonduco
Copy link
Contributor

javierhonduco commented Feb 1, 2023

Hi @marselester! Sorry for the delay! This code has significantly changed now. I have checked some memory profiles and I think the cost has shifted a little bit.

If it's ok we can maybe close this PR and if you are interested, I am more than happy in a few days to point you to other areas where we need to reduce allocations / CPU cycles 😄

@marselester
Copy link
Contributor Author

@javierhonduco no worries :) I would appreciate the pointers for sure!

@marselester marselester closed this Feb 1, 2023
@javierhonduco
Copy link
Contributor

@marselester sure thing, happy to get back to you next week :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants