Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd: make max request bytes configurable #7968

Merged
merged 2 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ const (
ClusterStateFlagNew = "new"
ClusterStateFlagExisting = "existing"

DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultMaxRequestBytes = 1.5 * 1024 * 1024

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down Expand Up @@ -87,6 +88,7 @@ type Config struct {
ElectionMs uint `json:"election-timeout"`
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// clustering

Expand Down Expand Up @@ -175,6 +177,7 @@ func NewConfig() *Config {
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
AutoCompactionRetention: cfg.AutoCompactionRetention,
QuotaBackendBytes: cfg.QuotaBackendBytes,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func newConfig() *config {
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")

// clustering
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ member flags:
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
--max-txn-ops '128'
maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
maximum client request size in bytes the server will accept.

clustering flags:

Expand Down
5 changes: 4 additions & 1 deletion etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc/grpclog"
)

const grpcOverheadBytes = 512 * 1024

func init() {
grpclog.SetLogger(plog)
}
Expand All @@ -36,8 +38,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this patch should be labeled v3rpc: ... instead of etcdserver: ... since it touches the v3rpc package

opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))

opts = append(opts, grpc.MaxMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
grpcServer := grpc.NewServer(opts...)

pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
Expand Down
3 changes: 3 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type ServerConfig struct {
QuotaBackendBytes int64
MaxTxnOps uint

// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

StrictReconfigCheck bool

// ClientCertAuthEnabled is true when cert has been signed by the client CA.
Expand Down
6 changes: 6 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ const (

// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
maxPendingRevokes = 16

recommendedMaxRequestBytes = 10 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultMaxRequestBytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. i see. nvm

)

var (
Expand Down Expand Up @@ -259,6 +261,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
cl *membership.RaftCluster
)

if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
}

if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
Expand Down
8 changes: 1 addition & 7 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ import (
)

const (
// the max request size that raft accepts.
// TODO: make this a flag? But we probably do not want to
// accept large request which might block raft stream. User
// specify a large value might end up with shooting in the foot.
maxRequestBytes = 1.5 * 1024 * 1024

// In the health case, there might be a small gap (10s of entries) between
// the applied index and committed index.
// However, if the committed entries are very heavy to apply, the gap might grow.
Expand Down Expand Up @@ -605,7 +599,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
return nil, err
}

if len(data) > maxRequestBytes {
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, ErrRequestTooLarge
}

Expand Down
7 changes: 7 additions & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type ClusterConfig struct {
UseGRPC bool
QuotaBackendBytes int64
MaxTxnOps uint
MaxRequestBytes uint
}

type cluster struct {
Expand Down Expand Up @@ -227,6 +228,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
Expand Down Expand Up @@ -494,6 +496,7 @@ type memberConfig struct {
clientTLS *transport.TLSInfo
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
}

// mustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -545,6 +548,10 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
if m.MaxTxnOps == 0 {
m.MaxTxnOps = embed.DefaultMaxTxnOps
}
m.MaxRequestBytes = mcfg.maxRequestBytes
if m.MaxRequestBytes == 0 {
m.MaxRequestBytes = embed.DefaultMaxRequestBytes
}
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
return m
}
Expand Down
29 changes: 29 additions & 0 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,35 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
}
}

// TestV3PutLargeRequests ensures that configurable MaxRequestBytes works as intended.
func TestV3PutLargeRequests(t *testing.T) {
defer testutil.AfterTest(t)
tests := []struct {
key string
maxRequestBytes uint
valueSize int
expectError error
}{
// don't set to 0. use 0 as the default.
{"foo", 1, 1024, rpctypes.ErrGRPCRequestTooLarge},
{"foo", 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
{"foo", 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
{"foo", 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
}
for i, test := range tests {
clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
kvcli := toGRPC(clus.Client(0)).KV
reqput := &pb.PutRequest{Key: []byte(test.key), Value: make([]byte, test.valueSize)}
_, err := kvcli.Put(context.TODO(), reqput)

if !eqErrGRPC(err, test.expectError) {
t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
}

clus.Terminate(t)
}
}

func eqErrGRPC(err1 error, err2 error) bool {
return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
}
Expand Down