From f2f13b2dc0d00e886393c89d0610f3ccf6b31daa Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 14 Jan 2024 20:52:02 -0800 Subject: [PATCH] Write large diffs to tmp files --- pkg/gitparse/gitparse.go | 180 ++++++++++++++++++++++++++++++++------- pkg/sources/git/git.go | 94 ++++++++++++++------ 2 files changed, 217 insertions(+), 57 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index dcd544c85531..5b3049248030 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "fmt" - "github.com/go-logr/logr" "io" "os" "os/exec" @@ -13,6 +12,9 @@ import ( "strings" "time" + "github.com/go-logr/logr" + + "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" ) @@ -40,10 +42,86 @@ type Commit struct { // Diff contains the info about a file diff in a commit. type Diff struct { - PathB string - LineStart int - Content bytes.Buffer - IsBinary bool + PathB string + LineStart int + Content bytes.Buffer // Keep in-memory buffer for smaller diffs + streamDestination *os.File // File destination for larger diffs + IsBinary bool + threshold int // Size threshold to switch to file +} + +type diffOption func(*Diff) + +// withPathB sets the PathB option. +func withPathB(pathB string) diffOption { return func(d *Diff) { d.PathB = pathB } } + +// withThreshold sets the threshold option. +// TODO: Leverage this option in the future. +func withThreshold(threshold int) diffOption { return func(d *Diff) { d.threshold = threshold } } + +// NewDiff creates a new Diff with a threshold. +func NewDiff(opts ...diffOption) *Diff { + const defaultThreshold = 20 * 1024 * 1024 // 20MB + d := &Diff{threshold: defaultThreshold} + for _, opt := range opts { + opt(d) + } + + return d +} + +// write handles writing diff data to either an in-memory buffer or a file, depending on the size. +func (d *Diff) write(ctx context.Context, p []byte) error { + if d.Content.Len()+len(p) <= d.threshold { + // If the total size is within the threshold, write to the buffer. + ctx.Logger().V(4).Info( + "writing to buffer", + "data_size", len(p), + "content_size", d.Content.Len(), + ) + _, err := d.Content.Write(p) + return err + } + // Switch to file writing if threshold is exceeded. + // This helps in managing memory efficiently for large diffs. + if d.streamDestination == nil { + var err error + d.streamDestination, err = os.CreateTemp(os.TempDir(), cleantemp.MkFilename()) + if err != nil { + return err + } + + // Transfer existing data in buffer to the file, then clear the buffer. + // This ensures all the diff data is in one place - either entirely in the buffer or the file. + if d.Content.Len() > 0 { + ctx.Logger().V(4).Info("writing buffer to file", "content_size", d.Content.Len()) + if _, err := d.streamDestination.Write(d.Content.Bytes()); err != nil { + return err + } + // Replace the buffer with a new one to free up memory. + d.Content = bytes.Buffer{} + } + } + ctx.Logger().V(4).Info("writing to file", "data_size", len(p)) + + _, err := d.streamDestination.Write(p) + return err +} + +// finalize ensures proper closure of resources associated with the Diff. +// handle the final flush in the finalize method, in case there's data remaining in the buffer. +// This method should be called to release resources, especially when writing to a file. +func (d *Diff) finalize() error { + if d.streamDestination == nil { + return nil + } + + if d.Content.Len() > 0 { + if _, err := d.streamDestination.Write(d.Content.Bytes()); err != nil { + return err + } + } + return d.streamDestination.Close() } // Parser sets values used in GitParse. @@ -53,6 +131,51 @@ type Parser struct { dateFormat string } +// noOpCloser wraps an io.Reader to add a no-op Close method, forming an io.ReadCloser. +type noOpCloser struct{ io.Reader } + +// Close performs no operation (no-op) and returns nil. +// It's used to fulfill the io.Closer interface. +func (noc *noOpCloser) Close() error { return nil } + +// DiffContentReadCloser returns an io.ReadCloser for reading the content of a Diff. +// If the diff content size exceeds a predefined threshold, it is stored in a temporary file, +// and the function returns an auto-deleting file reader (newAutoDeletingFileReader) to read from this file. +// For smaller diffs that fit within the threshold, the content is kept in memory, +// and the function returns a no-op closer wrapper (noOpCloser) around a bytes.Reader. +// The caller is responsible for calling Close on the returned io.ReadCloser in both cases. +func DiffContentReadCloser(d *Diff) (io.ReadCloser, error) { + if d.streamDestination != nil { + // Data is in a file, read from the file. + file, err := os.Open(d.streamDestination.Name()) + if err != nil { + return nil, err + } + return newAutoDeletingFileReader(file), nil + } + // Data is in memory. + return &noOpCloser{Reader: bytes.NewReader(d.Content.Bytes())}, nil +} + +// autoDeletingFileReader wraps an *os.File and deletes the file on Close +type autoDeletingFileReader struct{ file *os.File } + +// newAutoDeletingFileReader creates a new autoDeletingFileReader +func newAutoDeletingFileReader(file *os.File) *autoDeletingFileReader { + return &autoDeletingFileReader{file: file} +} + +// Read implements the io.Reader interface +func (r *autoDeletingFileReader) Read(p []byte) (int, error) { + return r.file.Read(p) +} + +// Close implements the io.Closer interface, deletes the file after closing +func (r *autoDeletingFileReader) Close() error { + defer os.Remove(r.file.Name()) // Delete the file after closing + return r.file.Close() +} + type ParseState int const ( @@ -254,11 +377,11 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch outReader := bufio.NewReader(stdOut) var ( currentCommit *Commit - currentDiff Diff totalLogSize int ) var latestState = Initial + currentDiff := NewDiff() defer common.RecoverWithExit(ctx) defer close(commitChan) @@ -278,19 +401,20 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch // If there is a currentDiff, add it to currentCommit. if currentDiff.Content.Len() > 0 || currentDiff.IsBinary { - currentCommit.Diffs = append(currentCommit.Diffs, currentDiff) + currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff) currentCommit.Size += currentDiff.Content.Len() } // If there is a currentCommit, send it to the channel. if currentCommit != nil { + if err := currentDiff.finalize(); err != nil { + ctx.Logger().Error(err, "failed to finalize diff") + } commitChan <- *currentCommit totalLogSize += currentCommit.Size } // Create a new currentDiff and currentCommit - currentDiff = Diff{} - currentCommit = &Commit{ - Message: strings.Builder{}, - } + currentDiff = NewDiff() + currentCommit = &Commit{Message: strings.Builder{}} // Check that the commit line contains a hash and set it. if len(line) >= 47 { currentCommit.Hash = string(line[7:47]) @@ -327,7 +451,10 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch currentCommit = &Commit{} } if currentDiff.Content.Len() > 0 || currentDiff.IsBinary { - currentCommit.Diffs = append(currentCommit.Diffs, currentDiff) + currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff) + if err := currentDiff.finalize(); err != nil { + ctx.Logger().Error(err, "failed to finalize diff") + } // If the currentDiff is over 1GB, drop it into the channel so it isn't held in memory waiting for more commits. totalSize := 0 for _, diff := range currentCommit.Diffs { @@ -348,7 +475,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch currentCommit.Message.WriteString(oldCommit.Message.String()) } } - currentDiff = Diff{} + currentDiff = NewDiff() case isModeLine(isStaged, latestState, line): latestState = ModeLine // NoOp @@ -376,11 +503,9 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch latestState = HunkLineNumberLine if currentDiff.Content.Len() > 0 || currentDiff.IsBinary { - currentCommit.Diffs = append(currentCommit.Diffs, currentDiff) - } - currentDiff = Diff{ - PathB: currentDiff.PathB, + currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff) } + currentDiff = NewDiff(withPathB(currentDiff.PathB)) words := bytes.Split(line, []byte(" ")) if len(words) >= 3 { @@ -395,24 +520,21 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch latestState = HunkContentLine } // TODO: Why do we care about this? It creates empty lines in the diff. If there are no plusLines, it's just newlines. - currentDiff.Content.Write([]byte("\n")) + if err := currentDiff.write(ctx, []byte("\n")); err != nil { + ctx.Logger().Error(err, "failed to write to diff") + } case isHunkPlusLine(isStaged, latestState, line): if latestState != HunkContentLine { latestState = HunkContentLine } - currentDiff.Content.Write(line[1:]) - case isHunkMinusLine(isStaged, latestState, line): - if latestState != HunkContentLine { - latestState = HunkContentLine + if err := currentDiff.write(ctx, line[1:]); err != nil { + ctx.Logger().Error(err, "failed to write to diff") } // NoOp. We only care about additions. - case isHunkNewlineWarningLine(isStaged, latestState, line): - if latestState != HunkContentLine { - latestState = HunkContentLine - } - // NoOp - case isHunkEmptyLine(isStaged, latestState, line): + case isHunkMinusLine(isStaged, latestState, line), + isHunkNewlineWarningLine(isStaged, latestState, line), + isHunkEmptyLine(isStaged, latestState, line): if latestState != HunkContentLine { latestState = HunkContentLine } @@ -446,7 +568,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch break } } - cleanupParse(currentCommit, ¤tDiff, commitChan, &totalLogSize) + cleanupParse(currentCommit, currentDiff, commitChan, &totalLogSize) ctx.Logger().V(2).Info("finished parsing git log.", "total_log_size", totalLogSize) } diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 1237c904b251..81f96d68d6ab 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -18,13 +18,12 @@ import ( "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/object" "github.com/google/go-github/v42/github" + diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "golang.org/x/oauth2" "golang.org/x/sync/semaphore" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" - "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" @@ -500,6 +499,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string atomic.AddUint64(&s.metrics.commitsScanned, 1) logger.V(5).Info("scanning commit", "commit", commit.Hash) for _, diff := range commit.Diffs { + diff := diff if !scanOptions.Filter.Pass(diff.PathB) { continue } @@ -532,20 +532,37 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string } if diff.Content.Len() > sources.ChunkSize+sources.PeekSize { - s.gitChunk(ctx, diff, fileName, email, hash, when, remoteURL, reporter) + s.gitChunk(ctx, &diff, fileName, email, hash, when, remoteURL, reporter) continue } - metadata := s.sourceMetadataFunc(fileName, email, hash, when, remoteURL, int64(diff.LineStart)) - chunk := sources.Chunk{ - SourceName: s.sourceName, - SourceID: s.sourceID, - JobID: s.jobID, - SourceType: s.sourceType, - SourceMetadata: metadata, - Data: diff.Content.Bytes(), - Verify: s.verify, + + chunkData := func(d *gitparse.Diff) error { + metadata := s.sourceMetadataFunc(fileName, email, hash, when, remoteURL, int64(diff.LineStart)) + + reader, err := gitparse.DiffContentReadCloser(d) + if err != nil { + ctx.Logger().Error(err, "error creating reader for commits", "filename", fileName, "commit", hash, "file", diff.PathB) + return nil + } + defer reader.Close() + + data := make([]byte, diff.Content.Len()) + if _, err := reader.Read(data); err != nil { + ctx.Logger().Error(err, "error reading diff content for commit", "filename", fileName, "commit", hash, "file", diff.PathB) + return nil + } + chunk := sources.Chunk{ + SourceName: s.sourceName, + SourceID: s.sourceID, + JobID: s.jobID, + SourceType: s.sourceType, + SourceMetadata: metadata, + Data: data, + Verify: s.verify, + } + return reporter.ChunkOk(ctx, chunk) } - if err := reporter.ChunkOk(ctx, chunk); err != nil { + if err := chunkData(&diff); err != nil { return err } } @@ -553,8 +570,15 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string return nil } -func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, hash, when, urlMetadata string, reporter sources.ChunkReporter) { - originalChunk := bufio.NewScanner(&diff.Content) +func (s *Git) gitChunk(ctx context.Context, diff *gitparse.Diff, fileName, email, hash, when, urlMetadata string, reporter sources.ChunkReporter) { + reader, err := gitparse.DiffContentReadCloser(diff) + if err != nil { + ctx.Logger().Error(err, "error creating reader for chunk", "filename", fileName, "commit", hash, "file", diff.PathB) + return + } + defer reader.Close() + + originalChunk := bufio.NewScanner(reader) newChunkBuffer := bytes.Buffer{} lastOffset := 0 for offset := 0; originalChunk.Scan(); offset++ { @@ -646,6 +670,7 @@ func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string, ctx.Logger().V(1).Info("scanning staged changes", "path", path) for commit := range commitChan { for _, diff := range commit.Diffs { + diff := diff logger := ctx.Logger().WithValues("filename", diff.PathB, "commit", commit.Hash, "file", diff.PathB) logger.V(2).Info("scanning staged changes from git") @@ -695,17 +720,33 @@ func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string, continue } - metadata := s.sourceMetadataFunc(fileName, email, "Staged", when, urlMetadata, int64(diff.LineStart)) - chunk := sources.Chunk{ - SourceName: s.sourceName, - SourceID: s.sourceID, - JobID: s.jobID, - SourceType: s.sourceType, - SourceMetadata: metadata, - Data: diff.Content.Bytes(), - Verify: s.verify, + chunkData := func(d *gitparse.Diff) error { + metadata := s.sourceMetadataFunc(fileName, email, "Staged", when, urlMetadata, int64(diff.LineStart)) + + reader, err := gitparse.DiffContentReadCloser(d) + if err != nil { + ctx.Logger().Error(err, "error creating reader for staged", "filename", fileName, "commit", hash, "file", diff.PathB) + return nil + } + defer reader.Close() + + data := make([]byte, diff.Content.Len()) + if _, err := reader.Read(data); err != nil { + ctx.Logger().Error(err, "error reading diff content for staged", "filename", fileName, "commit", hash, "file", diff.PathB) + return nil + } + chunk := sources.Chunk{ + SourceName: s.sourceName, + SourceID: s.sourceID, + JobID: s.jobID, + SourceType: s.sourceType, + SourceMetadata: metadata, + Data: data, + Verify: s.verify, + } + return reporter.ChunkOk(ctx, chunk) } - if err := reporter.ChunkOk(ctx, chunk); err != nil { + if err := chunkData(&diff); err != nil { return err } } @@ -1040,9 +1081,6 @@ func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources. return err } defer func() { - if err := fileReader.Close(); err != nil { - ctx.Logger().Error(err, "error closing fileReader") - } if err := cmd.Wait(); err != nil { ctx.Logger().Error( err, "error waiting for command",