From 8d14c114205306eea221b85e92b445e3847e5c26 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 13 Sep 2016 11:16:53 +0800 Subject: [PATCH] etcdserver: support read index Use read index to achieve l-read. --- etcdserver/raft.go | 15 ++++++ etcdserver/server.go | 13 ++++- etcdserver/v3_server.go | 112 ++++++++++++++++++++++++++++++++++------ 3 files changed, 124 insertions(+), 16 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 95e95fd22a4f..aacbeebe139a 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -17,6 +17,7 @@ package etcdserver import ( "encoding/json" "expvar" + "fmt" "sort" "sync" "sync/atomic" @@ -103,6 +104,9 @@ type raftNode struct { // a chan to send out apply applyc chan apply + // a chan to send out readState + readStateC chan raft.ReadState + // TODO: remove the etcdserver related logic from raftNode // TODO: add a state machine interface to apply the commit entries // and do snapshot/recover @@ -196,6 +200,17 @@ func (r *raftNode) start(s *EtcdServer) { } } + if len(rd.ReadStates) != 0 { + if len(rd.ReadStates) > 1 { + fmt.Println("drop len(rd.ReadStates)") + } + select { + case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: + default: + fmt.Println("drop ri") + } + } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, diff --git a/etcdserver/server.go b/etcdserver/server.go index 055f71298d84..561fb6b0c84a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -173,7 +173,15 @@ type EtcdServer struct { snapCount uint64 - w wait.Wait + w wait.Wait + + readMu sync.RWMutex + // read routine notifies etcd server that it waits for reading by sending an empty struct to + // readwaitC + readwaitc chan struct{} + // etcdserver notifies read routine that it can process the request by closing the readNotifyc + readNotifyc chan struct{} + stop chan struct{} done chan struct{} errorc chan error @@ -384,6 +392,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), + readStateC: make(chan raft.ReadState, 1), }, id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, @@ -471,6 +480,7 @@ func (s *EtcdServer) Start() { go s.purgeFile() go monitorFileDescriptor(s.done) go s.monitorVersions() + go s.linearizableReadLoop() } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -485,6 +495,7 @@ func (s *EtcdServer) start() { s.applyWait = wait.NewTimeList() s.done = make(chan struct{}) s.stop = make(chan struct{}) + s.readwaitc = make(chan struct{}, 1024) if s.ClusterVersion() != nil { plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) } else { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 395f75deb9b7..c1134436c4d8 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -15,6 +15,8 @@ package etcdserver import ( + "bytes" + "encoding/binary" "strconv" "strings" "time" @@ -26,6 +28,7 @@ import ( "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/raft" "golang.org/x/net/context" "google.golang.org/grpc/metadata" ) @@ -86,26 +89,28 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - if r.Serializable { - var resp *pb.RangeResponse - var err error - chk := func(ai *auth.AuthInfo) error { - return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) + if !r.Serializable { + notify, err := s.linearizableReadNotify(ctx) + if err != nil { + return nil, err } - get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } - if serr := s.doSerialize(ctx, chk, get); serr != nil { - return nil, serr + + select { + case <-notify: + case <-ctx.Done(): + return nil, ctx.Err() } - return resp, err } - result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r}) - if err != nil { - return nil, err + var resp *pb.RangeResponse + var err error + chk := func(ai *auth.AuthInfo) error { + return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - if result.err != nil { - return nil, result.err + get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + if serr := s.doSerialize(ctx, chk, get); serr != nil { + return nil, serr } - return result.resp.(*pb.RangeResponse), nil + return resp, err } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { @@ -641,3 +646,80 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern // Watchable returns a watchable interface attached to the etcdserver. func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } + +func (s *EtcdServer) linearizableReadLoop() { + var ( + rs raft.ReadState + requests int + ) + ctx := make([]byte, 8) + s.readNotifyc = make(chan struct{}) + + for { + binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next()) + + for (len(s.readwaitc) > 0 && requests < 256) || (len(s.readwaitc) == 0 && requests == 0) { + select { + case <-s.readwaitc: + requests++ + } + } + + s.readMu.Lock() + var drained bool + for !drained { + select { + // drain all pending requests + case <-s.readwaitc: + default: + drained = true + } + } + requests = 0 + notifyc := s.readNotifyc + s.readNotifyc = make(chan struct{}) + s.readMu.Unlock() + + if err := s.r.ReadIndex(context.Background(), ctx); err != nil { + continue + } + select { + case rs = <-s.r.readStateC: + if !bytes.Equal(rs.RequestCtx, ctx) { + plog.Errorf("unexpected read index context value (want %v, got %v)", rs.RequestCtx, ctx) + continue + } + case <-time.After(time.Second): + plog.Warningf("time out waiting for read index response") + continue + } + + if ai := s.getAppliedIndex(); ai < rs.Index { + select { + case <-s.applyWait.Wait(ai): + } + } + // unblock all the l-read that happened before we requested the + // read index + close(notifyc) + } +} + +func (s *EtcdServer) linearizableReadNotify(ctx context.Context) (chan struct{}, error) { + for { + s.readMu.RLock() + select { + case s.readwaitc <- struct{}{}: + nc := s.readNotifyc + s.readMu.RUnlock() + return nc, nil + case <-ctx.Done(): + s.readMu.RUnlock() + return nil, ctx.Err() + default: + s.readMu.RUnlock() + // retry after draining + time.Sleep(10 * time.Millisecond) + } + } +}