Skip to content

Commit

Permalink
[Dumper] Refactor pipeline ingestor (#175)
Browse files Browse the repository at this point in the history
* new pipeline

* fix unit tests (faker)

* clean code

* PR comment
  • Loading branch information
jt-dd authored Mar 19, 2024
1 parent f863374 commit eb7d1be
Show file tree
Hide file tree
Showing 4 changed files with 437 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pkg/collector/k8s_api_faker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
"k8s.io/client-go/kubernetes/fake"
)

func FakePod(namespace string, image string, status string) *corev1.Pod {
func FakePod(namespace string, name string, status string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: image,
Name: name,
Image: "nginx:latest",
},
},
Expand Down
154 changes: 154 additions & 0 deletions pkg/dump/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package pipeline

import (
"context"
"errors"
"fmt"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/dump/writer"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
"github.com/DataDog/KubeHound/pkg/worker"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

type StreamFunc func(context.Context) error

type DumpIngestorPipeline struct {
operationName string
entity string
streamFunc StreamFunc
}

// dumpIngestorSequence returns the pipeline sequence for dumping k8s object (can be multi-threaded depending on the writer used)
func dumpIngestorSequence(collector collector.CollectorClient, writer writer.DumperWriter) []DumpIngestorPipeline {
return []DumpIngestorPipeline{
{
operationName: span.DumperNodes,
entity: tag.EntityNodes,
streamFunc: func(ctx context.Context) error {
return collector.StreamNodes(ctx, NewNodeIngestor(ctx, writer))
},
},
{
operationName: span.DumperPods,
entity: tag.EntityPods,
streamFunc: func(ctx context.Context) error {
return collector.StreamPods(ctx, NewPodIngestor(ctx, writer))
},
},
{
operationName: span.DumperRoles,
entity: tag.EntityRoles,
streamFunc: func(ctx context.Context) error {
return collector.StreamRoles(ctx, NewRoleIngestor(ctx, writer))
},
},
{
operationName: span.DumperClusterRoles,
entity: tag.EntityClusterRoles,
streamFunc: func(ctx context.Context) error {
return collector.StreamClusterRoles(ctx, NewClusterRoleIngestor(ctx, writer))
},
},
{
operationName: span.DumperRoleBindings,
entity: tag.EntityRolebindings,
streamFunc: func(ctx context.Context) error {
return collector.StreamRoleBindings(ctx, NewRoleBindingIngestor(ctx, writer))
},
},
{
operationName: span.DumperClusterRoleBindings,
entity: tag.EntityClusterRolebindings,
streamFunc: func(ctx context.Context) error {
return collector.StreamClusterRoleBindings(ctx, NewClusterRoleBindingIngestor(ctx, writer))
},
},
{
operationName: span.DumperEndpoints,
entity: tag.EntityEndpoints,
streamFunc: func(ctx context.Context) error {
return collector.StreamEndpoints(ctx, NewEndpointIngestor(ctx, writer))
},
},
}
}

// PipelineDumpIngestor is a parallelized pipeline based ingestor implementation.
type PipelineDumpIngestor struct {
sequence []DumpIngestorPipeline
wp worker.WorkerPool
WorkerNumber int
}

func NewPipelineDumpIngestor(ctx context.Context, collector collector.CollectorClient, writer writer.DumperWriter) (context.Context, *PipelineDumpIngestor, error) {
sequence := dumpIngestorSequence(collector, writer)

// Getting the number of workers from the writer to setup multi-threading if possible
workerNumber := writer.WorkerNumber()
// Set the number of workers to the number of differents entities (roles, pods, ...)
if workerNumber == 0 {
workerNumber = len(sequence)
}

if workerNumber > 1 {
log.I.Infof("Multi-threading enabled: %d workers", workerNumber)
}

// Setting up the worker pool with multi-threading if possible
// enable for raw file writer
// disable for targz writer (not thread safe)
bufferCapacity := 1
wp, err := worker.PoolFactory(workerNumber, bufferCapacity)
if err != nil {
return nil, nil, fmt.Errorf("create worker pool: %w", err)
}

ctx, err = wp.Start(ctx)
if err != nil {
return nil, nil, fmt.Errorf("group worker pool start: %w", err)
}

return ctx, &PipelineDumpIngestor{
wp: wp,
sequence: sequence,
WorkerNumber: workerNumber,
}, nil

}

func (p *PipelineDumpIngestor) Run(ctx context.Context) error {
var err error
for _, v := range p.sequence {
v := v
p.wp.Submit(func() error {
_, errDump := dumpK8sObjs(ctx, v.operationName, v.entity, v.streamFunc)
if errDump != nil {
err = errors.Join(err, errDump)
}

return err
})
}

return err
}

func (p *PipelineDumpIngestor) Wait(ctx context.Context) error {
return p.wp.WaitForComplete()
}

// Static wrapper to dump k8s object dynamically (streams Kubernetes objects to the collector writer).
func dumpK8sObjs(ctx context.Context, operationName string, entity string, streamFunc StreamFunc) (context.Context, error) {
log.I.Infof("Dumping %s", entity)
span, ctx := tracer.StartSpanFromContext(ctx, operationName, tracer.Measured())
span.SetTag(tag.EntityTag, entity)
var err error
defer func() { span.Finish(tracer.WithError(err)) }()
err = streamFunc(ctx)

return ctx, err
}
121 changes: 121 additions & 0 deletions pkg/dump/pipeline/pipeline_faker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package pipeline

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/DataDog/KubeHound/pkg/collector"
mockcollector "github.com/DataDog/KubeHound/pkg/collector/mockcollector"
mockwriter "github.com/DataDog/KubeHound/pkg/dump/writer/mockwriter"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
)

