Skip to content

Commit

Permalink
Move bufferedfilewriter to own pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrav committed Jan 17, 2024
1 parent 7d2452a commit 0802a62
Show file tree
Hide file tree
Showing 5 changed files with 407 additions and 115 deletions.
122 changes: 24 additions & 98 deletions pkg/gitparse/gitparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

"github.com/go-logr/logr"

"github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp"
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffered_file_writer"
)

const (
Expand All @@ -42,87 +42,46 @@ type Commit struct {

// Diff contains the info about a file diff in a commit.
type Diff struct {
PathB string
LineStart int
Content bytes.Buffer // Keep in-memory buffer for smaller diffs
streamDestination *os.File // File destination for larger diffs
IsBinary bool
threshold int // Size threshold to switch to file
PathB string
LineStart int
// Used to keep small diffs in memory and larger diffs in a file.
contentWriter *bufferedfilewriter.BufferedFileWriter
IsBinary bool
}

type diffOption func(*Diff)

// withPathB sets the PathB option.
func withPathB(pathB string) diffOption { return func(d *Diff) { d.PathB = pathB } }

// withThreshold sets the threshold option.
// TODO: Leverage this option in the future.
func withThreshold(threshold int) diffOption { return func(d *Diff) { d.threshold = threshold } }

// NewDiff creates a new Diff with a threshold.
func NewDiff(opts ...diffOption) *Diff {
const defaultThreshold = 20 * 1024 * 1024 // 20MB
d := &Diff{threshold: defaultThreshold}
d := new(Diff)
for _, opt := range opts {
opt(d)
}
d.contentWriter = bufferedfilewriter.New(bufferedfilewriter.WithThreshold(defaultThreshold))

return d
}

// write handles writing diff data to either an in-memory buffer or a file, depending on the size.
func (d *Diff) write(ctx context.Context, p []byte) error {
if d.Content.Len()+len(p) <= d.threshold {
// If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info(
"writing to buffer",
"data_size", len(p),
"content_size", d.Content.Len(),
)
_, err := d.Content.Write(p)
return err
}
// Switch to file writing if threshold is exceeded.
// This helps in managing memory efficiently for large diffs.
if d.streamDestination == nil {
var err error
d.streamDestination, err = os.CreateTemp(os.TempDir(), cleantemp.MkFilename())
if err != nil {
return err
}
// Len returns the length of the storage.
func (d *Diff) Len() int { return d.contentWriter.Len() }

// Transfer existing data in buffer to the file, then clear the buffer.
// This ensures all the diff data is in one place - either entirely in the buffer or the file.
if d.Content.Len() > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", d.Content.Len())
if _, err := d.streamDestination.Write(d.Content.Bytes()); err != nil {
return err
}
// Replace the buffer with a new one to free up memory.
d.Content = bytes.Buffer{}
}
}
ctx.Logger().V(4).Info("writing to file", "data_size", len(p))
// ReadCloser returns a ReadCloser for the contentWriter.
func (d *Diff) ReadCloser() (io.ReadCloser, error) { return d.contentWriter.ReadCloser() }

_, err := d.streamDestination.Write(p)
// write delegates to the contentWriter.
func (d *Diff) write(ctx context.Context, p []byte) error {
_, err := d.contentWriter.Write(ctx, p)
return err
}

// finalize ensures proper closure of resources associated with the Diff.
// handle the final flush in the finalize method, in case there's data remaining in the buffer.
// This method should be called to release resources, especially when writing to a file.
func (d *Diff) finalize() error {
if d.streamDestination == nil {
return nil
}

if d.Content.Len() > 0 {
if _, err := d.streamDestination.Write(d.Content.Bytes()); err != nil {
return err
}
}
return d.streamDestination.Close()
}
func (d *Diff) finalize() error { return d.contentWriter.Close() }

// Parser sets values used in GitParse.
type Parser struct {
Expand All @@ -131,39 +90,6 @@ type Parser struct {
dateFormat string
}

// DiffContentReadCloser returns an io.ReadCloser for reading the content of a Diff.
// If the diff content size exceeds a predefined threshold, it is stored in a temporary file,
// and the function returns an auto-deleting file reader (newAutoDeletingFileReader) to read from this file.
// For smaller diffs that fit within the threshold, the content is kept in memory,
// and the function returns a no-op closer wrapper (noOpCloser) around a bytes.Reader.
// The caller is responsible for calling Close on the returned io.ReadCloser in both cases.
func DiffContentReadCloser(d *Diff) (io.ReadCloser, error) {
if d.streamDestination != nil {
// Data is in a file, read from the file.
file, err := os.Open(d.streamDestination.Name())
if err != nil {
return nil, err
}
return newAutoDeletingFileReader(file), nil
}
// Data is in memory.
return io.NopCloser(bytes.NewReader(d.Content.Bytes())), nil
}

// autoDeletingFileReader wraps an *os.File and deletes the file on Close
type autoDeletingFileReader struct{ *os.File }

// newAutoDeletingFileReader creates a new autoDeletingFileReader
func newAutoDeletingFileReader(file *os.File) *autoDeletingFileReader {
return &autoDeletingFileReader{File: file}
}

// Close implements the io.Closer interface, deletes the file after closing
func (r *autoDeletingFileReader) Close() error {
defer os.Remove(r.Name()) // Delete the file after closing
return r.File.Close()
}

type ParseState int

const (
Expand Down Expand Up @@ -263,7 +189,7 @@ func (c1 *Commit) Equal(c2 *Commit) bool {
return false
case d1.LineStart != d2.LineStart:
return false
case d1.Content.String() != d2.Content.String():
case d1.contentWriter.String() != d2.contentWriter.String():
return false
case d1.IsBinary != d2.IsBinary:
return false
Expand Down Expand Up @@ -388,9 +314,9 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
latestState = CommitLine

// If there is a currentDiff, add it to currentCommit.
if currentDiff.Content.Len() > 0 || currentDiff.IsBinary {
if currentDiff.Len() > 0 || currentDiff.IsBinary {
currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff)
currentCommit.Size += currentDiff.Content.Len()
currentCommit.Size += currentDiff.Len()
}
// If there is a currentCommit, send it to the channel.
if currentCommit != nil {
Expand Down Expand Up @@ -438,15 +364,15 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
if currentCommit == nil {
currentCommit = &Commit{}
}
if currentDiff.Content.Len() > 0 || currentDiff.IsBinary {
if currentDiff.Len() > 0 || currentDiff.IsBinary {
currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff)
if err := currentDiff.finalize(); err != nil {
ctx.Logger().Error(err, "failed to finalize diff")
}
// If the currentDiff is over 1GB, drop it into the channel so it isn't held in memory waiting for more commits.
totalSize := 0
for _, diff := range currentCommit.Diffs {
totalSize += diff.Content.Len()
totalSize += diff.Len()
}
if totalSize > c.maxCommitSize {
oldCommit := currentCommit
Expand Down Expand Up @@ -490,7 +416,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
case isHunkLineNumberLine(isStaged, latestState, line):
latestState = HunkLineNumberLine

if currentDiff.Content.Len() > 0 || currentDiff.IsBinary {
if currentDiff.Len() > 0 || currentDiff.IsBinary {
currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff)
}
currentDiff = NewDiff(withPathB(currentDiff.PathB))
Expand Down Expand Up @@ -549,7 +475,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
latestState = ParseFailure
}

if currentDiff.Content.Len() > c.maxDiffSize {
if currentDiff.Len() > c.maxDiffSize {
ctx.Logger().V(2).Info(fmt.Sprintf(
"Diff for %s exceeded MaxDiffSize(%d)", currentDiff.PathB, c.maxDiffSize,
))
Expand Down Expand Up @@ -828,7 +754,7 @@ func isCommitSeparatorLine(isStaged bool, latestState ParseState, line []byte) b

func cleanupParse(currentCommit *Commit, currentDiff *Diff, commitChan chan Commit, totalLogSize *int) {
// Ignore empty or binary diffs (this condition may be redundant).
if currentDiff != nil && (currentDiff.Content.Len() > 0 || currentDiff.IsBinary) {
if currentDiff != nil && (currentDiff.Len() > 0 || currentDiff.IsBinary) {
currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff)
}
if currentCommit != nil {
Expand Down
22 changes: 11 additions & 11 deletions pkg/gitparse/gitparse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ index 090c6ba6..38d67dd2 100644
--- a/pkg/engine/engine.go
+++ b/pkg/engine/engine.go
@@ -165,7 +165,7 @@ func Start(ctx context.Context, options ...EngineOption) *Engine {
seenDetectors := make(map[config.DetectorID]struct{}, len(dets))
seenDetectors := make(map[config.DetectorID]struct{}, Len(dets))
for _, det := range dets {
id := config.GetDetectorID(det)
- if _, ok := seenDetectors[id]; ok {
Expand Down Expand Up @@ -741,16 +741,16 @@ func TestIndividualCommitParsing(t *testing.T) {
}
j++
}
//for _, pass := range test.passes {
// for _, pass := range test.passes {
// if !test.function(false, pass.latestState, pass.line) {
// t.Errorf("%s: Parser did not recognize correct line. (%s)", name, string(pass.line))
// }
//}
//for _, fail := range test.fails {
// }
// for _, fail := range test.fails {
// if test.function(false, fail.latestState, fail.line) {
// t.Errorf("%s: Parser did not recognize incorrect line. (%s)", name, string(fail.line))
// }
//}
// }
}
}

Expand Down Expand Up @@ -802,24 +802,24 @@ func TestStagedDiffParsing(t *testing.T) {
Content: *bytes.NewBuffer([]byte("The Nameless is the origin of Heaven and Earth;\nThe named is the mother of all things.\n\nTherefore let there always be non-being,\n so we may see their subtlety,\nAnd let there always be being,\n so we may see their outcome.\nThe two are the same,\nBut after they are produced,\n they have different names.\nThey both may be called deep and profound.\nDeeper and more profound,\nThe door of all subtleties!\n")),

Check failure on line 802 in pkg/gitparse/gitparse_test.go

View workflow job for this annotation

GitHub Actions / lint

unknown field Content in struct literal of type struct{PathB string; LineStart int; contentWriter *bufferedfilewriter.BufferedFileWriter; IsBinary bool}

Check failure on line 802 in pkg/gitparse/gitparse_test.go

View workflow job for this annotation

GitHub Actions / test

unknown field Content in struct literal of type struct{PathB string; LineStart int; contentWriter *bufferedfilewriter.BufferedFileWriter; IsBinary bool}
IsBinary: false,
},
//{
// {
// PathB: "",
// LineStart: 0,
// Content: *bytes.NewBuffer([]byte("\n")),
// IsBinary: false,
//},
//{
// },
// {
// PathB: "",
// LineStart: 0,
// Content: *bytes.NewBuffer([]byte("\n")),
// IsBinary: false,
//},
//{
// },
// {
// PathB: "",
// LineStart: 0,
// Content: *bytes.NewBuffer([]byte("\n")),
// IsBinary: false,
//},
// },
},
},
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/sources/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,22 +531,22 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string
continue
}

if diff.Content.Len() > sources.ChunkSize+sources.PeekSize {
if diff.Len() > sources.ChunkSize+sources.PeekSize {
s.gitChunk(ctx, &diff, fileName, email, hash, when, remoteURL, reporter)
continue
}

chunkData := func(d *gitparse.Diff) error {
metadata := s.sourceMetadataFunc(fileName, email, hash, when, remoteURL, int64(diff.LineStart))

reader, err := gitparse.DiffContentReadCloser(d)
reader, err := diff.ReadCloser()
if err != nil {
ctx.Logger().Error(err, "error creating reader for commits", "filename", fileName, "commit", hash, "file", diff.PathB)
return nil
}
defer reader.Close()

data := make([]byte, diff.Content.Len())
data := make([]byte, diff.Len())
if _, err := reader.Read(data); err != nil {
ctx.Logger().Error(err, "error reading diff content for commit", "filename", fileName, "commit", hash, "file", diff.PathB)
return nil
Expand All @@ -571,7 +571,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string
}

func (s *Git) gitChunk(ctx context.Context, diff *gitparse.Diff, fileName, email, hash, when, urlMetadata string, reporter sources.ChunkReporter) {
reader, err := gitparse.DiffContentReadCloser(diff)
reader, err := diff.ReadCloser()
if err != nil {
ctx.Logger().Error(err, "error creating reader for chunk", "filename", fileName, "commit", hash, "file", diff.PathB)
return
Expand Down Expand Up @@ -723,14 +723,14 @@ func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string,
chunkData := func(d *gitparse.Diff) error {
metadata := s.sourceMetadataFunc(fileName, email, "Staged", when, urlMetadata, int64(diff.LineStart))

reader, err := gitparse.DiffContentReadCloser(d)
reader, err := diff.ReadCloser()
if err != nil {
ctx.Logger().Error(err, "error creating reader for staged", "filename", fileName, "commit", hash, "file", diff.PathB)
return nil
}
defer reader.Close()

data := make([]byte, diff.Content.Len())
data := make([]byte, diff.Len())
if _, err := reader.Read(data); err != nil {
ctx.Logger().Error(err, "error reading diff content for staged", "filename", fileName, "commit", hash, "file", diff.PathB)
return nil
Expand Down
Loading

0 comments on commit 0802a62

Please sign in to comment.