Skip to content

Commit

Permalink
Add event API v2 server handler (#5622)
Browse files Browse the repository at this point in the history
* add v2 server handler

* execute test and code cleanup

* gofmt

* move more event tests for added functions

* update emptyStateWithArtifacts()

* boilerplate

* golangci-lint
  • Loading branch information
MarlonGamez authored Apr 2, 2021
1 parent b557b14 commit 25ec081
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 0 deletions.
70 changes: 70 additions & 0 deletions pkg/skaffold/event/v2/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func emptyStateWithArtifacts(builds map[string]string, metadata *proto.Metadata,
AutoTrigger: autoBuild,
StatusCode: proto.StatusCode_OK,
},
TestState: &proto.TestState{
Status: NotStarted,
StatusCode: proto.StatusCode_OK,
},
DeployState: &proto.DeployState{
Status: NotStarted,
AutoTrigger: autoDeploy,
Expand All @@ -182,6 +186,53 @@ func emptyStateWithArtifacts(builds map[string]string, metadata *proto.Metadata,
}
}

// ResetStateOnBuild resets the build, test, deploy and sync state
func ResetStateOnBuild() {
builds := map[string]string{}
for k := range handler.getState().BuildState.Artifacts {
builds[k] = NotStarted
}
autoBuild, autoDeploy, autoSync := handler.getState().BuildState.AutoTrigger, handler.getState().DeployState.AutoTrigger, handler.getState().FileSyncState.AutoTrigger
newState := emptyStateWithArtifacts(builds, handler.getState().Metadata, autoBuild, autoDeploy, autoSync)
handler.setState(newState)
}

// ResetStateOnTest resets the test, deploy, sync and status check state
func ResetStateOnTest() {
newState := handler.getState()
newState.TestState.Status = NotStarted
handler.setState(newState)
}

// ResetStateOnDeploy resets the deploy, sync and status check state
func ResetStateOnDeploy() {
newState := handler.getState()
newState.DeployState.Status = NotStarted
newState.DeployState.StatusCode = proto.StatusCode_OK
newState.StatusCheckState = emptyStatusCheckState()
newState.ForwardedPorts = map[int32]*proto.PortForwardEvent{}
newState.DebuggingContainers = nil
handler.setState(newState)
}

func UpdateStateAutoBuildTrigger(t bool) {
newState := handler.getState()
newState.BuildState.AutoTrigger = t
handler.setState(newState)
}

func UpdateStateAutoDeployTrigger(t bool) {
newState := handler.getState()
newState.DeployState.AutoTrigger = t
handler.setState(newState)
}

func UpdateStateAutoSyncTrigger(t bool) {
newState := handler.getState()
newState.FileSyncState.AutoTrigger = t
handler.setState(newState)
}

func emptyStatusCheckState() *proto.StatusCheckState {
return &proto.StatusCheckState{
Status: NotStarted,
Expand All @@ -190,6 +241,19 @@ func emptyStatusCheckState() *proto.StatusCheckState {
}
}

func AutoTriggerDiff(name string, val bool) (bool, error) {
switch name {
case "build":
return val != handler.getState().BuildState.AutoTrigger, nil
case "sync":
return val != handler.getState().FileSyncState.AutoTrigger, nil
case "deploy":
return val != handler.getState().DeployState.AutoTrigger, nil
default:
return false, fmt.Errorf("unknown phase %v not found in handler state", name)
}
}

func TaskInProgress(taskName string, iteration int) {
handler.handleTaskEvent(&proto.TaskEvent{
Id: fmt.Sprintf("%s-%d", taskName, iteration),
Expand All @@ -210,6 +274,12 @@ func TaskFailed(taskName sErrors.Phase, iteration int, err error) {
})
}

func (ev *eventHandler) setState(state proto.State) {
ev.stateLock.Lock()
ev.state = state
ev.stateLock.Unlock()
}

func (ev *eventHandler) handle(event *proto.Event) {
go func(t *timestamp.Timestamp) {
event.Timestamp = t
Expand Down
124 changes: 124 additions & 0 deletions pkg/skaffold/event/v2/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/GoogleContainerTools/skaffold/testutil"
)

var targetPort = proto.IntOrString{Type: 0, IntVal: 2001}

func TestGetLogEvents(t *testing.T) {
for step := 0; step < 1000; step++ {
ev := newHandler()
Expand Down Expand Up @@ -106,6 +108,76 @@ func wait(t *testing.T, condition func() bool) {
}
}

func TestResetStateOnBuild(t *testing.T) {
defer func() { handler = newHandler() }()
handler = newHandler()
handler.state = proto.State{
BuildState: &proto.BuildState{
Artifacts: map[string]string{
"image1": Complete,
},
},
DeployState: &proto.DeployState{Status: Complete},
ForwardedPorts: map[int32]*proto.PortForwardEvent{
2001: {
LocalPort: 2000,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
FileSyncState: &proto.FileSyncState{Status: Succeeded},
}

ResetStateOnBuild()
expected := proto.State{
BuildState: &proto.BuildState{
Artifacts: map[string]string{
"image1": NotStarted,
},
},
TestState: &proto.TestState{Status: NotStarted},
DeployState: &proto.DeployState{Status: NotStarted},
StatusCheckState: &proto.StatusCheckState{Status: NotStarted, Resources: map[string]string{}},
FileSyncState: &proto.FileSyncState{Status: NotStarted},
}
testutil.CheckDeepEqual(t, expected, handler.getState(), cmpopts.EquateEmpty())
}

func TestResetStateOnDeploy(t *testing.T) {
defer func() { handler = newHandler() }()
handler = newHandler()
handler.state = proto.State{
BuildState: &proto.BuildState{
Artifacts: map[string]string{
"image1": Complete,
},
},
DeployState: &proto.DeployState{Status: Complete},
ForwardedPorts: map[int32]*proto.PortForwardEvent{
2001: {
LocalPort: 2000,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
}
ResetStateOnDeploy()
expected := proto.State{
BuildState: &proto.BuildState{
Artifacts: map[string]string{
"image1": Complete,
},
},
DeployState: &proto.DeployState{Status: NotStarted},
StatusCheckState: &proto.StatusCheckState{Status: NotStarted,
Resources: map[string]string{},
},
}
testutil.CheckDeepEqual(t, expected, handler.getState(), cmpopts.EquateEmpty())
}

func TestEmptyStateCheckState(t *testing.T) {
actual := emptyStatusCheckState()
expected := &proto.StatusCheckState{Status: NotStarted,
Expand All @@ -114,6 +186,58 @@ func TestEmptyStateCheckState(t *testing.T) {
testutil.CheckDeepEqual(t, expected, actual, cmpopts.EquateEmpty())
}

func TestUpdateStateAutoTriggers(t *testing.T) {
defer func() { handler = newHandler() }()
handler = newHandler()
handler.state = proto.State{
BuildState: &proto.BuildState{
Artifacts: map[string]string{
"image1": Complete,
},
AutoTrigger: false,
},
DeployState: &proto.DeployState{Status: Complete, AutoTrigger: false},
ForwardedPorts: map[int32]*proto.PortForwardEvent{
2001: {
LocalPort: 2000,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
FileSyncState: &proto.FileSyncState{
Status: "Complete",
AutoTrigger: false,
},
}
UpdateStateAutoBuildTrigger(true)
UpdateStateAutoDeployTrigger(true)
UpdateStateAutoSyncTrigger(true)

expected := proto.State{
BuildState: &proto.BuildState{
Artifacts: map[string]string{
"image1": Complete,
},
AutoTrigger: true,
},
DeployState: &proto.DeployState{Status: Complete, AutoTrigger: true},
ForwardedPorts: map[int32]*proto.PortForwardEvent{
2001: {
LocalPort: 2000,
PodName: "test/pod",
TargetPort: &targetPort,
},
},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
FileSyncState: &proto.FileSyncState{
Status: "Complete",
AutoTrigger: true,
},
}
testutil.CheckDeepEqual(t, expected, handler.getState(), cmpopts.EquateEmpty())
}

func TestTaskFailed(t *testing.T) {
tcs := []struct {
description string
Expand Down
15 changes: 15 additions & 0 deletions pkg/skaffold/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
v2 "github.com/GoogleContainerTools/skaffold/pkg/skaffold/server/v2"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/proto/v1"
protoV2 "github.com/GoogleContainerTools/skaffold/proto/v2"
)

const maxTryListen = 10
Expand Down Expand Up @@ -152,7 +154,16 @@ func newGRPCServer(preferredPort int, usedPorts *util.PortSet) (func() error, in
autoSyncCallback: func(bool) {},
autoDeployCallback: func(bool) {},
}
v2.Srv = &v2.Server{
BuildIntentCallback: func() {},
DeployIntentCallback: func() {},
SyncIntentCallback: func() {},
AutoBuildCallback: func(bool) {},
AutoSyncCallback: func(bool) {},
AutoDeployCallback: func(bool) {},
}
proto.RegisterSkaffoldServiceServer(s, srv)
protoV2.RegisterSkaffoldV2ServiceServer(s, v2.Srv)

go func() {
if err := s.Serve(l); err != nil {
Expand Down Expand Up @@ -185,6 +196,10 @@ func newHTTPServer(preferredPort, proxyPort int, usedPorts *util.PortSet) (func(
if err != nil {
return func() error { return nil }, err
}
err = protoV2.RegisterSkaffoldV2ServiceHandlerFromEndpoint(context.Background(), mux, fmt.Sprintf("%s:%d", util.Loopback, proxyPort), opts)
if err != nil {
return func() error { return nil }, err
}

l, port, err := listenOnAvailablePort(preferredPort, usedPorts)
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions pkg/skaffold/server/v2/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2021 The Skaffold Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v2

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

event "github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/v2"
proto "github.com/GoogleContainerTools/skaffold/proto/v2"
)

var (
// For Testing
resetStateOnBuild = event.ResetStateOnBuild
resetStateOnDeploy = event.ResetStateOnDeploy
)

func (s *Server) GetState(context.Context, *empty.Empty) (*proto.State, error) {
return event.GetState()
}

func (s *Server) Events(_ *empty.Empty, stream proto.SkaffoldV2Service_EventsServer) error {
return event.ForEachEvent(stream.Send)
}

func (s *Server) Handle(ctx context.Context, e *proto.Event) (*empty.Empty, error) {
return &empty.Empty{}, event.Handle(e)
}

func (s *Server) Execute(ctx context.Context, request *proto.UserIntentRequest) (*empty.Empty, error) {
intent := request.GetIntent()
if intent.GetBuild() {
resetStateOnBuild()
go func() {
s.BuildIntentCallback()
}()
}

if intent.GetDeploy() {
resetStateOnDeploy()
go func() {
s.DeployIntentCallback()
}()
}

if intent.GetSync() {
go func() {
s.SyncIntentCallback()
}()
}

return &empty.Empty{}, nil
}

func (s *Server) AutoBuild(ctx context.Context, request *proto.TriggerRequest) (res *empty.Empty, err error) {
return executeAutoTrigger("build", request, event.UpdateStateAutoBuildTrigger, event.ResetStateOnBuild, s.AutoBuildCallback)
}

func (s *Server) AutoDeploy(ctx context.Context, request *proto.TriggerRequest) (res *empty.Empty, err error) {
return executeAutoTrigger("deploy", request, event.UpdateStateAutoDeployTrigger, event.ResetStateOnDeploy, s.AutoDeployCallback)
}

func (s *Server) AutoSync(ctx context.Context, request *proto.TriggerRequest) (res *empty.Empty, err error) {
return executeAutoTrigger("sync", request, event.UpdateStateAutoSyncTrigger, func() {}, s.AutoSyncCallback)
}

func executeAutoTrigger(triggerName string, request *proto.TriggerRequest, updateTriggerStateFunc func(bool), resetPhaseStateFunc func(), serverCallback func(bool)) (res *empty.Empty, err error) {
res = &empty.Empty{}

trigger := request.GetState().GetEnabled()
update, err := event.AutoTriggerDiff(triggerName, trigger)
if err != nil {
return
}
if !update {
err = status.Errorf(codes.AlreadyExists, "auto %v is already set to %t", triggerName, trigger)
return
}
// update trigger state
updateTriggerStateFunc(trigger)
if trigger {
// reset phase state only when auto trigger is being set to true
resetPhaseStateFunc()
}
go func() {
serverCallback(trigger)
}()
return
}
Loading

0 comments on commit 25ec081

Please sign in to comment.