Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port-forward should be able to select ports by service name #5009

Merged
merged 4 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/content/en/api/skaffold.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,26 @@
"required": false,
"type": "string"
},
{
"name": "event.portEvent.targetPort.type",
"in": "query",
"required": false,
"type": "integer",
"format": "int32"
},
{
"name": "event.portEvent.targetPort.intVal",
"in": "query",
"required": false,
"type": "integer",
"format": "int32"
},
{
"name": "event.portEvent.targetPort.strVal",
"in": "query",
"required": false,
"type": "string"
},
{
"name": "event.statusCheckEvent.status",
"in": "query",
Expand Down Expand Up @@ -1659,6 +1679,23 @@
},
"description": "`FileSyncState` contains the status of the current file sync"
},
"protoIntOrString": {
"type": "object",
"properties": {
"type": {
"type": "integer",
"format": "int32"
},
"intVal": {
"type": "integer",
"format": "int32"
},
"strVal": {
"type": "string"
}
},
"description": "IntOrString is a type that can hold an int32 or a string."
},
"protoIntent": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1756,6 +1793,9 @@
},
"address": {
"type": "string"
},
"targetPort": {
"$ref": "#/definitions/protoIntOrString"
}
},
"description": "PortEvent Event describes each port forwarding event."
Expand Down
20 changes: 19 additions & 1 deletion docs/content/en/docs/references/api/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,23 @@ FileSyncEvent describes the sync status.



