Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new k8s go client #2659

Merged
merged 7 commits into from
Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
The diff you're trying to view is too large. We only load the first 3000 changed files.
15 changes: 8 additions & 7 deletions app/api_topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

"github.com/gorilla/mux"
"github.com/ugorji/go/codec"
"k8s.io/kubernetes/pkg/api"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiv1 "k8s.io/client-go/pkg/api/v1"

"github.com/weaveworks/common/test"
"github.com/weaveworks/scope/app"
Expand Down Expand Up @@ -205,21 +206,21 @@ func TestAPITopologyAddsKubernetes(t *testing.T) {
// Enable the kubernetes topologies
rpt := report.MakeReport()
rpt.Pod = report.MakeTopology()
rpt.Pod.Nodes[fixture.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{
ObjectMeta: api.ObjectMeta{
rpt.Pod.Nodes[fixture.ClientPodNodeID] = kubernetes.NewPod(&apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pong-a",
Namespace: "ping",
Labels: map[string]string{"ponger": "true"},
},
Status: api.PodStatus{
Status: apiv1.PodStatus{
HostIP: "1.2.3.4",
ContainerStatuses: []api.ContainerStatus{
ContainerStatuses: []apiv1.ContainerStatus{
{ContainerID: "container1"},
{ContainerID: "container2"},
},
},
Spec: api.PodSpec{
SecurityContext: &api.PodSecurityContext{},
Spec: apiv1.PodSpec{
SecurityContext: &apiv1.PodSecurityContext{},
},
}).GetNode("")
buf := &bytes.Buffer{}
Expand Down
170 changes: 69 additions & 101 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@ package kubernetes
import (
"fmt"
"io"
"strconv"
"sync"
"time"

"github.com/weaveworks/common/backoff"

log "github.com/Sirupsen/logrus"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
apiv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

// Client keeps track of running kubernetes pods and services
Expand All @@ -30,7 +29,7 @@ type Client interface {
WalkReplicaSets(f func(ReplicaSet) error) error
WalkDaemonSets(f func(DaemonSet) error) error
WalkReplicationControllers(f func(ReplicationController) error) error
WalkNodes(f func(*api.Node) error) error
WalkNodes(f func(*apiv1.Node) error) error

WatchPods(f func(Event, Pod))

Expand All @@ -43,15 +42,14 @@ type Client interface {
type client struct {
quit chan struct{}
resyncPeriod time.Duration
client *unversioned.Client
extensionsClient *unversioned.ExtensionsClient
podStore *cache.StoreToPodLister
serviceStore *cache.StoreToServiceLister
deploymentStore *cache.StoreToDeploymentLister
replicaSetStore *cache.StoreToReplicaSetLister
daemonSetStore *cache.StoreToDaemonSetLister
replicationControllerStore *cache.StoreToReplicationControllerLister
nodeStore *cache.StoreToNodeLister
client *kubernetes.Clientset
podStore cache.Store
serviceStore cache.Store
deploymentStore cache.Store
replicaSetStore cache.Store
daemonSetStore cache.Store
replicationControllerStore cache.Store
nodeStore cache.Store

podWatchesMutex sync.Mutex
podWatches []func(Event, Pod)
Expand Down Expand Up @@ -94,14 +92,14 @@ type ClientConfig struct {

// NewClient returns a usable Client. Don't forget to Stop it.
func NewClient(config ClientConfig) (Client, error) {
var restConfig *restclient.Config
var restConfig *rest.Config
if config.Server == "" && config.Kubeconfig == "" {
// If no API server address or kubeconfig was provided, assume we are running
// inside a pod. Try to connect to the API server through its
// Service environment variables, using the default Service
// Account Token.
var err error
if restConfig, err = restclient.InClusterConfig(); err != nil {
if restConfig, err = rest.InClusterConfig(); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -135,45 +133,40 @@ func NewClient(config ClientConfig) (Client, error) {
}
log.Infof("kubernetes: targeting api server %s", restConfig.Host)

c, err := unversioned.New(restConfig)
if err != nil {
return nil, err
}

ec, err := unversioned.NewExtensions(restConfig)
c, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}

result := &client{
quit: make(chan struct{}),
resyncPeriod: config.Interval,
client: c,
extensionsClient: ec,
quit: make(chan struct{}),
resyncPeriod: config.Interval,
client: c,
}

podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = &cache.StoreToPodLister{Store: result.setupStore(c, "pods", &api.Pod{}, podStore)}
result.serviceStore = &cache.StoreToServiceLister{Store: result.setupStore(c, "services", &api.Service{}, nil)}
result.replicationControllerStore = &cache.StoreToReplicationControllerLister{Store: result.setupStore(c, "replicationcontrollers", &api.ReplicationController{}, nil)}
result.nodeStore = &cache.StoreToNodeLister{Store: result.setupStore(c, "nodes", &api.Node{}, nil)}
result.podStore = result.setupStore(c.CoreV1Client.RESTClient(), "pods", &apiv1.Pod{}, podStore)

result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil)
result.replicationControllerStore = result.setupStore(c.CoreV1Client.RESTClient(), "replicationcontrollers", &apiv1.ReplicationController{}, nil)
result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil)

// We list deployments here to check if this version of kubernetes is >= 1.2.
// We would use NegotiateVersion, but Kubernetes 1.1 "supports"
// extensions/v1beta1, but not deployments, replicasets or daemonsets.
if _, err := ec.Deployments(api.NamespaceAll).List(api.ListOptions{}); err != nil {
if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("Deployments, ReplicaSets and DaemonSets are not supported by this Kubernetes version: %v", err)
} else {
result.deploymentStore = &cache.StoreToDeploymentLister{Store: result.setupStore(ec, "deployments", &extensions.Deployment{}, nil)}
result.replicaSetStore = &cache.StoreToReplicaSetLister{Store: result.setupStore(ec, "replicasets", &extensions.ReplicaSet{}, nil)}
result.daemonSetStore = &cache.StoreToDaemonSetLister{Store: result.setupStore(ec, "daemonsets", &extensions.DaemonSet{}, nil)}
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiv1beta1.Deployment{}, nil)
result.replicaSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "replicasets", &apiv1beta1.ReplicaSet{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiv1beta1.DaemonSet{}, nil)
}

return result, nil
}

func (c *client) setupStore(kclient cache.Getter, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
lw := cache.NewListWatchFromClient(kclient, resource, api.NamespaceAll, fields.Everything())
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
store := nonDefaultStore
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
Expand All @@ -192,16 +185,13 @@ func (c *client) triggerPodWatches(e Event, pod interface{}) {
c.podWatchesMutex.Lock()
defer c.podWatchesMutex.Unlock()
for _, watch := range c.podWatches {
watch(e, NewPod(pod.(*api.Pod)))
watch(e, NewPod(pod.(*apiv1.Pod)))
}
}

func (c *client) WalkPods(f func(Pod) error) error {
pods, err := c.podStore.List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods {
for _, m := range c.podStore.List() {
pod := m.(*apiv1.Pod)
if err := f(NewPod(pod)); err != nil {
return err
}
Expand All @@ -210,12 +200,9 @@ func (c *client) WalkPods(f func(Pod) error) error {
}

func (c *client) WalkServices(f func(Service) error) error {
list, err := c.serviceStore.List()
if err != nil {
return err
}
for i := range list.Items {
if err := f(NewService(&(list.Items[i]))); err != nil {
for _, m := range c.serviceStore.List() {
s := m.(*apiv1.Service)
if err := f(NewService(s)); err != nil {
return err
}
}
Expand All @@ -226,12 +213,9 @@ func (c *client) WalkDeployments(f func(Deployment) error) error {
if c.deploymentStore == nil {
return nil
}
list, err := c.deploymentStore.List()
if err != nil {
return err
}
for i := range list {
if err := f(NewDeployment(&(list[i]))); err != nil {
for _, m := range c.deploymentStore.List() {
d := m.(*apiv1beta1.Deployment)
if err := f(NewDeployment(d)); err != nil {
return err
}
}
Expand All @@ -243,12 +227,9 @@ func (c *client) WalkReplicaSets(f func(ReplicaSet) error) error {
if c.replicaSetStore == nil {
return nil
}
list, err := c.replicaSetStore.List()
if err != nil {
return err
}
for i := range list {
if err := f(NewReplicaSet(&(list[i]))); err != nil {
for _, m := range c.replicaSetStore.List() {
rs := m.(*apiv1beta1.ReplicaSet)
if err := f(NewReplicaSet(rs)); err != nil {
return err
}
}
Expand All @@ -258,12 +239,9 @@ func (c *client) WalkReplicaSets(f func(ReplicaSet) error) error {

// WalkReplicationcontrollers calls f for each replication controller
func (c *client) WalkReplicationControllers(f func(ReplicationController) error) error {
list, err := c.replicationControllerStore.List()
if err != nil {
return err
}
for i := range list {
if err := f(NewReplicationController(&(list[i]))); err != nil {
for _, m := range c.replicationControllerStore.List() {
rc := m.(*apiv1.ReplicationController)
if err := f(NewReplicationController(rc)); err != nil {
return err
}
}
Expand All @@ -275,64 +253,54 @@ func (c *client) WalkDaemonSets(f func(DaemonSet) error) error {
if c.daemonSetStore == nil {
return nil
}
list, err := c.daemonSetStore.List()
if err != nil {
return err
}
for i := range list.Items {
if err := f(NewDaemonSet(&(list.Items[i]))); err != nil {
for _, m := range c.daemonSetStore.List() {
ds := m.(*apiv1beta1.DaemonSet)
if err := f(NewDaemonSet(ds)); err != nil {
return err
}
}
return nil
}

func (c *client) WalkNodes(f func(*api.Node) error) error {
list, err := c.nodeStore.List()
if err != nil {
return err
}
for i := range list.Items {
if err := f(&(list.Items[i])); err != nil {
func (c *client) WalkNodes(f func(*apiv1.Node) error) error {
for _, m := range c.nodeStore.List() {
node := m.(*apiv1.Node)
if err := f(node); err != nil {
return err
}
}
return nil
}

func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) {
return c.client.RESTClient.Get().
Namespace(namespaceID).
Name(podID).
Resource("pods").
SubResource("log").
Param("follow", strconv.FormatBool(true)).
Param("previous", strconv.FormatBool(false)).
Param("timestamps", strconv.FormatBool(true)).
Stream()
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
},
)
return req.Stream()
}

func (c *client) DeletePod(namespaceID, podID string) error {
return c.client.RESTClient.Delete().
Namespace(namespaceID).
Name(podID).
Resource("pods").Do().Error()
return c.client.CoreV1().Pods(namespaceID).Delete(podID, &metav1.DeleteOptions{})
}

func (c *client) ScaleUp(resource, namespaceID, id string) error {
return c.modifyScale(resource, namespaceID, id, func(scale *extensions.Scale) {
return c.modifyScale(resource, namespaceID, id, func(scale *apiv1beta1.Scale) {
scale.Spec.Replicas++
})
}

func (c *client) ScaleDown(resource, namespaceID, id string) error {
return c.modifyScale(resource, namespaceID, id, func(scale *extensions.Scale) {
return c.modifyScale(resource, namespaceID, id, func(scale *apiv1beta1.Scale) {
scale.Spec.Replicas--
})
}

func (c *client) modifyScale(resource, namespace, id string, f func(*extensions.Scale)) error {
scaler := c.extensionsClient.Scales(namespace)
func (c *client) modifyScale(resource, namespace, id string, f func(*apiv1beta1.Scale)) error {
scaler := c.client.Extensions().Scales(namespace)
scale, err := scaler.Get(resource, id)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions probe/kubernetes/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package kubernetes
import (
"fmt"

"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
apiv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1"

"github.com/weaveworks/scope/report"
)
Expand All @@ -23,20 +23,20 @@ type DaemonSet interface {
}

type daemonSet struct {
*extensions.DaemonSet
*apiv1beta1.DaemonSet
Meta
}

// NewDaemonSet creates a new daemonset
func NewDaemonSet(d *extensions.DaemonSet) DaemonSet {
func NewDaemonSet(d *apiv1beta1.DaemonSet) DaemonSet {
return &daemonSet{
DaemonSet: d,
Meta: meta{d.ObjectMeta},
}
}

func (d *daemonSet) Selector() (labels.Selector, error) {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, err
}
Expand Down
Loading