Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡ discover assets in parallel #4973

Merged
merged 14 commits into from
Dec 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
⚡ discover assets in parallel
Signed-off-by: Salim Afiune Maya <[email protected]>
afiune committed Dec 12, 2024
commit 8c989040693f91d4b216995182acafdea1ec5212
12 changes: 12 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -198,6 +198,18 @@
"shell", "ssh", "[email protected]",
],
},
{
"name": "scan github org",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to keep this in here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think it is useful for folks testing, specially now that we have https://github.com/hit-training.

I can remove it.

"type": "go",
"request": "launch",
"program": "${workspaceRoot}/apps/cnquery/cnquery.go",
"args": [
"scan",
"github",
"org", "hit-training",
"--log-level", "trace"
]
},
{
"name": "Configure Built-in Providers",
"type": "go",
61 changes: 40 additions & 21 deletions explorer/scan/discovery.go
Original file line number Diff line number Diff line change
@@ -6,11 +6,13 @@ package scan
import (
"context"
"errors"
"sync"
"time"

"github.com/rs/zerolog/log"
"go.mondoo.com/cnquery/v11/cli/config"
"go.mondoo.com/cnquery/v11/cli/execruntime"
"go.mondoo.com/cnquery/v11/internal/workerpool"
"go.mondoo.com/cnquery/v11/llx"
"go.mondoo.com/cnquery/v11/logger"
"go.mondoo.com/cnquery/v11/providers"
@@ -20,6 +22,9 @@ import (
"go.mondoo.com/cnquery/v11/providers-sdk/v1/upstream"
)

// number of parallel goroutines discovering assets
const workers = 10

type AssetWithRuntime struct {
Asset *inventory.Asset
Runtime *providers.Runtime
@@ -34,11 +39,15 @@ type DiscoveredAssets struct {
platformIds map[string]struct{}
Assets []*AssetWithRuntime
Errors []*AssetWithError
assetsLock sync.Mutex
}

// Add adds an asset and its runtime to the discovered assets list. It returns true if the
// asset has been added, false if it is a duplicate
func (d *DiscoveredAssets) Add(asset *inventory.Asset, runtime *providers.Runtime) bool {
d.assetsLock.Lock()
defer d.assetsLock.Unlock()

isDuplicate := false
for _, platformId := range asset.PlatformIds {
if _, ok := d.platformIds[platformId]; ok {
@@ -161,35 +170,45 @@ func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *i
return
}

pool := workerpool.New[bool](workers)
pool.Start()
defer pool.Close()

// for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset
for _, a := range rootAssetWithRuntime.Runtime.Provider.Connection.Inventory.Spec.Assets {
// create runtime for root asset
assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording)
if err != nil {
log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset")
discoveredAssets.AddError(a, err)
continue
}
pool.Submit(func() (bool, error) {
// create runtime for root asset
assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording)
if err != nil {
log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset")
discoveredAssets.AddError(a, err)
return false, err
}

// If no asset was returned and no error, then we observed a duplicate asset with a
// runtime that already exists.
if assetWithRuntime == nil {
continue
}
// If no asset was returned and no error, then we observed a duplicate asset with a
// runtime that already exists.
if assetWithRuntime == nil {
return false, nil
}

resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset
if len(resolvedAsset.PlatformIds) > 0 {
prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels)
resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset
if len(resolvedAsset.PlatformIds) > 0 {
prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels)

// If the asset has been already added, we should close its runtime
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
// If the asset has been already added, we should close its runtime
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
assetWithRuntime.Runtime.Close()
}
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
assetWithRuntime.Runtime.Close()
}
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
assetWithRuntime.Runtime.Close()
}
return true, nil
})
}

// Wait for the workers to finish processing
pool.Wait()
}

func createRuntimeForAsset(asset *inventory.Asset, upstream *upstream.UpstreamConfig, recording llx.Recording) (*AssetWithRuntime, error) {
18 changes: 12 additions & 6 deletions providers-sdk/v1/plugin/service.go
Original file line number Diff line number Diff line change
@@ -51,11 +51,8 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u
}
// ^^

s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()

// If a runtime with this ID already exists, then return that
if runtime, ok := s.runtimes[conf.Id]; ok {
if runtime, err := s.GetRuntime(conf.Id); err == nil {
return runtime, nil
}

@@ -66,18 +63,27 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u

if runtime.Connection != nil {
if parentId := runtime.Connection.ParentID(); parentId > 0 {
parentRuntime, err := s.doGetRuntime(parentId)
parentRuntime, err := s.GetRuntime(parentId)
if err != nil {
return nil, errors.New("parent connection " + strconv.FormatUint(uint64(parentId), 10) + " not found")
}
runtime.Resources = parentRuntime.Resources

}
}
s.runtimes[conf.Id] = runtime

// store the new runtime
s.addRuntime(conf.Id, runtime)

return runtime, nil
}

func (s *Service) addRuntime(id uint32, runtime *Runtime) {
s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()
s.runtimes[id] = runtime
}

// FIXME: DEPRECATED, remove in v12.0 vv
func (s *Service) deprecatedAddRuntime(createRuntime func(connId uint32) (*Runtime, error)) (*Runtime, error) {
s.runtimesLock.Lock()
1 change: 1 addition & 0 deletions providers/github/connection/connection.go
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ func NewGithubConnection(id uint32, asset *inventory.Asset) (*GithubConnection,
ctx := context.WithValue(context.Background(), github.SleepUntilPrimaryRateLimitResetWhenRateLimited, true)

// perform a quick call to verify the token's validity.
// @afiune do we need to validate the token for every connection? can this be a "once" operation?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed with #4980

_, resp, err := client.Meta.Zen(ctx)
if err != nil {
if resp != nil && resp.StatusCode == 401 {