func PipelineLiveTest(ctx context.Context, t *testing.T, workerNum int) (*mockwriter.DumperWriter, collector.CollectorClient) {
t.Helper()
mDumpWriter, collectorClient := NewFakeDumpIngestorPipeline(ctx, t, false)

mDumpWriter.EXPECT().WorkerNumber().Return(workerNum)

// Building the map of the cached k8s objects
// For namespaced resources, the main key is the namespace
// For non-namespaced resources, the only key is the k8s object type
countK8sObjectsByFile := make(map[string]int)
for _, rObj := range GenK8sObjects() {
reflectType := reflect.TypeOf(rObj)
switch reflectType {
case reflect.TypeOf(&corev1.Node{}):
countK8sObjectsByFile[collector.NodePath]++
case reflect.TypeOf(&corev1.Pod{}):
k8sObj, ok := rObj.(*corev1.Pod)
if !ok {
t.Fatalf("failed to cast object to PodType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.PodPath)
countK8sObjectsByFile[path]++
case reflect.TypeOf(&rbacv1.Role{}):
k8sObj, ok := rObj.(*rbacv1.Role)
if !ok {
t.Fatalf("failed to cast object to RoleType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.RolesPath)
countK8sObjectsByFile[path]++
case reflect.TypeOf(&rbacv1.RoleBinding{}):
k8sObj, ok := rObj.(*rbacv1.RoleBinding)
if !ok {
t.Fatalf("failed to cast object to RoleBindingType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.RoleBindingsPath)
countK8sObjectsByFile[path]++
case reflect.TypeOf(&rbacv1.ClusterRole{}):
countK8sObjectsByFile[collector.ClusterRolesPath]++
case reflect.TypeOf(&rbacv1.ClusterRoleBinding{}):
countK8sObjectsByFile[collector.ClusterRoleBindingsPath]++
case reflect.TypeOf(&discoveryv1.EndpointSlice{}):
k8sObj, ok := rObj.(*discoveryv1.EndpointSlice)
if !ok {
t.Fatalf("failed to cast object to EndpointType: %s", reflectType.String())
}
path := fmt.Sprintf("%s/%s", k8sObj.Namespace, collector.EndpointPath)
countK8sObjectsByFile[path]++
default:
t.Fatalf("unknown object type to cast: %s", reflectType.String())
}
}

for range countK8sObjectsByFile {
mDumpWriter.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
}

return mDumpWriter, collectorClient
}

func NewFakeDumpIngestorPipeline(ctx context.Context, t *testing.T, mockCollector bool) (*mockwriter.DumperWriter, collector.CollectorClient) {
t.Helper()

mDumpWriter := mockwriter.NewDumperWriter(t)

mCollectorClient := mockcollector.NewCollectorClient(t)
clientset := fake.NewSimpleClientset(GenK8sObjects()...)
collectorClient := collector.NewTestK8sAPICollector(ctx, clientset)

if mockCollector {
return mDumpWriter, mCollectorClient
}

return mDumpWriter, collectorClient

}

func GenK8sObjects() []runtime.Object {
k8sOjb := []runtime.Object{
collector.FakeNode("node1", "provider1"),
collector.FakePod("namespace1", "name11", "Running"),
collector.FakePod("namespace1", "name12", "Running"),
collector.FakePod("namespace2", "name21", "Running"),
collector.FakePod("namespace2", "name22", "Running"),
collector.FakeRole("namespace1", "name11"),
collector.FakeRole("namespace1", "name12"),
collector.FakeRole("namespace2", "name21"),
collector.FakeRole("namespace2", "name22"),
collector.FakeClusterRole("name1"),
collector.FakeRoleBinding("namespace1", "name11"),
collector.FakeRoleBinding("namespace1", "name12"),
collector.FakeRoleBinding("namespace2", "name21"),
collector.FakeRoleBinding("namespace2", "name22"),
collector.FakeClusterRoleBinding("name1"),
collector.FakeEndpoint("namespace1", "name11", []int32{80}),
collector.FakeEndpoint("namespace1", "name12", []int32{80}),
collector.FakeEndpoint("namespace2", "name21", []int32{80}),
collector.FakeEndpoint("namespace2", "name22", []int32{80}),
}

return k8sOjb
}
Loading

0 comments on commit eb7d1be

Please sign in to comment.