-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Ingestor] Adding the ability to ingest multiple cluster successively #185
Conversation
pkg/ingestor/api/api.go
Outdated
@@ -61,6 +64,11 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string | |||
var err error | |||
defer func() { spanJob.Finish(tracer.WithError(err)) }() | |||
|
|||
err = g.checkPreviousRun(runCtx, clusterName, runID) //nolint: contextcheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small thing but:
I believe the checkPreviousRun
is confusing a bit:
- what does it checks?
- it returns an error for what?
Imo, this should be named something like "checkAlreadyIngested" and potentially, return (bool, error)
instead of just error
, (potentially isAlreadyIngested
, if you use that bool).
I don't have a strong opinion on the (bool, error)
part, because semantically, the function is expected to return an error. But in any case, the error should be exported and checked directly against, not simply by err != nil
. (if there's a blip in the db call or something, it will return an error, while there wasn't a previous ingestion.
The caller (here), could do:
ErrAlreadyIngested := errors.New("ingestion already completed")
...
...
err := ...
if error.Is(err, ErrAlreadyIngested) {
return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID)
}
if err != nil {
return err
}
or, with the (bool, error) return
ErrAlreadyIngested := errors.New("ingestion already completed")
...
...
alreadyIngested, err := ...
if err != nil {
return err
}
if alreadyIngested {
return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to bool,err with isAlreadyIngested()
pkg/ingestor/api/api.go
Outdated
@@ -83,6 +91,13 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string | |||
}, | |||
} | |||
|
|||
// We need to flush the cache to prevent warning/error on the overwriting element in cache the any conflict with the previous ingestion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is missing a few words?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it does fixing.
pkg/ingestor/api/api.go
Outdated
if resNum != 0 { | ||
log.I.Infof("Found %d element in collection %s", resNum, collection) | ||
|
||
break | ||
} | ||
} | ||
if resNum != 0 { | ||
return fmt.Errorf("ingestion already completed [%s:%s]", clusterName, runID) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Any reason not to return in the first resNum != 0
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hum not sure. refactoring it.
pkg/kubehound/graph/builder.go
Outdated
@@ -71,7 +71,7 @@ func (b *Builder) buildEdge(ctx context.Context, label string, e edge.Builder, o | |||
return err | |||
} | |||
|
|||
err = e.Stream(ctx, b.storedb, b.cache, | |||
err = e.Stream(ctx, b.storedb, b.cache, &b.cfg.Dynamic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question (applies to all the caller as well):
Shouldn't this config be set in the Initialize(cfg)
method instead (line 65)?
This would enable us to not modify the signature of this function with a specific config now (I believe this would make extending this with new configuration easier as well.)
All the caller would only have to change from runtime.ClusterName
to e.runtime.ClusterName
for example if you call that config field "runtime" as well.
(and make the diff a bit smaller as well haha)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good point. Did not see the inititate method.
@@ -153,3 +156,46 @@ func (e *PodPatch) Stream(ctx context.Context, store storedb.Provider, _ cache.C | |||
|
|||
return adapter.MongoCursorHandler[podPatchGroup](ctx, cur, callback, complete) | |||
} | |||
|
|||
/* db.permissionSets.aggregate([ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this expected to be there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was the raw mongo request, will dump it somewhere else, but I think it could be useful to have all of them somewhere. Always a pain to translate golang to mongo queries.
@@ -167,12 +170,47 @@ func (e *RoleBindCrbCrR) Stream(ctx context.Context, store storedb.Provider, c c | |||
}, | |||
}, | |||
// Looking for all permissionSets link to the same namespace | |||
// { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does we want to keep that comment? Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a duplicate of what is below. We only need of them.
{ | ||
"$lookup": bson.M{ | ||
"from": "rolebindings", | ||
"localField": "role_binding_id", | ||
"foreignField": "_id", | ||
"as": "rb", | ||
// "let": bson.M{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we comment on why we don't want the cluster + run id filtering here? It's not clear to me why we don't want that here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are using the _id as foreignField, we dont need to filter them as it is already filtered. So I "removed" it to make it more effecient.
Removing both commented parts.
}, | ||
}, | ||
// Looking for all permissionSets link to the same namespace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does we want to keep that comment? Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No removing it. This was a first try using a simple version, but did not work properly.
In order for the multiple ingestion not to overrun one another, we need to scope the building of the graph for each run_id. All the mongo request have been updated by providing them the
run_id
andcluster_name
.To test it:
./kubehound-ingestor --debug
./kubehound dump cloud --bucket s3://<yourbucket> --khaas-server "127.0.0.1:9000" --insecure --debug
kubectx
./kubehound dump cloud --bucket s3://<yourbucket> --khaas-server "127.0.0.1:9000" --insecure --debug
kh.nodes().has("run_id","<run_id>")
Other new features: