-
Notifications
You must be signed in to change notification settings - Fork 303
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Move most of logic from main into app
We can write unit tests for app, but not for code defined in the main package
- Loading branch information
Showing
6 changed files
with
561 additions
and
424 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package app | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/golang/glog" | ||
|
||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api" | ||
"k8s.io/ingress-gce/pkg/utils" | ||
"k8s.io/kubernetes/pkg/cloudprovider" | ||
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce" | ||
) | ||
|
||
const ( | ||
// Sleep interval to retry cloud client creation. | ||
cloudClientRetryInterval = 10 * time.Second | ||
) | ||
|
||
func NewKubeClient() (kubernetes.Interface, error) { | ||
var err error | ||
var config *rest.Config | ||
|
||
if Flags.InCluster { | ||
if config, err = rest.InClusterConfig(); err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
if Flags.APIServerHost == "" { | ||
return nil, fmt.Errorf("please specify the api server address using the flag --apiserver-host") | ||
} | ||
|
||
config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( | ||
&clientcmd.ClientConfigLoadingRules{ExplicitPath: Flags.KubeConfigFile}, | ||
&clientcmd.ConfigOverrides{ | ||
ClusterInfo: clientcmdapi.Cluster{ | ||
Server: Flags.APIServerHost, | ||
}, | ||
}).ClientConfig() | ||
if err != nil { | ||
glog.Fatalf("error creating client configuration: %v", err) | ||
} | ||
} | ||
|
||
return kubernetes.NewForConfig(config) | ||
} | ||
|
||
func NewGCEClient(config io.Reader) *gce.GCECloud { | ||
getConfigReader := func() io.Reader { return nil } | ||
|
||
if config != nil { | ||
allConfig, err := ioutil.ReadAll(config) | ||
if err != nil { | ||
glog.Fatalf("Error while reading entire config: %v", err) | ||
} | ||
glog.V(2).Infof("Using cloudprovider config file:\n%v ", string(allConfig)) | ||
|
||
getConfigReader = func() io.Reader { | ||
return bytes.NewReader(allConfig) | ||
} | ||
} else { | ||
glog.V(2).Infof("No cloudprovider config file provided, using default values.") | ||
} | ||
|
||
// Creating the cloud interface involves resolving the metadata server to get | ||
// an oauth token. If this fails, the token provider assumes it's not on GCE. | ||
// No errors are thrown. So we need to keep retrying till it works because | ||
// we know we're on GCE. | ||
for { | ||
cloudInterface, err := cloudprovider.GetCloudProvider("gce", getConfigReader()) | ||
if err == nil { | ||
cloud := cloudInterface.(*gce.GCECloud) | ||
|
||
// If this controller is scheduled on a node without compute/rw | ||
// it won't be allowed to list backends. We can assume that the | ||
// user has no need for Ingress in this case. If they grant | ||
// permissions to the node they will have to restart the controller | ||
// manually to re-create the client. | ||
// TODO: why do we bail with success out if there is a permission error??? | ||
if _, err = cloud.ListGlobalBackendServices(); err == nil || utils.IsHTTPErrorCode(err, http.StatusForbidden) { | ||
return cloud | ||
} | ||
glog.Warningf("Failed to list backend services, retrying: %v", err) | ||
} else { | ||
glog.Warningf("Failed to retrieve cloud interface, retrying: %v", err) | ||
} | ||
time.Sleep(cloudClientRetryInterval) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package app | ||
|
||
import ( | ||
"flag" | ||
"time" | ||
|
||
"k8s.io/api/core/v1" | ||
"k8s.io/ingress-gce/pkg/controller" | ||
) | ||
|
||
var ( | ||
// Flags are the command line flags. | ||
Flags = struct { | ||
APIServerHost string | ||
ClusterName string | ||
ConfigFilePath string | ||
DefaultSvc string | ||
DeleteAllOnQuit bool | ||
HealthCheckPath string | ||
HealthzPort int | ||
InCluster bool | ||
KubeConfigFile string | ||
ResyncPeriod time.Duration | ||
UseRealCloud bool | ||
Verbose bool | ||
WatchNamespace string | ||
}{} | ||
) | ||
|
||
func init() { | ||
flag.StringVar(&Flags.APIServerHost, "apiserver-host", "", | ||
`The address of the Kubernetes Apiserver to connect to in the format of | ||
protocol://address:port, e.g., http://localhost:8080. If not specified, the | ||
assumption is that the binary runs inside a Kubernetes cluster and local | ||
discovery is attempted.`) | ||
flag.StringVar(&Flags.ClusterName, "cluster-uid", controller.DefaultClusterUID, | ||
`Optional, used to tag cluster wide, shared loadbalancer resources such | ||
as instance groups. Use this flag if you'd like to continue using the same | ||
resources across a pod restart. Note that this does not need to match the name | ||
of you Kubernetes cluster, it's just an arbitrary name used to tag/lookup cloud | ||
resources.`) | ||
flag.StringVar(&Flags.ConfigFilePath, "config-file-path", "", | ||
`Path to a file containing the gce config. If left unspecified this | ||
controller only works with default zones.`) | ||
flag.StringVar(&Flags.DefaultSvc, "default-backend-service", "kube-system/default-http-backend", | ||
`Service used to serve a 404 page for the default backend. Takes the | ||
form namespace/name. The controller uses the first node port of this Service for | ||
the default backend.`) | ||
flag.BoolVar(&Flags.DeleteAllOnQuit, "delete-all-on-quit", false, | ||
`If true, the controller will delete all Ingress and the associated | ||
external cloud resources as it's shutting down. Mostly used for testing. In | ||
normal environments the controller should only delete a loadbalancer if the | ||
associated Ingress is deleted.`) | ||
flag.StringVar(&Flags.HealthCheckPath, "health-check-path", "/", | ||
`Path used to health-check a backend service. All Services must serve a | ||
200 page on this path. Currently this is only configurable globally.`) | ||
flag.IntVar(&Flags.HealthzPort, "healthz-port", 8081, | ||
`Port to run healthz server. Must match the health check port in yaml.`) | ||
flag.BoolVar(&Flags.InCluster, "running-in-cluster", true, | ||
`Optional, if this controller is running in a kubernetes cluster, use | ||
the pod secrets for creating a Kubernetes client.`) | ||
flag.StringVar(&Flags.KubeConfigFile, "kubeconfig", "", | ||
`Path to kubeconfig file with authorization and master location information.`) | ||
flag.DurationVar(&Flags.ResyncPeriod, "sync-period", 30*time.Second, | ||
`Relist and confirm cloud resources this often.`) | ||
// TODO: Consolidate this flag and running-in-cluster. People already use | ||
// the first one to mean "running in dev", unfortunately. | ||
flag.BoolVar(&Flags.UseRealCloud, "use-real-cloud", false, | ||
`Optional, if set a real cloud client is created. Only matters with | ||
--running-in-cluster=false, i.e a real cloud is always used when this controller | ||
is running on a Kubernetes node.`) | ||
flag.BoolVar(&Flags.Verbose, "verbose", false, | ||
`If true, logs are displayed at V(4), otherwise V(2).`) | ||
flag.StringVar(&Flags.WatchNamespace, "watch-namespace", v1.NamespaceAll, | ||
`Namespace to watch for Ingress/Services/Endpoints.`) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package app | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
|
||
"github.com/golang/glog" | ||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
"k8s.io/ingress-gce/pkg/controller" | ||
) | ||
|
||
func RunHTTPServer(lbc *controller.LoadBalancerController) { | ||
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { | ||
if err := lbc.CloudClusterManager.IsHealthy(); err != nil { | ||
w.WriteHeader(500) | ||
w.Write([]byte(fmt.Sprintf("Cluster unhealthy: %v", err))) | ||
return | ||
} | ||
w.WriteHeader(200) | ||
w.Write([]byte("ok")) | ||
}) | ||
http.Handle("/metrics", promhttp.Handler()) | ||
http.HandleFunc("/delete-all-and-quit", func(w http.ResponseWriter, r *http.Request) { | ||
// TODO: Retry failures during shutdown. | ||
lbc.Stop(true) | ||
}) | ||
|
||
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", Flags.HealthzPort), nil)) | ||
} | ||
|
||
func RunSIGTERMHandler(lbc *controller.LoadBalancerController, deleteAll bool) { | ||
// Multiple SIGTERMs will get dropped | ||
signalChan := make(chan os.Signal, 1) | ||
signal.Notify(signalChan, syscall.SIGTERM) | ||
<-signalChan | ||
glog.Infof("Received SIGTERM, shutting down") | ||
|
||
// TODO: Better retries than relying on restartPolicy. | ||
exitCode := 0 | ||
if err := lbc.Stop(deleteAll); err != nil { | ||
glog.Infof("Error during shutdown %v", err) | ||
exitCode = 1 | ||
} | ||
glog.Infof("Exiting with %v", exitCode) | ||
os.Exit(exitCode) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package app | ||
|
||
import ( | ||
"strings" | ||
"time" | ||
|
||
"github.com/golang/glog" | ||
|
||
"k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/apimachinery/pkg/util/intstr" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/ingress-gce/pkg/annotations" | ||
"k8s.io/ingress-gce/pkg/backends" | ||
) | ||
|
||
func DefaultBackendServicePort(kubeClient kubernetes.Interface) *backends.ServicePort { | ||
// TODO: make this not fatal | ||
if Flags.DefaultSvc == "" { | ||
glog.Fatalf("Please specify --default-backend") | ||
} | ||
|
||
// Wait for the default backend Service. There's no pretty way to do this. | ||
parts := strings.Split(Flags.DefaultSvc, "/") | ||
if len(parts) != 2 { | ||
glog.Fatalf("Default backend should take the form namespace/name: %v", | ||
Flags.DefaultSvc) | ||
} | ||
port, nodePort, err := getNodePort(kubeClient, parts[0], parts[1]) | ||
if err != nil { | ||
glog.Fatalf("Could not configure default backend %v: %v", | ||
Flags.DefaultSvc, err) | ||
} | ||
|
||
return &backends.ServicePort{ | ||
Port: int64(nodePort), | ||
Protocol: annotations.ProtocolHTTP, // The default backend is HTTP. | ||
SvcName: types.NamespacedName{Namespace: parts[0], Name: parts[1]}, | ||
SvcPort: intstr.FromInt(int(port)), | ||
} | ||
} | ||
|
||
// getNodePort waits for the Service, and returns its first node port. | ||
func getNodePort(client kubernetes.Interface, ns, name string) (port, nodePort int32, err error) { | ||
glog.V(2).Infof("Waiting for %v/%v", ns, name) | ||
|
||
var svc *v1.Service | ||
wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) { | ||
svc, err = client.Core().Services(ns).Get(name, metav1.GetOptions{}) | ||
if err != nil { | ||
return false, nil | ||
} | ||
for _, p := range svc.Spec.Ports { | ||
if p.NodePort != 0 { | ||
port = p.Port | ||
nodePort = p.NodePort | ||
glog.V(3).Infof("Node port %v", nodePort) | ||
break | ||
} | ||
} | ||
return true, nil | ||
}) | ||
return | ||
} |
Oops, something went wrong.