Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduce the duplicated event issue #15262

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,9 @@ func (sws *serverWatchStream) recvLoop() error {
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
}
// Request progress for all watchers,
// force generation of a response
sws.watchStream.RequestProgressAll(true)
}
default:
// we probably should not shutdown the entire stream when
Expand Down Expand Up @@ -408,6 +407,7 @@ func (sws *serverWatchStream) sendLoop() {
// either return []*mvccpb.Event from the mvcc package
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
progressNotify := len(evs) == 0
events := make([]*mvccpb.Event, len(evs))
sws.mu.RLock()
needPrevKV := sws.prevKV[wresp.WatchID]
Expand All @@ -432,11 +432,15 @@ func (sws *serverWatchStream) sendLoop() {
Canceled: canceled,
}

if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
// Progress notifications can have WatchID -1
// if they announce on behalf of multiple watchers
if !progressNotify || wresp.WatchID != clientv3.InvalidWatchID {
if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
}
}

mvcc.ReportEventReceived(len(evs))
Expand Down
45 changes: 40 additions & 5 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
progress_all(force bool)
rev() int64
}

Expand All @@ -62,6 +63,9 @@ type watchableStore struct {
// The key of the map is the key that the watcher watches on.
synced watcherGroup

// Whether to generate a progress notification once all watchers are synchronised
progressOnSync bool

stopc chan struct{}
wg sync.WaitGroup
}
Expand All @@ -79,11 +83,12 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S
lg = zap.NewNop()
}
s := &watchableStore{
store: NewStore(lg, b, le, cfg),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
store: NewStore(lg, b, le, cfg),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
progressOnSync: false,
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
Expand Down Expand Up @@ -407,6 +412,15 @@ func (s *watchableStore) syncWatchers() int {
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))

// Deferred progress notification left to send when synced?
if s.progressOnSync && s.unsynced.size() == 0 {
for w, _ := range s.synced.watchers {
w.send(WatchResponse{WatchID: -1, Revision: s.rev()})
break
}
s.progressOnSync = false
}

return s.unsynced.size()
}

Expand Down Expand Up @@ -482,6 +496,27 @@ func (s *watchableStore) progress(w *watcher) {
}
}

func (s *watchableStore) progress_all(force bool) {
s.mu.RLock()
defer s.mu.RUnlock()

// Any watcher unsynced?
if s.unsynced.size() > 0 {
// If forced: Defer progress until successfully synced
if force {
s.progressOnSync = true
}

} else {
// If all watchers are synchronised, send out progress
// watch response on first watcher (if any)
for w, _ := range s.synced.watchers {
w.send(WatchResponse{WatchID: -1, Revision: s.rev()})
break
}
}
}

type watcher struct {
// the watcher key
key []byte
Expand Down
12 changes: 12 additions & 0 deletions server/storage/mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ type WatchStream interface {
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)

// RequestProgressAll requests a progress notification for the
// entire watcher group. The response will only be sent if
// all watchers are synced - or once they become synced, if
// forced. The responses will be sent through the
// WatchRespone Chan of the first watcher of this stream, if
// any.
RequestProgressAll(force bool)

// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
Expand Down Expand Up @@ -188,3 +196,7 @@ func (ws *watchStream) RequestProgress(id WatchID) {
}
ws.watchable.progress(w)
}

func (ws *watchStream) RequestProgressAll(force bool) {
ws.watchable.progress_all(force)
}
1 change: 1 addition & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
"--initial-cluster-token", cfg.InitialToken,
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
"--experimental-watch-progress-notify-interval", "100ms",
}

