Skip to content

Commit

Permalink
Index Pods by UID and join with containers based on this.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Apr 29, 2016
1 parent a81b116 commit 671b4dd
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 173 deletions.
21 changes: 16 additions & 5 deletions probe/kubernetes/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,30 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R
}

// CapturePod is exported for testing
func CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
namespaceID, podID, ok := report.ParsePodNodeID(req.NodeID)
uid, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return f(req, namespaceID, podID)
// find pod by UID
var pod Pod
r.client.WalkPods(func(p Pod) error {
if p.UID() == uid {
pod = p
}
return nil
})
if pod == nil {
return xfer.ResponseErrorf("Pod not found: %s", uid)
}
return f(req, pod.Namespace(), pod.Name())
}
}

func (r *Reporter) registerControls() {
controls.Register(GetLogs, CapturePod(r.GetLogs))
controls.Register(DeletePod, CapturePod(r.deletePod))
controls.Register(GetLogs, r.CapturePod(r.GetLogs))
controls.Register(DeletePod, r.CapturePod(r.deletePod))
}

func (r *Reporter) deregisterControls() {
Expand Down
48 changes: 23 additions & 25 deletions probe/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,23 @@ import (

// These constants are keys used in node metadata
const (
PodID = "kubernetes_pod_id"
PodName = "kubernetes_pod_name"
PodCreated = "kubernetes_pod_created"
PodContainerIDs = "kubernetes_pod_container_ids"
PodState = "kubernetes_pod_state"
PodLabelPrefix = "kubernetes_pod_labels_"
PodIP = "kubernetes_pod_ip"
ServiceIDs = "kubernetes_service_ids"
PodID = "kubernetes_pod_id"
PodName = "kubernetes_pod_name"
PodCreated = "kubernetes_pod_created"
PodState = "kubernetes_pod_state"
PodLabelPrefix = "kubernetes_pod_labels_"
PodIP = "kubernetes_pod_ip"
ServiceIDs = "kubernetes_service_ids"

StateDeleted = "deleted"
)

// Pod represents a Kubernetes pod
type Pod interface {
UID() string
ID() string
Name() string
Namespace() string
ContainerIDs() []string
Created() string
AddServiceID(id string)
Labels() labels.Labels
Expand All @@ -47,6 +46,14 @@ func NewPod(p *api.Pod) Pod {
return &pod{Pod: p}
}

func (p *pod) UID() string {
// Work around for master pod not reporting the right UID.
if hash, ok := p.ObjectMeta.Annotations["kubernetes.io/config.hash"]; ok {
return hash
}
return string(p.ObjectMeta.UID)
}

func (p *pod) ID() string {
return p.ObjectMeta.Namespace + "/" + p.ObjectMeta.Name
}
Expand All @@ -63,14 +70,6 @@ func (p *pod) Created() string {
return p.ObjectMeta.CreationTimestamp.Format(time.RFC822)
}

func (p *pod) ContainerIDs() []string {
ids := []string{}
for _, container := range p.Status.ContainerStatuses {
ids = append(ids, strings.TrimPrefix(container.ContainerID, "docker://"))
}
return ids
}

func (p *pod) Labels() labels.Labels {
return labels.Set(p.ObjectMeta.Labels)
}
Expand All @@ -88,14 +87,13 @@ func (p *pod) NodeName() string {
}

func (p *pod) GetNode(probeID string) report.Node {
n := report.MakeNodeWith(report.MakePodNodeID(p.Namespace(), p.Name()), map[string]string{
PodID: p.ID(),
PodName: p.Name(),
Namespace: p.Namespace(),
PodCreated: p.Created(),
PodContainerIDs: strings.Join(p.ContainerIDs(), " "),
PodState: p.State(),
PodIP: p.Status.PodIP,
n := report.MakeNodeWith(report.MakePodNodeID(p.UID()), map[string]string{
PodID: p.ID(),
PodName: p.Name(),
Namespace: p.Namespace(),
PodCreated: p.Created(),
PodState: p.State(),
PodIP: p.Status.PodIP,
report.ControlProbeID: probeID,
})
if len(p.serviceIDs) > 0 {
Expand Down
39 changes: 21 additions & 18 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,42 @@ func (r *Reporter) podEvent(e Event, pod Pod) {
rpt.Shortcut = true
rpt.Pod.AddNode(
report.MakeNodeWith(
report.MakePodNodeID(pod.Namespace(), pod.Name()),
report.MakePodNodeID(pod.UID()),
map[string]string{PodState: StateDeleted},
),
)
r.probe.Publish(rpt)
}
}

// Tag adds pod parents to container nodes.
func (r *Reporter) Tag(rpt report.Report) (report.Report, error) {
for id, n := range rpt.Container.Nodes {
uid, ok := n.Latest.Lookup(docker.LabelPrefix + "io.kubernetes.pod.uid")
if !ok {
continue
}
rpt.Container.Nodes[id] = n.WithParents(report.EmptySets.Add(
report.Pod,
report.EmptyStringSet.Add(report.MakePodNodeID(uid)),
))
}
return rpt, nil
}

// Report generates a Report containing Container and ContainerImage topologies
func (r *Reporter) Report() (report.Report, error) {
result := report.MakeReport()
serviceTopology, services, err := r.serviceTopology()
if err != nil {
return result, err
}
podTopology, containerTopology, err := r.podTopology(services)
podTopology, err := r.podTopology(services)
if err != nil {
return result, err
}
result.Service = result.Service.Merge(serviceTopology)
result.Pod = result.Pod.Merge(podTopology)
result.Container = result.Container.Merge(containerTopology)
return result, nil
}

Expand Down Expand Up @@ -143,13 +157,12 @@ var GetNodeName = func(r *Reporter) (string, error) {
return nodeName, err
}

func (r *Reporter) podTopology(services []Service) (report.Topology, report.Topology, error) {
func (r *Reporter) podTopology(services []Service) (report.Topology, error) {
var (
pods = report.MakeTopology().
WithMetadataTemplates(PodMetadataTemplates).
WithTableTemplates(PodTableTemplates)
containers = report.MakeTopology()
selectors = map[string]labels.Selector{}
selectors = map[string]labels.Selector{}
)
pods.Controls.AddControl(report.Control{
ID: GetLogs,
Expand All @@ -169,7 +182,7 @@ func (r *Reporter) podTopology(services []Service) (report.Topology, report.Topo

thisNodeName, err := GetNodeName(r)
if err != nil {
return pods, containers, err
return pods, err
}
err = r.client.WalkPods(func(p Pod) error {
if p.NodeName() != thisNodeName {
Expand All @@ -180,18 +193,8 @@ func (r *Reporter) podTopology(services []Service) (report.Topology, report.Topo
p.AddServiceID(serviceID)
}
}
nodeID := report.MakePodNodeID(p.Namespace(), p.Name())
pods = pods.AddNode(p.GetNode(r.probeID))

for _, containerID := range p.ContainerIDs() {
container := report.MakeNodeWith(report.MakeContainerNodeID(containerID), map[string]string{
PodID: p.ID(),
Namespace: p.Namespace(),
docker.ContainerID: containerID,
}).WithParents(report.EmptySets.Add(report.Pod, report.MakeStringSet(nodeID)))
containers.AddNode(container)
}
return nil
})
return pods, containers, err
return pods, err
}
77 changes: 25 additions & 52 deletions probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
TypeMeta: podTypeMeta,
ObjectMeta: api.ObjectMeta{
Name: "pong-a",
UID: "pong-a",
Namespace: "ping",
CreationTimestamp: unversioned.Now(),
Labels: map[string]string{"ponger": "true"},
Expand All @@ -44,6 +45,7 @@ var (
TypeMeta: podTypeMeta,
ObjectMeta: api.ObjectMeta{
Name: "pong-b",
UID: "pong-b",
Namespace: "ping",
CreationTimestamp: unversioned.Now(),
Labels: map[string]string{"ponger": "true"},
Expand Down Expand Up @@ -127,7 +129,8 @@ func (*mockClient) WalkNodes(f func(*api.Node) error) error {
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) {
r, ok := c.logs[report.MakePodNodeID(namespaceID, podName)]
fmt.Println("here", namespaceID, podName)
r, ok := c.logs[namespaceID+";"+podName]
if !ok {
return nil, fmt.Errorf("Not found")
}
Expand Down Expand Up @@ -157,8 +160,8 @@ func TestReporter(t *testing.T) {
return nodeName, nil
}

pod1ID := report.MakePodNodeID("ping", "pong-a")
pod2ID := report.MakePodNodeID("ping", "pong-b")
pod1ID := report.MakePodNodeID("pong-a")
pod2ID := report.MakePodNodeID("pong-b")
serviceID := report.MakeServiceNodeID("ping", "pongservice")
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", nil).Report()

Expand All @@ -169,20 +172,18 @@ func TestReporter(t *testing.T) {
latest map[string]string
}{
{pod1ID, serviceID, map[string]string{
kubernetes.PodID: "ping/pong-a",
kubernetes.PodName: "pong-a",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.PodContainerIDs: "container1 container2",
kubernetes.ServiceIDs: "ping/pongservice",
kubernetes.PodID: "ping/pong-a",
kubernetes.PodName: "pong-a",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.ServiceIDs: "ping/pongservice",
}},
{pod2ID, serviceID, map[string]string{
kubernetes.PodID: "ping/pong-b",
kubernetes.PodName: "pong-b",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.PodContainerIDs: "container3 container4",
kubernetes.ServiceIDs: "ping/pongservice",
kubernetes.PodID: "ping/pong-b",
kubernetes.PodName: "pong-b",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.ServiceIDs: "ping/pongservice",
}},
} {
node, ok := rpt.Pod.Nodes[pod.id]
Expand Down Expand Up @@ -219,34 +220,6 @@ func TestReporter(t *testing.T) {
}
}
}

// Reporter should have tagged the containers
for _, pod := range []struct {
id, nodeID string
containers []string
}{
{"ping/pong-a", pod1ID, []string{"container1", "container2"}},
{"ping/pong-b", pod2ID, []string{"container3", "container4"}},
} {
for _, containerID := range pod.containers {
node, ok := rpt.Container.Nodes[report.MakeContainerNodeID(containerID)]
if !ok {
t.Errorf("Expected report to have container %q, but not found", containerID)
}
// container should have pod id
if have, ok := node.Latest.Lookup(kubernetes.PodID); !ok || have != pod.id {
t.Errorf("Expected container %s latest %q: %q, got %q", containerID, kubernetes.PodID, pod.id, have)
}
// container should have namespace
if have, ok := node.Latest.Lookup(kubernetes.Namespace); !ok || have != "ping" {
t.Errorf("Expected container %s latest %q: %q, got %q", containerID, kubernetes.Namespace, "ping", have)
}
// container should have pod parent
if parents, ok := node.Parents.Lookup(report.Pod); !ok || !parents.Contains(pod.nodeID) {
t.Errorf("Expected container %s to have parent service %q, got %q", containerID, pod.nodeID, parents)
}
}
}
}

type callbackReadCloser struct {
Expand All @@ -269,7 +242,7 @@ func TestReporterGetLogs(t *testing.T) {

// Should error on invalid IDs
{
resp := kubernetes.CapturePod(reporter.GetLogs)(xfer.Request{
resp := reporter.CapturePod(reporter.GetLogs)(xfer.Request{
NodeID: "invalidID",
Control: kubernetes.GetLogs,
})
Expand All @@ -280,39 +253,39 @@ func TestReporterGetLogs(t *testing.T) {

// Should pass through errors from k8s (e.g if pod does not exist)
{
resp := kubernetes.CapturePod(reporter.GetLogs)(xfer.Request{
resp := reporter.CapturePod(reporter.GetLogs)(xfer.Request{
AppID: "appID",
NodeID: report.MakePodNodeID("not", "found"),
NodeID: report.MakePodNodeID("notfound"),
Control: kubernetes.GetLogs,
})
if want := "Not found"; resp.Error != want {
if want := "Pod not found: notfound"; resp.Error != want {
t.Errorf("Expected error on invalid ID: %q, got %q", want, resp.Error)
}
}

pod1ID := report.MakePodNodeID("ping", "pong-a")
podNamespaceAndID := "ping;pong-a"
pod1Request := xfer.Request{
AppID: "appID",
NodeID: pod1ID,
NodeID: report.MakePodNodeID("pong-a"),
Control: kubernetes.GetLogs,
}

// Inject our logs content, and watch for it to be closed
closed := false
wantContents := "logs: ping/pong-a"
client.logs[pod1ID] = &callbackReadCloser{Reader: strings.NewReader(wantContents), close: func() error {
client.logs[podNamespaceAndID] = &callbackReadCloser{Reader: strings.NewReader(wantContents), close: func() error {
closed = true
return nil
}}

// Should create a new pipe for the stream
resp := kubernetes.CapturePod(reporter.GetLogs)(pod1Request)
resp := reporter.CapturePod(reporter.GetLogs)(pod1Request)
if resp.Pipe == "" {
t.Errorf("Expected pipe id to be returned, but got %#v", resp)
}
pipe, ok := pipes[resp.Pipe]
if !ok {
t.Errorf("Expected pipe %q to have been created, but wasn't", resp.Pipe)
t.Fatalf("Expected pipe %q to have been created, but wasn't", resp.Pipe)
}

// Should push logs from k8s client into the pipe
Expand Down
5 changes: 5 additions & 0 deletions probe/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (

// Service represents a Kubernetes service
type Service interface {
UID() string
ID() string
Name() string
Namespace() string
Expand All @@ -36,6 +37,10 @@ func NewService(s *api.Service) Service {
return &service{Service: s}
}

func (s *service) UID() string {
return string(s.ObjectMeta.UID)
}

func (s *service) ID() string {
return s.ObjectMeta.Namespace + "/" + s.ObjectMeta.Name
}
Expand Down
1 change: 1 addition & 0 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func probeMain(flags probeFlags) {
reporter := kubernetes.NewReporter(client, clients, probeID, p)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
} else {
log.Errorf("Kubernetes: failed to start client: %v", err)
log.Errorf("Kubernetes: make sure to run Scope inside a POD with a service account or provide a valid kubernetes.api url")
Expand Down
Loading

0 comments on commit 671b4dd

Please sign in to comment.