Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: initial read index implementation #6212

Merged
merged 1 commit into from
Sep 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type raftNode struct {
// a chan to send out apply
applyc chan apply

// a chan to send out readState
readStateC chan raft.ReadState

// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
Expand Down Expand Up @@ -196,6 +199,14 @@ func (r *raftNode) start(s *EtcdServer) {
}
}

if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-r.stopped:
return
}
}

raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
Expand Down
16 changes: 15 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,22 @@ type EtcdServer struct {
snapCount uint64

w wait.Wait

readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
// readwaitC
readwaitc chan struct{}
// readNotifier is used to notify the read routine that it can process the request
// when there is no error
readNotifier *notifier

// stop signals the run goroutine should shutdown.
stop chan struct{}
// stopping is closed by run goroutine on shutdown.
stopping chan struct{}
// done is closed when all goroutines from start() complete.
done chan struct{}
done chan struct{}

errorc chan error
id types.ID
attributes membership.Attributes
Expand Down Expand Up @@ -391,6 +401,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Expand Down Expand Up @@ -478,6 +489,7 @@ func (s *EtcdServer) Start() {
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand All @@ -493,6 +505,8 @@ func (s *EtcdServer) start() {
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stopping = make(chan struct{})
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
if s.ClusterVersion() != nil {
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
} else {
Expand Down
16 changes: 16 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool
}
return longest, true
}

type notifier struct {
c chan struct{}
err error
}

func newNotifier() *notifier {
return &notifier{
c: make(chan struct{}, 0),
}
}

func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}
127 changes: 127 additions & 0 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package etcdserver

import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +28,9 @@ import (
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"

"github.com/coreos/go-semver/semver"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
Expand All @@ -44,6 +49,10 @@ const (
maxGapBetweenApplyAndCommitIndex = 1000
)

var (
newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0"))
)

type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
Expand Down Expand Up @@ -86,6 +95,31 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also support l-read Txn?

// TODO: remove this checking when we release etcd 3.2
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
return s.legacyRange(ctx, r)
}

if !r.Serializable {
err := s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
}
var resp *pb.RangeResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why shuffle this around?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm, I see it falls through from linearized to serialized once it gets the notify

var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return resp, err
}

// TODO: remove this func when we release etcd 3.2
func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
var resp *pb.RangeResponse
var err error
Expand Down Expand Up @@ -143,6 +177,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}
return resp, err
}
// TODO: readonly Txn do not need to go through raft
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
Expand Down Expand Up @@ -641,3 +676,95 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern

// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
internalTimeout := time.Second

for {
ctx := make([]byte, 8)
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())

select {
case <-s.readwaitc:
case <-s.stopping:
return
}

nextnr := newNotifier()

s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()

cctx, cancel := context.WithTimeout(context.Background(), internalTimeout)
if err := s.r.ReadIndex(cctx, ctx); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
nr.notify(err)
continue
}
cancel()

var (
timeout bool
done bool
)
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctx)
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
}
case <-time.After(internalTimeout):
plog.Warningf("timed out waiting for read index response")
nr.notify(ErrTimeout)
timeout = true
case <-s.stopping:
return
}
}
if !done {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to notifyc?

}

if ai := s.getAppliedIndex(); ai < rs.Index {
select {
case <-s.applyWait.Wait(rs.Index):
case <-s.stopping:
return
}
}
// unblock all l-reads requested at indices before rs.Index
nr.notify(nil)
}
}

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()

// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}

// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.stopping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a routine etcd server created. This is a per request routine so I assume we should use s.done?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it doesn't really make a difference

return ErrStopped
}
}