Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dumper] Refactor pipeline ingestor #175

Merged
merged 4 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/collector/k8s_api_faker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
"k8s.io/client-go/kubernetes/fake"
)

func FakePod(namespace string, image string, status string) *corev1.Pod {
func FakePod(namespace string, name string, status string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: image,
Name: name,
Image: "nginx:latest",
},
},
Expand Down
154 changes: 154 additions & 0 deletions pkg/dump/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package pipeline

import (
"context"
"errors"
"fmt"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/dump/writer"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
"github.com/DataDog/KubeHound/pkg/worker"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

type StreamFunc func(context.Context) error

type DumpIngestorPipeline struct {
operationName string
entity string
streamFunc StreamFunc
}

// dumpIngestorSequence returns the pipeline sequence for dumping k8s object (can be multi-threaded depending on the writer used)
func dumpIngestorSequence(collector collector.CollectorClient, writer writer.DumperWriter) []DumpIngestorPipeline {
return []DumpIngestorPipeline{
{
operationName: span.DumperNodes,
entity: tag.EntityNodes,
streamFunc: func(ctx context.Context) error {
return collector.StreamNodes(ctx, NewNodeIngestor(ctx, writer))
},
},
{
operationName: span.DumperPods,
entity: tag.EntityPods,
streamFunc: func(ctx context.Context) error {
return collector.StreamPods(ctx, NewPodIngestor(ctx, writer))
},
},
{
operationName: span.DumperRoles,
entity: tag.EntityRoles,
streamFunc: func(ctx context.Context) error {
return collector.StreamRoles(ctx, NewRoleIngestor(ctx, writer))
},
},
{
operationName: span.DumperClusterRoles,
entity: tag.EntityClusterRoles,
streamFunc: func(ctx context.Context) error {
return collector.StreamClusterRoles(ctx, NewClusterRoleIngestor(ctx, writer))
},
},
{
operationName: span.DumperRoleBindings,
entity: tag.EntityRolebindings,
streamFunc: func(ctx context.Context) error {
return collector.StreamRoleBindings(ctx, NewRoleBindingIngestor(ctx, writer))
},
},
{
operationName: span.DumperClusterRoleBindings,
entity: tag.EntityClusterRolebindings,
streamFunc: func(ctx context.Context) error {
return collector.StreamClusterRoleBindings(ctx, NewClusterRoleBindingIngestor(ctx, writer))
},
},
{
operationName: span.DumperEndpoints,
entity: tag.EntityEndpoints,
streamFunc: func(ctx context.Context) error {
return collector.StreamEndpoints(ctx, NewEndpointIngestor(ctx, writer))
},
},
}
}

// PipelineDumpIngestor is a parallelized pipeline based ingestor implementation.
type PipelineDumpIngestor struct {
sequence []DumpIngestorPipeline
wp worker.WorkerPool
WorkerNumber int
}

func NewPipelineDumpIngestor(ctx context.Context, collector collector.CollectorClient, writer writer.DumperWriter) (context.Context, *PipelineDumpIngestor, error) {
sequence := dumpIngestorSequence(collector, writer)

// Getting the number of workers from the writer to setup multi-threading if possible
workerNumber := writer.WorkerNumber()
// Set the number of workers to the number of differents entities (roles, pods, ...)
if workerNumber == 0 {
workerNumber = len(sequence)
}

if workerNumber > 1 {
log.I.Infof("Multi-threading enabled: %d workers", workerNumber)
}

// Setting up the worker pool with multi-threading if possible
// enable for raw file writer
// disable for targz writer (not thread safe)
bufferCapacity := 1
wp, err := worker.PoolFactory(workerNumber, bufferCapacity)
if err != nil {
return nil, nil, fmt.Errorf("create worker pool: %w", err)
}

ctx, err = wp.Start(ctx)
if err != nil {
return nil, nil, fmt.Errorf("group worker pool start: %w", err)
}

return ctx, &PipelineDumpIngestor{
wp: wp,
sequence: sequence,
WorkerNumber: workerNumber,
}, nil

}

func (p *PipelineDumpIngestor) Run(ctx context.Context) error {
var err error
for _, v := range p.sequence {
v := v
p.wp.Submit(func() error {
_, errDump := dumpK8sObjs(ctx, v.operationName, v.entity, v.streamFunc)
if errDump != nil {
err = errors.Join(err, errDump)
}

return err
})
}

return err
}

func (p *PipelineDumpIngestor) Wait(ctx context.Context) error {
return p.wp.WaitForComplete()
}

// Static wrapper to dump k8s object dynamically (streams Kubernetes objects to the collector writer).
func dumpK8sObjs(ctx context.Context, operationName string, entity string, streamFunc StreamFunc) (context.Context, error) {
log.I.Infof("Dumping %s", entity)
span, ctx := tracer.StartSpanFromContext(ctx, operationName, tracer.Measured())
span.SetTag(tag.EntityTag, entity)
var err error
defer func() { span.Finish(tracer.WithError(err)) }()
err = streamFunc(ctx)

return ctx, err
}
121 changes: 121 additions & 0 deletions pkg/dump/pipeline/pipeline_faker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package pipeline

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/DataDog/KubeHound/pkg/collector"
mockcollector "github.com/DataDog/KubeHound/pkg/collector/mockcollector"
mockwriter "github.com/DataDog/KubeHound/pkg/dump/writer/mockwriter"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
)

