Skip to content

Commit

Permalink
Merge pull request #2889 from thaJeztah/19.03_backport_fix_update_out…
Browse files Browse the repository at this point in the history
…_of_sequence

[19.03 backport] Fix update out of sequence and increase max recv gRPC message size for nodes and secrets
  • Loading branch information
dperny authored Sep 11, 2019
2 parents 4fb9e96 + bcd123c commit bbe3418
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 113 deletions.
14 changes: 14 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
run:
tests: false
linters:
disable-all: true
enable:
- misspell
- gofmt
- goimports
- golint
- ineffassign
- deadcode
- unconvert
- govet

17 changes: 0 additions & 17 deletions .gometalinter.json

This file was deleted.

2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.No

// Override hostname and TLS info
if desc != nil {
if a.config.Hostname != "" && desc != nil {
if a.config.Hostname != "" {
desc.Hostname = a.config.Hostname
}
desc.TLSInfo = tlsInfo
Expand Down
2 changes: 1 addition & 1 deletion agent/exec/dockerapi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) {
return nil, err
}

protocol := api.ProtocolTCP
var protocol api.PortConfig_Protocol
switch strings.ToLower(parts[1]) {
case "tcp":
protocol = api.ProtocolTCP
Expand Down
6 changes: 4 additions & 2 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"context"
"errors"
"math"
"sync"
"time"

Expand Down Expand Up @@ -64,6 +65,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
cc, err := agent.config.ConnBroker.Select(
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
)

if err != nil {
Expand Down Expand Up @@ -136,7 +138,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
// `ctx` is done and hence fail to propagate the timeout error to the agent.
// If the error is not propogated to the agent, the agent will not close
// the session or rebuild a new session.
sessionCtx, cancelSession := context.WithCancel(ctx) // nolint: vet
sessionCtx, cancelSession := context.WithCancel(ctx) //nolint:govet

// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
Expand All @@ -159,7 +161,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
select {
case err := <-errChan:
if err != nil {
return err // nolint: vet
return err //nolint:govet
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarmctl/service/flagparser/tmpfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func parseTmpfs(flags *pflag.FlagSet, spec *api.ServiceSpec) error {
// remove suffix and try again
suffix := meat[len(meat)-1]
meat = meat[:len(meat)-1]
var multiplier int64 = 1
var multiplier int64
switch suffix {
case 'g':
multiplier = 1 << 30
Expand Down
6 changes: 3 additions & 3 deletions direct.mk
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ version/version.go:
setup: ## install dependencies
@echo "🐳 $@"
# TODO(stevvooe): Install these from the vendor directory
@go get -u github.com/alecthomas/gometalinter
@gometalinter --install
# install golangci-lint version 1.17.1 to ./bin/golangci-lint
@curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s v1.17.1
@go get -u github.com/lk4d4/vndr
@go get -u github.com/stevvooe/protobuild

Expand All @@ -65,7 +65,7 @@ checkprotos: generate ## check if protobufs needs to be generated again
check: fmt-proto
check: ## Run various source code validation tools
@echo "🐳 $@"
@gometalinter ./...
@./bin/golangci-lint run

.PHONY: fmt-proto
fmt-proto:
Expand Down
2 changes: 1 addition & 1 deletion manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
if err != nil {
return err
}
if err == nil && len(clusters) == 1 {
if len(clusters) == 1 {
heartbeatPeriod, err := gogotypes.DurationFromProto(clusters[0].Spec.Dispatcher.HeartbeatPeriod)
if err == nil && heartbeatPeriod > 0 {
d.config.HeartbeatPeriod = heartbeatPeriod
Expand Down
2 changes: 1 addition & 1 deletion manager/drivers/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (m *DriverProvider) NewSecretDriver(driver *api.Driver) (*SecretDriver, err
if m.pluginGetter == nil {
return nil, fmt.Errorf("plugin getter is nil")
}
if driver == nil && driver.Name == "" {
if driver == nil || driver.Name == "" {
return nil, fmt.Errorf("driver specification is nil")
}
// Search for the specified plugin
Expand Down
2 changes: 2 additions & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"math"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -758,6 +759,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
func(addr string, timeout time.Duration) (net.Conn, error) {
return xnet.DialTimeoutLocal(addr, timeout)
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
)
if err != nil {
logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
Expand Down
36 changes: 18 additions & 18 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, global.Run(ctx))
}()
})

addService(t, store, service1)
testutils.Expect(t, watch, api.EventCreateService{})
Expand Down Expand Up @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2")
Expand Down Expand Up @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Nothing should happen because both tasks are up to date.
select {
Expand Down Expand Up @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask1.ID, "task2")
Expand Down Expand Up @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Fail the running task
s.Update(func(tx store.Tx) error {
Expand Down
Loading

0 comments on commit bbe3418

Please sign in to comment.