Skip to content

Commit

Permalink
Concurrency for traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
fishi0x01 committed Feb 23, 2021
1 parent 1626231 commit a649e46
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
31 changes: 23 additions & 8 deletions cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
"path/filepath"
"strings"
"sync"

"github.com/fishi0x01/vsh/client"
"github.com/fishi0x01/vsh/log"
Expand Down Expand Up @@ -61,17 +62,31 @@ func cmdPath(pwd string, arg string) (result string) {
return result
}

var numWorkers = 5

func runCommandWithTraverseTwoPaths(client *client.Client, source string, target string, f func(string, string) error) {
c := make(chan string, numWorkers)
source = filepath.Clean(source) // remove potential trailing '/'
for _, path := range client.Traverse(source) {
target := strings.Replace(path, source, target, 1)
err := f(path, target)
if err != nil {
return
}
go client.Traverse(source, c)
var wg sync.WaitGroup
for t := 0; t < numWorkers; t++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
path, ok := <-c
if !ok {
return
}
target := strings.Replace(path, source, target, 1)
err := f(path, target)
if err != nil {
return
}
}
}()
}

return
wg.Wait()
}

func transportSecrets(c *client.Client, source string, target string, transport func(string, string) error) int {
Expand Down
4 changes: 3 additions & 1 deletion cli/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func (cmd *RemoveCommand) Run() int {
case client.LEAF:
cmd.removeSecret(newPwd)
case client.NODE:
for _, path := range cmd.client.Traverse(newPwd) {
c := make(chan string, 10)
go cmd.client.Traverse(newPwd, c)
for path := range c {
err := cmd.removeSecret(path)
if err != nil {
return 1
Expand Down
30 changes: 18 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,30 +153,36 @@ func (client *Client) GetType(absolutePath string) (kind PathKind) {
return kind
}

// Traverse traverses given absolutePath via DFS and returns sub-paths in array
func (client *Client) Traverse(absolutePath string) (paths []string) {
// Traverse traverses given absolutePath via DFS and pushes paths to given channel
func (client *Client) Traverse(absolutePath string, c chan<- string) {
defer close(c)
if client.isTopLevelPath(absolutePath) {
paths = client.topLevelTraverse()
client.topLevelTraverse(c)
} else {
paths = client.lowLevelTraverse(normalizedVaultPath(absolutePath))
client.lowLevelTraverse(normalizedVaultPath(absolutePath), c)
}

return paths
}

// SubpathsForPath will return an array of absolute paths at or below path
func (client *Client) SubpathsForPath(path string) (filePaths []string, err error) {
func (client *Client) SubpathsForPath(path string) (result []string, err error) {
switch t := client.GetType(path); t {
case LEAF:
filePaths = append(filePaths, filepath.Clean(path))
result = []string{filepath.Clean(path)}
case NODE:
for _, traversedPath := range client.Traverse(path) {
filePaths = append(filePaths, traversedPath)
c := make(chan string, 10)
go client.Traverse(path, c)
// TODO: this is currently fully sequential to keep old behavior
for {
p, ok := <-c
if !ok {
break
}
result = append(result, p)
}
default:
return filePaths, fmt.Errorf("Not a valid path for operation: %s", path)
err = fmt.Errorf("Not a valid path for operation: %s", path)
}
return filePaths, nil
return result, err
}

// ClearCache clears the list cache
Expand Down
15 changes: 6 additions & 9 deletions client/traverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"strings"
)

func (client *Client) topLevelTraverse() (result []string) {
func (client *Client) topLevelTraverse(c chan<- string) {
for k := range client.KVBackends {
result = append(result, k)
c <- k
}

return result
}

func (client *Client) lowLevelTraverse(path string) (result []string) {
func (client *Client) lowLevelTraverse(path string, c chan<- string) {
s, err := client.cache.List(client.getKVMetaDataPath(path))
if err != nil {
log.AppTrace("%+v", err)
Expand All @@ -27,17 +25,16 @@ func (client *Client) lowLevelTraverse(path string) (result []string) {
// prevent ambiguous dir/file to be added twice
if strings.HasSuffix(val, "/") {
// dir
result = append(result, client.lowLevelTraverse(path+"/"+val)...)
client.lowLevelTraverse(path+"/"+val, c)
} else {
// file
leaf := strings.ReplaceAll("/"+path+"/"+val, "//", "/")
result = append(result, leaf)
c <- leaf
}
}
}
} else {
leaf := strings.ReplaceAll("/"+path, "//", "/")
result = append(result, leaf)
c <- leaf
}
return result
}

0 comments on commit a649e46

Please sign in to comment.