Skip to content

Commit

Permalink
[ASENG-654] Mongo trace bugfix (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
d0g0x01 authored Oct 31, 2023
1 parent 1507530 commit db78c5e
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 168 deletions.
6 changes: 3 additions & 3 deletions pkg/kubehound/graph/adapter/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// MongoDB is a helper function to retrieve the store database object from a mongoDB provider.
func MongoDB(store storedb.Provider) *mongo.Database {
mongoClient, ok := store.Raw().(*mongo.Client)
db, ok := store.Reader().(*mongo.Database)
if !ok {
log.I.Fatalf("Invalid database provider type. Expected *mongo.Client, got %T", store.Raw())
log.I.Fatalf("Invalid database provider type. Expected *mongo.Client, got %T", store.Reader())
}

return mongoClient.Database(storedb.MongoDatabaseName)
return db
}

// MongoCursorHandler is the default stream implementation to handle the query results from a mongoDB store provider.
Expand Down
20 changes: 10 additions & 10 deletions pkg/kubehound/storage/storedb/mocks/store_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 71 additions & 18 deletions pkg/kubehound/storage/storedb/mongo_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (

"github.com/DataDog/KubeHound/pkg/kubehound/store/collections"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
"github.com/hashicorp/go-multierror"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
mongotrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/go.mongodb.org/mongo-driver/mongo"
)

const (
Expand All @@ -21,50 +23,81 @@ var (
_ Provider = (*MongoProvider)(nil)
)

// A MongoDB based store provider implementation.
type MongoProvider struct {
client *mongo.Client
db *mongo.Database
tags []string
reader *mongo.Client // MongoDB client optimized for read operations
writer *mongo.Client // MongoDB client optimized for write operations
tags []string // Tags to be applied for telemetry
}

func NewMongoProvider(ctx context.Context, url string, connectionTimeout time.Duration) (*MongoProvider, error) {
opts := options.Client()
opts.ApplyURI(url + fmt.Sprintf("/?connectTimeoutMS=%d", connectionTimeout))
// createClient creates a new MongoDB client with the provided options.
func createClient(ctx context.Context, opts *options.ClientOptions, timeout time.Duration) (*mongo.Client, error) {
client, err := mongo.Connect(ctx, opts)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, connectionTimeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err = client.Ping(ctx, readpref.Primary())
if err != nil {
return nil, err
}

db := client.Database(MongoDatabaseName)
return client, nil
}

// createReaderWriter creates a pair of MongoDB clients - one for writes and another for reads.
func createReaderWriter(ctx context.Context, url string, timeout time.Duration) (*mongo.Client, *mongo.Client, error) {
baseOpts := options.Client()
baseOpts.ApplyURI(url + fmt.Sprintf("/?connectTimeoutMS=%d", timeout))

writer, err := createClient(ctx, baseOpts, timeout)
if err != nil {
return nil, nil, err
}

opts := baseOpts
opts.Monitor = mongotrace.NewMonitor()
reader, err := createClient(ctx, opts, timeout)
if err != nil {
_ = writer.Disconnect(ctx)

return nil, nil, err
}

return reader, writer, nil
}

// NewMongoProvider creates a new instance of the MongoDB store provider
func NewMongoProvider(ctx context.Context, url string, connectionTimeout time.Duration) (*MongoProvider, error) {
reader, writer, err := createReaderWriter(ctx, url, connectionTimeout)
if err != nil {
return nil, err
}

return &MongoProvider{
client: client,
db: db,
reader: reader,
writer: writer,
tags: append(tag.BaseTags, tag.Storage(StorageProviderName)),
}, nil
}

func (mp *MongoProvider) Prepare(ctx context.Context) error {
collections, err := mp.db.ListCollectionNames(ctx, bson.M{})
db := mp.writer.Database(MongoDatabaseName)
collections, err := db.ListCollectionNames(ctx, bson.M{})
if err != nil {
return fmt.Errorf("listing mongo DB collections: %w", err)
}

for _, collectionName := range collections {
err = mp.db.Collection(collectionName).Drop(ctx)
err = db.Collection(collectionName).Drop(ctx)
if err != nil {
return fmt.Errorf("deleting mongo DB collection %s: %w", collectionName, err)
}
}

ib, err := NewIndexBuilder(mp.db)
ib, err := NewIndexBuilder(db)
if err != nil {
return fmt.Errorf("mongo DB index builder create: %w", err)
}
Expand All @@ -76,16 +109,21 @@ func (mp *MongoProvider) Prepare(ctx context.Context) error {
return nil
}

func (mp *MongoProvider) Raw() any {
return mp.client
func (mp *MongoProvider) Reader() any {
return mp.reader.Database(MongoDatabaseName)
}

func (mp *MongoProvider) Name() string {
return StorageProviderName
}

func (mp *MongoProvider) HealthCheck(ctx context.Context) (bool, error) {
err := mp.client.Ping(ctx, nil)
err := mp.reader.Ping(ctx, nil)
if err != nil {
return false, err
}

err = mp.writer.Ping(ctx, nil)
if err != nil {
return false, err
}
Expand All @@ -94,11 +132,26 @@ func (mp *MongoProvider) HealthCheck(ctx context.Context) (bool, error) {
}

func (mp *MongoProvider) Close(ctx context.Context) error {
return mp.client.Disconnect(ctx)
var res *multierror.Error
if mp.reader != nil {
err := mp.reader.Disconnect(ctx)
if err != nil {
res = multierror.Append(res, err)
}
}

if mp.writer != nil {
err := mp.writer.Disconnect(ctx)
if err != nil {
res = multierror.Append(res, err)
}
}

return res.ErrorOrNil()
}

func (mp *MongoProvider) BulkWriter(ctx context.Context, collection collections.Collection, opts ...WriterOption) (AsyncWriter, error) {
writer := NewMongoAsyncWriter(ctx, mp, collection, opts...)
writer := NewMongoAsyncWriter(ctx, mp.writer.Database(MongoDatabaseName), collection, opts...)

return writer, nil
}
83 changes: 0 additions & 83 deletions pkg/kubehound/storage/storedb/mongo_provider_test.go

This file was deleted.

Loading

0 comments on commit db78c5e

Please sign in to comment.