From 92c95290de2f935e8f849216010459fda453c8ec Mon Sep 17 00:00:00 2001 From: YuYang Date: Thu, 21 Sep 2023 15:22:34 +0800 Subject: [PATCH 1/2] call plugin in parallel --- resource/cobalt/call.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/resource/cobalt/call.go b/resource/cobalt/call.go index f49e0ebfd..3e20354b3 100644 --- a/resource/cobalt/call.go +++ b/resource/cobalt/call.go @@ -2,6 +2,7 @@ package cobalt import ( "context" + "sync" "github.com/cockroachdb/errors" "github.com/projecteru2/core/log" @@ -9,19 +10,28 @@ import ( ) func call[T any](ctx context.Context, ps []plugins.Plugin, f func(plugins.Plugin) (T, error)) (map[plugins.Plugin]T, error) { - // TODO 并行化,意义不大 + var mu sync.Mutex + var wg sync.WaitGroup var combinedErr error results := map[plugins.Plugin]T{} for _, p := range ps { - result, err := f(p) - if err != nil { - log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name()) - combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name())) - continue - } - results[p] = result - } + wg.Add(1) + go func(p plugins.Plugin) { + defer wg.Done() + + result, err := f(p) + mu.Lock() + defer mu.Unlock() + if err != nil { + log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name()) + combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name())) + return + } + results[p] = result + }(p) + } + wg.Wait() return results, combinedErr } From 002c5100915089225fd1f8e9ea662dca4aaca4fc Mon Sep 17 00:00:00 2001 From: YuYang Date: Thu, 21 Sep 2023 21:54:38 +0800 Subject: [PATCH 2/2] remove lock --- resource/cobalt/call.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/resource/cobalt/call.go b/resource/cobalt/call.go index 3e20354b3..668c0b9f1 100644 --- a/resource/cobalt/call.go +++ b/resource/cobalt/call.go @@ -10,28 +10,33 @@ import ( ) func call[T any](ctx context.Context, ps []plugins.Plugin, f func(plugins.Plugin) (T, error)) (map[plugins.Plugin]T, error) { - var mu sync.Mutex var wg sync.WaitGroup var combinedErr error - results := map[plugins.Plugin]T{} - + var results sync.Map for _, p := range ps { wg.Add(1) go func(p plugins.Plugin) { defer wg.Done() result, err := f(p) - mu.Lock() - defer mu.Unlock() - if err != nil { log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name()) - combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name())) + results.Store(p, err) return } - results[p] = result + results.Store(p, result) }(p) } wg.Wait() - return results, combinedErr + ans := make(map[plugins.Plugin]T) + results.Range(func(key, value any) bool { + switch vt := value.(type) { + case error: + combinedErr = errors.CombineErrors(combinedErr, vt) + case T: + ans[key.(plugins.Plugin)] = vt + } + return true + }) + return ans, combinedErr }