Skip to content

Commit

Permalink
fix: properly handle the tunnel error channel to retry image pushing (#…
Browse files Browse the repository at this point in the history
…2190)

## Description

This PR fixes error channel handling for Zarf tunnels so lost pod
connections don't result in infinite spins. This should mostly resolve
2104 though not marking it "Fixes" as depending on how many pod
connection errors occur a deployment could still run out of retries.

## Related Issue

Relates to #2104 

## Type of change

- [X] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Other (security config, docs update, etc)

## Checklist before merging

- [ ] Test, docs, adr added or updated as needed
- [X] [Contributor Guide
Steps](https://github.com/defenseunicorns/zarf/blob/main/CONTRIBUTING.md#developer-workflow)
followed
  • Loading branch information
Racer159 authored Dec 16, 2023
1 parent f2f6216 commit 83c5ba4
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 65 deletions.
10 changes: 7 additions & 3 deletions src/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ var (
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
exec.SuppressGlobalInterrupt = true

// Wait for the interrupt signal.
<-interruptChan
spinner.Successf(lang.CmdConnectTunnelClosed, url)
// Wait for the interrupt signal or an error.
select {
case err = <-tunnel.ErrChan():
spinner.Fatalf(err, lang.CmdConnectErrService, err.Error())
case <-interruptChan:
spinner.Successf(lang.CmdConnectTunnelClosed, url)
}
os.Exit(0)
},
}
Expand Down
24 changes: 16 additions & 8 deletions src/cmd/tools/crane.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/transform"
"github.com/defenseunicorns/zarf/src/pkg/utils/exec"
"github.com/defenseunicorns/zarf/src/types"
craneCmd "github.com/google/go-containerregistry/cmd/crane/cmd"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
Expand Down Expand Up @@ -132,15 +133,16 @@ func zarfCraneCatalog(cranePlatformOptions *[]crane.Option) *cobra.Command {
return err
}

// Add the correct authentication to the crane command options
authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PullUsername, zarfState.RegistryInfo.PullPassword)
*cranePlatformOptions = append(*cranePlatformOptions, authOption)

if tunnel != nil {
message.Notef(lang.CmdToolsRegistryTunnel, registryEndpoint, zarfState.RegistryInfo.Address)
defer tunnel.Close()
return tunnel.Wrap(func() error { return originalCatalogFn(cmd, []string{registryEndpoint}) })
}

// Add the correct authentication to the crane command options
authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PullUsername, zarfState.RegistryInfo.PullPassword)
*cranePlatformOptions = append(*cranePlatformOptions, authOption)

return originalCatalogFn(cmd, []string{registryEndpoint})
}

Expand Down Expand Up @@ -186,6 +188,10 @@ func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command
return err
}

// Add the correct authentication to the crane command options
authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword)
*cranePlatformOptions = append(*cranePlatformOptions, authOption)

if tunnel != nil {
message.Notef(lang.CmdToolsRegistryTunnel, tunnel.Endpoint(), zarfState.RegistryInfo.Address)

Expand All @@ -194,12 +200,9 @@ func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command
givenAddress := fmt.Sprintf("%s/", zarfState.RegistryInfo.Address)
tunnelAddress := fmt.Sprintf("%s/", tunnel.Endpoint())
args[imageNameArgumentIndex] = strings.Replace(args[imageNameArgumentIndex], givenAddress, tunnelAddress, 1)
return tunnel.Wrap(func() error { return originalListFn(cmd, args) })
}

// Add the correct authentication to the crane command options
authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword)
*cranePlatformOptions = append(*cranePlatformOptions, authOption)

return originalListFn(cmd, args)
}

Expand Down Expand Up @@ -234,8 +237,13 @@ func pruneImages(_ *cobra.Command, _ []string) error {
if tunnel != nil {
message.Notef(lang.CmdToolsRegistryTunnel, registryEndpoint, zarfState.RegistryInfo.Address)
defer tunnel.Close()
return tunnel.Wrap(func() error { return doPruneImagesForPackages(zarfState, zarfPackages, registryEndpoint) })
}

return doPruneImagesForPackages(zarfState, zarfPackages, registryEndpoint)
}

func doPruneImagesForPackages(zarfState *types.ZarfState, zarfPackages []types.DeployedPackage, registryEndpoint string) error {
authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword)