func PipelineLiveTest(ctx context.Context, t *testing.T, workerNum int) (*mockwriter.DumperWriter, collector.CollectorClient) {
t.Helper()
mDumpWriter, collectorClient := NewFakeDumpIngestorPipeline(ctx, t, false)

mDumpWriter.EXPECT().WorkerNumber().Return(workerNum)

// Building the map of the cached k8s objects
// For namespaced resources, the main key is the namespace
// For non-namespaced resources, the only key is the k8s object type
countK8sObjectsByFile := make(map[string]int)
for _, rObj := range GenK8sObjects() {
reflectType := reflect.TypeOf(rObj)
switch reflectType {
case reflect.TypeOf(&corev1.Node{}):
countK8sObjectsByFile[collector.NodePath]++
case reflect.TypeOf(&corev1.Pod{}):
k8sObj, ok := rObj.(*corev1.Pod)
if !ok {
t.Fatalf("failed to cast object to PodType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.PodPath)
countK8sObjectsByFile[path]++
case reflect.TypeOf(&rbacv1.Role{}):
k8sObj, ok := rObj.(*rbacv1.Role)
if !ok {
t.Fatalf("failed to cast object to RoleType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.RolesPath)
countK8sObjectsByFile[path]++
case reflect.TypeOf(&rbacv1.RoleBinding{}):
k8sObj, ok := rObj.(*rbacv1.RoleBinding)
if !ok {
t.Fatalf("failed to cast object to RoleBindingType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.RoleBindingsPath)
countK8sObjectsByFile[path]++
case reflect.TypeOf(&rbacv1.ClusterRole{}):
countK8sObjectsByFile[collector.ClusterRolesPath]++
case reflect.TypeOf(&rbacv1.ClusterRoleBinding{}):
countK8sObjectsByFile[collector.ClusterRoleBindingsPath]++
case reflect.TypeOf(&discoveryv1.EndpointSlice{}):
k8sObj, ok := rObj.(*discoveryv1.EndpointSlice)
if !ok {
t.Fatalf("failed to cast object to EndpointType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.EndpointPath)
countK8sObjectsByFile[path]++
default:
t.Fatalf("unknown object type to cast: %s", reflectType.String())
}
}

for range countK8sObjectsByFile {
mDumpWriter.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
}

return mDumpWriter, collectorClient
}

func NewFakeDumpIngestorPipeline(ctx context.Context, t *testing.T, mockCollector bool) (*mockwriter.DumperWriter, collector.CollectorClient) {
t.Helper()

mDumpWriter := mockwriter.NewDumperWriter(t)

mCollectorClient := mockcollector.NewCollectorClient(t)
clientset := fake.NewSimpleClientset(GenK8sObjects()...)
collectorClient := collector.NewTestK8sAPICollector(ctx, clientset)

if mockCollector {
return mDumpWriter, mCollectorClient
}

return mDumpWriter, collectorClient

}

func GenK8sObjects() []runtime.Object {
k8sOjb := []runtime.Object{
collector.FakeNode("node1", "provider1"),
collector.FakePod("namespace1", "name11", "Running"),
collector.FakePod("namespace1", "name12", "Running"),
collector.FakePod("namespace2", "name21", "Running"),
collector.FakePod("namespace2", "name22", "Running"),
collector.FakeRole("namespace1", "name11"),
collector.FakeRole("namespace1", "name12"),
collector.FakeRole("namespace2", "name21"),
collector.FakeRole("namespace2", "name22"),
collector.FakeClusterRole("name1"),
collector.FakeRoleBinding("namespace1", "name11"),
collector.FakeRoleBinding("namespace1", "name12"),
collector.FakeRoleBinding("namespace2", "name21"),
collector.FakeRoleBinding("namespace2", "name22"),
collector.FakeClusterRoleBinding("name1"),
collector.FakeEndpoint("namespace1", "name11", []int32{80}),
collector.FakeEndpoint("namespace1", "name12", []int32{80}),
collector.FakeEndpoint("namespace2", "name21", []int32{80}),
collector.FakeEndpoint("namespace2", "name22", []int32{80}),
}

return k8sOjb
}
Loading
Loading