Skip to content

Commit

Permalink
call plugin in parallel (#613)
Browse files Browse the repository at this point in the history
* call plugin in parallel

* remove lock
  • Loading branch information
yuyang0 authored Sep 21, 2023
1 parent a29fe9b commit fb44089
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions resource/cobalt/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,41 @@ package cobalt

import (
"context"
"sync"

"github.com/cockroachdb/errors"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resource/plugins"
)

func call[T any](ctx context.Context, ps []plugins.Plugin, f func(plugins.Plugin) (T, error)) (map[plugins.Plugin]T, error) {
// TODO 并行化,意义不大
var wg sync.WaitGroup
var combinedErr error
results := map[plugins.Plugin]T{}

var results sync.Map
for _, p := range ps {
result, err := f(p)
if err != nil {
log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name())
combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name()))
continue
}
results[p] = result
}
wg.Add(1)
go func(p plugins.Plugin) {
defer wg.Done()

return results, combinedErr
result, err := f(p)
if err != nil {
log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name())
results.Store(p, err)
return
}
results.Store(p, result)
}(p)
}
wg.Wait()
ans := make(map[plugins.Plugin]T)
results.Range(func(key, value any) bool {
switch vt := value.(type) {
case error:
combinedErr = errors.CombineErrors(combinedErr, vt)
case T:
ans[key.(plugins.Plugin)] = vt
}
return true
})
return ans, combinedErr
}

0 comments on commit fb44089

Please sign in to comment.