diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000000..694ae5c372 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,14 @@ +run: + tests: false +linters: + disable-all: true + enable: + - misspell + - gofmt + - goimports + - golint + - ineffassign + - deadcode + - unconvert + - govet + diff --git a/.gometalinter.json b/.gometalinter.json deleted file mode 100644 index 6710a180dc..0000000000 --- a/.gometalinter.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "Vendor": true, - "Exclude": [ - ".*\\.pb\\.go" - ], - "Enable": [ - "vet", - "misspell", - "gofmt", - "goimports", - "golint", - "ineffassign", - "deadcode", - "unconvert" - ], - "Deadline": "2m" -} diff --git a/agent/agent.go b/agent/agent.go index 743072f9da..58ebff5934 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -575,7 +575,7 @@ func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.No // Override hostname and TLS info if desc != nil { - if a.config.Hostname != "" && desc != nil { + if a.config.Hostname != "" { desc.Hostname = a.config.Hostname } desc.TLSInfo = tlsInfo diff --git a/agent/exec/dockerapi/controller.go b/agent/exec/dockerapi/controller.go index 1450fbdc36..abb2e15e90 100644 --- a/agent/exec/dockerapi/controller.go +++ b/agent/exec/dockerapi/controller.go @@ -654,7 +654,7 @@ func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) { return nil, err } - protocol := api.ProtocolTCP + var protocol api.PortConfig_Protocol switch strings.ToLower(parts[1]) { case "tcp": protocol = api.ProtocolTCP diff --git a/agent/session.go b/agent/session.go index 526953509b..2e7f1b6a37 100644 --- a/agent/session.go +++ b/agent/session.go @@ -3,6 +3,7 @@ package agent import ( "context" "errors" + "math" "sync" "time" @@ -64,6 +65,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI cc, err := agent.config.ConnBroker.Select( grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), ) if err != nil { @@ -136,7 +138,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e // `ctx` is done and hence fail to propagate the timeout error to the agent. // If the error is not propogated to the agent, the agent will not close // the session or rebuild a new session. - sessionCtx, cancelSession := context.WithCancel(ctx) // nolint: vet + sessionCtx, cancelSession := context.WithCancel(ctx) //nolint:govet // Need to run Session in a goroutine since there's no way to set a // timeout for an individual Recv call in a stream. @@ -159,7 +161,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e select { case err := <-errChan: if err != nil { - return err // nolint: vet + return err //nolint:govet } case <-time.After(dispatcherRPCTimeout): cancelSession() diff --git a/cmd/swarmctl/service/flagparser/tmpfs.go b/cmd/swarmctl/service/flagparser/tmpfs.go index 36d9e26da7..8b42696082 100644 --- a/cmd/swarmctl/service/flagparser/tmpfs.go +++ b/cmd/swarmctl/service/flagparser/tmpfs.go @@ -64,7 +64,7 @@ func parseTmpfs(flags *pflag.FlagSet, spec *api.ServiceSpec) error { // remove suffix and try again suffix := meat[len(meat)-1] meat = meat[:len(meat)-1] - var multiplier int64 = 1 + var multiplier int64 switch suffix { case 'g': multiplier = 1 << 30 diff --git a/direct.mk b/direct.mk index 0cdeff946a..4e1f4bf216 100644 --- a/direct.mk +++ b/direct.mk @@ -39,8 +39,8 @@ version/version.go: setup: ## install dependencies @echo "🐳 $@" # TODO(stevvooe): Install these from the vendor directory - @go get -u github.com/alecthomas/gometalinter - @gometalinter --install + # install golangci-lint version 1.17.1 to ./bin/golangci-lint + @curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s v1.17.1 @go get -u github.com/lk4d4/vndr @go get -u github.com/stevvooe/protobuild @@ -65,7 +65,7 @@ checkprotos: generate ## check if protobufs needs to be generated again check: fmt-proto check: ## Run various source code validation tools @echo "🐳 $@" - @gometalinter ./... + @./bin/golangci-lint run .PHONY: fmt-proto fmt-proto: diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 6149806470..d1db2fdc83 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -238,7 +238,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { if err != nil { return err } - if err == nil && len(clusters) == 1 { + if len(clusters) == 1 { heartbeatPeriod, err := gogotypes.DurationFromProto(clusters[0].Spec.Dispatcher.HeartbeatPeriod) if err == nil && heartbeatPeriod > 0 { d.config.HeartbeatPeriod = heartbeatPeriod diff --git a/manager/drivers/provider.go b/manager/drivers/provider.go index 0d9be6119d..97c36fe73d 100644 --- a/manager/drivers/provider.go +++ b/manager/drivers/provider.go @@ -22,7 +22,7 @@ func (m *DriverProvider) NewSecretDriver(driver *api.Driver) (*SecretDriver, err if m.pluginGetter == nil { return nil, fmt.Errorf("plugin getter is nil") } - if driver == nil && driver.Name == "" { + if driver == nil || driver.Name == "" { return nil, fmt.Errorf("driver specification is nil") } // Search for the specified plugin diff --git a/manager/manager.go b/manager/manager.go index b66a8db570..9112fb2b40 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "math" "net" "os" "path/filepath" @@ -758,6 +759,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { func(addr string, timeout time.Duration) (net.Conn, error) { return xnet.DialTimeoutLocal(addr, timeout) }), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), ) if err != nil { logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster") diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index c7cede8d94..abb24fc307 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc ctx := context.Background() // Start the global orchestrator. global := NewGlobalOrchestrator(store) - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, global.Run(ctx)) - }() + }) addService(t, store, service1) testutils.Expect(t, watch, api.EventCreateService{}) @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2") @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Nothing should happen because both tasks are up to date. select { @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask1.ID, "task2") @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Fail the running task s.Update(func(tx store.Tx) error { diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index 325a0141aa..53d6da728e 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -2,7 +2,7 @@ package replicated import ( "context" - "sync/atomic" + "sync" "testing" "time" @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - // this test should complete within 20 seconds. if not, bail out + // this test should complete within 30 seconds. if not, bail out ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa orchestrator := NewReplicatedOrchestrator(s) - // TODO(dperny): these are used with atomic.StoreUint32 and - // atomic.LoadUint32. using atomic primitives is bad practice and easy to - // mess up + // These variables will be used to signal that The Fail Loop should start + // failing these tasks. Once they're closed, The Failing Can Begin. var ( - failImage1 uint32 - failImage2 uint32 + failMu sync.Mutex + failImage1 bool ) + // create a watch for task creates, which we will use to verify that the + // updater works correctly. watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{}) defer cancelCreate() @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Fail new tasks the updater tries to run watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) defer cancelUpdate() - go func() { + + // We're gonna call this big chunk here "The Fail Loop". its job is to put + // tasks into a Failed state in certain conditions. + testutils.EnsureRuns(func() { failedLast := false + // typical go pattern: infinite for loop in a goroutine, exits on + // ctx.Done for { var e events.Event select { @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa if task.DesiredState == task.Status.State { continue } - if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning { + // This used to have a 3rd clause, + // "&& task.Status.State != api.TaskStateRunning" + // however, this is unneeded. If DesiredState is Running, then + // actual state cannot be Running, because that would get caught + // in the condition about (DesiredState == State) + if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed { err := s.Update(func(tx store.Tx) error { task = store.GetTask(tx, task.ID) - // Never fail two image2 tasks in a row, so there's a mix of - // failed and successful tasks for the rollback. - if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 { + // lock mutex governing access to failImage1. + failMu.Lock() + defer failMu.Unlock() + // we should start failing tasks with image1 only after1 + if task.Spec.GetContainer().Image == "image1" && failImage1 { + // only fail the task if we can read from failImage1 + // (which will only be true if it's closed) task.Status.State = api.TaskStateFailed failedLast = true - } else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast { + } else if task.Spec.GetContainer().Image == "image2" && !failedLast { + // Never fail two image2 tasks in a row, so there's a mix of + // failed and successful tasks for the rollback. task.Status.State = api.TaskStateFailed failedLast = true } else { @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) } } - }() + }) // Create a service with four replicas specified before the orchestrator // is started. This should result in two tasks when the orchestrator @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Start the orchestrator. var orchestratorError error - orchestratorDone := make(chan struct{}) - // verify that the orchestrator has had a chance to run by blocking the - // main test routine until it has. - orchestratorRan := make(chan struct{}) - go func() { - close(orchestratorRan) - // try not to fail the test in go routines. it's racey. instead, save - // the error and check it in a defer + orchestratorDone := testutils.EnsureRuns(func() { orchestratorError = orchestrator.Run(ctx) - close(orchestratorDone) - }() - - select { - case <-orchestratorRan: - case <-ctx.Done(): - t.Error("orchestrator did not start before test timed out") - } + }) defer func() { orchestrator.Stop() @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.Equal(t, observedTask.Status.State, api.TaskStateNew) assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1") - atomic.StoreUint32(&failImage2, 1) - // Start a rolling update err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa var e events.Event select { case e = <-watchServiceUpdate: + t.Log("service was updated") case <-ctx.Done(): t.Error("test timed out before watchServiceUpdate provided an event") return @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa } } - atomic.StoreUint32(&failImage1, 1) - // Repeat the rolling update but this time fail the tasks that the // rollback creates. + failMu.Lock() + failImage1 = true + failMu.Unlock() + err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") require.NotNil(t, s1) diff --git a/manager/orchestrator/restart/restart.go b/manager/orchestrator/restart/restart.go index ed7bf3e584..2b2a087aac 100644 --- a/manager/orchestrator/restart/restart.go +++ b/manager/orchestrator/restart/restart.go @@ -516,20 +516,13 @@ func (r *Supervisor) Cancel(taskID string) { <-delay.doneCh } -// CancelAll aborts all pending restarts and waits for any instances of -// StartNow that have already triggered to complete. +// CancelAll aborts all pending restarts func (r *Supervisor) CancelAll() { - var cancelled []delayedStart - r.mu.Lock() for _, delay := range r.delays { delay.cancel() } r.mu.Unlock() - - for _, delay := range cancelled { - <-delay.doneCh - } } // ClearServiceHistory forgets restart history related to a given service ID. diff --git a/manager/orchestrator/service.go b/manager/orchestrator/service.go index 037e493b30..c5d298c516 100644 --- a/manager/orchestrator/service.go +++ b/manager/orchestrator/service.go @@ -47,22 +47,27 @@ func SetServiceTasksRemove(ctx context.Context, s *store.MemoryStore, service *a err = s.Batch(func(batch *store.Batch) error { for _, t := range tasks { err := batch.Update(func(tx store.Tx) error { + // the task may have changed for some reason in the meantime + // since we read it out, so we need to get from the store again + // within the boundaries of a transaction + latestTask := store.GetTask(tx, t.ID) + // time travel is not allowed. if the current desired state is // above the one we're trying to go to we can't go backwards. // we have nothing to do and we should skip to the next task - if t.DesiredState > api.TaskStateRemove { + if latestTask.DesiredState > api.TaskStateRemove { // log a warning, though. we shouln't be trying to rewrite // a state to an earlier state log.G(ctx).Warnf( "cannot update task %v in desired state %v to an earlier desired state %v", - t.ID, t.DesiredState, api.TaskStateRemove, + latestTask.ID, latestTask.DesiredState, api.TaskStateRemove, ) return nil } // update desired state to REMOVE - t.DesiredState = api.TaskStateRemove + latestTask.DesiredState = api.TaskStateRemove - if err := store.UpdateTask(tx, t); err != nil { + if err := store.UpdateTask(tx, latestTask); err != nil { log.G(ctx).WithError(err).Errorf("failed transaction: update task desired state to REMOVE") } return nil diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index e347ec709c..0c6b9a9849 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) { reaper := New(s) // Now, start the reaper - go reaper.Run(ctx) + testutils.EnsureRuns(func() { reaper.Run(ctx) }) // And then stop the reaper. This will cause the reaper to run through its // whole init phase and then immediately enter the loop body, get the stop @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator and the reaper. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) { taskReaper := New(s) taskReaper.tickSignal = make(chan struct{}, 1) defer taskReaper.Stop() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) // None of the tasks we've created are eligible for deletion. We should // see no task delete events. Wait for a tick signal, or 500ms to pass, to @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 0b3b851ee0..09602ec256 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -11,6 +11,22 @@ import ( "github.com/stretchr/testify/assert" ) +// EnsureRuns takes a closure and runs it in a goroutine, blocking until the +// goroutine has had an opportunity to run. It returns a channel which will be +// closed when the provided closure exits. +func EnsureRuns(closure func()) <-chan struct{} { + started := make(chan struct{}) + stopped := make(chan struct{}) + go func() { + close(started) + closure() + close(stopped) + }() + + <-started + return stopped +} + // WatchTaskCreate waits for a task to be created. func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { for { @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(2 * time.Second): + case <-time.After(3 * time.Second): assert.FailNow(t, "no task creation") } } @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventCreateTask); ok { assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task update") } } diff --git a/manager/orchestrator/update/updater.go b/manager/orchestrator/update/updater.go index 7c977dba1c..4e6a2cc0bd 100644 --- a/manager/orchestrator/update/updater.go +++ b/manager/orchestrator/update/updater.go @@ -501,7 +501,10 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove return fmt.Errorf("task %s not found while trying to shut it down", original.ID) } if t.DesiredState > api.TaskStateRunning { - return fmt.Errorf("task %s was already shut down when reached by updater", original.ID) + return fmt.Errorf( + "task %s was already shut down when reached by updater (state: %v)", + original.ID, t.DesiredState, + ) } t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t) diff --git a/node/node.go b/node/node.go index f82fc139d6..7235da66f4 100644 --- a/node/node.go +++ b/node/node.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "io/ioutil" + "math" "net" "os" "path/filepath" @@ -33,7 +34,7 @@ import ( "github.com/docker/swarmkit/manager/encryption" "github.com/docker/swarmkit/remotes" "github.com/docker/swarmkit/xnet" - "github.com/grpc-ecosystem/go-grpc-prometheus" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" @@ -911,6 +912,7 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) opts := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), } insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) opts = append(opts, grpc.WithTransportCredentials(insecureCreds))