From 196319b106852fbdebb4592fca2247c8d5e48da5 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 25 Feb 2022 15:14:49 -0700 Subject: [PATCH] [FIXED] JetStream: Some stream advisories missing The "deleted" advisory was missing because the stream's send loop was closed before the advisory was pushed to the queue to be sent. Added tests, both for single and clustered mode to test all stream advisories. Resolves #2886 Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster_test.go | 156 +++++++++++++++++++++++++++++++ server/stream.go | 23 +++-- 2 files changed, 167 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b8b0a9fbb1c..5de969c78ec 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10849,6 +10849,162 @@ func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) { } } +func TestJetStreamStreamAdvisories(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + checkAdv := func(t *testing.T, sub *nats.Subscription, expectedPrefixes ...string) { + t.Helper() + seen := make([]bool, len(expectedPrefixes)) + for i := 0; i < len(expectedPrefixes); i++ { + msg := natsNexMsg(t, sub, time.Second) + var gotOne bool + for j, pfx := range expectedPrefixes { + if !seen[j] && strings.HasPrefix(msg.Subject, pfx) { + seen[j] = true + gotOne = true + break + } + } + if !gotOne { + t.Fatalf("Expected one of prefixes %q, got %q", expectedPrefixes, msg.Subject) + } + } + } + + // Used to keep stream names pseudo unique. t.Name() has slashes in it which caused problems. + var testN int + + checkAdvisories := func(t *testing.T, s *Server, replicas int) { + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + testN++ + streamName := "TEST_ADVISORIES_" + fmt.Sprintf("%d", testN) + + sub := natsSubSync(t, nc, "$JS.EVENT.ADVISORY.STREAM.*."+streamName) + + si, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Storage: nats.FileStorage, + Replicas: replicas, + }) + require_NoError(t, err) + advisories := []string{JSAdvisoryStreamCreatedPre} + if replicas > 1 { + advisories = append(advisories, JSAdvisoryStreamLeaderElectedPre) + } + checkAdv(t, sub, advisories...) + + si.Config.MaxMsgs = 1000 + _, err = js.UpdateStream(&si.Config) + require_NoError(t, err) + checkAdv(t, sub, JSAdvisoryStreamUpdatedPre) + + snapreq := &JSApiStreamSnapshotRequest{ + DeliverSubject: nats.NewInbox(), + ChunkSize: 512, + } + var snapshot []byte + done := make(chan bool) + nc.Subscribe(snapreq.DeliverSubject, func(m *nats.Msg) { + // EOF + if len(m.Data) == 0 { + done <- true + return + } + // Could be writing to a file here too. + snapshot = append(snapshot, m.Data...) + // Flow ack + m.Respond(nil) + }) + + req, _ := json.Marshal(snapreq) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, streamName), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error on snapshot request: %v", err) + } + + var snapresp JSApiStreamSnapshotResponse + json.Unmarshal(rmsg.Data, &snapresp) + if snapresp.Error != nil { + t.Fatalf("Did not get correct error response: %+v", snapresp.Error) + } + + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive our snapshot in time") + } + + checkAdv(t, sub, JSAdvisoryStreamSnapshotCreatePre) + checkAdv(t, sub, JSAdvisoryStreamSnapshotCompletePre) + + err = js.DeleteStream(streamName) + require_NoError(t, err) + checkAdv(t, sub, JSAdvisoryStreamDeletedPre) + + state := *snapresp.State + config := *snapresp.Config + resreq := &JSApiStreamRestoreRequest{ + Config: config, + State: state, + } + req, _ = json.Marshal(resreq) + rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, streamName), req, 5*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var resresp JSApiStreamRestoreResponse + json.Unmarshal(rmsg.Data, &resresp) + if resresp.Error != nil { + t.Fatalf("Got an unexpected error response: %+v", resresp.Error) + } + + // Send our snapshot back in to restore the stream. + // Can be any size message. + var chunk [1024]byte + for r := bytes.NewReader(snapshot); ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + nc.Request(resresp.DeliverSubject, chunk[:n], time.Second) + } + rmsg, err = nc.Request(resresp.DeliverSubject, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resresp.Error = nil + json.Unmarshal(rmsg.Data, &resresp) + if resresp.Error != nil { + t.Fatalf("Got an unexpected error response: %+v", resresp.Error) + } + + checkAdv(t, sub, JSAdvisoryStreamRestoreCreatePre) + // At this point, the stream_created advisory may be sent before + // or after the restore_complete advisory because they are sent + // using different "send queues". That is, the restore uses the + // server's event queue while the stream_created is sent from + // the stream's own send queue. + advisories = append(advisories, JSAdvisoryStreamRestoreCompletePre) + checkAdv(t, sub, advisories...) + } + + t.Run("Single", func(t *testing.T) { checkAdvisories(t, s, 1) }) + t.Run("Clustered_R1", func(t *testing.T) { checkAdvisories(t, c.randomServer(), 1) }) + t.Run("Clustered_R3", func(t *testing.T) { checkAdvisories(t, c.randomServer(), 3) }) +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index fdbe41a4182..41ff5444b7b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -464,7 +464,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Send advisory. var suppress bool if !s.standAloneMode() && sa == nil { - suppress = true + if cfg.Replicas > 1 { + suppress = true + } } else if sa != nil { suppress = sa.responded } @@ -1048,11 +1050,8 @@ func (mset *stream) update(config *StreamConfig) error { // Now update config and store's version of our config. mset.cfg = *cfg - var suppress bool - if mset.isClustered() && mset.sa != nil { - suppress = mset.sa.responded - } - if mset.isLeader() && !suppress { + // If we are the leader never suppres update advisory, simply send. + if mset.isLeader() { mset.sendUpdateAdvisoryLocked() } mset.mu.Unlock() @@ -3327,12 +3326,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.infoSub = nil } - // Quit channel. - if mset.qch != nil { - close(mset.qch) - mset.qch = nil - } - // Cluster cleanup if n := mset.node; n != nil { if deleteFlag { @@ -3347,6 +3340,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } + // Quit channel, do this after sending the delete advisory + if mset.qch != nil { + close(mset.qch) + mset.qch = nil + } + c := mset.client mset.client = nil if c == nil {