Download binary from [releases]( +2. Add it to your `PATH` + +## Usage +When the plugin binary is found from `PATH` you can just execute it through `kubectl` +```shell +kubectl warp --help +``` + +## Development +### Prerequisites +- Golang v1.11 +- [Go mod enabled]( + +### Build and run locally +```shell +go run ./main.go --image alpine -- ls -la + +# Syncs your local files to Kubernetes and list the files +``` + +### Build and install locally +```shell +go install . + +# Now you can use `kubectl` +kubectl warp --help +``` diff --git a/cmd/output.go b/cmd/output.go new file mode 100644 index 0000000..93d3745 --- /dev/null +++ b/cmd/output.go @@ -0,0 +1,24 @@ +package cmd + +import ( + "io" + + "" +) + +// logOutput logs output from opts to the pods log. +func logOutput(client *kubectl.Client, namespace, pod, containerName string, stdout io.Writer) error { + request, err := client.GetLogs(namespace, pod, containerName) + if err != nil { + return err + } + + readCloser, err := request.Stream() + if err != nil { + return err + } + defer readCloser.Close() + + _, err = io.Copy(stdout, readCloser) + return err +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..954e2c4 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,203 @@ +// Copyright © 2018 ERNO AAPA +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "fmt" + "os" + "os/signal" + "strings" + "time" + + "" + + "" + "" + "" + "" + "" + "" + apiv1 "" +) + +type runOptions struct { + Image string + Stdin bool + TTY bool + RsyncArgs string + Includes []string + Excludes []string +} + +var configFlags = genericclioptions.NewConfigFlags() +var opt = runOptions{} +var workDir = "/work-dir" +var devNull = utils.DevNull(0) + +var rootCmd = &cobra.Command{ + Use: "warp", + Short: "Transfer local files and run command in container", + Long: `Start Pod and syncs local files to Pod and executes command +along with the synchronized files.`, + RunE: func(_ *cobra.Command, args []string) error { + stopChannel := make(chan struct{}, 1) + // ctrl+c signal + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + defer signal.Stop(signals) + + go func() { + <-signals + close(stopChannel) + }() + if len(args) < 1 { + return errors.New("NAME is required for warp") + } + var ( + name = args[0] + cmd = args[1:] + stdin = os.Stdin + stdout = os.Stdout + stderr = os.Stderr + containerName = "exec" // TODO + ) + + privateKey, publicKey, err := cert.Create() + if err != nil { + return err + } + + privateKeyFile, err := utils.CreateTempFile(privateKey) + if err != nil { + return err + } + defer os.Remove(privateKeyFile) + + if !opt.Stdin { + stdin = nil + } + + ns, _, err := configFlags.ToRawKubeConfigLoader().Namespace() + if err != nil { + return err + } + + config, err := configFlags.ToRESTConfig() + if err != nil { + return err + } + kubectl.SetKubernetesDefaults(config) + + c := kubectl.NewClient(config) + + fmt.Fprintln(stderr, "Create the Pod") + _, err = c.CreatePod(ns, name, opt.Image, cmd, workDir, opt.TTY, opt.Stdin, publicKey) + if err != nil { + return err + } + defer c.DeletePod(ns, name) + + _, err = c.WaitForPod(ns, name, kubectl.PodInitReady) + if err != nil && err != kubectl.ErrPodCompleted { + return err + } + + // Because init container doesn't support readinessProbe, we must wait a small moment so sshd is listening the port + // otherwise sometimes we get error "Connection refused" from the port 22 + time.Sleep(100 * time.Millisecond) + + // Until this bug is fixed, we cannot use 0 to make the PortForwarder to pick random port + // + randomPort := utils.MustResolveRandomPort() + readyChannel := make(chan struct{}, 1) + + fmt.Fprintln(stderr, "Open connection to the Pod") + f, err := kubectl.PreparePortForward(config, ns, name, []string{fmt.Sprintf("%d:%d", randomPort, 22)}, stopChannel, readyChannel, devNull, stderr) + if err != nil { + return err + } + go f.ForwardPorts() + + // Wait until port forwarding is ready + <-readyChannel + + fmt.Fprintln(stderr, "Sync initial files to the Pod") + s := sync.NewRsync(randomPort, strings.Split(opt.RsyncArgs, " "), privateKeyFile, devNull, devNull) + if err := s.Sync(fmt.Sprintf("root@localhost:%s", workDir), opt.Includes, opt.Excludes); err != nil { + return err + } + + pod, err := c.WaitForPod(ns, name, kubectl.ContainerRunning(containerName)) + if err != nil { + if err == kubectl.ErrPodCompleted { + fmt.Fprintf(stderr, "Pod %s execution container were already completed. Print logs out\n", name) + return logOutput(c, ns, name, containerName, stdout) + } + return err + } + if pod.Status.Phase == apiv1.PodSucceeded || pod.Status.Phase == apiv1.PodFailed { + fmt.Fprintf(stderr, "Pod %s were already completed. Print logs to stdout\n", name) + return logOutput(c, ns, name, containerName, stdout) + } + + go func() { + if _, err := c.WaitForPod(ns, name, kubectl.ContainerRunning("sync")); err != nil { + fmt.Fprintf(stderr, "Error while waiting sync container to be started: %s\n", err) + return + } + + fmt.Fprintln(stderr, "Start background file sync") + for { + select { + case <-time.After(1 * time.Second): + if err := s.Sync(fmt.Sprintf("root@localhost:%s", workDir), opt.Includes, opt.Excludes); err != nil { + fmt.Fprintf(stderr, "sync Failed: %s\n", err) + } + case <-stopChannel: + fmt.Fprintf(stderr, "sync: Stop %s syncing\n", name) + return + } + } + }() + + return c.Attach(ns, name, containerName, stdin, stdout, stderr, opt.TTY) + }, + // We handle errors at root.go + SilenceUsage: true, + SilenceErrors: true, +} + +func init() { + configFlags.AddFlags(rootCmd.Flags()) + + rootCmd.Flags().StringVar(&opt.Image, "image", opt.Image, "The image for the container to run.") + rootCmd.MarkFlagRequired("image") + rootCmd.Flags().StringVar(&opt.RsyncArgs, "rsync-args", "--recursive --times --links --devices --specials", "Space separated arguments for the rsync command") + rootCmd.Flags().BoolVarP(&opt.Stdin, "stdin", "i", opt.Stdin, "Pass stdin to the container") + rootCmd.Flags().BoolVarP(&opt.TTY, "tty", "t", opt.TTY, "Stdin is a TTY") + rootCmd.Flags().StringSliceVar(&opt.Includes, "include", []string{}, "Include only specific paths from current directory for syncing") + rootCmd.Flags().StringSliceVar(&opt.Excludes, "exclude", []string{}, "Exclude only specific paths from current directory for syncing") +} + +// Execute run the root command +func Execute() { + if err := rootCmd.Execute(); err != nil { + if err.Error() == "interrupted" { + fmt.Println("Cancelling...") + } else { + fmt.Println(err) + } + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e231b39 --- /dev/null +++ b/go.mod @@ -0,0 +1,49 @@ +module + +require ( + v0.0.0-20171113091838-e9091a26100e // indirect + v1.1.0 + v0.0.0-20170215093142-bf70f2a70fb1 // indirect + v1.13.1 // indirect + v0.0.0-20181023171402-6480d4af844c // indirect + v4.1.0+incompatible // indirect + v0.0.0-20151013193312-d6023ce2651d // indirect + v1.0.0 // indirect + v0.17.2 // indirect + v1.1.1 // indirect + v1.2.0 // indirect + v0.0.0-20180813153112-4030bb1f1f0c // indirect + v0.0.0-20170612174753-24818f796faf // indirect + v0.2.0 // indirect + v0.0.0-20181110185634-c63ab54fda8f // indirect + v0.5.0 // indirect + v0.3.6 // indirect + v1.1.5 // indirect + v1.13.0 + v1.0.0 // indirect + v0.0.0-20180306012644-bacd9c7ef1dd // indirect + v1.0.1 // indirect + v2.0.1+incompatible // indirect + v0.8.0 + v2.0.0+incompatible // indirect + v0.0.0-20170918181015-86672fcb3f95 // indirect + v0.0.3 + v1.0.3 // indirect + v1.2.2 + v0.0.0-20181203042331-505ab145d0a9 + v0.0.0-20181207154023-610586996380 // indirect + v0.0.0-20181203162652-d668ce993890 // indirect + copy of the License at +// +// +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "math/rand" + "time" + + "" + _ "" +) + +func main() { + rand.Seed(time.Now().UnixNano()) + cmd.Execute() +} diff --git a/pkg/cert/cert.go b/pkg/cert/cert.go new file mode 100644 index 0000000..d93c731 --- /dev/null +++ b/pkg/cert/cert.go @@ -0,0 +1,49 @@ +package cert + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" + + "" +) + +const bitSize = 4096 + +// Create new SSH RSA public/private key pair +func Create() ([]byte, []byte, error) { + privateKey, err := rsa.GenerateKey(rand.Reader, bitSize) + if err != nil { + return nil, nil, err + } + + if err = privateKey.Validate(); err != nil { + return nil, nil, err + } + + publicKey, err := ssh.NewPublicKey(&privateKey.PublicKey) + if err != nil { + return nil, nil, err + } + + return encodePrivateKeyToPEM(privateKey), ssh.MarshalAuthorizedKey(publicKey), nil +} + +// encodePrivateKeyToPEM encodes Private Key from RSA to PEM format +func encodePrivateKeyToPEM(privateKey *rsa.PrivateKey) []byte { + // Get ASN.1 DER format + privDER := x509.MarshalPKCS1PrivateKey(privateKey) + + // pem.Block + privBlock := pem.Block{ + Type: "RSA PRIVATE KEY", + Headers: nil, + Bytes: privDER, + } + + // Private key in PEM format + privatePEM := pem.EncodeToMemory(&privBlock) + + return privatePEM +} diff --git a/pkg/kubectl/client.go b/pkg/kubectl/client.go new file mode 100644 index 0000000..d898f9e --- /dev/null +++ b/pkg/kubectl/client.go @@ -0,0 +1,243 @@ +package kubectl + +import ( + "context" + "fmt" + "io" + "time" + + apiv1 "" + "" + metav1 "" + "" + "" + "" + "" + "" + watchtools "" + "" + "" + "" +) + +type Client struct { + config *rest.Config + timeout time.Duration +} + +func NewClient(config *rest.Config) *Client { + return &Client{ + config: config, + timeout: 60 * time.Second, + } +} + +func (c *Client) getClient(namespace string) (v1.PodInterface, error) { + clientset, err := kubernetes.NewForConfig(c.config) + if err != nil { + return nil, err + } + + return clientset.CoreV1().Pods(namespace), nil +} + +func (c *Client) findPodByName(namespace, name string) (*apiv1.Pod, error) { + client, err := c.getClient(namespace) + if err != nil { + return &apiv1.Pod{}, err + } + + list, err := client.List(metav1.ListOptions{}) + if err != nil { + return &apiv1.Pod{}, err + } + for _, p := range list.Items { + if p.Name == name { + return &p, nil + } + } + + return &apiv1.Pod{}, ErrWithMessagef(ErrNotFound, "Pod with name %s not found", name) +} + +func (c *Client) CreatePod(namespace, name, image string, cmd []string, workDir string, tty, stdin bool, publicKey []byte) (*apiv1.Pod, error) { + if err := c.createSSHSecret(namespace, name, publicKey); err != nil { + return nil, err + } + + client, err := c.getClient(namespace) + if err != nil { + return nil, err + } + + return client.Create(createPodManifest(name, image, cmd, workDir, tty, stdin)) +} + +// WaitForPod watches the given pod until the exitCondition is true +func (c *Client) WaitForPod(namespace, name string, exitCondition watchtools.ConditionFunc) (*apiv1.Pod, error) { + client, err := c.getClient(namespace) + if err != nil { + return nil, err + } + + w, err := client.Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name})) + if err != nil { + return nil, err + } + + // TODO: expose the timeout + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second) + defer cancel() + intr := interrupt.New(nil, cancel) + var result *apiv1.Pod + err = intr.Run(func() error { + ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) { + return exitCondition(ev) + }) + if ev != nil { + result = ev.Object.(*apiv1.Pod) + } + return err + }) + + // Fix generic not found error. + if err != nil && errors.IsNotFound(err) { + err = errors.NewNotFound(apiv1.Resource("pods"), name) + } + + return result, err +} + +func (c *Client) Attach(namespace, podName, containerName string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + t, sizeQueue := getTerminal(stdin, stdout) + pod, err := c.findPodByName(namespace, podName) + if err != nil { + return err + } + + restClient, err := rest.UnversionedRESTClientFor(c.config) + if err != nil { + return err + } + + // check for TTY + containerToAttach, err := containerToAttachTo(pod, containerName) + if err != nil { + return fmt.Errorf("cannot attach to the container: %v", err) + } + + return t.Safe(func() error { + fmt.Fprintln(stderr, "If you don't see a command prompt, try pressing enter.") + + req := restClient.Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("attach") + req.VersionedParams(&apiv1.PodAttachOptions{ + Container: containerToAttach.Name, + Stdin: stdin != nil, + Stdout: stdout != nil, + Stderr: stderr != nil, + TTY: tty, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(c.config, "POST", req.URL()) + if err != nil { + return err + } + + return exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: sizeQueue, + }) + }) +} + +func getTerminal(stdin io.Reader, stdout io.Writer) (term.TTY, remotecommand.TerminalSizeQueue) { + t := term.TTY{ + Parent: nil, + Raw: stdin != nil, + In: stdin, + Out: stdout, + } + + var sizeQueue remotecommand.TerminalSizeQueue + if size := t.GetSize(); size != nil { + // fake resizing +1 and then back to normal so that attach-detach-reattach will result in the + // screen being redrawn + sizePlusOne := *size + sizePlusOne.Width++ + sizePlusOne.Height++ + + // this call spawns a goroutine to monitor/update the terminal size + sizeQueue = t.MonitorSize(&sizePlusOne, size) + } + + return t, sizeQueue +} + +// containerToAttach returns a reference to the container to attach to, given +// by name or the first container if name is empty. +func containerToAttachTo(pod *apiv1.Pod, containerName string) (*apiv1.Container, error) { + if len(containerName) > 0 { + for i := range pod.Spec.Containers { + if pod.Spec.Containers[i].Name == containerName { + return &pod.Spec.Containers[i], nil + } + } + for i := range pod.Spec.InitContainers { + if pod.Spec.InitContainers[i].Name == containerName { + return &pod.Spec.InitContainers[i], nil + } + } + return nil, fmt.Errorf("container not found (%s)", containerName) + } + + return &pod.Spec.Containers[0], nil +} + +func (c *Client) createSSHSecret(namespace, name string, publicKey []byte) error { + c.deleteSSHSecret(namespace, name) + + clientset, err := kubernetes.NewForConfig(c.config) + if err != nil { + return err + } + + _, err = clientset.CoreV1().Secrets(namespace).Create(createSecretManifest(name, publicKey)) + if err != nil { + return err + } + + return nil +} + +func (c *Client) deleteSSHSecret(namespace, name string) error { + clientset, err := kubernetes.NewForConfig(c.config) + if err != nil { + return err + } + + return clientset.CoreV1().Secrets(namespace).Delete(name, &metav1.DeleteOptions{}) +} + +func (c *Client) DeletePod(namespace, name string) error { + client, err := c.getClient(namespace) + if err != nil { + return err + } + + return client.Delete(name, metav1.NewDeleteOptions(int64(-1))) +} + +func (c *Client) GetLogs(namespace, name, containerName string) (*rest.Request, error) { + client, err := c.getClient(namespace) + if err != nil { + return nil, err + } + return client.GetLogs(name, &apiv1.PodLogOptions{Container: containerName}), nil +} diff --git a/pkg/kubectl/conditions.go b/pkg/kubectl/conditions.go new file mode 100644 index 0000000..20d791b --- /dev/null +++ b/pkg/kubectl/conditions.go @@ -0,0 +1,111 @@ +package kubectl + +import ( + "fmt" + "log" + + apiv1 "" + "" + "" + "" +) + +var ErrPodCompleted = fmt.Errorf("pod ran to completion") +var ErrPodStarted = fmt.Errorf("pod ran to running") +var ErrNoContainerFound = fmt.Errorf("no container found") + +// PodInitReady returns true if the pod init containers are running and ready, false if the pod has not +// yet reached those states, returns ErrPodCompleted if the pod has run to completion, or +// an error in any other case. +func PodInitReady(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *apiv1.Pod: + switch t.Status.Phase { + case apiv1.PodFailed, apiv1.PodSucceeded: + return false, ErrPodCompleted + case apiv1.PodRunning: + return false, ErrPodStarted + case apiv1.PodPending: + return isInitContainersReady(t), nil + } + } + return false, nil +} + +func isInitContainersReady(pod *apiv1.Pod) bool { + if isScheduled(pod) && isInitContainersRunning(pod) { + return true + } + return false +} + +func isScheduled(pod *apiv1.Pod) bool { + if &pod.Status != nil && len(pod.Status.Conditions) > 0 { + for _, condition := range pod.Status.Conditions { + if condition.Type == apiv1.PodScheduled && + condition.Status == apiv1.ConditionTrue { + return true + } + } + } + return false +} + +func isInitContainersRunning(pod *apiv1.Pod) bool { + if &pod.Status != nil { + if len(pod.Spec.InitContainers) != len(pod.Status.InitContainerStatuses) { + return false + } + for _, status := range pod.Status.InitContainerStatuses { + if status.State.Running == nil { + return false + } + } + return true + } + return false +} + +// ContainerRunning returns true if the pod is running and container is ready, false if the pod has not +// yet reached those states, returns ErrPodCompleted if the pod has run to completion, or +// an error in any other case. +func ContainerRunning(containerName string) func(watch.Event) (bool, error) { + return func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *apiv1.Pod: + switch t.Status.Phase { + case apiv1.PodFailed, apiv1.PodSucceeded: + return false, ErrPodCompleted + case apiv1.PodRunning: + return isContainerRunning(t, containerName) + } + } + return false, nil + } +} + +func isContainerRunning(pod *apiv1.Pod, containerName string) (bool, error) { + for _, status := range pod.Status.ContainerStatuses { + if status.Name == containerName { + if status.State.Waiting != nil { + return false, nil + } else if status.State.Running != nil { + return true, nil + } else if status.State.Terminated != nil { + log.Println("pod terminated") + return false, ErrPodCompleted + } else { + return false, fmt.Errorf("Unknown container state") + } + } + } + return false, ErrNoContainerFound +} diff --git a/pkg/kubectl/conditions_test.go b/pkg/kubectl/conditions_test.go new file mode 100644 index 0000000..4d841dc --- /dev/null +++ b/pkg/kubectl/conditions_test.go @@ -0,0 +1,40 @@ +package kubectl + +import ( + "testing" + + "" + apiv1 "" +) + +func TestIsInitContainersReady(t *testing.T) { + pod := &apiv1.Pod{ + Status: apiv1.PodStatus{ + Phase: "Pending", + Conditions: []apiv1.PodCondition{ + { + Type: apiv1.PodScheduled, + Status: apiv1.ConditionFalse, + }, + { + Type: apiv1.PodReady, + Status: apiv1.ConditionFalse, + }, + { + Type: apiv1.PodScheduled, + Status: apiv1.ConditionTrue, + }, + }, + InitContainerStatuses: []apiv1.ContainerStatus{ + { + Name: "sync-init", + State: apiv1.ContainerState{ + Running: &apiv1.ContainerStateRunning{}, + }, + }, + }, + }, + } + + require.True(t, isInitContainersReady(pod)) +} diff --git a/pkg/kubectl/errors.go b/pkg/kubectl/errors.go new file mode 100644 index 0000000..570d0cb --- /dev/null +++ b/pkg/kubectl/errors.go @@ -0,0 +1,22 @@ +package kubectl + +import ( + "fmt" + + "" +) + +// Definitions of common error types used throughout runtime implementation. +// All errors returned by the interface will map into one of these errors classes. +var ( + ErrNotFound = errors.New("not found") +) + +// IsNotFound returns true if the error is due to a missing resource +func IsNotFound(err error) bool { + return errors.Cause(err) == ErrNotFound +} + +func ErrWithMessagef(err error, format string, args ...interface{}) error { + return errors.WithMessage(err, fmt.Sprintf(format, args...)) +} diff --git a/pkg/kubectl/manifest.go b/pkg/kubectl/manifest.go new file mode 100644 index 0000000..2dfd1be --- /dev/null +++ b/pkg/kubectl/manifest.go @@ -0,0 +1,135 @@ +package kubectl + +import ( + apiv1 "" + metav1 "" + "" +) + +var mode = int32(256) + +func createSecretManifest(name string, publicKey []byte) *apiv1.Secret { + return &apiv1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + StringData: map[string]string{ + "authorized_keys": string(publicKey), + }, + } +} + +func createPodManifest(name, image string, cmd []string, workDir string, tty, stdin bool) *apiv1.Pod { + syncContainer := apiv1.Container{ + Name: "sync", + Image: "ernoaapa/sshd-rsync", + Ports: []apiv1.ContainerPort{ + { + Name: "ssh", + Protocol: apiv1.ProtocolTCP, + ContainerPort: 22, + }, + }, + ReadinessProbe: &apiv1.Probe{ + Handler: apiv1.Handler{ + TCPSocket: &apiv1.TCPSocketAction{ + Port: intstr.IntOrString{IntVal: 22}, + }, + }, + }, + LivenessProbe: &apiv1.Probe{ + Handler: apiv1.Handler{ + TCPSocket: &apiv1.TCPSocketAction{ + Port: intstr.IntOrString{IntVal: 22}, + }, + }, + }, + VolumeMounts: []apiv1.VolumeMount{ + { + Name: "ssh-config", + MountPath: "/root/.ssh/authorized_keys", + SubPath: "authorized_keys", + }, + { + Name: "workdir", + MountPath: workDir, + }, + }, + } + + runContainer := apiv1.Container{ + Name: "exec", + Image: image, + Command: cmd, + TTY: tty, + Stdin: stdin, + StdinOnce: stdin, + WorkingDir: workDir, + VolumeMounts: []apiv1.VolumeMount{ + { + Name: "workdir", + MountPath: workDir, + }, + }, + } + + return &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: apiv1.PodSpec{ + RestartPolicy: apiv1.RestartPolicyNever, + InitContainers: []apiv1.Container{ + { + Name: "sync-init", + Image: "ernoaapa/sshd-rsync", + Ports: []apiv1.ContainerPort{ + { + Name: "ssh", + Protocol: apiv1.ProtocolTCP, + ContainerPort: 22, + }, + }, + Env: []apiv1.EnvVar{ + { + Name: "ONE_TIME", + Value: "true", + }, + }, + VolumeMounts: []apiv1.VolumeMount{ + { + Name: "ssh-config", + MountPath: "/root/.ssh/authorized_keys", + SubPath: "authorized_keys", + }, + { + Name: "workdir", + MountPath: workDir, + }, + }, + }, + }, + Containers: []apiv1.Container{ + syncContainer, + runContainer, + }, + Volumes: []apiv1.Volume{ + { + Name: "ssh-config", + VolumeSource: apiv1.VolumeSource{ + Secret: &apiv1.SecretVolumeSource{ + SecretName: name, + DefaultMode: &mode, + }, + }, + }, + { + Name: "workdir", + VolumeSource: apiv1.VolumeSource{ + EmptyDir: &apiv1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + } +} diff --git a/pkg/kubectl/portforwarder.go b/pkg/kubectl/portforwarder.go new file mode 100644 index 0000000..118deba --- /dev/null +++ b/pkg/kubectl/portforwarder.go @@ -0,0 +1,30 @@ +package kubectl + +import ( + "io" + "net/http" + + "" + "" + "" +) + +func PreparePortForward(config *rest.Config, namespace, podName string, ports []string, stopChannel, readyChannel chan struct{}, out, errOut io.Writer) (*portforward.PortForwarder, error) { + restClient, err := rest.UnversionedRESTClientFor(config) + if err != nil { + return nil, err + } + + req := restClient.Post(). + Resource("pods"). + Namespace(namespace). + Name(podName). + SubResource("portforward") + + transport, upgrader, err := spdy.RoundTripperFor(config) + if err != nil { + return nil, err + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) + return portforward.New(dialer, ports, stopChannel, readyChannel, out, errOut) +} diff --git a/pkg/kubectl/util.go b/pkg/kubectl/util.go new file mode 100644 index 0000000..98d2e17 --- /dev/null +++ b/pkg/kubectl/util.go @@ -0,0 +1,28 @@ +package kubectl + +import ( + "" + "" + "" + "" +) + +// SetKubernetesDefaults sets default values on the provided client config for accessing the +// Kubernetes API or returns an error if any of the defaults are impossible or invalid. +// NOTE: Originally copied from here: +// +func SetKubernetesDefaults(config *rest.Config) error { + // TODO remove this hack. This is allowing the GetOptions to be serialized. + config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"} + + if config.APIPath == "" { + config.APIPath = "/api" + } + if config.NegotiatedSerializer == nil { + // This codec factory ensures the resources are not converted. Therefore, resources + // will not be round-tripped through internal versions. Defaulting does not happen + // on the client. + config.NegotiatedSerializer = &serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} + } + return rest.SetKubernetesDefaults(config) +} diff --git a/pkg/sync/rsync.go b/pkg/sync/rsync.go new file mode 100644 index 0000000..150eb3c --- /dev/null +++ b/pkg/sync/rsync.go @@ -0,0 +1,49 @@ +package sync + +import ( + "fmt" + "io" + "os/exec" +) + +type Rsync struct { + sshPort uint16 + args []string + privateKeyFile string + stdout io.Writer + stderr io.Writer +} + +// NewRsync create new instance of rsync executor +func NewRsync(sshPort uint16, args []string, privateKeyFile string, stdout, stderr io.Writer) *Rsync { + return &Rsync{ + sshPort: sshPort, + stdout: stdout, + stderr: stderr, + args: args, + privateKeyFile: privateKeyFile, + } +} + +// Sync executes underying rsync to synchronize fiels to target host +func (s *Rsync) Sync(destination string, includes, excludes []string) error { + args := s.args + rsh := fmt.Sprintf("/usr/bin/ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p %d -i %s", s.sshPort, s.privateKeyFile) + args = append(args, "--rsh", rsh) + + args = append(args, prefix("--include=", includes)...) + args = append(args, prefix("--exclude=", excludes)...) + + cmd := exec.Command("rsync", append(args, ".", destination)...) + cmd.Stdout = s.stdout + cmd.Stderr = s.stderr + return cmd.Run() +} + +func prefix(p string, s []string) []string { + r := []string{} + for _, e := range s { + r = append(r, p+e) + } + return r +} diff --git a/pkg/sync/rsync_test.go b/pkg/sync/rsync_test.go new file mode 100644 index 0000000..a5d0d8d --- /dev/null +++ b/pkg/sync/rsync_test.go @@ -0,0 +1,11 @@ +package sync + +import ( + "testing" + + "" +) + +func TestPrefix(t *testing.T) { + require.Equal(t, []string{"pre-foo", "pre-bar"}, prefix("pre-", []string{"foo", "bar"})) +} diff --git a/pkg/utils/devnull.go b/pkg/utils/devnull.go new file mode 100644 index 0000000..105b347 --- /dev/null +++ b/pkg/utils/devnull.go @@ -0,0 +1,14 @@ +package utils + +// DevNull implements io.Writer what just drops all the bytes, ie. /dev/null +type DevNull int + +// Write io.Writer implementation +func (DevNull) Write(p []byte) (int, error) { + return len(p), nil +} + +// WriteString io.Writer implementation +func (DevNull) WriteString(s string) (int, error) { + return len(s), nil +} diff --git a/pkg/utils/files.go b/pkg/utils/files.go new file mode 100644 index 0000000..0e96743 --- /dev/null +++ b/pkg/utils/files.go @@ -0,0 +1,24 @@ +package utils + +import ( + "crypto/rand" + "encoding/hex" + "io/ioutil" +) + +// CreateTempFile creates random tmeporary file and stores the content to the file and return path to it or error +func CreateTempFile(content []byte) (string, error) { + randBytes := make([]byte, 16) + rand.Read(randBytes) + tmpfile, err := ioutil.TempFile("", hex.EncodeToString(randBytes)) + if err != nil { + return "", err + } + defer tmpfile.Close() + + if _, err := tmpfile.Write(content); err != nil { + return "", err + } + + return tmpfile.Name(), nil +} diff --git a/pkg/utils/freeport.go b/pkg/utils/freeport.go new file mode 100644 index 0000000..fceb945 --- /dev/null +++ b/pkg/utils/freeport.go @@ -0,0 +1,30 @@ +package utils + +import ( + "log" + "net" +) + +// MustResolveRandomPort asks the kernel for a free open port or fail fatal in case of error +func MustResolveRandomPort() uint16 { + port, err := resolveFreePort() + if err != nil { + log.Fatalf("Failed to find free port: %s", err) + } + return port +} + +func resolveFreePort() (uint16, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + defer l.Close() + + return uint16(l.Addr().(*net.TCPAddr).Port), nil +}