Skip to content

Commit

Permalink
feat: Start and Stop service Starlark instructions for K8S (#756)
Browse files Browse the repository at this point in the history
## Description:
Most of the work for the K8S backend was done as part of #694. In
addition, the port forwarder reconnect feature reduced the amount of
work we had to finish. Here are the changes made in this PR:
- We keep the K8S service untouched when we stop the user service so it
can be restarted easily by creating a new pod. This change does not
impact the remove service expected behavior.
- We don't try to create a port forwarder connection if the service is
stopped.

A service stopped here is a k8s service with no pod running. I think it
is time to return the service status (registered, started, stopped) as
part of the GetServices response so we can simplify the second item
above. I will do that in a follow-up PR.

## Is this change user facing?
YES

## References (if applicable):
Closes #673 
#694 
#736
  • Loading branch information
laurentluce authored Jun 23, 2023
1 parent 2c3d74e commit fb3e922
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 88 deletions.
16 changes: 11 additions & 5 deletions cli/cli/kurtosis_gateway/connection/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package connection

import (
"context"
"net/url"

"github.com/kurtosis-tech/kurtosis/api/golang/engine/kurtosis_engine_rpc_api_bindings"
"github.com/kurtosis-tech/kurtosis/api/golang/engine/lib/kurtosis_context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager"
Expand All @@ -15,7 +17,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"net/url"
)

const (
Expand Down Expand Up @@ -93,10 +94,12 @@ func (provider *GatewayConnectionProvider) ForEnclaveApiContainer(enclaveInfo *k
return apiContainerConnection, nil
}

func (provider *GatewayConnectionProvider) ForUserService(enclaveId string, serviceUuid string, servicePortSpecs map[string]*port_spec.PortSpec) (GatewayConnectionToKurtosis, error) {
podPortforwardEndpoint, err := provider.getUserServicePodPortforwardEndpoint(enclaveId, serviceUuid)
func (provider *GatewayConnectionProvider) ForUserServiceIfRunning(enclaveId string, serviceUuid string, servicePortSpecs map[string]*port_spec.PortSpec) (GatewayConnectionToKurtosis, error) {
podPortforwardEndpoint, err := provider.getMaybeUserServicePodPortforwardEndpoint(enclaveId, serviceUuid)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to find an api endpoint for Kubernetes portforward to a Kurtosis user service with id '%v' in enclave '%v', instead a non-nil error was returned", enclaveId, serviceUuid)
} else if podPortforwardEndpoint == nil {
return nil, nil
}
userServiceConnection, err := newLocalPortToPodPortConnection(provider.config, podPortforwardEndpoint, servicePortSpecs)
if err != nil {
Expand Down Expand Up @@ -167,7 +170,7 @@ func (provider *GatewayConnectionProvider) getApiContainerPodPortforwardEndpoint
return provider.kubernetesManager.GetPodPortforwardEndpointUrl(enclaveNamespaceName, apiContainerPodName), nil
}

func (provider *GatewayConnectionProvider) getUserServicePodPortforwardEndpoint(enclaveId string, serviceUuid string) (*url.URL, error) {
func (provider *GatewayConnectionProvider) getMaybeUserServicePodPortforwardEndpoint(enclaveId string, serviceUuid string) (*url.URL, error) {
userServiceNamespaceName, err := provider.getEnclaveNamespaceNameForEnclaveId(enclaveId)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to get a Kubernetes namespace corresponding to a Kurtosis enclave with id '%v', instead a non-nil error was returned", enclaveId)
Expand All @@ -181,7 +184,10 @@ func (provider *GatewayConnectionProvider) getUserServicePodPortforwardEndpoint(
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to get running user service pods with labels '%+v' in namespace '%v', instead a non nil error was returned", runningUserServicePodNames, userServiceNamespaceName)
}
if len(runningUserServicePodNames) != 1 {
if len(runningUserServicePodNames) == 0 {
// A stopped service has no pod running and no port forward endpoint
return nil, nil
} else if len(runningUserServicePodNames) != 1 {
return nil, stacktrace.NewError("Expected to find exactly 1 running user service pod with guid '%v' in enclave '%v', instead found '%v'", serviceUuid, enclaveId, len(runningUserServicePodNames))
}
userServicePodName := runningUserServicePodNames[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package api_container_gateway

import (
"context"
"io"
"sync"

"github.com/kurtosis-tech/kurtosis/api/golang/core/kurtosis_core_rpc_api_bindings"
"github.com/kurtosis-tech/kurtosis/api/golang/core/lib/binding_constructors"
"github.com/kurtosis-tech/kurtosis/cli/cli/kurtosis_gateway/connection"
Expand All @@ -10,8 +13,6 @@ import (
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"sync"
)

const (
Expand Down Expand Up @@ -123,7 +124,7 @@ func (service *ApiContainerGatewayServiceServer) AddServices(ctx context.Context

// Write over the PublicIp and Public Ports fields so the service can be accessed through local port forwarding
for serviceNameStr, serviceInfo := range remoteApiContainerResponse.GetSuccessfulServiceNameToServiceInfo() {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformationIfServiceRunning(serviceInfo); err != nil {
err = stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceNameStr)
failedServicesPool[serviceNameStr] = err.Error()
}
Expand Down Expand Up @@ -291,9 +292,10 @@ func (service *ApiContainerGatewayServiceServer) UploadStarlarkPackage(server ku
//
// ====================================================================================================

// writeOverServiceInfoFieldsWithLocalConnectionInformation overwites the `MaybePublicPorts` and `MaybePublicIpAdrr` fields to connect to local ports forwarding requests to private ports in Kubernetes
// writeOverServiceInfoFieldsWithLocalConnectionInformationIfServiceRunning overwites the `MaybePublicPorts` and `MaybePublicIpAdrr` fields to connect to local ports forwarding requests to private ports in Kubernetes
// Only TCP Private Ports are forwarded
func (service *ApiContainerGatewayServiceServer) writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo *kurtosis_core_rpc_api_bindings.ServiceInfo) error {
// Does nothing if the service is stopped (no pod running)
func (service *ApiContainerGatewayServiceServer) writeOverServiceInfoFieldsWithLocalConnectionInformationIfServiceRunning(serviceInfo *kurtosis_core_rpc_api_bindings.ServiceInfo) error {
// If the service has no private ports, then don't overwrite any of the service info fields
if len(serviceInfo.PrivatePorts) == 0 {
return nil
Expand All @@ -305,9 +307,11 @@ func (service *ApiContainerGatewayServiceServer) writeOverServiceInfoFieldsWithL
cleanUpConnection := true
runningLocalConnection, isFound := service.userServiceGuidToLocalConnectionMap[serviceUuid]
if !isFound {
runningLocalConnection, localConnErr = service.startRunningConnectionForKurtosisService(serviceUuid, serviceInfo.PrivatePorts)
runningLocalConnection, localConnErr = service.startRunningConnectionForKurtosisServiceIfRunning(serviceUuid, serviceInfo.PrivatePorts)
if localConnErr != nil {
return stacktrace.Propagate(localConnErr, "Expected to be able to start a local connection to Kurtosis service '%v', instead a non-nil error was returned", serviceUuid)
} else if runningLocalConnection == nil {
return nil
}
defer func() {
if cleanUpConnection {
Expand All @@ -322,9 +326,9 @@ func (service *ApiContainerGatewayServiceServer) writeOverServiceInfoFieldsWithL
return nil
}

// startRunningConnectionForKurtosisService starts a port forwarding process from kernel assigned local ports to the remote service ports specified
// startRunningConnectionForKurtosisServiceIfRunning starts a port forwarding process from kernel assigned local ports to the remote service ports specified
// If privatePortsFromApi is empty, an error is thrown
func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtosisService(serviceUuid string, privatePortsFromApi map[string]*kurtosis_core_rpc_api_bindings.Port) (*runningLocalServiceConnection, error) {
func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtosisServiceIfRunning(serviceUuid string, privatePortsFromApi map[string]*kurtosis_core_rpc_api_bindings.Port) (*runningLocalServiceConnection, error) {
if len(privatePortsFromApi) == 0 {
return nil, stacktrace.NewError("Expected Kurtosis service to have private ports specified for port forwarding, instead no ports were provided")
}
Expand All @@ -351,9 +355,11 @@ func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtos
}

// Start listening
serviceConnection, err := service.connectionProvider.ForUserService(service.enclaveId, serviceUuid, remotePrivatePortSpecs)
serviceConnection, err := service.connectionProvider.ForUserServiceIfRunning(service.enclaveId, serviceUuid, remotePrivatePortSpecs)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to start a local connection service with guid '%v' in enclave '%v', instead a non-nil error was returned", serviceUuid, service.enclaveId)
} else if serviceConnection == nil {
return nil, nil
}
cleanUpConnection := true
defer func() {
Expand Down Expand Up @@ -414,7 +420,7 @@ func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(s

serviceUuids := map[string]bool{}
for serviceId, serviceInfo := range serviceInfos {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformationIfServiceRunning(serviceInfo); err != nil {
return stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceId)
}
serviceUuids[serviceInfo.GetServiceUuid()] = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/stacktrace"
applyconfigurationsv1 "k8s.io/client-go/applyconfigurations/core/v1"
)

func StopUserServices(
Expand Down Expand Up @@ -46,22 +45,6 @@ func StopUserServices(
}
}

kubernetesService := resources.Service
serviceName := kubernetesService.Name
updateConfigurator := func(updatesToApply *applyconfigurationsv1.ServiceApplyConfiguration) {
specUpdates := applyconfigurationsv1.ServiceSpec().WithSelector(nil)
updatesToApply.WithSpec(specUpdates)
}
if _, err := kubernetesManager.UpdateService(ctx, namespaceName, serviceName, updateConfigurator); err != nil {
erroredUuids[serviceUuid] = stacktrace.Propagate(
err,
"An error occurred updating service '%v' in namespace '%v' to reflect that it's no longer running",
serviceName,
namespaceName,
)
continue
}

successfulUuids[serviceUuid] = true
}
return successfulUuids, erroredUuids, nil
Expand Down
13 changes: 0 additions & 13 deletions docs/docs/starlark-reference/plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,6 @@ plan.start_service(
)
```

:::caution

`start_service` is only available with the Docker backend.

:::


stop_service
------------

Expand All @@ -559,12 +552,6 @@ plan.stop_service(
)
```

:::caution

`stop_service` is only available with the Docker backend.

:::

store_service_files
-------------------

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
//go:build !minikube
// +build !minikube

// We don't run this test in Kubernetes because the gateway does not support start and stop services

package startosis_stop_service_test

import (
Expand Down Expand Up @@ -42,17 +37,19 @@ def run(plan):
plan.add_service(name = DATASTORE_SERVICE_NAME, config = config)
plan.print("Service " + DATASTORE_SERVICE_NAME + " deployed successfully.")
`
// We start the service we created through the script above with a different script
startScript = `

// We stop and restart the service we created through the script above with a different script
stopAndStartScript = `
DATASTORE_SERVICE_NAME = "` + serviceName + `"
def run(plan):
plan.stop_service(DATASTORE_SERVICE_NAME)
plan.start_service(DATASTORE_SERVICE_NAME)
`
// We stop the service we created through the script above with a different script
stopScript = `
// We start the service we created through the script above with a different script
startScript = `
DATASTORE_SERVICE_NAME = "` + serviceName + `"
def run(plan):
plan.stop_service(DATASTORE_SERVICE_NAME)
plan.start_service(DATASTORE_SERVICE_NAME)
`
)

Expand Down Expand Up @@ -96,7 +93,6 @@ Service example-datastore-server-1 deployed successfully.
"Error validating datastore server '%s' is healthy",
serviceName,
)

logrus.Infof("Validated that all services are healthy")

// we run the start script and validate that an error is returned since the service is already started.
Expand All @@ -107,8 +103,8 @@ Service example-datastore-server-1 deployed successfully.
expectedErrorStr := fmt.Sprintf("Service '%v' is already started", serviceName)
require.Contains(t, runResult.ExecutionError.ErrorMessage, expectedErrorStr)

// we run the stop script and validate that the service is unreachable.
runResult, err = test_helpers.RunScriptWithDefaultConfig(ctx, enclaveCtx, stopScript)
// we run the stop and restart script and validate that the service is unreachable.
runResult, err = test_helpers.RunScriptWithDefaultConfig(ctx, enclaveCtx, stopAndStartScript)
require.NoError(t, err, "Unexpected error executing stop script")

require.Nil(t, runResult.InterpretationError, "Unexpected interpretation error")
Expand All @@ -119,35 +115,13 @@ Service example-datastore-server-1 deployed successfully.
`
require.Regexp(t, expectedScriptOutput, string(runResult.RunOutput))

require.Error(
t,
test_helpers.ValidateDatastoreServiceHealthy(context.Background(), enclaveCtx, serviceName, portId),
"Error validating datastore server '%s' is not healthy",
serviceName,
)

logrus.Infof("Validated that the service is stopped")

// we run the start script and validate that the service is ready
runResult, err = test_helpers.RunScriptWithDefaultConfig(ctx, enclaveCtx, startScript)
require.NoError(t, err, "Unexpected error executing start script")

require.Nil(t, runResult.InterpretationError, "Unexpected interpretation error")
require.Empty(t, runResult.ValidationErrors, "Unexpected validation error")
require.Nil(t, runResult.ExecutionError, "Unexpected execution error")

expectedScriptOutput = `Service 'example-datastore-server-1' started
`
require.Regexp(t, expectedScriptOutput, string(runResult.RunOutput))

require.NoError(
t,
test_helpers.ValidateDatastoreServiceHealthy(context.Background(), enclaveCtx, serviceName, portId),
"Error validating datastore server '%s' is healthy",
serviceName,
)

logrus.Infof("Validated that the service is started")
logrus.Infof("Validated that the service is restarted")

// we run the start script and validate that an error is returned since the service is already started.
runResult, _ = test_helpers.RunScriptWithDefaultConfig(ctx, enclaveCtx, startScript)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
//go:build !minikube
// +build !minikube

// We don't run this test in Kubernetes because the gateway does not support start and stop services

package startosis_stop_service_test

import (
Expand Down
3 changes: 1 addition & 2 deletions internal_testsuites/typescript/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ DEFAULT_TESTSUITE_CLUSTER_BACKEND="${TESTSUITE_CLUSTER_BACKEND_DOCKER}"
# network_partition_starlark,network_partition_test,network_soft_partition_test - Networking partitioning is not implemented in kubernetes
# service_pause_test - Service pausing not implemented in Kubernetes
# stream_log_test,search_log_test - The centralized logs feature is not implemented in Kubernetes yet
# startosis_start_service_test,startosis_stop_service_test - The start and stop service feature does not work in Kubernetes due to a required update in the gateway
KUBERNETES_TEST_IGNORE_PATTERNS="/build/testsuite/(network_partition_starlark|network_partition_test|network_soft_partition_test|service_pause_test|stream_log_test|search_logs_test|startosis_start_service_test|startosis_stop_service_test)"
KUBERNETES_TEST_IGNORE_PATTERNS="/build/testsuite/(network_partition_starlark|network_partition_test|network_soft_partition_test|service_pause_test|stream_log_test|search_logs_test)"

# ==================================================================================================
# Arg Parsing & Validation
Expand Down

0 comments on commit fb3e922

Please sign in to comment.