Skip to content

Commit

Permalink
Copied portforwarding from the fission cli to avoid the need for FISS…
Browse files Browse the repository at this point in the history
…ION_URL
  • Loading branch information
erwinvaneyk committed Mar 22, 2018
1 parent 17d756c commit fd894c3
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# To run (from repo root): docker build -t fission -f ./build/Dockerfile .
ARG NOBUILD
ARG GOLANG_VERSION=1.9.4
ARG GOLANG_VERSION=1.10.0
FROM golang:$GOLANG_VERSION AS builder

WORKDIR /go/src/github.com/fission/fission-workflows
Expand Down
14 changes: 13 additions & 1 deletion cmd/wfcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ import (

// This is a prototype of the CLI (and will be integrated into the Fission CLI eventually).
func main() {
// fetch the FISSION_URL env variable. If not set, port-forward to controller.
var value string
fissionUrl := os.Getenv("FISSION_URL")
if len(fissionUrl) == 0 {
fissionNamespace := getFissionNamespace()
kubeConfig := getKubeConfigPath()
localPort := setupPortForward(kubeConfig, fissionNamespace, "application=fission-api")
value = "http://127.0.0.1:" + localPort
} else {
value = fissionUrl
}

app := cli.NewApp()
app.Author = "Erwin van Eyk"
app.Email = "[email protected]"
Expand All @@ -24,7 +36,7 @@ func main() {
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "url, u",
Value: "http://localhost:31313",
Value: value,
EnvVar: "FISSION_URL",
Usage: "Url to the Fission apiserver",
},
Expand Down
186 changes: 186 additions & 0 deletions cmd/wfcli/portforward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package main

import (
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"

meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
)

func getFissionNamespace() string {
fissionNamespace := os.Getenv("FISSION_NAMESPACE")
return fissionNamespace
}

func getKubeConfigPath() string {
kubeConfig := os.Getenv("KUBECONFIG")
if len(kubeConfig) == 0 {
home := os.Getenv("HOME")
kubeConfig = filepath.Join(home, ".kube", "config")

if _, err := os.Stat(kubeConfig); os.IsNotExist(err) {
panic("Couldn't find kubeconfig file. Set the KUBECONFIG environment variable to your kubeconfig's path.")
}
}
return kubeConfig
}

func findFreePort() (string, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
return "", err
}

port := strconv.Itoa(listener.Addr().(*net.TCPAddr).Port)
file, err := listener.(*net.TCPListener).File()
if err != nil {
return "", nil
}

err = listener.Close()
if err != nil {
return "", err
}

err = file.Close()
if err != nil {
return "", err
}

return port, nil
}

// runPortForward creates a local port forward to the specified pod
func runPortForward(kubeConfig string, labelSelector string, localPort string, fissionNamespace string) error {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
panic(fmt.Sprintf("Failed to connect to Kubernetes: %s", err))
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("Failed to connect to Kubernetes: %s", err))
}

// if fission namespace is unset, try to find a fission pod in any namespace
if len(fissionNamespace) == 0 {
fissionNamespace = meta_v1.NamespaceAll
}

// get the pod; if there is more than one, ask the user to disambiguate
podList, err := clientset.CoreV1().Pods(fissionNamespace).
List(meta_v1.ListOptions{LabelSelector: labelSelector})
if err != nil || len(podList.Items) == 0 {
panic("Error getting controller pod for port-forwarding")
}

// make a useful error message if there is more than one install
if len(podList.Items) > 1 {
namespaces := make([]string, 0)
for _, p := range podList.Items {
namespaces = append(namespaces, p.Namespace)
}
panic(fmt.Sprintf("Found %v fission installs, set FISSION_NAMESPACE to one of: %v",
len(podList.Items), strings.Join(namespaces, " ")))
}

// pick the first pod
podName := podList.Items[0].Name
podNameSpace := podList.Items[0].Namespace

// get the service and the target port
svcs, err := clientset.CoreV1().Services(podNameSpace).
List(meta_v1.ListOptions{LabelSelector: labelSelector})
if err != nil {
panic(fmt.Sprintf("Error getting %v service :%v", labelSelector, err.Error()))
}
if len(svcs.Items) == 0 {
panic(fmt.Sprintf("Service %v not found", labelSelector))
}
service := &svcs.Items[0]

var targetPort string
for _, servicePort := range service.Spec.Ports {
targetPort = servicePort.TargetPort.String()
}

stopChannel := make(chan struct{}, 1)
readyChannel := make(chan struct{})

// create request URL
req := clientset.CoreV1Client.RESTClient().Post().Resource("pods").
Namespace(podNameSpace).Name(podName).SubResource("portforward")
url := req.URL()

// create ports slice
portCombo := localPort + ":" + targetPort
ports := []string{portCombo}

// actually start the port-forwarding process here
dialer, err := remotecommand.NewExecutor(config, "POST", url)
if err != nil {
msg := fmt.Sprintf("newexecutor errored out :%v", err.Error())
panic(msg)
}

fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, nil, os.Stderr)
if err != nil {
msg := fmt.Sprintf("portforward.new errored out :%v", err.Error())
panic(msg)
}

return fw.ForwardPorts()
}

// Port forward a free local port to a pod on the cluster. The pod is
// found in the specified namespace by labelSelector. The pod's port
// is found by looking for a service in the same namespace and using
// its targetPort. Once the port forward is started, wait for it to
// start accepting connections before returning.
func setupPortForward(kubeConfig, namespace, labelSelector string) string {
localPort, err := findFreePort()
if err != nil {
panic(fmt.Sprintf("Error finding unused port :%v", err.Error()))
}

for {
conn, _ := net.DialTimeout("tcp",
net.JoinHostPort("", localPort), time.Millisecond)
if conn != nil {
conn.Close()
} else {
break
}
time.Sleep(time.Millisecond * 50)
}

go func() {
err := runPortForward(kubeConfig, labelSelector, localPort, namespace)
if err != nil {
panic(fmt.Sprintf("Error forwarding to controller port: %s", err.Error()))
}
}()

for {
conn, _ := net.DialTimeout("tcp",
net.JoinHostPort("", localPort), time.Millisecond)
if conn != nil {
conn.Close()
break
}
time.Sleep(time.Millisecond * 50)
}

return localPort
}
17 changes: 14 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions hack/codegen-swagger-client.sh

This file was deleted.

2 changes: 1 addition & 1 deletion test/e2e/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ dump_system_info() {
echo "--- fission ---"
fission -v
echo "--- wfcli ---"
wfcli -v
wfcli version
echo "--- End System Info ---"
}

Expand Down

0 comments on commit fd894c3

Please sign in to comment.