// Determine which image digests are currently used by Zarf packages
Expand Down
39 changes: 32 additions & 7 deletions src/internal/packager/git/gitea.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ func (g *Git) CreateReadOnlyUser() error {

tunnelURL := tunnel.HTTPEndpoint()

var out []byte

// Determine if the read only user already exists
getUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users", tunnelURL)
getUserRequest, _ := netHttp.NewRequest("GET", getUserEndpoint, nil)
out, err := g.DoHTTPThings(getUserRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(getUserRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("GET %s:\n%s", getUserEndpoint, string(out))
if err != nil {
return err
Expand Down Expand Up @@ -80,7 +85,10 @@ func (g *Git) CreateReadOnlyUser() error {
updateUserData, _ := json.Marshal(updateUserBody)
updateUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users/%s", tunnelURL, g.Server.PullUsername)
updateUserRequest, _ := netHttp.NewRequest("PATCH", updateUserEndpoint, bytes.NewBuffer(updateUserData))
out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("PATCH %s:\n%s", updateUserEndpoint, string(out))
return err
}
Expand All @@ -100,7 +108,10 @@ func (g *Git) CreateReadOnlyUser() error {
// Send API request to create the user
createUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users", tunnelURL)
createUserRequest, _ := netHttp.NewRequest("POST", createUserEndpoint, bytes.NewBuffer(createUserData))
out, err = g.DoHTTPThings(createUserRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(createUserRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("POST %s:\n%s", createUserEndpoint, string(out))
if err != nil {
return err
Expand All @@ -115,7 +126,10 @@ func (g *Git) CreateReadOnlyUser() error {
updateUserData, _ := json.Marshal(updateUserBody)
updateUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users/%s", tunnelURL, g.Server.PullUsername)
updateUserRequest, _ := netHttp.NewRequest("PATCH", updateUserEndpoint, bytes.NewBuffer(updateUserData))
out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("PATCH %s:\n%s", updateUserEndpoint, string(out))
return err
}
Expand All @@ -142,10 +156,15 @@ func (g *Git) CreatePackageRegistryToken() (CreateTokenResponse, error) {

tunnelURL := tunnel.Endpoint()

var out []byte

// Determine if the package token already exists
getTokensEndpoint := fmt.Sprintf("http://%s/api/v1/users/%s/tokens", tunnelURL, g.Server.PushUsername)
getTokensRequest, _ := netHttp.NewRequest("GET", getTokensEndpoint, nil)
out, err := g.DoHTTPThings(getTokensRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(getTokensRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("GET %s:\n%s", getTokensEndpoint, string(out))
if err != nil {
return CreateTokenResponse{}, err
Expand All @@ -168,7 +187,10 @@ func (g *Git) CreatePackageRegistryToken() (CreateTokenResponse, error) {
// Delete the existing token to be replaced
deleteTokensEndpoint := fmt.Sprintf("http://%s/api/v1/users/%s/tokens/%s", tunnelURL, g.Server.PushUsername, config.ZarfArtifactTokenName)
deleteTokensRequest, _ := netHttp.NewRequest("DELETE", deleteTokensEndpoint, nil)
out, err := g.DoHTTPThings(deleteTokensRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(deleteTokensRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("DELETE %s:\n%s", deleteTokensEndpoint, string(out))
if err != nil {
return CreateTokenResponse{}, err
Expand All @@ -181,7 +203,10 @@ func (g *Git) CreatePackageRegistryToken() (CreateTokenResponse, error) {
}
createTokensData, _ := json.Marshal(createTokensBody)
createTokensRequest, _ := netHttp.NewRequest("POST", createTokensEndpoint, bytes.NewBuffer(createTokensData))
out, err = g.DoHTTPThings(createTokensRequest, g.Server.PushUsername, g.Server.PushPassword)
err = tunnel.Wrap(func() error {
out, err = g.DoHTTPThings(createTokensRequest, g.Server.PushUsername, g.Server.PushPassword)
return err
})
message.Debugf("POST %s:\n%s", createTokensEndpoint, string(out))
if err != nil {
return CreateTokenResponse{}, err
Expand Down
14 changes: 12 additions & 2 deletions src/internal/packager/images/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func (i *ImageConfig) PushToZarfRegistry() error {
defer tunnel.Close()
}

pushImage := func(img v1.Image, name string) error {
if tunnel != nil {
return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) })
}

return crane.Push(img, name, pushOptions...)
}

for refInfo, img := range refInfoToImage {
refTruncated := message.Truncate(refInfo.Reference, 55, true)
progressBar.UpdateTitle(fmt.Sprintf("Pushing %s", refTruncated))
Expand All @@ -91,7 +99,8 @@ func (i *ImageConfig) PushToZarfRegistry() error {

message.Debugf("crane.Push() %s:%s -> %s)", i.ImagesPath, refInfo.Reference, offlineNameCRC)

if err = crane.Push(img, offlineNameCRC, pushOptions...); err != nil {
err = pushImage(img, offlineNameCRC)
if err != nil {
return err
}
}
Expand All @@ -105,7 +114,8 @@ func (i *ImageConfig) PushToZarfRegistry() error {

message.Debugf("crane.Push() %s:%s -> %s)", i.ImagesPath, refInfo.Reference, offlineName)

if err = crane.Push(img, offlineName, pushOptions...); err != nil {
err = pushImage(img, offlineName)
if err != nil {
return err
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/pkg/cluster/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,15 @@ func (c *Cluster) injectorIsReady(seedImages []transform.Image, spinner *message

for _, seedImage := range seedImages {
seedRegistry := fmt.Sprintf("%s/v2/%s/manifests/%s", tunnel.HTTPEndpoint(), seedImage.Path, seedImage.Tag)
if resp, err := http.Get(seedRegistry); err != nil || resp.StatusCode != 200 {

var resp *http.Response
var err error
err = tunnel.Wrap(func() error {
resp, err = http.Get(seedRegistry)
return err
})

if err != nil || resp.StatusCode != 200 {
// Just debug log the output because failures just result in trying the next image
message.Debug(resp, err)
return false
Expand Down
82 changes: 39 additions & 43 deletions src/pkg/k8s/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,63 +109,59 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, include PodFilter) []co

var readyPods = []corev1.Pod{}

// Reverse sort by creation time
// Sort the pods from newest to oldest
sort.Slice(pods.Items, func(i, j int) bool {
return pods.Items[i].CreationTimestamp.After(pods.Items[j].CreationTimestamp.Time)
})

if len(pods.Items) > 0 {
for _, pod := range pods.Items {
k.Log("Testing pod %q", pod.Name)
for _, pod := range pods.Items {
k.Log("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targeting
if target.Container != "" {
k.Log("Testing pod %q for container %q", pod.Name, target.Container)
var matchesInitContainer bool

// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if isRunning && initContainer.Name == target.Container {
// On running match in initContainer break this loop
matchesInitContainer = true
readyPods = append(readyPods, pod)
break
}
}
// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Don't check any further if there's already a match
if matchesInitContainer {
continue
// Handle container targeting
if target.Container != "" {
k.Log("Testing pod %q for container %q", pod.Name, target.Container)
var matchesInitContainer bool

// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if isRunning && initContainer.Name == target.Container {
// On running match in initContainer break this loop
matchesInitContainer = true
readyPods = append(readyPods, pod)
break
}
}

// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if isRunning && container.Name == target.Container {
readyPods = append(readyPods, pod)
}
}
// Don't check any further if there's already a match
if matchesInitContainer {
continue
}

} else {
status := pod.Status.Phase
k.Log("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if isRunning && container.Name == target.Container {
readyPods = append(readyPods, pod)
}
}

} else {
status := pod.Status.Phase
k.Log("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
readyPods = append(readyPods, pod)
}
}
}

if len(readyPods) > 0 {
return readyPods
}
if len(readyPods) > 0 {
return readyPods
}

time.Sleep(3 * time.Second)
Expand Down
26 changes: 26 additions & 0 deletions src/pkg/k8s/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Tunnel struct {
attempt int
stopChan chan struct{}
readyChan chan struct{}
errChan chan error
}

// NewTunnel will create a new Tunnel struct.
Expand All @@ -60,6 +61,23 @@ func (k *K8s) NewTunnel(namespace, resourceType, resourceName, urlSuffix string,
}, nil
}

// Wrap takes a function that returns an error and wraps it to check for tunnel errors as well.
func (tunnel *Tunnel) Wrap(function func() error) error {
var err error
funcErrChan := make(chan error)

go func() {
funcErrChan <- function()
}()

select {
case err = <-funcErrChan:
return err
case err = <-tunnel.ErrChan():
return err
}
}

// Connect will establish a tunnel to the specified target.
func (tunnel *Tunnel) Connect() (string, error) {
url, err := tunnel.establish()
Expand Down Expand Up @@ -90,6 +108,11 @@ func (tunnel *Tunnel) Endpoint() string {
return fmt.Sprintf("%s:%d", helpers.IPV4Localhost, tunnel.localPort)
}

// ErrChan returns the tunnel's error channel
func (tunnel *Tunnel) ErrChan() chan error {
return tunnel.errChan
}

// HTTPEndpoint returns the tunnel endpoint as a HTTP URL string.
func (tunnel *Tunnel) HTTPEndpoint() string {
return fmt.Sprintf("http://%s", tunnel.Endpoint())
Expand Down Expand Up @@ -189,6 +212,9 @@ func (tunnel *Tunnel) establish() (string, error) {
tunnel.localPort = localPort
url := tunnel.FullURL()

// Store the error channel to listen for errors
tunnel.errChan = errChan

tunnel.kube.Log("Creating port forwarding tunnel at %s", url)
return url, nil
}
Expand Down
Loading

0 comments on commit 83c5ba4

Please sign in to comment.