From fb440891a3388fc8be53b9989585b5d5f9a257f4 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Thu, 21 Sep 2023 22:17:07 +0800 Subject: [PATCH] call plugin in parallel (#613) * call plugin in parallel * remove lock --- resource/cobalt/call.go | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/resource/cobalt/call.go b/resource/cobalt/call.go index f49e0ebfd..668c0b9f1 100644 --- a/resource/cobalt/call.go +++ b/resource/cobalt/call.go @@ -2,6 +2,7 @@ package cobalt import ( "context" + "sync" "github.com/cockroachdb/errors" "github.com/projecteru2/core/log" @@ -9,19 +10,33 @@ import ( ) func call[T any](ctx context.Context, ps []plugins.Plugin, f func(plugins.Plugin) (T, error)) (map[plugins.Plugin]T, error) { - // TODO 并行化,意义不大 + var wg sync.WaitGroup var combinedErr error - results := map[plugins.Plugin]T{} - + var results sync.Map for _, p := range ps { - result, err := f(p) - if err != nil { - log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name()) - combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name())) - continue - } - results[p] = result - } + wg.Add(1) + go func(p plugins.Plugin) { + defer wg.Done() - return results, combinedErr + result, err := f(p) + if err != nil { + log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name()) + results.Store(p, err) + return + } + results.Store(p, result) + }(p) + } + wg.Wait() + ans := make(map[plugins.Plugin]T) + results.Range(func(key, value any) bool { + switch vt := value.(type) { + case error: + combinedErr = errors.CombineErrors(combinedErr, vt) + case T: + ans[key.(plugins.Plugin)] = vt + } + return true + }) + return ans, combinedErr }