Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new ingest local command to process offline dump #205

Merged
merged 14 commits into from
Jun 18, 2024
16 changes: 8 additions & 8 deletions cmd/kubehound/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ var (
},
}

cloudCmd = &cobra.Command{
Use: "cloud",
dumpRemoteCmd = &cobra.Command{
Use: "remote",
Short: "Push collected k8s resources to an s3 bucket of a targeted cluster. Run the ingestion on KHaaS if khaas-server is set.",
Long: `Collect all Kubernetes resources needed to build the attack path in an offline format on a s3 bucket. If KubeHound as a Service (KHaaS) is set, then run the ingestion on KHaaS.`,
PreRunE: func(cobraCmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -63,7 +63,7 @@ var (
return err
},
}
localCmd = &cobra.Command{
dumpLocalCmd = &cobra.Command{
Use: "local",
Short: "Dump locally the k8s resources of a targeted cluster",
Long: `Collect all Kubernetes resources needed to build the attack path in an offline format locally (compressed or flat)`,
Expand All @@ -85,11 +85,11 @@ var (

func init() {
cmd.InitDumpCmd(dumpCmd)
cmd.InitLocalCmd(localCmd)
cmd.InitCloudCmd(cloudCmd)
cmd.InitGrpcClientCmd(cloudCmd, false)
cmd.InitLocalDumpCmd(dumpLocalCmd)
cmd.InitRemoteDumpCmd(dumpRemoteCmd)
cmd.InitRemoteIngestCmd(dumpRemoteCmd, false)

dumpCmd.AddCommand(cloudCmd)
dumpCmd.AddCommand(localCmd)
dumpCmd.AddCommand(dumpRemoteCmd)
dumpCmd.AddCommand(dumpLocalCmd)
rootCmd.AddCommand(dumpCmd)
}
40 changes: 0 additions & 40 deletions cmd/kubehound/grpc_client.go

This file was deleted.

75 changes: 75 additions & 0 deletions cmd/kubehound/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"fmt"

"github.com/DataDog/KubeHound/pkg/cmd"
"github.com/DataDog/KubeHound/pkg/config"
"github.com/DataDog/KubeHound/pkg/kubehound/core"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var (
inputFilePath string
)

var (
ingestCmd = &cobra.Command{
Use: "ingest",
Short: "Start an ingestion locally or remotely",
Long: `Run an ingestion locally (local) or on KHaaS from a bucket to build the attack path (remote)`,
}

localIngestCmd = &cobra.Command{
Use: "local",
Short: "Ingest data locally from a KubeHound dump",
Long: `Run an ingestion locally using a previous dump (directory or tar.gz)`,
PreRunE: func(cobraCmd *cobra.Command, args []string) error {
return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", true, true)
},
RunE: func(cobraCmd *cobra.Command, args []string) error {
// Passing the Kubehound config from viper
khCfg, err := cmd.GetConfig()
if err != nil {
return fmt.Errorf("get config: %w", err)
}

return core.CoreLocalIngest(cobraCmd.Context(), khCfg, inputFilePath)
},
}

remoteIngestCmd = &cobra.Command{
Use: "remote",
Short: "Ingest data remotely on a KHaaS instance",
Long: `Run an ingestion on KHaaS from a bucket to build the attack path`,
PreRunE: func(cobraCmd *cobra.Command, args []string) error {
return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", false, true)
},
PreRun: func(cobraCmd *cobra.Command, args []string) {
viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck
viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck
},
RunE: func(cobraCmd *cobra.Command, args []string) error {
// Passing the Kubehound config from viper
khCfg, err := cmd.GetConfig()
if err != nil {
return fmt.Errorf("get config: %w", err)
}

return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID)
},
}
)

func init() {

ingestCmd.AddCommand(localIngestCmd)
cmd.InitLocalIngestCmd(localIngestCmd)
localIngestCmd.Flags().StringVar(&inputFilePath, "data", "", "Filepath for the data to process (directory or tar.gz path)")

ingestCmd.AddCommand(remoteIngestCmd)
cmd.InitRemoteIngestCmd(remoteIngestCmd, true)

rootCmd.AddCommand(ingestCmd)
}
12 changes: 9 additions & 3 deletions pkg/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func InitDumpCmd(cmd *cobra.Command) {
viper.BindPFlag(config.GlobalDebug, cmd.PersistentFlags().Lookup("debug")) //nolint: errcheck
}

func InitLocalCmd(cmd *cobra.Command) {
func InitLocalDumpCmd(cmd *cobra.Command) {
cmd.Flags().Bool("compress", false, "Enable compression for the dumped data (generates a tar.gz file)")
viper.BindPFlag(config.CollectorFileArchiveFormat, cmd.Flags().Lookup("compress")) //nolint: errcheck

Expand All @@ -49,7 +49,7 @@ func InitLocalCmd(cmd *cobra.Command) {
cmd.MarkFlagRequired("output-dir") //nolint: errcheck
}

func InitCloudCmd(cmd *cobra.Command) {
func InitRemoteDumpCmd(cmd *cobra.Command) {
cmd.Flags().String("bucket", "", "Bucket to use to push k8s resources (e.g.: s3://<your_bucket>)")
viper.BindPFlag(config.CollectorFileBlobBucket, cmd.Flags().Lookup("bucket")) //nolint: errcheck
cmd.MarkFlagRequired("bucket") //nolint: errcheck
Expand All @@ -58,7 +58,7 @@ func InitCloudCmd(cmd *cobra.Command) {
viper.BindPFlag(config.CollectorFileBlobRegion, cmd.Flags().Lookup("region")) //nolint: errcheck
}

func InitGrpcClientCmd(cmd *cobra.Command, standalone bool) {
func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) {

cmd.Flags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)")
cmd.Flags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint")
Expand All @@ -77,3 +77,9 @@ func InitGrpcClientCmd(cmd *cobra.Command, standalone bool) {
cmd.MarkFlagRequired("khaas-server") //nolint: errcheck
}
}

func InitLocalIngestCmd(cmd *cobra.Command) {
cmd.PersistentFlags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)")
viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck
cmd.MarkFlagRequired("cluster") //nolint: errcheck
}
2 changes: 2 additions & 0 deletions pkg/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const (
DefaultVertexBatchSizeSmall = DefaultVertexBatchSize / 5

DefaultStopOnError = false

DefaultLargeClusterOptimizations = true
)

// VertexBuilderConfig configures vertex builder parameters.
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func SetDefaultValues(c *viper.Viper) {
c.SetDefault("builder.edge.batch_size_small", DefaultEdgeBatchSizeSmall)
c.SetDefault("builder.edge.batch_size_cluster_impact", DefaultEdgeBatchSizeClusterImpact)
c.SetDefault("builder.stop_on_error", DefaultStopOnError)
c.SetDefault("builder.edge.large_cluster_optimizations", DefaultLargeClusterOptimizations)

c.SetDefault(IngestorAPIEndpoint, DefaultIngestorAPIEndpoint)
c.SetDefault(IngestorAPIInsecure, DefaultIngestorAPIInsecure)
Expand Down Expand Up @@ -189,8 +190,12 @@ func NewConfig(v *viper.Viper, configPath string) (*KubehoundConfig, error) {

// NewConfig creates a new config instance from the provided file using viper.
func NewInlineConfig(v *viper.Viper) (*KubehoundConfig, error) {
// Load default embedded config file
SetDefaultValues(v)

// Configure environment variable override
SetEnvOverrides(v)

kc, err := unmarshalConfig(v)
if err != nil {
return nil, fmt.Errorf("unmarshaling config data: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestMustLoadConfig(t *testing.T) {
BatchSizeSmall: 100,
},
Edge: EdgeBuilderConfig{
LargeClusterOptimizations: false,
LargeClusterOptimizations: DefaultLargeClusterOptimizations,
WorkerPoolSize: 5,
WorkerPoolCapacity: 100,
BatchSize: 500,
Expand All @@ -84,7 +84,7 @@ func TestMustLoadConfig(t *testing.T) {
},
TempDir: "/tmp/kubehound",
ArchiveName: "archive.tar.gz",
MaxArchiveSize: 1073741824,
MaxArchiveSize: DefaultMaxArchiveSize,
},
},
wantErr: false,
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestMustLoadConfig(t *testing.T) {
},
TempDir: "/tmp/kubehound",
ArchiveName: "archive.tar.gz",
MaxArchiveSize: 1073741824,
MaxArchiveSize: DefaultMaxArchiveSize,
},
},
wantErr: false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const (
DefaultBucketName = "" // we want to let it empty because we can easily abort if it's not configured
DefaultTempDir = "/tmp/kubehound"
DefaultArchiveName = "archive.tar.gz"
DefaultMaxArchiveSize = int64(1 << 30) // 1GB
DefaultMaxArchiveSize = int64(2 << 30) // 1GB
jt-dd marked this conversation as resolved.
Show resolved Hide resolved

IngestorAPIEndpoint = "ingestor.api.endpoint"
IngestorAPIInsecure = "ingestor.api.insecure"
Expand Down
2 changes: 1 addition & 1 deletion pkg/dump/writer/tar_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTarWriter_Write(t *testing.T) {
writer.Flush(ctx)
writer.Close(ctx)

err = puller.ExtractTarGz(writer.OutputPath(), tmpTarExtractDir, config.DefaultMaxArchiveSize)
err = puller.ExtractTarGz(false, writer.OutputPath(), tmpTarExtractDir, config.DefaultMaxArchiveSize)
jt-dd marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("failed to extract tar.gz: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingestor/puller/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error {
return fmt.Errorf("Dangerous file path used during extraction, aborting: %w", err)
}

err = puller.ExtractTarGz(archivePath, basePath, bs.cfg.Ingestor.MaxArchiveSize)
err = puller.ExtractTarGz(false, archivePath, basePath, bs.cfg.Ingestor.MaxArchiveSize)
if err != nil {
return err
}
Expand Down
36 changes: 35 additions & 1 deletion pkg/ingestor/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,40 @@ func sanitizeExtractPath(filePath string, destination string) error {
return nil
}

func ExtractTarGz(archivePath string, basePath string, maxArchiveSize int64) error {
func IsTarGz(filePath string, maxArchiveSize int64) (bool, error) {
fileInfo, err := os.Stat(filePath)
if err != nil {
return false, fmt.Errorf("stat %s: %w", filePath, err)
}

switch mod := fileInfo.Mode(); {
case mod.IsDir():
return false, nil
case mod.IsRegular():
err = ExtractTarGz(true, filePath, "/tmp", maxArchiveSize)
if err != nil {
return false, err
}

return true, nil
}

return false, fmt.Errorf("file type not supported")
}

func ExtractTarGz(checkOnly bool, archivePath string, basePath string, maxArchiveSize int64) error { //nolint:gocognit
gzipFileReader, err := os.Open(archivePath)
if err != nil {
return err
}
defer gzipFileReader.Close()

uncompressedStream, err := gzip.NewReader(gzipFileReader)
if err != nil {
return err
}
defer uncompressedStream.Close()

tarReader := tar.NewReader(io.LimitReader(uncompressedStream, maxArchiveSize))
for {
header, err := tarReader.Next()
Expand All @@ -63,6 +87,7 @@ func ExtractTarGz(archivePath string, basePath string, maxArchiveSize int64) err
if err != nil {
return err
}

err = sanitizeExtractPath(basePath, header.Name)
if err != nil {
return err
Expand All @@ -72,11 +97,17 @@ func ExtractTarGz(archivePath string, basePath string, maxArchiveSize int64) err
switch header.Typeflag {
// Handle sub folder containing namespaces
case tar.TypeDir:
if checkOnly {
continue
}
err := mkdirIfNotExists(cleanPath)
if err != nil {
return err
}
case tar.TypeReg:
if checkOnly {
continue
}
err := mkdirIfNotExists(filepath.Dir(cleanPath))
if err != nil {
return err
Expand All @@ -89,6 +120,9 @@ func ExtractTarGz(archivePath string, basePath string, maxArchiveSize int64) err
// We don't really have an upper limit of archive size and adding a limited writer is not trivial without importing
// a third party library (like our internal secure lib)
_, err = io.Copy(outFile, tarReader) //nolint:gosec
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return fmt.Errorf("archive size exceeds the limit: %d: %w", maxArchiveSize, err)
}
if err != nil {
return fmt.Errorf("copying file %s: %w", cleanPath, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingestor/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestExtractTarGz(t *testing.T) {
if err != nil {
t.Error(err)
}
if err := ExtractTarGz("./testdata/archive.tar.gz", tmpPath, tt.args.maxArchiveSize); (err != nil) != tt.wantErr {
if err := ExtractTarGz(false, "./testdata/archive.tar.gz", tmpPath, tt.args.maxArchiveSize); (err != nil) != tt.wantErr {
t.Errorf("ExtractTarGz() error = %v, wantErr %v", err, tt.wantErr)
}
for _, file := range tt.expectedFiles {
Expand Down
Loading
Loading