Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASENG-835] Add puller package for s3 #157

Merged
merged 35 commits into from
Mar 14, 2024
Merged

Conversation

edznux-dd
Copy link
Contributor

@edznux-dd edznux-dd commented Dec 26, 2023

WIP / Discussion

PoC of a gRPC API for a remote ingestor.
This is one possible solution for pulling the data from an s3 bucket that was stored previously.

This PR includes 3 blocks:

  • A Puller: pulls the data from a remote source, in our case a generic implementation of a blob storage (e.g: s3)
  • An API: allows users and automated system to execute command from that "ingestor"
  • A notifier: allows the API to notify user and system of processing completion. This can be done via a queuing system, webhooks or something else.

Notes:

  • Not a huge fan of the "ingestor" name, we probably need to convey that it's a remote API that may have extended use later on?
    • I thing the API needs to be extended for it to not be a bit over engineered here.

Alternative solution:

  • Should we replace the puller by simply a custom collector (e.g: blob_collector)?
    • This would need to fit the Collector interface to pull data to s3 but could simplify a few things maybe?
    • This will match the same structure as the file collector (bundled json files with all resource, per type)

@jt-dd jt-dd marked this pull request as ready for review March 13, 2024 18:09
@jt-dd jt-dd requested a review from a team as a code owner March 13, 2024 18:09
Copy link
Contributor Author

@edznux-dd edznux-dd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for taking over the PR :D
I left some comments!

I guess we could have reused the tar package from our secure go sdk for create/extract and remove some of the manual setup!

Small, overall nit: (it was maybe in my code tbh haha), there's a bit of back and forth between the inline vs 2 lines if err !=nil checks. it's a bit weird, but I'm alright with it.

Comment on lines 38 to 39
log.I.Errorf("error loading run id: %v", err)
return failure(fmt.Errorf("loading run id: %v", err))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to log AND return the error?
Sounds like we should just return the error and print in the caller to me if we don't want to duplicate the log.

runCtx = context.WithValue(runCtx, span.ContextLogFieldRunID, runID)

spanJob, runCtx := span.SpanIngestRunFromContext(runCtx, span.IngestorStartJob)
defer spanJob.Finish()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want so we can propagate the errors in the span.

Suggested change
defer spanJob.Finish()
defer func() {
spanJob.Finish(tracer.WithError(err))
}()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(might need to declare err before hand tho)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes nice catch need to update it like 20x

Copy link
Contributor

@Minosity-VR Minosity-VR Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(might need to declare err before hand tho)

You can use a named parameter for the return value, iirc that's how the tracing doc advises to do it


// Run the ingest pipeline
log.I.Info("Starting Kubernetes raw data ingest")
if err := ingestor.IngestData(runCtx, runCfg, collect, g.cache, g.storedb, g.graphdb); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to switch style of err := ...; err != nil vs err := ... \n if err !=nil {} ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No not really. Changing it.


start := time.Now()
span, ctx := tracer.StartSpanFromContext(ctx, span.IngestData, tracer.Measured())
defer span.Finish()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer span.Finish()
defer func() {
span.Finish(tracer.WithError(err))
}()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, you'll need var err error above (or name the return value err)

pkg/ingestor/puller/blob/blob.go Outdated Show resolved Hide resolved
// Launch will launch the KubeHound application to ingest data from a collector and create an attack graph.
func Launch(ctx context.Context, opts ...LaunchOption) error {
span, ctx := tracer.StartSpanFromContext(ctx, span.Launch, tracer.Measured())
defer span.Finish()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer span.Finish()
defer func() {
span.Finish(tracer.WithError(err))
}()


// We update the base tags to include that run id, so we have it available for metrics
tag.AppendBaseTags(tag.RunID(runID.String()))
// tag.BaseTags = append(tag.BaseTags, tag.RunID(runID.String()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// tag.BaseTags = append(tag.BaseTags, tag.RunID(runID.String()))

I believe this is a leftover?

}
}

func AppendBaseTags(tags ...string) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log uses "GlobalTags" instead of base tags, would that make sense to unify? (it feels a bit weird on the caller side because it suggest 2 different semantic meaning and I don't think there should be?)

Overall, I wonder if we shouldn't have all the tags in both metrics and logs shared onto a single config? (it could be "pulled" from here into the logger package)

// We define a unique run id this so we can measure run by run in addition of version per version.
// Useful when rerunning the same binary (same version) on different dataset or with different databases...
runID := config.NewRunID()
span.SetBaggageItem("run_id", runID.String())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use tags.RunIdTag here to unify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this will get rebranded. I did not want to touch yet the "live" dumping. I will rework it after.

return fmt.Errorf("raw data ingest: %w", err)
}
type LaunchConfig struct {
Cfg *config.KubehoundConfig
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to expose Cfg to other package?
I don't think it's used anywhere outside this pacakge by looking at the code locally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is being used, you will see in the next PRs

jt-dd
jt-dd previously approved these changes Mar 14, 2024
@jt-dd jt-dd self-requested a review March 14, 2024 18:00
@jt-dd jt-dd merged commit 4f34295 into main Mar 14, 2024
7 checks passed
@jt-dd jt-dd deleted the edouard/implement-ingestor-api branch March 14, 2024 18:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants