-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
edbca99
commit 6c64f84
Showing
1 changed file
with
94 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,100 @@ go get github.com/invisiblefunnel/egr | |
## Example | ||
|
||
```go | ||
# TODO | ||
package main | ||
|
||
import ( | ||
"context" | ||
"crypto/md5" | ||
"fmt" | ||
"log" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/invisiblefunnel/egr" | ||
) | ||
|
||
// Let's rewrite the pipeline example from the errgroup docs. | ||
// https://pkg.go.dev/golang.org/x/[email protected]/errgroup#example-Group-Pipeline | ||
func main() { | ||
m, err := MD5All(context.Background(), ".") | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
for k, sum := range m { | ||
fmt.Printf("%s:\t%x\n", k, sum) | ||
} | ||
} | ||
|
||
type result struct { | ||
path string | ||
sum [md5.Size]byte | ||
} | ||
|
||
// MD5All reads all the files in the file tree rooted at root and returns a map | ||
// from file path to the MD5 sum of the file's contents. If the directory walk | ||
// fails or any read operation fails, MD5All returns an error. | ||
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { | ||
queueSize := 1 // choose based on workload | ||
numDigesters := 20 | ||
|
||
digesters, ctx := egr.WithContext[string](ctx, queueSize) | ||
collector, ctx := egr.WithContext[result](ctx, queueSize) | ||
collector.SetLimit(1) | ||
|
||
for i := 0; i < numDigesters; i++ { | ||
digesters.Go(func(queue <-chan string) error { | ||
for path := range queue { | ||
data, err := os.ReadFile(path) | ||
if err != nil { | ||
return err | ||
} | ||
err = collector.Push(ctx, result{path, md5.Sum(data)}) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
}) | ||
} | ||
|
||
m := make(map[string][md5.Size]byte) | ||
|
||
// Only launch one collector goroutine to update the map | ||
collector.Go(func(queue <-chan result) error { | ||
for r := range queue { | ||
m[r.path] = r.sum | ||
} | ||
return nil | ||
}) | ||
|
||
// Walk the directory and Push paths to the digesters queue | ||
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
if !info.Mode().IsRegular() { | ||
return nil | ||
} | ||
return digesters.Push(ctx, path) | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Wait for digesters to finish | ||
if err := digesters.Wait(); err != nil { | ||
return nil, err | ||
} | ||
|
||
// Wait for collector to finish | ||
if err := collector.Wait(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return m, nil | ||
} | ||
``` | ||
|
||
## License | ||
|