if cfg.ForceNewCluster {
Expand Down
59 changes: 4 additions & 55 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ var (
largePutSize: 32769,
writes: []requestChance{
{operation: Put, chance: 50},
{operation: LargePut, chance: 5},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
Expand All @@ -80,7 +79,7 @@ var (
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
LowTraffic, HighTraffic,
LowTraffic,
}
)

Expand All @@ -96,7 +95,7 @@ func TestLinearizability(t *testing.T) {
for _, traffic := range trafficList {
scenarios = append(scenarios, scenario{
name: "ClusterOfSize1/" + traffic.name,
failpoint: RandomFailpoint,
failpoint: KillFailpoint,
traffic: &traffic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
Expand All @@ -105,57 +104,7 @@ func TestLinearizability(t *testing.T) {
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
),
})
scenarios = append(scenarios, scenario{
name: "ClusterOfSize3/" + traffic.name,
failpoint: RandomFailpoint,
traffic: &traffic,
config: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
),
})
}
scenarios = append(scenarios, []scenario{
{
name: "Issue14370",
failpoint: RaftBeforeSavePanic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
),
},
{
name: "Issue14685",
failpoint: DefragBeforeCopyPanic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
),
},
{
name: "Issue13766",
failpoint: KillFailpoint,
traffic: &HighTraffic,
config: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
),
},
// TODO: investigate periodic `Model is not linearizable` failures
// see https://github.com/etcd-io/etcd/pull/15104#issuecomment-1416371288
/*{
name: "Snapshot",
failpoint: RandomSnapshotFailpoint,
traffic: &HighTraffic,
config: *e2e.NewConfig(
e2e.WithGoFailEnabled(true),
e2e.WithSnapshotCount(100),
e2e.WithSnapshotCatchUpEntries(100),
e2e.WithPeerProxy(true),
),
},*/
}...)
for _, scenario := range scenarios {
if scenario.traffic == nil {
scenario.traffic = &defaultTraffic
Expand All @@ -172,7 +121,7 @@ func TestLinearizability(t *testing.T) {
defer clus.Close()
operations, events := testLinearizability(ctx, t, lg, clus, FailpointConfig{
failpoint: scenario.failpoint,
count: 1,
count: 6,
retries: 3,
waitBetweenTriggers: waitBetweenFailpointTriggers,
}, *scenario.traffic)
Expand All @@ -198,7 +147,7 @@ func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus
watchCtx, watchCancel := context.WithCancel(ctx)
g.Go(func() error {
operations = simulateTraffic(trafficCtx, t, lg, clus, traffic)
time.Sleep(time.Second)
time.Sleep(10 * time.Second)
watchCancel()
return nil
})
Expand Down
6 changes: 6 additions & 0 deletions tests/linearizability/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ type requestChance struct {

func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {

cnt := 0
for {
time.Sleep(100 * time.Millisecond)
cnt++
if cnt > 6 {
return
}
select {
case <-ctx.Done():
return
Expand Down
44 changes: 40 additions & 4 deletions tests/linearizability/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package linearizability

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -58,15 +60,33 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger
}

func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.Client) []watchEvent {
gotProcessNotify := false
events := []watchEvent{}
var lastRevision int64 = 1
operations := map[model.EtcdOperation]clientv3.WatchResponse{}
var lastRevision int64 = 0
for {
select {
case <-ctx.Done():
if !gotProcessNotify {
panic("Expected at least one notify")
}
return events
default:
}
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) {
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) {
if resp.Header.Revision < lastRevision {
panic("Revision should never decrease")
}
if resp.Header.Revision == lastRevision && len(resp.Events) != 0 {
//panic("Got two non-empty responses about same revision")
}
if resp.IsProgressNotify() {
gotProcessNotify = true
}
/*err := c.RequestProgress(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
panic(err)
}*/
lastRevision = resp.Header.Revision
time := time.Now()
for _, event := range resp.Events {
Expand All @@ -77,15 +97,31 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C
case mvccpb.DELETE:
op = model.Delete
}
events = append(events, watchEvent{
event := watchEvent{
Time: time,
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: model.ToValueOrHash(string(event.Kv.Value)),
},
})
}
if prev, found := operations[event.Op]; found {
currentResponse, err := json.MarshalIndent(resp, "", " ")
if err != nil {
panic(fmt.Sprintf("Failed to marshal response, err: %v", err))
}
previousResponse, err := json.MarshalIndent(prev, "", " ")
if err != nil {
panic(fmt.Sprintf("Failed to marshal response, err: %v", err))
}

if resp.Events[0].Type != mvccpb.DELETE && prev.Events[0].Type != mvccpb.DELETE {
panic(fmt.Sprintf("duplicate operation in two responses on revision %d\n---\nfirst:\n%s\n---\nsecond:\n%s", event.Revision, previousResponse, currentResponse))
}
}
operations[event.Op] = resp
events = append(events, event)
}
if resp.Err() != nil {
lg.Info("Watch error", zap.Error(resp.Err()))
Expand Down