Skip to content

Commit

Permalink
Azure: Batch Fetcher: Group resource by subscription ID
Browse files Browse the repository at this point in the history
  • Loading branch information
orestisfl committed Oct 5, 2023
1 parent 66d31c6 commit eadc8bf
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 20 deletions.
25 changes: 16 additions & 9 deletions resources/fetching/fetchers/azure/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/samber/lo"
"golang.org/x/exp/maps"

"github.com/elastic/cloudbeat/resources/fetching"
"github.com/elastic/cloudbeat/resources/providers/azurelib"
Expand Down Expand Up @@ -50,21 +52,26 @@ func NewAzureBatchAssetFetcher(log *logp.Logger, ch chan fetching.ResourceInfo,
func (f *AzureBatchAssetFetcher) Fetch(ctx context.Context, cMetadata fetching.CycleMetadata) error {
f.log.Info("Starting AzureBatchAssetFetcher.Fetch")

for assetType, pair := range AzureBatchAssets {
assets, err := f.provider.ListAllAssetTypesByName([]string{assetType})
if err != nil {
return err
}
allAssets, err := f.provider.ListAllAssetTypesByName(maps.Keys(AzureBatchAssets))
if err != nil {
return err
}

if len(assets) == 0 {
continue
}
if len(allAssets) == 0 {
return nil
}

assetGroups := lo.GroupBy(allAssets, func(item inventory.AzureAsset) string {
return fmt.Sprintf("%s-%s", item.SubscriptionId, item.Type)
})

for _, assets := range assetGroups {
pair := AzureBatchAssets[assets[0].Type]

select {
case <-ctx.Done():
f.log.Infof("AzureBatchAssetFetcher.Fetch context err: %s", ctx.Err().Error())
return nil
// TODO: Groups by subscription id to create multiple batches of assets
case f.resourceCh <- fetching.ResourceInfo{
CycleMetadata: cMetadata,
Resource: &AzureBatchResource{
Expand Down
144 changes: 133 additions & 11 deletions resources/fetching/fetchers/azure/batch_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package fetchers
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"

"github.com/elastic/cloudbeat/resources/fetching"
"github.com/elastic/cloudbeat/resources/providers/azurelib/inventory"
Expand Down Expand Up @@ -51,9 +53,6 @@ func (s *AzureBatchAssetFetcherTestSuite) TearDownTest() {
}

func (s *AzureBatchAssetFetcherTestSuite) TestFetcher_Fetch() {
ctx := context.Background()

mockInventoryService := &inventory.MockServiceAPI{}
mockAssets := map[string][]inventory.AzureAsset{
inventory.ActivityLogAlertAssetType: {
{
Expand Down Expand Up @@ -91,26 +90,29 @@ func (s *AzureBatchAssetFetcherTestSuite) TestFetcher_Fetch() {
},
}

mockInventoryService := inventory.NewMockServiceAPI(s.T())
mockInventoryService.EXPECT().
ListAllAssetTypesByName(mock.AnythingOfType("[]string")).
RunAndReturn(func(types []string) ([]inventory.AzureAsset, error) {
s.Require().Len(types, 1)
mockAssetsList, ok := mockAssets[types[0]]
s.Require().True(ok)
return mockAssetsList, nil
}).Times(len(mockAssets))
defer mockInventoryService.AssertExpectations(s.T())
s.ElementsMatch(maps.Keys(mockAssets), types)

var result []inventory.AzureAsset
for _, tpe := range types {
result = append(result, mockAssets[tpe]...)
}
return result, nil
}).Once()

fetcher := AzureBatchAssetFetcher{
log: testhelper.NewLogger(s.T()),
resourceCh: s.resourceCh,
provider: mockInventoryService,
}
err := fetcher.Fetch(ctx, fetching.CycleMetadata{})
err := fetcher.Fetch(context.Background(), fetching.CycleMetadata{})
s.Require().NoError(err)
results := testhelper.CollectResources(s.resourceCh)

s.Require().Len(results, len(mockAssets))
s.Len(results, len(mockAssets))

for assetType, expectedAssets := range mockAssets {
result := findResult(results, assetType)
Expand Down Expand Up @@ -152,6 +154,126 @@ func (s *AzureBatchAssetFetcherTestSuite) TestFetcher_Fetch() {
}
}

func (s *AzureBatchAssetFetcherTestSuite) TestFetcher_Fetch_Batches() {
var mockAssets []inventory.AzureAsset
for i, variableFields := range []struct {
sub string
tpe string
}{
{
// 0
sub: "1",
tpe: inventory.ActivityLogAlertAssetType,
},
{
// 1
sub: "1",
tpe: inventory.ActivityLogAlertAssetType,
},
{
// 2
sub: "2",
tpe: inventory.ActivityLogAlertAssetType,
},
{
// 3
sub: "3",
tpe: inventory.BastionAssetType,
},
{
// 4
sub: "1",
tpe: inventory.BastionAssetType,
},
{
// 5
sub: "2",
tpe: inventory.ActivityLogAlertAssetType,
},
{
// 6
sub: "3",
tpe: inventory.BastionAssetType,
},
{
// 7
sub: "4",
tpe: inventory.BastionAssetType,
},
} {
id := strconv.Itoa(i)
mockAssets = append(mockAssets, inventory.AzureAsset{
Id: "id" + id,
Name: "name" + id,
Location: "loc" + id,
Properties: map[string]any{"key" + id: "value" + id},
ResourceGroup: "rg" + id,
SubscriptionId: variableFields.sub,
TenantId: "tenant",
Type: variableFields.tpe,
Sku: "sku" + id,
})
}

mockInventoryService := inventory.NewMockServiceAPI(s.T())
mockInventoryService.EXPECT().
ListAllAssetTypesByName(mock.AnythingOfType("[]string")).
Return(mockAssets, nil)
fetcher := AzureBatchAssetFetcher{
log: testhelper.NewLogger(s.T()),
resourceCh: s.resourceCh,
provider: mockInventoryService,
}

err := fetcher.Fetch(context.Background(), fetching.CycleMetadata{})
s.Require().NoError(err)
results := testhelper.CollectResources(s.resourceCh)

s.Len(results, 5)
s.ElementsMatch([]fetching.ResourceInfo{
{ // sub 1
Resource: &AzureBatchResource{
Type: fetching.MonitoringIdentity,
SubType: fetching.AzureActivityLogAlertType,
Assets: []inventory.AzureAsset{mockAssets[0], mockAssets[1]},
},
CycleMetadata: fetching.CycleMetadata{Sequence: 0},
},
{ // sub 2
Resource: &AzureBatchResource{
Type: fetching.MonitoringIdentity,
SubType: fetching.AzureActivityLogAlertType,
Assets: []inventory.AzureAsset{mockAssets[2], mockAssets[5]},
},
CycleMetadata: fetching.CycleMetadata{Sequence: 0},
},
{ // sub 1
Resource: &AzureBatchResource{
Type: fetching.CloudDns,
SubType: fetching.AzureBastionType,
Assets: []inventory.AzureAsset{mockAssets[4]},
},
CycleMetadata: fetching.CycleMetadata{Sequence: 0},
},
{ // sub 3
Resource: &AzureBatchResource{
Type: fetching.CloudDns,
SubType: fetching.AzureBastionType,
Assets: []inventory.AzureAsset{mockAssets[3], mockAssets[6]},
},
CycleMetadata: fetching.CycleMetadata{Sequence: 0},
},
{ // sub 4
Resource: &AzureBatchResource{
Type: fetching.CloudDns,
SubType: fetching.AzureBastionType,
Assets: []inventory.AzureAsset{mockAssets[7]},
},
CycleMetadata: fetching.CycleMetadata{Sequence: 0},
},
}, results)
}

func findResult(results []fetching.ResourceInfo, assetType string) *fetching.ResourceInfo {
for _, result := range results {
if result.GetData().([]inventory.AzureAsset)[0].Type == assetType {
Expand Down

0 comments on commit eadc8bf

Please sign in to comment.