Skip to content

Commit

Permalink
Start API server only once (#2382)
Browse files Browse the repository at this point in the history
Signed-off-by: David Gageot <[email protected]>
dgageot authored Jul 1, 2019

Verified

This commit was signed with the committer’s verified signature. The key has expired.
charlyx Charles-Henri GUERIN
1 parent daf58c6 commit 23da73c
Showing 13 changed files with 136 additions and 164 deletions.
87 changes: 55 additions & 32 deletions cmd/skaffold/app/cmd/cmd.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/update"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/version"
"github.com/pkg/errors"
@@ -44,47 +46,68 @@ var (

func NewSkaffoldCommand(out, err io.Writer) *cobra.Command {
updateMsg := make(chan string)
var shutdownAPIServer func() error

rootCmd := &cobra.Command{
Use: "skaffold",
Short: "A tool that facilitates continuous development for Kubernetes applications.",
SilenceErrors: true,
}

rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
opts.Command = cmd.Use
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
cmd.Root().SilenceUsage = true

if err := SetUpLogs(err, v); err != nil {
return err
}
opts.Command = cmd.Use

if forceColors {
color.ForceColors()
}
// Setup colors
if forceColors {
color.ForceColors()
}
color.OverwriteDefault(color.Color(defaultColor))

rootCmd.SilenceUsage = true
logrus.Infof("Skaffold %+v", version.Get())
color.OverwriteDefault(color.Color(defaultColor))
// Setup logs
if err := setUpLogs(err, v); err != nil {
return err
}

if quietFlag {
logrus.Debugf("Update check is disabled because of quiet mode")
} else {
go func() {
if err := updateCheck(updateMsg); err != nil {
logrus.Infof("update check failed: %s", err)
}
}()
}
// Start API Server
if cmd.Use == "dev" {
// TODO(dgageot): api server is always started in dev mode, right now.
// It should instead default to true.
opts.EnableRPC = true
}
shutdown, err := server.Initialize(opts)
if err != nil {
return errors.Wrap(err, "initializing api server")
}
shutdownAPIServer = shutdown

// Print version
version := version.Get()
logrus.Infof("Skaffold %+v", version)
event.LogSkaffoldMetadata(version)

if quietFlag {
logrus.Debugf("Update check is disabled because of quiet mode")
} else {
go func() {
if err := updateCheck(updateMsg); err != nil {
logrus.Infof("update check failed: %s", err)
}
}()
}

return nil
}
return nil
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
select {
case msg := <-updateMsg:
fmt.Fprintf(out, "%s\n", msg)
default:
}

rootCmd.PersistentPostRun = func(cmd *cobra.Command, args []string) {
select {
case msg := <-updateMsg:
fmt.Fprintf(out, "%s\n", msg)
default:
}
if shutdownAPIServer != nil {
shutdownAPIServer()
}
},
}

SetUpFlags()
@@ -158,9 +181,9 @@ func FlagToEnvVarName(f *pflag.Flag) string {
return fmt.Sprintf("SKAFFOLD_%s", strings.Replace(strings.ToUpper(f.Name), "-", "_", -1))
}

func SetUpLogs(out io.Writer, level string) error {
func setUpLogs(out io.Writer, level string) error {
logrus.SetOutput(out)
lvl, err := logrus.ParseLevel(v)
lvl, err := logrus.ParseLevel(level)
if err != nil {
return errors.Wrap(err, "parsing log level")
}
2 changes: 0 additions & 2 deletions cmd/skaffold/app/cmd/dev.go
Original file line number Diff line number Diff line change
@@ -43,8 +43,6 @@ func NewCmdDev(out io.Writer) *cobra.Command {
}

func doDev(ctx context.Context, out io.Writer) error {
opts.EnableRPC = true

cleanup := func() {}
if opts.Cleanup {
defer func() {
2 changes: 1 addition & 1 deletion cmd/skaffold/app/cmd/runner.go
Original file line number Diff line number Diff line change
@@ -42,9 +42,9 @@ func withRunner(ctx context.Context, action func(runner.Runner, *latest.Skaffold
if err != nil {
return errors.Wrap(err, "creating runner")
}
defer runner.Stop()

err = action(runner, config)

return alwaysSucceedWhenCancelled(ctx, err)
}

9 changes: 2 additions & 7 deletions pkg/skaffold/build/local/local_test.go
Original file line number Diff line number Diff line change
@@ -222,17 +222,12 @@ func TestLocalRun(t *testing.T) {
fakeWarner := &warnings.Collect{}
t.Override(&warnings.Printf, fakeWarner.Warnf)

cfg := latest.BuildConfig{
event.InitializeState(latest.BuildConfig{
BuildType: latest.BuildType{
LocalBuild: &latest.LocalBuild{},
},
}
event.InitializeState(&runcontext.RunContext{
Cfg: &latest.Pipeline{
Build: cfg,
},
Opts: &config.SkaffoldOptions{},
})

l := Builder{
cfg: &latest.LocalBuild{},
localDocker: docker.NewLocalDaemon(&test.api, nil, false, map[string]bool{}),
10 changes: 1 addition & 9 deletions pkg/skaffold/build/sequence_test.go
Original file line number Diff line number Diff line change
@@ -25,9 +25,7 @@ import (
"testing"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/tag"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/testutil"
)
@@ -143,15 +141,9 @@ func (t *concatTagger) doBuild(ctx context.Context, out io.Writer, artifact *lat
}

func initializeEvents() {
cfg := latest.BuildConfig{
event.InitializeState(latest.BuildConfig{
BuildType: latest.BuildType{
LocalBuild: &latest.LocalBuild{},
},
}
event.InitializeState(&runcontext.RunContext{
Cfg: &latest.Pipeline{
Build: cfg,
},
Opts: &config.SkaffoldOptions{},
})
}
2 changes: 1 addition & 1 deletion pkg/skaffold/deploy/helm_test.go
Original file line number Diff line number Diff line change
@@ -459,7 +459,7 @@ func TestHelmDeploy(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&util.DefaultExecCommand, test.cmd)

event.InitializeState(test.runContext)
event.InitializeState(test.runContext.Cfg.Build)
err := NewHelmDeployer(test.runContext).Deploy(context.Background(), ioutil.Discard, test.builds, nil)

t.CheckError(test.shouldErr, err)
28 changes: 12 additions & 16 deletions pkg/skaffold/event/event.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ import (
"fmt"
"sync"

runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server/proto"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/version"
@@ -35,10 +34,7 @@ const (
Failed = "Failed"
)

var (
handler *eventHandler
once sync.Once
)
var handler = &eventHandler{}

type eventHandler struct {
eventLog []proto.LogEntry
@@ -126,12 +122,10 @@ func (ev *eventHandler) forEachEvent(callback func(*proto.LogEntry) error) error
return <-listener.errors
}

func emptyState(build *latest.BuildConfig) proto.State {
func emptyState(build latest.BuildConfig) proto.State {
builds := map[string]string{}
if build != nil {
for _, a := range build.Artifacts {
builds[a.ImageName] = NotStarted
}
for _, a := range build.Artifacts {
builds[a.ImageName] = NotStarted
}

return proto.State{
@@ -146,12 +140,8 @@ func emptyState(build *latest.BuildConfig) proto.State {
}

// InitializeState instantiates the global state of the skaffold runner, as well as the event log.
func InitializeState(runCtx *runcontext.RunContext) {
once.Do(func() {
handler = &eventHandler{
state: emptyState(&runCtx.Cfg.Build),
}
})
func InitializeState(build latest.BuildConfig) {
handler.setState(emptyState(build))
}

// DeployInProgress notifies that a deployment has been started.
@@ -202,6 +192,12 @@ func PortForwarded(localPort, remotePort int32, podName, containerName, namespac
})
}

func (ev *eventHandler) setState(state proto.State) {
ev.stateLock.Lock()
ev.state = state
ev.stateLock.Unlock()
}

func (ev *eventHandler) handleDeployEvent(e *proto.DeployEvent) {
go ev.handle(&proto.Event{
EventType: &proto.Event_DeployEvent{
30 changes: 15 additions & 15 deletions pkg/skaffold/event/event_test.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ func TestGetLogEvents(t *testing.T) {

func TestGetState(t *testing.T) {
ev := &eventHandler{
state: emptyState(nil),
state: emptyState(latest.BuildConfig{}),
}

ev.stateLock.Lock()
@@ -68,10 +68,10 @@ func TestGetState(t *testing.T) {
}

func TestDeployInProgress(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(nil),
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().DeployState.Status == NotStarted })
@@ -80,10 +80,10 @@ func TestDeployInProgress(t *testing.T) {
}

func TestDeployFailed(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(nil),
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().DeployState.Status == NotStarted })
@@ -92,10 +92,10 @@ func TestDeployFailed(t *testing.T) {
}

func TestDeployComplete(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(nil),
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().DeployState.Status == NotStarted })
@@ -104,10 +104,10 @@ func TestDeployComplete(t *testing.T) {
}

func TestBuildInProgress(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(&latest.BuildConfig{
state: emptyState(latest.BuildConfig{
Artifacts: []*latest.Artifact{{
ImageName: "img",
}},
@@ -120,10 +120,10 @@ func TestBuildInProgress(t *testing.T) {
}

func TestBuildFailed(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(&latest.BuildConfig{
state: emptyState(latest.BuildConfig{
Artifacts: []*latest.Artifact{{
ImageName: "img",
}},
@@ -136,10 +136,10 @@ func TestBuildFailed(t *testing.T) {
}

func TestBuildComplete(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(&latest.BuildConfig{
state: emptyState(latest.BuildConfig{
Artifacts: []*latest.Artifact{{
ImageName: "img",
}},
@@ -152,10 +152,10 @@ func TestBuildComplete(t *testing.T) {
}

func TestPortForwarded(t *testing.T) {
defer func() { handler = nil }()
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(nil),
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().ForwardedPorts["container"] == nil })
5 changes: 2 additions & 3 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/testutil"
v1 "k8s.io/api/core/v1"
@@ -436,7 +435,7 @@ func TestAutomaticPortForwardPod(t *testing.T) {
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
event.InitializeState(&runcontext.RunContext{Cfg: &latest.Pipeline{Build: latest.BuildConfig{}}})
event.InitializeState(latest.BuildConfig{})
taken := map[int]struct{}{}

forwardingTimeoutTime = time.Second
@@ -499,7 +498,7 @@ func TestStartPodForwarder(t *testing.T) {

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
event.InitializeState(&runcontext.RunContext{Cfg: &latest.Pipeline{Build: latest.BuildConfig{}}})
event.InitializeState(latest.BuildConfig{})
client := fakekubeclientset.NewSimpleClientset(&v1.Pod{})
fakeWatcher := watch.NewRaceFreeFake()
client.PrependWatchReactor("*", testutil.SetupFakeWatcher(fakeWatcher))
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/testutil"
"github.com/google/go-cmp/cmp"
@@ -115,7 +114,7 @@ func TestStart(t *testing.T) {
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
event.InitializeState(&runcontext.RunContext{Cfg: &latest.Pipeline{Build: latest.BuildConfig{}}})
event.InitializeState(latest.BuildConfig{})
fakeForwarder := newTestForwarder(nil)
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard), "", nil)
rf.EntryForwarder = fakeForwarder
@@ -226,7 +225,7 @@ func TestUserDefinedResources(t *testing.T) {
}

testutil.Run(t, "one service and one user defined pod", func(t *testutil.T) {
event.InitializeState(&runcontext.RunContext{Cfg: &latest.Pipeline{Build: latest.BuildConfig{}}})
event.InitializeState(latest.BuildConfig{})
fakeForwarder := newTestForwarder(nil)
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard), "", []*latest.PortForwardResource{pod})
rf.EntryForwarder = fakeForwarder
17 changes: 1 addition & 16 deletions pkg/skaffold/runner/runner.go
Original file line number Diff line number Diff line change
@@ -36,11 +36,9 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/sync"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/sync/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/test"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/version"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/watch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -56,7 +54,6 @@ type Runner interface {
Prune(context.Context, io.Writer) error
HasDeployed() bool
HasBuilt() bool
Stop() error
}

// SkaffoldRunner is responsible for running the skaffold build, test and deploy config.
@@ -77,7 +74,6 @@ type SkaffoldRunner struct {
hasBuilt bool
hasDeployed bool
imageList *kubernetes.ImageList
RPCServerShutdown func() error
}

// NewForConfig returns a new SkaffoldRunner for a SkaffoldConfig
@@ -118,12 +114,7 @@ func NewForConfig(opts *config.SkaffoldOptions, cfg *latest.SkaffoldConfig) (*Sk
return nil, errors.Wrap(err, "creating watch trigger")
}

shutdown, err := server.Initialize(opts)
if err != nil {
return nil, errors.Wrap(err, "initializing skaffold server")
}
event.InitializeState(runCtx)
event.LogSkaffoldMetadata(version.Get())
event.InitializeState(runCtx.Cfg.Build)

return &SkaffoldRunner{
Builder: builder,
@@ -138,7 +129,6 @@ func NewForConfig(opts *config.SkaffoldOptions, cfg *latest.SkaffoldConfig) (*Sk
imageList: kubernetes.NewImageList(),
cache: artifactCache,
runCtx: runCtx,
RPCServerShutdown: shutdown,
}, nil
}

@@ -288,8 +278,3 @@ func (r *SkaffoldRunner) imageTags(ctx context.Context, out io.Writer, artifacts
color.Default.Fprintln(out, "Tags generated in", time.Since(start))
return imageTags, nil
}

// Stop stops the runner.
func (r *SkaffoldRunner) Stop() error {
return r.RPCServerShutdown()
}
98 changes: 43 additions & 55 deletions pkg/skaffold/server/server.go
Original file line number Diff line number Diff line change
@@ -33,12 +33,6 @@ import (
"google.golang.org/grpc"
)

var (
once sync.Once
callback func() error
err error
)

// Trigger TODO(dgageot): this was a global variable carried by the runCtx.
// I changed it to a plain global variable to fix an issue.
// This needs to be better addressed.
@@ -48,59 +42,10 @@ type server struct {
trigger chan bool
}

func newGRPCServer(port int) (func() error, error) {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
return func() error { return nil }, errors.Wrap(err, "creating listener")
}
logrus.Infof("starting gRPC server on port %d", port)

s := grpc.NewServer()
proto.RegisterSkaffoldServiceServer(s, &server{
trigger: Trigger,
})

go func() {
if err := s.Serve(l); err != nil {
logrus.Errorf("failed to start grpc server: %s", err)
}
}()
return func() error {
s.Stop()
return l.Close()
}, nil
}

func newHTTPServer(port, proxyPort int) (func() error, error) {
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := proto.RegisterSkaffoldServiceHandlerFromEndpoint(context.Background(), mux, fmt.Sprintf("%s:%d", util.Loopback, proxyPort), opts)
if err != nil {
return func() error { return nil }, err
}

l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
return func() error { return nil }, errors.Wrap(err, "creating listener")
}
logrus.Infof("starting gRPC HTTP server on port %d", port)

go http.Serve(l, mux)

return l.Close, nil
}

// Initialize creates the gRPC and HTTP servers for serving the state and event log.
// It returns a shutdown callback for tearing down the grpc server,
// which the runner is responsible for calling.
func Initialize(opts *config.SkaffoldOptions) (func() error, error) {
once.Do(func() {
callback, err = initialize(opts)
})
return callback, err
}

func initialize(opts *config.SkaffoldOptions) (func() error, error) {
if !opts.EnableRPC {
return func() error { return nil }, nil
}
@@ -142,5 +87,48 @@ func initialize(opts *config.SkaffoldOptions) (func() error, error) {
if err != nil {
return callback, errors.Wrap(err, "starting HTTP server")
}

return callback, nil
}

func newGRPCServer(port int) (func() error, error) {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
return func() error { return nil }, errors.Wrap(err, "creating listener")
}
logrus.Infof("starting gRPC server on port %d", port)

s := grpc.NewServer()
proto.RegisterSkaffoldServiceServer(s, &server{
trigger: Trigger,
})

go func() {
if err := s.Serve(l); err != nil {
logrus.Errorf("failed to start grpc server: %s", err)
}
}()
return func() error {
s.Stop()
return l.Close()
}, nil
}

func newHTTPServer(port, proxyPort int) (func() error, error) {
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := proto.RegisterSkaffoldServiceHandlerFromEndpoint(context.Background(), mux, fmt.Sprintf("%s:%d", util.Loopback, proxyPort), opts)
if err != nil {
return func() error { return nil }, err
}

l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
return func() error { return nil }, errors.Wrap(err, "creating listener")
}
logrus.Infof("starting gRPC HTTP server on port %d", port)

go http.Serve(l, mux)

return l.Close, nil
}
5 changes: 1 addition & 4 deletions pkg/skaffold/server/server_test.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ package server
import (
"fmt"
"net"
"sync"
"testing"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
@@ -34,10 +33,8 @@ var (
)

func TestServerStartup(t *testing.T) {
once = sync.Once{}

// start up servers
shutdown, err := initialize(&config.SkaffoldOptions{
shutdown, err := Initialize(&config.SkaffoldOptions{
EnableRPC: true,
RPCPort: rpcAddr,
RPCHTTPPort: httpAddr,

0 comments on commit 23da73c

Please sign in to comment.