<a name="proto.IntOrString"></a>
#### IntOrString
IntOrString is a type that can hold an int32 or a string.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| type | [int32](#int32) | | type of stored value |
| intVal | [int32](#int32) | | int value |
| strVal | [string](#string) | | string value |







<a name="proto.Intent"></a>
#### Intent
Intent represents user intents for a given phase.
Expand Down Expand Up @@ -447,14 +464,15 @@ PortEvent Event describes each port forwarding event.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| localPort | [int32](#int32) | | local port for forwarded resource |
| remotePort | [int32](#int32) | | remote port is the resource port that will be forwarded. |
| remotePort | [int32](#int32) | | Deprecated. Uses targetPort.intVal. |
| podName | [string](#string) | | pod name if port forwarded resourceType is Pod |
| containerName | [string](#string) | | container name if specified in the kubernetes spec |
| namespace | [string](#string) | | the namespace of the resource to port forward. |
| portName | [string](#string) | | |
| resourceType | [string](#string) | | resource type e.g. "pod", "service". |
| resourceName | [string](#string) | | name of the resource to forward. |
| address | [string](#string) | | address on which to bind |
| targetPort | [IntOrString](#proto.IntOrString) | | target port is the resource port that will be forwarded. |



Expand Down
9 changes: 8 additions & 1 deletion docs/content/en/schemas/v2beta10.json
Original file line number Diff line number Diff line change
Expand Up @@ -2288,7 +2288,14 @@
"x-intellij-html-description": "namespace of the resource to port forward."
},
"port": {
"type": "integer",
"anyOf": [
{
"type": "string"
},
{
"type": "integer"
}
],
"description": "resource port that will be forwarded.",
"x-intellij-html-description": "resource port that will be forwarded."
},
Expand Down
9 changes: 9 additions & 0 deletions hack/schemas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ func (g *schemaGenerator) newDefinition(name string, t ast.Expr, comment string,
def.Properties[yamlName] = g.newDefinition(field.Names[0].Name, field.Type, field.Doc.Text(), field.Tag.Value)
def.AdditionalProperties = false
}

case *ast.SelectorExpr:
typeName := tt.Sel.Name
if typeName == "IntOrString" {
def.AnyOf = []*Definition{
{Type: "string"},
{Type: "integer"},
}
}
}

if g.strict && name != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ spec:
image: leeroy-app
ports:
- containerPort: 50051
name: http
4 changes: 4 additions & 0 deletions integration/examples/microservices/skaffold.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ portForward:
resourceName: leeroy-web
port: 8080
localPort: 9000
- resourceType: deployment
resourceName: leeroy-app
port: http
localPort: 9001
14 changes: 14 additions & 0 deletions integration/port_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ func TestRunUserPortForwardResource(t *testing.T) {
assertResponseFromPort(t, address, localPort, constants.LeeroyAppResponse)
}

func TestRunPortForwardByPortName(t *testing.T) {
MarkIntegrationTest(t, CanRunWithoutGcp)

ns, _ := SetupNamespace(t)

rpcAddr := randomPort()
skaffold.Run("--port-forward", "--rpc-port", rpcAddr, "--enable-rpc").InDir("examples/microservices").InNs(ns.Name).RunBackground(t)

_, entries := apiEvents(t, rpcAddr)

address1, localPort1 := getLocalPortFromPortForwardEvent(t, entries, "leeroy-app", "deployment", ns.Name)
assertResponseFromPort(t, address1, localPort1, constants.LeeroyAppResponse)
}

// TestDevPortForwardDeletePod tests that port forwarding works
// as expected. Then, the test force deletes a pod,
// and tests that the pod eventually comes up at the same port again.
Expand Down
33 changes: 21 additions & 12 deletions pkg/skaffold/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

sErrors "github.com/GoogleContainerTools/skaffold/pkg/skaffold/errors"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/version"
"github.com/GoogleContainerTools/skaffold/proto"
)
Expand Down Expand Up @@ -382,20 +383,28 @@ func FileSyncSucceeded(fileCount int, image string) {
}

// PortForwarded notifies that a remote port has been forwarded locally.
func PortForwarded(localPort, remotePort int32, podName, containerName, namespace string, portName string, resourceType, resourceName, address string) {
func PortForwarded(localPort int32, remotePort util.IntOrString, podName, containerName, namespace string, portName string, resourceType, resourceName, address string) {
event := proto.PortEvent{
LocalPort: localPort,
PodName: podName,
ContainerName: containerName,
Namespace: namespace,
PortName: portName,
ResourceType: resourceType,
ResourceName: resourceName,
Address: address,
TargetPort: &proto.IntOrString{
Type: int32(remotePort.Type),
IntVal: int32(remotePort.IntVal),
StrVal: remotePort.StrVal,
},
}
if remotePort.Type == util.Int {
event.RemotePort = int32(remotePort.IntVal)
}
handler.handle(&proto.Event{
EventType: &proto.Event_PortEvent{
PortEvent: &proto.PortEvent{
LocalPort: localPort,
RemotePort: remotePort,
PodName: podName,
ContainerName: containerName,
Namespace: namespace,
PortName: portName,
ResourceType: resourceType,
ResourceName: resourceName,
Address: address,
},
PortEvent: &event,
},
})
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/skaffold/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (

sErrors "github.com/GoogleContainerTools/skaffold/pkg/skaffold/errors"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
schemautil "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/proto"
"github.com/GoogleContainerTools/skaffold/testutil"
)

var targetPort = proto.IntOrString{Type: 0, IntVal: 2001}

func TestGetLogEvents(t *testing.T) {
for step := 0; step < 1000; step++ {
ev := newHandler()
Expand Down Expand Up @@ -164,8 +167,14 @@ func TestPortForwarded(t *testing.T) {
handler.state = emptyState(latest.Pipeline{}, "test", true, true, true)

wait(t, func() bool { return handler.getState().ForwardedPorts[8080] == nil })
PortForwarded(8080, 8888, "pod", "container", "ns", "portname", "resourceType", "resourceName", "127.0.0.1")
wait(t, func() bool { return handler.getState().ForwardedPorts[8080] != nil })
PortForwarded(8080, schemautil.FromInt(8888), "pod", "container", "ns", "portname", "resourceType", "resourceName", "127.0.0.1")
wait(t, func() bool {
return handler.getState().ForwardedPorts[8080] != nil && handler.getState().ForwardedPorts[8080].RemotePort == 8888
})

wait(t, func() bool { return handler.getState().ForwardedPorts[8081] == nil })
PortForwarded(8081, schemautil.FromString("http"), "pod", "container", "ns", "portname", "resourceType", "resourceName", "127.0.0.1")
wait(t, func() bool { return handler.getState().ForwardedPorts[8081] != nil })
}

func TestStatusCheckEventStarted(t *testing.T) {
Expand Down Expand Up @@ -344,6 +353,7 @@ func TestResetStateOnBuild(t *testing.T) {
LocalPort: 2000,
RemotePort: 2001,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
Expand Down Expand Up @@ -379,6 +389,7 @@ func TestResetStateOnDeploy(t *testing.T) {
LocalPort: 2000,
RemotePort: 2001,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
Expand Down Expand Up @@ -422,6 +433,7 @@ func TestUpdateStateAutoTriggers(t *testing.T) {
LocalPort: 2000,
RemotePort: 2001,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
Expand All @@ -447,6 +459,7 @@ func TestUpdateStateAutoTriggers(t *testing.T) {
LocalPort: 2000,
RemotePort: 2001,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/portforward/entry_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// TODO priyawadhwa@, change event API to accept ports of type int
event.PortForwarded(
int32(entry.localPort),
int32(entry.resource.Port),
entry.resource.Port,
entry.podName,
entry.containerName,
entry.resource.Namespace,
Expand Down Expand Up @@ -113,11 +113,11 @@ func (b *EntryManager) forwardPortForwardEntry(ctx context.Context, entry *portF
if err := b.entryForwarder.Forward(ctx, entry); err == nil {
color.Green.Fprintln(
b.output,
fmt.Sprintf("Port forwarding %s/%s in namespace %s, remote port %d -> address %s port %d",
fmt.Sprintf("Port forwarding %s/%s in namespace %s, remote port %s -> address %s port %d",
entry.resource.Type,
entry.resource.Name,
entry.resource.Namespace,
entry.resource.Port,
entry.resource.Port.String(),
entry.resource.Address,
entry.localPort))
} else {
Expand Down
28 changes: 18 additions & 10 deletions pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
kubernetesclient "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/client"
schemautil "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
)

Expand Down Expand Up @@ -160,11 +161,11 @@ func portForwardArgs(ctx context.Context, pfe *portForwardEntry) []string {
args = append(args, fmt.Sprintf("pod/%s", podName), fmt.Sprintf("%d:%d", pfe.localPort, remotePort))
break
}
logrus.Warnf("could not map pods to service %s/%s/%d: %v", pfe.resource.Namespace, pfe.resource.Name, pfe.resource.Port, err)
logrus.Warnf("could not map pods to service %s/%s/%s: %v", pfe.resource.Namespace, pfe.resource.Name, pfe.resource.Port.String(), err)
fallthrough // and let kubectl try to handle it

default:
args = append(args, fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port))
args = append(args, fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), fmt.Sprintf("%d:%s", pfe.localPort, pfe.resource.Port.String()))
}

if pfe.resource.Address != "" && pfe.resource.Address != util.Loopback {
Expand Down Expand Up @@ -232,7 +233,7 @@ func (*KubectlForwarder) monitorLogs(ctx context.Context, logs io.Reader, cmd *k
// findNewestPodForService queries the cluster to find a pod that fulfills the given service, giving
// preference to pods that were most recently created. This is in contrast to the selection algorithm
// used by kubectl (see https://github.com/GoogleContainerTools/skaffold/issues/4522 for details).
func findNewestPodForService(ctx context.Context, ns, serviceName string, servicePort int) (string, int, error) {
func findNewestPodForService(ctx context.Context, ns, serviceName string, servicePort schemautil.IntOrString) (string, int, error) {
client, err := kubernetesclient.Client()
if err != nil {
return "", -1, fmt.Errorf("getting Kubernetes client: %w", err)
Expand Down Expand Up @@ -270,17 +271,17 @@ func findNewestPodForService(ctx context.Context, ns, serviceName string, servic
for _, p := range pods {
names = append(names, fmt.Sprintf("(pod:%q phase:%v created:%v)", p.Name, p.Status.Phase, p.CreationTimestamp))
}
logrus.Tracef("service %s/%d maps to %d pods: %v", serviceName, servicePort, len(pods), names)
logrus.Tracef("service %s/%s maps to %d pods: %v", serviceName, servicePort.String(), len(pods), names)
}

for _, p := range pods {
if targetPort := findTargetPort(svcPort, p); targetPort > 0 {
logrus.Debugf("Forwarding service %s/%d to pod %s/%d", serviceName, servicePort, p.Name, targetPort)
logrus.Debugf("Forwarding service %s/%s to pod %s/%d", serviceName, servicePort.String(), p.Name, targetPort)
return p.Name, targetPort, nil
}
}

return "", -1, fmt.Errorf("no pods match service %s/%d", serviceName, servicePort)
return "", -1, fmt.Errorf("no pods match service %s/%s", serviceName, servicePort.String())
}

// newestPodsFirst sorts pods by their creation time
Expand All @@ -292,13 +293,20 @@ func newestPodsFirst(pods []corev1.Pod) func(int, int) bool {
}
}

func findServicePort(svc corev1.Service, servicePort int) (corev1.ServicePort, error) {
func findServicePort(svc corev1.Service, servicePort schemautil.IntOrString) (corev1.ServicePort, error) {
for _, s := range svc.Spec.Ports {
if int(s.Port) == servicePort {
return s, nil
switch servicePort.Type {
case schemautil.Int:
if s.Port == int32(servicePort.IntVal) {
return s, nil
}
case schemautil.String:
if s.Name == servicePort.StrVal {
return s, nil
}
}
}
return corev1.ServicePort{}, fmt.Errorf("service %q does not expose port %d", svc.Name, servicePort)
return corev1.ServicePort{}, fmt.Errorf("service %q does not expose port %s", svc.Name, servicePort.String())
}

func findTargetPort(svcPort corev1.ServicePort, pod corev1.Pod) int {
Expand Down
Loading