From b4128693ed61aa0c32179af07677bcf1d8301dcd Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 8 Mar 2022 15:22:08 -0700 Subject: [PATCH] Ensure file path is correct during stream restore Also had to change all references from `path.` to `filepath.` when dealing with files, so that it works properly on Windows. Fixed also lots of tests to defer the shutdown of the server after the removal of the storage, and fixed some config files directories to use the single quote `'` to surround the file path, again to work on Windows. Signed-off-by: Ivan Kozlovic --- server/filestore.go | 87 ++-- server/filestore_test.go | 32 +- server/gateway_test.go | 1 + server/jetstream.go | 31 +- server/jetstream_api.go | 4 +- server/jetstream_cluster.go | 6 +- server/jetstream_cluster_test.go | 70 ++- server/jetstream_test.go | 423 ++++++++---------- server/jwt_test.go | 44 +- server/leafnode_test.go | 52 +-- server/log_test.go | 6 +- server/monitor_test.go | 2 +- server/mqtt_test.go | 2 +- server/norace_test.go | 49 +- server/raft.go | 33 +- server/reload_test.go | 7 +- server/routes_test.go | 1 + server/server_test.go | 2 +- server/stream.go | 36 +- .../jetstream/restore_bad_stream/backup.json | 33 ++ .../restore_bad_stream/stream.tar.s2 | Bin 0 -> 1590 bytes test/ocsp_test.go | 36 +- test/service_latency_test.go | 6 +- 23 files changed, 467 insertions(+), 496 deletions(-) create mode 100644 test/configs/jetstream/restore_bad_stream/backup.json create mode 100755 test/configs/jetstream/restore_bad_stream/stream.tar.s2 diff --git a/server/filestore.go b/server/filestore.go index 28ac847f69..d23cdf5948 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -29,7 +29,6 @@ import ( "io/ioutil" "net" "os" - "path" "path/filepath" "runtime" "sort" @@ -296,8 +295,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim fs.fip = !fcfg.AsyncFlush // Check if this is a new setup. - mdir := path.Join(fcfg.StoreDir, msgDir) - odir := path.Join(fcfg.StoreDir, consumerDir) + mdir := filepath.Join(fcfg.StoreDir, msgDir) + odir := filepath.Join(fcfg.StoreDir, consumerDir) if err := os.MkdirAll(mdir, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create message storage directory - %v", err) } @@ -321,7 +320,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } // Write our meta data iff does not exist. - meta := path.Join(fcfg.StoreDir, JetStreamMetaFile) + meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { if err := fs.writeStreamMeta(); err != nil { return nil, err @@ -331,7 +330,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // If we expect to be encrypted check that what we are restoring is not plaintext. // This can happen on snapshot restores or conversions. if fs.prf != nil { - keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) + keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) { if err := fs.writeStreamMeta(); err != nil { return nil, err @@ -454,7 +453,7 @@ func (fs *fileStore) writeStreamMeta() error { return err } fs.aek = key - keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) + keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } @@ -463,7 +462,7 @@ func (fs *fileStore) writeStreamMeta() error { } } - meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile) + meta := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) { return err } @@ -484,7 +483,7 @@ func (fs *fileStore) writeStreamMeta() error { fs.hh.Reset() fs.hh.Write(b) checksum := hex.EncodeToString(fs.hh.Sum(nil)) - sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) + sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err } @@ -503,10 +502,10 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, error) { mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire} - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - mb.mfn = path.Join(mdir, fi.Name()) - mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index)) - mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index)) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + mb.mfn = filepath.Join(mdir, fi.Name()) + mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index)) + mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, index)) if mb.hh == nil { key := sha256.Sum256(fs.hashKeyForBlock(index)) @@ -517,7 +516,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e // Check if encryption is enabled. if fs.prf != nil { - ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index))) + ekey, err := ioutil.ReadFile(filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))) if err != nil { // We do not seem to have keys even though we should. Could be a plaintext conversion. // Create the keys and we will double check below. @@ -863,12 +862,12 @@ func (fs *fileStore) recoverMsgs() error { defer fs.mu.Unlock() // Check for any left over purged messages. - pdir := path.Join(fs.fcfg.StoreDir, purgeDir) + pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir) if _, err := os.Stat(pdir); err == nil { os.RemoveAll(pdir) } - mdir := path.Join(fs.fcfg.StoreDir, msgDir) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) fis, err := ioutil.ReadDir(mdir) if err != nil { return errNotReadable @@ -916,13 +915,13 @@ func (fs *fileStore) recoverMsgs() error { // We had a bug that would leave fss files around during a snapshot. // Clean them up here if we see them. - if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 { + if fms, err := filepath.Glob(filepath.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 { for _, fn := range fms { os.Remove(fn) } } // Same bug for keyfiles but for these we just need to identify orphans. - if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 { + if kms, err := filepath.Glob(filepath.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 { valid := make(map[uint64]bool) for _, mb := range fs.blks { valid[mb.index] = true @@ -930,7 +929,7 @@ func (fs *fileStore) recoverMsgs() error { for _, fn := range kms { var index uint64 shouldRemove := true - if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] { + if n, err := fmt.Sscanf(filepath.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] { shouldRemove = false } if shouldRemove { @@ -1516,8 +1515,8 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.hh = hh - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - mb.mfn = path.Join(mdir, fmt.Sprintf(blkScan, mb.index)) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index)) mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) if err != nil { mb.dirtyCloseWithRemove(true) @@ -1525,7 +1524,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.mfd = mfd - mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, mb.index)) + mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, mb.index)) ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms) if err != nil { mb.dirtyCloseWithRemove(true) @@ -1534,7 +1533,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb.ifd = ifd // For subject based info. - mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index)) + mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, mb.index)) // Check if encryption is enabled. if fs.prf != nil { @@ -1576,8 +1575,8 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error { return err } mb.aek, mb.bek, mb.seed, mb.nonce = key, bek, seed, encrypted[:key.NonceSize()] - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - keyFile := path.Join(mdir, fmt.Sprintf(keyScan, mb.index)) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } @@ -2119,7 +2118,7 @@ func (mb *msgBlock) compact() { mb.closeFDsLocked() // We will write to a new file and mv/rename it in case of failure. - mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) + mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) defer os.Remove(mfn) if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { return @@ -4061,8 +4060,8 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { // Move the msgs directory out of the way, will delete out of band. // FIXME(dlc) - These can error and we need to change api above to propagate? - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - pdir := path.Join(fs.fcfg.StoreDir, purgeDir) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir) // If purge directory still exists then we need to wait // in place and remove since rename would fail. if _, err := os.Stat(pdir); err == nil { @@ -4593,7 +4592,7 @@ func (fs *fileStore) Delete() error { } fs.Purge() - pdir := path.Join(fs.fcfg.StoreDir, purgeDir) + pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir) // If purge directory still exists then we need to wait // in place and remove since rename would fail. if _, err := os.Stat(pdir); err == nil { @@ -4820,13 +4819,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ o.mu.Unlock() // Write all the consumer files. - if writeFile(path.Join(odirPre, JetStreamMetaFile), meta) != nil { + if writeFile(filepath.Join(odirPre, JetStreamMetaFile), meta) != nil { return } - if writeFile(path.Join(odirPre, JetStreamMetaFileSum), sum) != nil { + if writeFile(filepath.Join(odirPre, JetStreamMetaFileSum), sum) != nil { return } - writeFile(path.Join(odirPre, consumerState), state) + writeFile(filepath.Join(odirPre, consumerState), state) } } @@ -4909,7 +4908,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt if cfg == nil || name == _EMPTY_ { return nil, fmt.Errorf("bad consumer config") } - odir := path.Join(fs.fcfg.StoreDir, consumerDir, name) + odir := filepath.Join(fs.fcfg.StoreDir, consumerDir, name) if err := os.MkdirAll(odir, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create consumer directory - %v", err) } @@ -4920,7 +4919,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt prf: fs.prf, name: name, odir: odir, - ifn: path.Join(odir, consumerState), + ifn: filepath.Join(odir, consumerState), } key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name)) hh, err := highwayhash.New64(key[:]) @@ -4931,7 +4930,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // Check for encryption. if o.prf != nil { - if ekey, err := ioutil.ReadFile(path.Join(odir, JetStreamMetaFileKey)); err == nil { + if ekey, err := ioutil.ReadFile(filepath.Join(odir, JetStreamMetaFileKey)); err == nil { // Recover key encryption key. rb, err := fs.prf([]byte(fs.cfg.Name + tsep + o.name)) if err != nil { @@ -4953,7 +4952,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt } // Write our meta data iff does not exist. - meta := path.Join(odir, JetStreamMetaFile) + meta := filepath.Join(odir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { csi.Created = time.Now().UTC() if err := o.writeConsumerMeta(); err != nil { @@ -4964,7 +4963,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // If we expect to be encrypted check that what we are restoring is not plaintext. // This can happen on snapshot restores or conversions. if o.prf != nil { - keyFile := path.Join(odir, JetStreamMetaFileKey) + keyFile := filepath.Join(odir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) { if err := o.writeConsumerMeta(); err != nil { return nil, err @@ -5418,7 +5417,7 @@ func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error { // Write out the consumer meta data, i.e. state. // Lock should be held. func (cfs *consumerFileStore) writeConsumerMeta() error { - meta := path.Join(cfs.odir, JetStreamMetaFile) + meta := filepath.Join(cfs.odir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) { return err } @@ -5430,7 +5429,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { return err } cfs.aek = key - keyFile := path.Join(cfs.odir, JetStreamMetaFileKey) + keyFile := filepath.Join(cfs.odir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } @@ -5456,7 +5455,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { cfs.hh.Reset() cfs.hh.Write(b) checksum := hex.EncodeToString(cfs.hh.Sum(nil)) - sum := path.Join(cfs.odir, JetStreamMetaFileSum) + sum := filepath.Join(cfs.odir, JetStreamMetaFileSum) if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err } @@ -5787,7 +5786,7 @@ type templateFileStore struct { } func newTemplateFileStore(storeDir string) *templateFileStore { - tdir := path.Join(storeDir, tmplsDir) + tdir := filepath.Join(storeDir, tmplsDir) key := sha256.Sum256([]byte("templates")) hh, err := highwayhash.New64(key[:]) if err != nil { @@ -5797,11 +5796,11 @@ func newTemplateFileStore(storeDir string) *templateFileStore { } func (ts *templateFileStore) Store(t *streamTemplate) error { - dir := path.Join(ts.dir, t.Name) + dir := filepath.Join(ts.dir, t.Name) if err := os.MkdirAll(dir, defaultDirPerms); err != nil { return fmt.Errorf("could not create templates storage directory for %q- %v", t.Name, err) } - meta := path.Join(dir, JetStreamMetaFile) + meta := filepath.Join(dir, JetStreamMetaFile) if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil { return err } @@ -5818,7 +5817,7 @@ func (ts *templateFileStore) Store(t *streamTemplate) error { ts.hh.Reset() ts.hh.Write(b) checksum := hex.EncodeToString(ts.hh.Sum(nil)) - sum := path.Join(dir, JetStreamMetaFileSum) + sum := filepath.Join(dir, JetStreamMetaFileSum) if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err } @@ -5826,5 +5825,5 @@ func (ts *templateFileStore) Store(t *streamTemplate) error { } func (ts *templateFileStore) Delete(t *streamTemplate) error { - return os.RemoveAll(path.Join(ts.dir, t.Name)) + return os.RemoveAll(filepath.Join(ts.dir, t.Name)) } diff --git a/server/filestore_test.go b/server/filestore_test.go index a4a16da947..1814680d93 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -27,7 +27,6 @@ import ( "math/bits" "math/rand" "os" - "path" "path/filepath" "reflect" "strings" @@ -516,6 +515,7 @@ func TestFileStoreMsgLimitBug(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fs.StoreMsg(subj, nil, msg) } @@ -742,8 +742,8 @@ func TestFileStorePurge(t *testing.T) { } // We will simulate crashing before the purge directory is cleared. - mdir := path.Join(storeDir, msgDir) - pdir := path.Join(fs.fcfg.StoreDir, "ptest") + mdir := filepath.Join(storeDir, msgDir) + pdir := filepath.Join(fs.fcfg.StoreDir, "ptest") os.Rename(mdir, pdir) os.MkdirAll(mdir, 0755) @@ -753,7 +753,7 @@ func TestFileStorePurge(t *testing.T) { // Make sure we recover same state. fs.Stop() - purgeDir := path.Join(fs.fcfg.StoreDir, purgeDir) + purgeDir := filepath.Join(fs.fcfg.StoreDir, purgeDir) os.Rename(pdir, purgeDir) fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: blkSize}, StreamConfig{Name: "zzz", Storage: FileStorage}) @@ -1253,7 +1253,7 @@ func TestFileStoreEraseMsg(t *testing.T) { // Now look on disk as well. rl := fileStoreMsgSize(subj, nil, msg) buf := make([]byte, rl) - fp, err := os.Open(path.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1))) + fp, err := os.Open(filepath.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1))) if err != nil { t.Fatalf("Error opening msg block file: %v", err) } @@ -1311,7 +1311,7 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) { // Stop and remove the index file. fs.Stop() - ifn := path.Join(storeDir, msgDir, fmt.Sprintf(indexScan, 1)) + ifn := filepath.Join(storeDir, msgDir, fmt.Sprintf(indexScan, 1)) removeFile(t, ifn) fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) @@ -1344,8 +1344,8 @@ func TestFileStoreMeta(t *testing.T) { } defer fs.Stop() - metafile := path.Join(storeDir, JetStreamMetaFile) - metasum := path.Join(storeDir, JetStreamMetaFileSum) + metafile := filepath.Join(storeDir, JetStreamMetaFile) + metasum := filepath.Join(storeDir, JetStreamMetaFileSum) // Test to make sure meta file and checksum are present. if _, err := os.Stat(metafile); os.IsNotExist(err) { @@ -1389,8 +1389,8 @@ func TestFileStoreMeta(t *testing.T) { t.Fatalf("Unexepected error: %v", err) } - ometafile := path.Join(storeDir, consumerDir, oname, JetStreamMetaFile) - ometasum := path.Join(storeDir, consumerDir, oname, JetStreamMetaFileSum) + ometafile := filepath.Join(storeDir, consumerDir, oname, JetStreamMetaFile) + ometasum := filepath.Join(storeDir, consumerDir, oname, JetStreamMetaFileSum) // Test to make sure meta file and checksum are present. if _, err := os.Stat(ometafile); os.IsNotExist(err) { @@ -1766,7 +1766,7 @@ func TestFileStoreSnapshot(t *testing.T) { if err != nil { t.Fatalf("Error getting next entry from snapshot: %v", err) } - fpath := path.Join(rstoreDir, filepath.Clean(hdr.Name)) + fpath := filepath.Join(rstoreDir, filepath.Clean(hdr.Name)) pdir := filepath.Dir(fpath) os.MkdirAll(pdir, 0755) fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600) @@ -2055,13 +2055,13 @@ func TestFileStoreWriteFailures(t *testing.T) { // has a limited size. // E.g. Docker // docker run -ti --tmpfs /jswf_test:rw,size=32k --rm -v ~/Development/go/src:/go/src -w /go/src/github.com/nats-io/nats-server/ golang:1.16 /bin/bash - tdir := path.Join("/", "jswf_test") + tdir := filepath.Join("/", "jswf_test") if stat, err := os.Stat(tdir); err != nil || !stat.IsDir() { t.SkipNow() } defer removeDir(t, tdir) - storeDir := path.Join(tdir, JetStreamStoreDir) + storeDir := filepath.Join(tdir, JetStreamStoreDir) os.MkdirAll(storeDir, 0755) subj, msg := "foo", []byte("Hello Write Failures!") @@ -2772,7 +2772,7 @@ func TestFileStoreStreamDeleteDirNotEmpty(t *testing.T) { ready := make(chan bool) go func() { - g := path.Join(storeDir, "g") + g := filepath.Join(storeDir, "g") ready <- true for i := 0; i < 100; i++ { ioutil.WriteFile(g, []byte("OK"), defaultFilePerms) @@ -2859,7 +2859,7 @@ func TestFileStoreStreamIndexBug(t *testing.T) { badIdxBytes, _ := base64.StdEncoding.DecodeString("FgGBkw7D/f8/772iDPDIgbU=") dir := createDir(t, "js-bad-idx-") defer removeDir(t, dir) - fn := path.Join(dir, "1.idx") + fn := filepath.Join(dir, "1.idx") ioutil.WriteFile(fn, badIdxBytes, 0644) mb := &msgBlock{index: 1, ifn: fn} if err := mb.readIndexInfo(); err == nil || !strings.Contains(err.Error(), "short index") { @@ -3225,8 +3225,10 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) { restartFS(ttl - 100*time.Millisecond + 25*time.Millisecond) checkState(0, 11, 10) + fs.Stop() // Not for start per se but since we have all the test tooling here check that Compact() does right thing as well. fs = newFS() + defer fs.Stop() loadMsgs(100) checkFiltered("orders.*", SimpleState{Msgs: 100, First: 1, Last: 100}) checkFiltered("orders.5", SimpleState{Msgs: 10, First: 5, Last: 95}) diff --git a/server/gateway_test.go b/server/gateway_test.go index dd8a044f9f..8d7ae9e027 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -6484,6 +6484,7 @@ func testTLSGatewaysCertificateImplicitAllow(t *testing.T, pass bool) { if err := cfg.Sync(); err != nil { t.Fatal(err) } + cfg.Close() optsA := LoadConfig(cfg.Name()) optsB := LoadConfig(cfg.Name()) diff --git a/server/jetstream.go b/server/jetstream.go index 88e80029d6..6b13ac7afd 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "math" "os" - "path" "path/filepath" "strconv" "strings" @@ -970,7 +969,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq} jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) - jsa.storeDir = path.Join(js.config.StoreDir, a.Name) + jsa.storeDir = filepath.Join(js.config.StoreDir, a.Name) js.accounts[a.Name] = jsa js.mu.Unlock() @@ -998,9 +997,9 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Debugf(" Max Storage: %s", friendlyBytes(limits.MaxStore)) // Clean up any old snapshots that were orphaned while staging. - os.RemoveAll(path.Join(js.config.StoreDir, snapStagingDir)) + os.RemoveAll(filepath.Join(js.config.StoreDir, snapStagingDir)) - sdir := path.Join(jsa.storeDir, streamsDir) + sdir := filepath.Join(jsa.storeDir, streamsDir) if _, err := os.Stat(sdir); os.IsNotExist(err) { if err := os.MkdirAll(sdir, defaultDirPerms); err != nil { return fmt.Errorf("could not create storage streams directory - %v", err) @@ -1016,7 +1015,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // Check templates first since messsage sets will need proper ownership. // FIXME(dlc) - Make this consistent. - tdir := path.Join(jsa.storeDir, tmplsDir) + tdir := filepath.Join(jsa.storeDir, tmplsDir) if stat, err := os.Stat(tdir); err == nil && stat.IsDir() { key := sha256.Sum256([]byte("templates")) hh, err := highwayhash.New64(key[:]) @@ -1025,8 +1024,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } fis, _ := ioutil.ReadDir(tdir) for _, fi := range fis { - metafile := path.Join(tdir, fi.Name(), JetStreamMetaFile) - metasum := path.Join(tdir, fi.Name(), JetStreamMetaFileSum) + metafile := filepath.Join(tdir, fi.Name(), JetStreamMetaFile) + metasum := filepath.Join(tdir, fi.Name(), JetStreamMetaFileSum) buf, err := ioutil.ReadFile(metafile) if err != nil { s.Warnf(" Error reading StreamTemplate metafile %q: %v", metasum, err) @@ -1071,14 +1070,14 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // Now recover the streams. fis, _ := ioutil.ReadDir(sdir) for _, fi := range fis { - mdir := path.Join(sdir, fi.Name()) + mdir := filepath.Join(sdir, fi.Name()) key := sha256.Sum256([]byte(fi.Name())) hh, err := highwayhash.New64(key[:]) if err != nil { return err } - metafile := path.Join(mdir, JetStreamMetaFile) - metasum := path.Join(mdir, JetStreamMetaFileSum) + metafile := filepath.Join(mdir, JetStreamMetaFile) + metasum := filepath.Join(mdir, JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing stream metafile for %q", metafile) continue @@ -1105,7 +1104,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Check if we are encrypted. - if key, err := ioutil.ReadFile(path.Join(mdir, JetStreamMetaFileKey)); err == nil { + if key, err := ioutil.ReadFile(filepath.Join(mdir, JetStreamMetaFileKey)); err == nil { s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") if len(key) != metaKeySize { s.Warnf(" Bad stream encryption key length of %d", len(key)) @@ -1170,7 +1169,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Noticef(" Restored %s messages for stream '%s > %s'", comma(int64(state.Msgs)), mset.accName(), mset.name()) // Now do the consumers. - odir := path.Join(sdir, fi.Name(), consumerDir) + odir := filepath.Join(sdir, fi.Name(), consumerDir) consumers = append(consumers, &ce{mset, odir}) } @@ -1180,8 +1179,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), e.mset.accName(), e.mset.name()) } for _, ofi := range ofis { - metafile := path.Join(e.odir, ofi.Name(), JetStreamMetaFile) - metasum := path.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) + metafile := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing consumer metafile %q", metafile) continue @@ -1197,7 +1196,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Check if we are encrypted. - if key, err := ioutil.ReadFile(path.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + if key, err := ioutil.ReadFile(filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") // Decode the buffer before proceeding. if buf, err = s.decryptMeta(key, buf, a.Name, e.mset.name()+tsep+ofi.Name()); err != nil { @@ -1238,7 +1237,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Make sure to cleanup any old remaining snapshots. - os.RemoveAll(path.Join(jsa.storeDir, snapsDir)) + os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir)) s.Debugf("JetStream state for account %q recovered", a.Name) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 4f06e543cb..31c0de0544 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -21,7 +21,7 @@ import ( "io/ioutil" "math/rand" "os" - "path" + "path/filepath" "sort" "strconv" "strings" @@ -2740,7 +2740,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} - snapDir := path.Join(js.config.StoreDir, snapStagingDir) + snapDir := filepath.Join(js.config.StoreDir, snapStagingDir) if _, err := os.Stat(snapDir); os.IsNotExist(err) { if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil { resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0e85c1e863..584c55d3b7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -21,7 +21,7 @@ import ( "fmt" "math" "math/rand" - "path" + "path/filepath" "reflect" "sort" "strings" @@ -501,7 +501,7 @@ func (js *jetStream) setupMetaGroup() error { // Setup our WAL for the metagroup. sysAcc := s.SystemAccount() - storeDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) + storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) fs, err := newFileStore( FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false}, @@ -1407,7 +1407,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { return errors.New("shutting down") } - storeDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) + storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) var store StreamStore if storage == FileStorage { fs, err := newFileStore( diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index af96996e3b..2f7bce7ee7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -23,7 +23,7 @@ import ( "io/ioutil" "math/rand" "os" - "path" + "path/filepath" "reflect" "strconv" "strings" @@ -38,7 +38,7 @@ import ( func TestJetStreamClusterConfig(t *testing.T) { conf := createConfFile(t, []byte(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: "%s"} + jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: '%s'} cluster { listen: 127.0.0.1:-1 } `)) defer removeFile(t, conf) @@ -59,7 +59,7 @@ func TestJetStreamClusterConfig(t *testing.T) { conf = createConfFile(t, []byte(` listen: 127.0.0.1:-1 server_name: "TEST" - jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: "%s"} + jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: '%s'} cluster { listen: 127.0.0.1:-1 } `)) defer removeFile(t, conf) @@ -282,7 +282,7 @@ func TestJetStreamClusterMultiReplicaStreamsDefaultFileMem(t *testing.T) { const testConfig = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {store_dir: "%s"} + jetstream: {store_dir: '%s'} cluster { name: %s @@ -5469,7 +5469,7 @@ func TestJetStreamClusterSuperClusterInterestOnlyMode(t *testing.T) { template := ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} accounts { one { jetstream: enabled @@ -7668,7 +7668,7 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) { template := ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: HUB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: HUB, store_dir: '%s'} cluster { name: %s @@ -8685,7 +8685,7 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { c.stopAll() // Remove all state by truncating for the non-leader. for _, fn := range []string{"1.blk", "1.idx", "1.fss"} { - fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn) + fname := filepath.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn) fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms) if err != nil { continue @@ -8694,9 +8694,9 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { fd.Close() } // For both make sure we have no raft snapshots. - snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots") + snapDir := filepath.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots") os.RemoveAll(snapDir) - snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots") + snapDir = filepath.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots") os.RemoveAll(snapDir) // Now restart. @@ -8775,7 +8775,7 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) { var jsClusterAccountLimitsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} cluster { name: %s @@ -8850,7 +8850,7 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) storeDir1 := createDir(t, JetStreamStoreDir) conf1 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: '%s'} accounts { A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, SYS:{ users:[ {user:s1,password:s1}]}, @@ -8866,7 +8866,7 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) storeDir2 := createDir(t, JetStreamStoreDir) conf2 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: '%s'} accounts { A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, SYS:{ users:[ {user:s1,password:s1}]}, @@ -8883,7 +8883,7 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) storeDir3 := createDir(t, JetStreamStoreDir) conf3 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: '%s'} accounts { A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, SYS:{ users:[ {user:s1,password:s1}]}, @@ -8961,11 +8961,10 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) func TestJetStreamSeal(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -9572,11 +9571,10 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) { func TestJetStreamListFilter(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -9629,11 +9627,10 @@ func TestJetStreamListFilter(t *testing.T) { func TestJetStreamConsumerUpdates(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 5) defer c.shutdown() @@ -10427,11 +10424,10 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) { func TestJetStreamConsumerUpgrade(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -10457,11 +10453,10 @@ func TestJetStreamConsumerUpgrade(t *testing.T) { func TestJetStreamAddConsumerWithInfo(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -10960,11 +10955,10 @@ func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) { func TestJetStreamStreamAdvisories(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -11139,7 +11133,7 @@ var jsClusterAccountsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { listen: 127.0.0.1:-1 @@ -11164,7 +11158,7 @@ var jsClusterAccountsTempl = ` var jsClusterTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { listen: 127.0.0.1:-1 @@ -11183,7 +11177,7 @@ var jsClusterTempl = ` var jsClusterMaxBytesTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { listen: 127.0.0.1:-1 @@ -11225,7 +11219,7 @@ var jsSuperClusterTempl = ` var jsClusterLimitsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: "%s"} + jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'} cluster { name: %s @@ -11247,7 +11241,7 @@ var jsClusterLimitsTempl = ` var jsMixedModeGlobalAccountTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: "%s"} + jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'} cluster { name: %s @@ -11434,7 +11428,7 @@ func (sc *supercluster) waitOnPeerCount(n int) { var jsClusterMirrorSourceImportsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} cluster { name: %s @@ -11470,7 +11464,7 @@ var jsClusterMirrorSourceImportsTempl = ` var jsClusterImportsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} cluster { name: %s @@ -11672,7 +11666,7 @@ func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDoma var jsClusterSingleLeafNodeLikeNGSTempl = ` listen: 127.0.0.1:-1 server_name: LNJS - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { remotes [ { urls: [ %s ] } ] } ` @@ -11680,7 +11674,7 @@ var jsClusterSingleLeafNodeLikeNGSTempl = ` var jsClusterSingleLeafNodeTempl = ` listen: 127.0.0.1:-1 server_name: LNJS - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { remotes [ { urls: [ %s ], account: "JSY" } @@ -11697,7 +11691,7 @@ var jsClusterSingleLeafNodeTempl = ` var jsClusterTemplWithLeafNode = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} @@ -11716,7 +11710,7 @@ var jsClusterTemplWithLeafNodeNoJS = ` server_name: %s # Need to keep below since it fills in the store dir by default so just comment out. - # jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + # jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} @@ -11733,7 +11727,7 @@ var jsClusterTemplWithLeafNodeNoJS = ` var jsClusterTemplWithSingleLeafNode = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} @@ -11745,7 +11739,7 @@ var jsClusterTemplWithSingleLeafNodeNoJS = ` listen: 127.0.0.1:-1 server_name: %s - # jetstream: {store_dir: "%s"} + # jetstream: {store_dir: '%s'} {{leaf}} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index e715241633..078da74269 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20,12 +20,12 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "math/rand" "net/http" "net/url" "os" - "path" "path/filepath" "reflect" "runtime" @@ -46,11 +46,10 @@ import ( func TestJetStreamBasicNilConfig(t *testing.T) { s := RunRandClientPortServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if err := s.EnableJetStream(nil); err != nil { t.Fatalf("Expected no error, got %v", err) @@ -124,11 +123,10 @@ func clientConnectWithOldRequest(t *testing.T, s *Server) *nats.Conn { func TestJetStreamEnableAndDisableAccount(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Global in simple setup should be enabled already. if !s.GlobalAccount().JetStreamEnabled() { @@ -366,11 +364,10 @@ func TestJetStreamAutoTuneFSConfig(t *testing.T) { func TestJetStreamConsumerAndStreamDescriptions(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() descr := "foo asset" acc := s.GlobalAccount() @@ -418,11 +415,10 @@ func TestJetStreamConsumerAndStreamDescriptions(t *testing.T) { func TestJetStreamPubAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sname := "PUBACK" acc := s.GlobalAccount() @@ -900,11 +896,10 @@ func TestJetStreamAddStreamCanonicalNames(t *testing.T) { func TestJetStreamAddStreamBadSubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -935,11 +930,10 @@ func TestJetStreamAddStreamBadSubjects(t *testing.T) { func TestJetStreamMaxConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -978,11 +972,10 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() mset, err := acc.addStream(mconfig) @@ -1010,11 +1003,10 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -1044,11 +1036,10 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() mset, err := acc.addStream(mconfig) @@ -3112,11 +3103,10 @@ func TestJetStreamWorkQueueTerminateDelivery(t *testing.T) { func TestJetStreamConsumerAckAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) @@ -3163,11 +3153,10 @@ func TestJetStreamConsumerAckAck(t *testing.T) { func TestJetStreamAckNext(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "ACKNXT" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: MemoryStorage}) @@ -3259,11 +3248,10 @@ func TestJetStreamAckNext(t *testing.T) { func TestJetStreamPublishDeDupe(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "DeDupe" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage, MaxAge: time.Hour, Subjects: []string{"foo.*"}}) @@ -3423,11 +3411,10 @@ func getPubAckResponse(msg []byte) *JSPubAckResponse { func TestJetStreamPublishExpect(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "EXPECT" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage, MaxAge: time.Hour, Subjects: []string{"foo.*"}}) @@ -3519,11 +3506,10 @@ func TestJetStreamPublishExpect(t *testing.T) { func TestJetStreamPullConsumerRemoveInterest(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS-PULL" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: MemoryStorage}) @@ -3606,11 +3592,10 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) { func TestJetStreamConsumerRateLimit(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "RATELIMIT" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -3684,11 +3669,10 @@ func TestJetStreamConsumerRateLimit(t *testing.T) { func TestJetStreamEphemeralConsumerRecoveryAfterServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -3787,11 +3771,10 @@ func TestJetStreamEphemeralConsumerRecoveryAfterServerRestart(t *testing.T) { func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -3928,11 +3911,10 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { func TestJetStreamDeleteConsumerAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sendSubj := "MYQ" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sendSubj, Storage: FileStorage}) @@ -3979,11 +3961,10 @@ func TestJetStreamDeleteConsumerAndServerRestart(t *testing.T) { func TestJetStreamRedeliveryAfterServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sendSubj := "MYQ" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sendSubj, Storage: FileStorage}) @@ -4056,11 +4037,10 @@ func TestJetStreamRedeliveryAfterServerRestart(t *testing.T) { func TestJetStreamSnapshots(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MY-STREAM" subjects := []string{"foo", "bar", "baz"} @@ -4570,11 +4550,10 @@ func TestJetStreamPubAckPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4599,11 +4578,10 @@ func TestJetStreamPubPerfWithFullStream(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4638,11 +4616,10 @@ func TestJetStreamSnapshotsAPIPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() cfg := StreamConfig{ Name: "snap-perf", @@ -5559,12 +5536,10 @@ func TestJetStreamRedeliverCount(t *testing.T) { // not get the message back. func TestJetStreamRedeliverAndLateAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "LA", Storage: MemoryStorage}) if err != nil { @@ -5603,12 +5578,10 @@ func TestJetStreamRedeliverAndLateAck(t *testing.T) { // https://github.com/nats-io/nats-server/issues/1502 func TestJetStreamPendingNextTimer(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "NT", Storage: MemoryStorage, Subjects: []string{"ORDERS.*"}}) if err != nil { @@ -6659,11 +6632,10 @@ func TestJetStreamConsumerReplayQuit(t *testing.T) { func TestJetStreamSystemLimits(t *testing.T) { s := RunRandClientPortServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if _, _, err := s.JetStreamReservedResources(); err == nil { t.Fatalf("Expected error requesting jetstream reserved resources when not enabled") @@ -6831,11 +6803,10 @@ func TestJetStreamSystemLimits(t *testing.T) { func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() gacc := s.GlobalAccount() @@ -6960,11 +6931,10 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { func TestJetStreamStreamFileTrackingAndLimits(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() gacc := s.GlobalAccount() @@ -7093,11 +7063,10 @@ func TestJetStreamSimpleFileRecovery(t *testing.T) { base := runtime.NumGoroutine() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -7210,12 +7179,10 @@ func TestJetStreamSimpleFileRecovery(t *testing.T) { func TestJetStreamPushConsumerFlowControl(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7315,11 +7282,10 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) { func TestJetStreamFlowControlRequiresHeartbeats(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -7339,12 +7305,10 @@ func TestJetStreamFlowControlRequiresHeartbeats(t *testing.T) { func TestJetStreamPushConsumerIdleHeartbeats(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7414,12 +7378,10 @@ func TestJetStreamPushConsumerIdleHeartbeats(t *testing.T) { func TestJetStreamPushConsumerIdleHeartbeatsWithFilterSubject(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7478,12 +7440,10 @@ func TestJetStreamPushConsumerIdleHeartbeatsWithFilterSubject(t *testing.T) { func TestJetStreamPushConsumerIdleHeartbeatsWithNoInterest(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7542,12 +7502,10 @@ func TestJetStreamPushConsumerIdleHeartbeatsWithNoInterest(t *testing.T) { func TestJetStreamInfoAPIWithHeaders(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -7574,12 +7532,10 @@ func TestJetStreamInfoAPIWithHeaders(t *testing.T) { func TestJetStreamRequestAPI(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -8095,12 +8051,10 @@ func TestJetStreamRequestAPI(t *testing.T) { func TestJetStreamFilteredStreamNames(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -8562,11 +8516,10 @@ func TestJetStreamLimitLockBug(t *testing.T) { t.Run(c.name, func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil && config.StoreDir != "" { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(c.mconfig) if err != nil { @@ -8614,11 +8567,10 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() cfg := &StreamConfig{Name: "foo", Storage: FileStorage} mset, err := s.GlobalAccount().addStream(cfg) @@ -8782,11 +8734,10 @@ func TestJetStreamMsgHeaders(t *testing.T) { func TestJetStreamTemplateBasics(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -8854,11 +8805,10 @@ func TestJetStreamTemplateBasics(t *testing.T) { func TestJetStreamTemplateFileStoreRecovery(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9054,11 +9004,10 @@ func clientConnectToServerWithUP(t *testing.T, opts *Options, user, pass string) func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sa := s.SystemAccount() if err := sa.EnableJetStream(nil); err == nil { @@ -9087,11 +9036,10 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) { defer removeFile(t, conf) s, opts := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if !s.JetStreamEnabled() { t.Fatalf("Expected JetStream to be enabled") @@ -9334,11 +9282,10 @@ func TestJetStreamStoreDirectoryFix(t *testing.T) { func TestJetStreamPushConsumersPullError(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -9380,11 +9327,10 @@ func TestJetStreamPubPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9446,11 +9392,10 @@ func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9486,11 +9431,10 @@ func TestJetStreamPubWithSyncPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -9518,11 +9462,10 @@ func TestJetStreamConsumerPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9580,11 +9523,10 @@ func TestJetStreamConsumerAckFileStorePerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9651,11 +9593,10 @@ func TestJetStreamPubSubPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9924,11 +9865,10 @@ func TestJetStreamStoredMsgsDontDisappearAfterCacheExpiration(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStreamWithStore(sc, &FileStoreConfig{BlockSize: 128, CacheExpire: 15 * time.Millisecond}) if err != nil { @@ -10496,11 +10436,10 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { opts.Port = -1 opts.JetStream = true s := RunServer(&opts) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{ Name: "MY_STREAM", @@ -10631,11 +10570,10 @@ func TestJetStreamAccountImportBasics(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc, err := s.LookupAccount("JS") if err != nil { @@ -10760,11 +10698,10 @@ func TestJetStreamAccountImportAll(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc, err := s.LookupAccount("JS") if err != nil { @@ -10830,11 +10767,10 @@ func TestJetStreamServerReload(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if !s.JetStreamEnabled() { t.Fatalf("Expected JetStream to be enabled") @@ -10906,11 +10842,10 @@ func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11267,11 +11202,10 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) { func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11323,11 +11257,10 @@ func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { func TestJetStreamMirrorAndSourcesFilteredConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11419,11 +11352,10 @@ func TestJetStreamMirrorAndSourcesFilteredConsumers(t *testing.T) { func TestJetStreamMirrorBasics(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11579,11 +11511,10 @@ func TestJetStreamMirrorBasics(t *testing.T) { func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11603,11 +11534,10 @@ func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { func TestJetStreamSourceBasics(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11761,11 +11691,10 @@ func TestJetStreamSourceBasics(t *testing.T) { func TestJetStreamOperatorAccounts(t *testing.T) { s, _ := RunServerWithConfig("./configs/js-op.conf") - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserCredentials("./configs/one.creds")) defer nc.Close() @@ -11873,12 +11802,11 @@ func TestJetStreamDomainInPubAck(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -11907,11 +11835,10 @@ func TestJetStreamDomainInPubAck(t *testing.T) { // Issue #2213 func TestJetStreamDirectConsumersBeingReported(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11981,11 +11908,10 @@ func TestJetStreamDirectConsumersBeingReported(t *testing.T) { // https://github.com/nats-io/nats-server/issues/2290 func TestJetStreamTemplatedErrorsBug(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12064,7 +11990,7 @@ func TestJetStreamServerEncryption(t *testing.T) { ci, _ := js.ConsumerInfo("TEST", "dlc") // Quick check to make sure everything not just plaintext still. - sdir := path.Join(config.StoreDir, "$G", "streams", "TEST") + sdir := filepath.Join(config.StoreDir, "$G", "streams", "TEST") // Make sure we can not find any plaintext strings in the target file. checkFor := func(fn string, strs ...string) { t.Helper() @@ -12087,17 +12013,17 @@ func TestJetStreamServerEncryption(t *testing.T) { // Check stream meta. checkEncrypted := func() { - checkKeyFile(path.Join(sdir, JetStreamMetaFileKey)) - checkFor(path.Join(sdir, JetStreamMetaFile), "TEST", "foo", "bar", "baz", "max_msgs", "max_bytes") + checkKeyFile(filepath.Join(sdir, JetStreamMetaFileKey)) + checkFor(filepath.Join(sdir, JetStreamMetaFile), "TEST", "foo", "bar", "baz", "max_msgs", "max_bytes") // Check a message block. - checkKeyFile(path.Join(sdir, "msgs", "1.key")) - checkFor(path.Join(sdir, "msgs", "1.blk"), "ENCRYPTED PAYLOAD!!", "foo", "bar", "baz") + checkKeyFile(filepath.Join(sdir, "msgs", "1.key")) + checkFor(filepath.Join(sdir, "msgs", "1.blk"), "ENCRYPTED PAYLOAD!!", "foo", "bar", "baz") // Check consumer meta and state. - checkKeyFile(path.Join(sdir, "obs", "dlc", JetStreamMetaFileKey)) - checkFor(path.Join(sdir, "obs", "dlc", JetStreamMetaFile), "TEST", "dlc", "foo", "bar", "baz", "max_msgs", "ack_policy") + checkKeyFile(filepath.Join(sdir, "obs", "dlc", JetStreamMetaFileKey)) + checkFor(filepath.Join(sdir, "obs", "dlc", JetStreamMetaFile), "TEST", "dlc", "foo", "bar", "baz", "max_msgs", "ack_policy") // Load and see if we can parse the consumer state. - state, err := ioutil.ReadFile(path.Join(sdir, "obs", "dlc", "o.dat")) + state, err := ioutil.ReadFile(filepath.Join(sdir, "obs", "dlc", "o.dat")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -12204,11 +12130,10 @@ func TestJetStreamServerEncryption(t *testing.T) { // User report of bug. func TestJetStreamConsumerBadNumPending(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12288,11 +12213,10 @@ func TestJetStreamDeliverLastPerSubject(t *testing.T) { for _, st := range []StorageType{FileStorage, MemoryStorage} { t.Run(st.String(), func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12428,11 +12352,10 @@ func TestJetStreamDeliverLastPerSubject(t *testing.T) { func TestJetStreamDeliverLastPerSubjectNumPending(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12477,11 +12400,10 @@ func TestJetStreamDeliverLastPerSubjectNumPending(t *testing.T) { // This I believe is only really possible in clustered mode, but we will force the issue here. func TestJetStreamConsumerCleanupWithRetentionPolicy(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12547,11 +12469,10 @@ func TestJetStreamConsumerCleanupWithRetentionPolicy(t *testing.T) { // Issue #2392 func TestJetStreamPurgeEffectsConsumerDelivery(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12599,11 +12520,10 @@ func TestJetStreamPurgeEffectsConsumerDelivery(t *testing.T) { // Issue #2403 func TestJetStreamExpireCausesDeadlock(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12668,11 +12588,10 @@ func TestJetStreamConsumerPendingBugWithKV(t *testing.T) { t.Run(c.name, func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client based API nc, js := jsClientConnect(t, s) @@ -12716,11 +12635,10 @@ func TestJetStreamConsumerPendingBugWithKV(t *testing.T) { // Issue #2420 func TestJetStreamDefaultMaxMsgsPer(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12743,11 +12661,10 @@ func TestJetStreamDefaultMaxMsgsPer(t *testing.T) { // Issue #2423 func TestJetStreamBadConsumerCreateErr(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12779,12 +12696,11 @@ func TestJetStreamBadConsumerCreateErr(t *testing.T) { func TestJetStreamConsumerPushBound(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -12887,12 +12803,11 @@ func TestJetStreamConsumerPushBound(t *testing.T) { // Got a report of memory leaking, tracked it to internal clients for consumers. func TestJetStreamConsumerInternalClientLeak(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -12949,12 +12864,11 @@ func TestJetStreamConsumerInternalClientLeak(t *testing.T) { func TestJetStreamConsumerEventingRaceOnShutdown(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.NoReconnect()) defer nc.Close() @@ -12990,12 +12904,11 @@ func TestJetStreamConsumerEventingRaceOnShutdown(t *testing.T) { // and try to send new messages. func TestJetStreamExpireAllWhileServerDown(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13046,12 +12959,11 @@ func TestJetStreamExpireAllWhileServerDown(t *testing.T) { func TestJetStreamLongStreamNamesAndPubAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13075,12 +12987,11 @@ func TestJetStreamPerSubjectPending(t *testing.T) { t.Run(st.String(), func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13133,12 +13044,11 @@ func TestJetStreamPerSubjectPending(t *testing.T) { func TestJetStreamPublishExpectNoMsg(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13182,12 +13092,11 @@ func TestJetStreamPublishExpectNoMsg(t *testing.T) { func TestJetStreamPullLargeBatchExpired(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13229,12 +13138,11 @@ func TestJetStreamPullLargeBatchExpired(t *testing.T) { func TestJetStreamNegativeDupeWindow(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13263,12 +13171,11 @@ func TestJetStreamNegativeDupeWindow(t *testing.T) { // Issue #2551 func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13356,11 +13263,11 @@ func TestJetStreamDisabledLimitsEnforcementJWT(t *testing.T) { defer removeDir(t, storeDir1) conf := createConfFile(t, []byte(fmt.Sprintf(` listen: -1 - jetstream: {store_dir: %s} + jetstream: {store_dir: '%s'} operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, storeDir1, ojwt, dir, sysPub))) @@ -13385,7 +13292,7 @@ func TestJetStreamDisabledLimitsEnforcement(t *testing.T) { storeDir1 := createDir(t, JetStreamStoreDir) conf1 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} accounts { one { jetstream: { @@ -13417,12 +13324,11 @@ func TestJetStreamDisabledLimitsEnforcement(t *testing.T) { func TestJetStreamConsumerNoMsgPayload(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13464,12 +13370,11 @@ func TestJetStreamConsumerNoMsgPayload(t *testing.T) { // Issue #2607 func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13534,12 +13439,11 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) { // Issue #2662 func TestJetStreamLargeExpiresAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13588,12 +13492,11 @@ func TestJetStreamLargeExpiresAndServerRestart(t *testing.T) { func TestJetStreamMessagePerSubjectKeepBug(t *testing.T) { test := func(t *testing.T, keep int64, store nats.StorageType) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13631,12 +13534,11 @@ func TestJetStreamMessagePerSubjectKeepBug(t *testing.T) { func TestJetStreamInvalidDeliverSubject(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13723,7 +13625,7 @@ func TestJetStreamRecoverBadStreamSubjects(t *testing.T) { sd := config.StoreDir s.Shutdown() - f := path.Join(sd, "$G", "streams", "TEST") + f := filepath.Join(sd, "$G", "streams", "TEST") fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ Name: "TEST", Subjects: []string{"foo", "bar", " baz "}, // baz has spaces @@ -13748,12 +13650,11 @@ func TestJetStreamRecoverBadStreamSubjects(t *testing.T) { func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sd := config.StoreDir // Client for API requests. @@ -13769,7 +13670,7 @@ func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { s.Shutdown() - f := path.Join(sd, "$G", "streams", "M") + f := filepath.Join(sd, "$G", "streams", "M") fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ Name: "M", Subjects: []string{"foo", "bar", "baz"}, // Mirrors should not have spaces. @@ -13816,11 +13717,10 @@ func TestJetStreamCrossAccountsDeliverSubjectInterest(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("a", "pwd")) defer nc.Close() @@ -13876,11 +13776,10 @@ func TestJetStreamCrossAccountsDeliverSubjectInterest(t *testing.T) { func TestJetStreamPullConsumerRequestCleanup(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13918,11 +13817,10 @@ func TestJetStreamPullConsumerRequestCleanup(t *testing.T) { func TestJetStreamPullConsumerRequestMaximums(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, _ := jsClientConnect(t, s) defer nc.Close() @@ -13966,11 +13864,10 @@ func TestJetStreamPullConsumerRequestMaximums(t *testing.T) { func TestJetStreamEphemeralPullConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14058,11 +13955,10 @@ func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Connect to JS account and create stream, put some messages into it. nc, js := jsClientConnect(t, s, nats.UserInfo("dlc", "foo")) @@ -14263,11 +14159,10 @@ func TestJetStreamPullConsumerCrossAccountsAndLeafNodes(t *testing.T) { defer removeFile(t, conf) s, o := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() conf2 := createConfFile(t, []byte(fmt.Sprintf(` server_name: SLN @@ -14362,11 +14257,10 @@ func TestJetStreamPullConsumerCrossAccountsAndLeafNodes(t *testing.T) { // 4. Try, which never waits at all ever. func TestJetStreamPullConsumersOneShotBehavior(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -14474,11 +14368,10 @@ func TestJetStreamPullConsumersOneShotBehavior(t *testing.T) { func TestJetStreamPullConsumersMultipleRequestsExpireOutOfOrder(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -14533,11 +14426,10 @@ func TestJetStreamPullConsumersMultipleRequestsExpireOutOfOrder(t *testing.T) { func TestJetStreamConsumerUpdateSurvival(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14577,11 +14469,10 @@ func TestJetStreamConsumerUpdateSurvival(t *testing.T) { func TestJetStreamNakRedeliveryWithNoWait(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14725,11 +14616,10 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) { func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14789,11 +14679,10 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) { func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14879,11 +14768,10 @@ func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) { func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14940,11 +14828,10 @@ func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) { // Issue #2836 func TestJetStreamInterestRetentionBug(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14989,11 +14876,10 @@ func TestJetStreamInterestRetentionBug(t *testing.T) { // exceed the outstanding FC we would become stalled. func TestJetStreamFlowControlStall(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15019,11 +14905,10 @@ func TestJetStreamFlowControlStall(t *testing.T) { func TestJetStreamConsumerPendingCountWithRedeliveries(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15086,11 +14971,10 @@ func TestJetStreamConsumerPendingCountWithRedeliveries(t *testing.T) { func TestJetStreamPullConsumerHeartBeats(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15211,11 +15095,10 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) { func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15232,7 +15115,7 @@ func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) // Now we need a non-clean shutdown. // For this use case that means we do *not* write the fss file. sd := s.JetStreamConfig().StoreDir - fss := path.Join(sd, "$G", "streams", "T", "msgs", "1.fss") + fss := filepath.Join(sd, "$G", "streams", "T", "msgs", "1.fss") // Stop current nc.Close() @@ -15254,6 +15137,66 @@ func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) require_NoError(t, err) } +func TestJetStreamRestoreBadStream(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + var rreq JSApiStreamRestoreRequest + buf, err := os.ReadFile("../test/configs/jetstream/restore_bad_stream/backup.json") + require_NoError(t, err) + err = json.Unmarshal(buf, &rreq) + require_NoError(t, err) + + data, err := os.Open("../test/configs/jetstream/restore_bad_stream/stream.tar.s2") + require_NoError(t, err) + defer data.Close() + + var rresp JSApiStreamRestoreResponse + msg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, rreq.Config.Name), buf, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + + var chunk [1024]byte + for { + n, err := data.Read(chunk[:]) + if err == io.EOF { + break + } + require_NoError(t, err) + + msg, err = nc.Request(rresp.DeliverSubject, chunk[:n], 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + } + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error == nil || !strings.Contains(rresp.Error.Description, "unexpected") { + t.Fatalf("Expected error about unexpected content, got: %+v", rresp.Error) + } + + dir := filepath.Join(s.JetStreamConfig().StoreDir, globalAccountName) + f1 := filepath.Join(dir, "fail1.txt") + f2 := filepath.Join(dir, "fail2.txt") + for _, f := range []string{f1, f2} { + if _, err := os.Stat(f); err == nil { + t.Fatalf("Found file %s", f) + } + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/jwt_test.go b/server/jwt_test.go index cb81cd3e20..a391a71bd3 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3487,7 +3487,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" limit: 4 } @@ -3513,7 +3513,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" limit: 4 } @@ -3537,7 +3537,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { system_account: %s resolver: { type: cache - dir: %s + dir: '%s' ttl: "%dms" limit: 4 } @@ -3729,7 +3729,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } gateway: { @@ -3753,7 +3753,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } gateway: { @@ -3780,7 +3780,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } gateway: { @@ -3807,7 +3807,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } cluster { @@ -4296,7 +4296,7 @@ func TestJWTJetStreamLimits(t *testing.T) { operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, ojwt, dir, sysPub))) @@ -4342,7 +4342,7 @@ func TestJWTJetStreamLimits(t *testing.T) { operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, port, ojwt, dir, sysPub))) @@ -4359,7 +4359,7 @@ func TestJWTJetStreamLimits(t *testing.T) { operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, port, ojwt, dir, sysPub))) @@ -4434,7 +4434,7 @@ func TestJWTUserRevocation(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -4547,7 +4547,7 @@ func TestJWTActivationRevocation(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -4660,7 +4660,7 @@ func TestJWTAccountFetchTimeout(t *testing.T) { resolver: { %s timeout: "100ms" - dir: %s + dir: '%s' } `, ojwt, syspub, cfg, dirSrv))) defer removeFile(t, conf) @@ -4741,7 +4741,7 @@ func TestJWTAccountOps(t *testing.T) { system_account: %s resolver: { %s - dir: %s + dir: '%s' } `, opJwt, syspub, cfg, dirSrv))) disconnectErrChan := make(chan struct{}, 1) @@ -4876,7 +4876,7 @@ func TestJWTHeader(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -5180,7 +5180,7 @@ func TestJWTAccountTokenImportMisuse(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) @@ -5429,7 +5429,7 @@ func TestJWScopedSigningKeys(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, cf) @@ -5614,7 +5614,7 @@ func TestJWTStrictSigningKeys(t *testing.T) { operator = %s resolver: { type: full - dir: %s + dir: '%s' } resolver_preload = { %s : "%s" @@ -5786,7 +5786,7 @@ func TestJWTAccountProtectedImport(t *testing.T) { system_account = %s resolver: { type: full - dir: %s + dir: '%s' }`, ojwt, sysPub, dirSrv))) defer removeFile(t, cf) s, _ := RunServerWithConfig(cf) @@ -5864,7 +5864,7 @@ func TestJWTMappings(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -5976,7 +5976,7 @@ func TestJWTNoSystemAccountButNatsResolver(t *testing.T) { operator: %s resolver: { type: %s - dir: %s + dir: '%s' }`, ojwt, resType, dirSrv))) defer removeFile(t, conf) opts := LoadConfig(conf) @@ -6018,7 +6018,7 @@ func TestJWTAccountConnzAccessAfterClaimUpdate(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, spub, dirSrv))) defer removeFile(t, conf) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index c1fcbe154e..ec508faa02 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2535,7 +2535,7 @@ func TestLeafNodeOperatorBadCfg(t *testing.T) { system_account: %s resolver: { type: cache - dir: %s + dir: '%s' } leafnodes: { %s @@ -3835,7 +3835,7 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { jetstream { max_mem_store: 256MB, max_file_store: 2GB, - store_dir: "%s", + store_dir: '%s', domain: hub } accounts { @@ -3852,7 +3852,7 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { jetstream { max_mem_store: 256MB, max_file_store: 2GB, - store_dir: "%s", + store_dir: '%s', domain: %s } accounts { @@ -3944,7 +3944,7 @@ leafnodes: { } jetstream :{ domain: "cluster" - store_dir: "%s" + store_dir: '%s' max_mem: 100Mb max_file: 100Mb } @@ -3959,13 +3959,13 @@ accounts :{ system_account = SYS jetstream: { domain: ln1 - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb } leafnodes:{ - remotes:[{ url:nats://127.0.0.1:%d, account: A, credentials: %s}, - { url:nats://127.0.0.1:%d, account: SYS, credentials: %s}] + remotes:[{ url:nats://127.0.0.1:%d, account: A, credentials: '%s'}, + { url:nats://127.0.0.1:%d, account: SYS, credentials: '%s'}] } ` @@ -4139,7 +4139,7 @@ leafnodes: { } jetstream :{ domain: "cluster" - store_dir: "%s" + store_dir: '%s' max_mem: 100Mb max_file: 100Mb } @@ -4168,7 +4168,7 @@ leafnodes: { } jetstream: { domain: "cluster" - store_dir: "%s" + store_dir: '%s' max_mem: 100Mb max_file: 100Mb } @@ -4190,7 +4190,7 @@ accounts :{ system_account = SYS jetstream: { domain: "cluster" - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb %s @@ -4218,7 +4218,7 @@ accounts :{ system_account = SYS jetstream: { domain: "cluster" - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb %s @@ -4420,7 +4420,7 @@ leafnodes: { timeout: 0.5 } } -jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb } +jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb } server_name: A cluster: { name: clust1 @@ -4444,7 +4444,7 @@ leafnodes: { timeout: 0.5 } } -jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb } +jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb } server_name: B cluster: { name: clust1 @@ -4489,7 +4489,7 @@ accounts :{ } system_account = SYS # the extension hint is to simplify this test. without it present we would need a cluster of size 2 -jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb, extension_hint: will_extend } +jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb, extension_hint: will_extend } server_name: LA leafnodes:{ no_advertise: true @@ -4599,13 +4599,13 @@ accounts :{ system_account = SYS jetstream: { domain: "cluster" - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb } leafnodes:{ - remotes:[{url:nats://a1:a1@127.0.0.1:50555, account: A, credentials: %s }, - {url:nats://s1:s1@127.0.0.1:50555, account: SYS, credentials: %s, deny_imports: foo, deny_exports: bar}] + remotes:[{url:nats://a1:a1@127.0.0.1:50555, account: A, credentials: '%s' }, + {url:nats://s1:s1@127.0.0.1:50555, account: SYS, credentials: '%s', deny_imports: foo, deny_exports: bar}] } ` akp, err := nkeys.CreateAccount() @@ -4681,7 +4681,7 @@ accounts :{ SYS:{ users:[ {user:s1,password:s1}]}, } system_account: SYS -jetstream: { domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF leafnodes: { remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},%s] @@ -4751,7 +4751,7 @@ leafnodes: { // Enable jetstream in hub. sdHub := createDir(t, JetStreamStoreDir) defer os.RemoveAll(sdHub) - jsEnabled := fmt.Sprintf(`{ domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb }`, domain, sdHub) + jsEnabled := fmt.Sprintf(`{ domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }`, domain, sdHub) require_NoError(t, ioutil.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub, sHubUpd1.opts.Port, "disabled", @@ -4833,11 +4833,11 @@ accounts :{ SYS:{ users:[ {user:s1,password:s1}]}, } system_account: SYS -jetstream: { domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF leafnodes: { - remotes:[{url:nats://127.0.0.1:%d, account: A, credentials: %s}, - {url:nats://127.0.0.1:%d, account: SYS, credentials: %s}] + remotes:[{url:nats://127.0.0.1:%d, account: A, credentials: '%s'}, + {url:nats://127.0.0.1:%d, account: SYS, credentials: '%s'}] } %s ` @@ -4922,7 +4922,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: enabled, users:[ {user:b1,password:b1}]} } -jetstream : { domain: "DHUB", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream : { domain: "DHUB", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: HUB1 cluster: { name: HUB @@ -4940,7 +4940,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: enabled, users:[ {user:b1,password:b1}]} } -jetstream : { domain: "DHUB", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream : { domain: "DHUB", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: HUB2 cluster: { name: HUB @@ -4958,7 +4958,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: disabled, users:[ {user:b1,password:b1}]} } -jetstream: { domain: "DLEAF", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "DLEAF", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF1 cluster: { name: LEAF @@ -4977,7 +4977,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: disabled, users:[ {user:b1,password:b1}]} } -jetstream: { domain: "DLEAF", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "DLEAF", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF2 cluster: { name: LEAF diff --git a/server/log_test.go b/server/log_test.go index 23e2033806..d41fba4b52 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -142,14 +142,14 @@ func TestReOpenLogFile(t *testing.T) { } func TestFileLoggerSizeLimitAndReopen(t *testing.T) { - s := &Server{opts: &Options{}} - defer s.SetLogger(nil, false, false) - tmpDir := createDir(t, "nats-server") defer removeDir(t, tmpDir) file := createFileAtDir(t, tmpDir, "log_") file.Close() + s := &Server{opts: &Options{}} + defer s.SetLogger(nil, false, false) + // Set a File log s.opts.LogFile = file.Name() s.opts.Logtime = true diff --git a/server/monitor_test.go b/server/monitor_test.go index 77976a5816..041638c1c2 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4022,7 +4022,7 @@ func TestMonitorJsz(t *testing.T) { jetstream: { max_mem_store: 10Mb max_file_store: 10Mb - store_dir: %s + store_dir: '%s' } cluster { name: cluster_name diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e6e4abdcf0..7cabba2b63 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -40,7 +40,7 @@ var testMQTTTimeout = 4 * time.Second var jsClusterTemplWithLeafAndMQTT = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} diff --git a/server/norace_test.go b/server/norace_test.go index 117a9ee0ea..df78606d72 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -29,7 +29,6 @@ import ( "net" "net/http" "net/url" - "path" "path/filepath" "runtime" "runtime/debug" @@ -1092,11 +1091,10 @@ func TestNoRaceAcceptLoopsDoNotLeaveOpenedConn(t *testing.T) { func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -1125,11 +1123,10 @@ func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) { // This test is to show that issue and that the fix works, meaning we no longer swap c.acc. func TestNoRaceJetStreamServiceImportAccountSwapIssue(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client based API nc, js := jsClientConnect(t, s) @@ -1203,12 +1200,10 @@ func TestNoRaceJetStreamServiceImportAccountSwapIssue(t *testing.T) { func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Create 2X limit streamsNum := 2 * JSApiNamesLimit @@ -1272,12 +1267,10 @@ func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) { func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sname := "MYSTREAM" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sname}) @@ -1867,11 +1860,10 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -2198,11 +2190,10 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) { func TestNoRaceJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Create directly here to force multiple blocks, etc. a, err := s.LookupAccount("$G") @@ -2352,11 +2343,10 @@ func TestNoRaceJetStreamFileStoreBufferReuse(t *testing.T) { skip(t) s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() cfg := &StreamConfig{Name: "TEST", Subjects: []string{"foo", "bar", "baz"}, Storage: FileStorage} if _, err := s.GlobalAccount().addStreamWithStore(cfg, nil); err != nil { @@ -2433,11 +2423,10 @@ func TestNoRaceJetStreamSlowRestartWithManyExpiredMsgs(t *testing.T) { opts.Port = -1 opts.JetStream = true s := RunServer(&opts) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -2697,12 +2686,11 @@ func TestNoRaceAccountConnz(t *testing.T) { func TestNoRaceCompressedConnz(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, _ := jsClientConnect(t, s) defer nc.Close() @@ -2930,12 +2918,11 @@ func TestNoRaceJetStreamClusterExtendedStreamPurge(t *testing.T) { func TestNoRaceJetStreamFileStoreCompaction(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -3057,12 +3044,11 @@ func TestNoRaceJetStreamOrderedConsumerMissingMsg(t *testing.T) { skip(t) s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -3935,12 +3921,11 @@ func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) { func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -4133,11 +4118,10 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("me", "pwd")) defer nc.Close() @@ -4203,11 +4187,10 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { func TestNoRaceJetStreamSparseConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4319,7 +4302,7 @@ func TestNoRaceFileStoreSubjectInfoWithSnapshotCleanup(t *testing.T) { } // We will have cleanup the main .blk and .idx sans the lmb, but we should not have any *.fss files. - fms, err := filepath.Glob(path.Join(storeDir, msgDir, fssScanAll)) + fms, err := filepath.Glob(filepath.Join(storeDir, msgDir, fssScanAll)) require_NoError(t, err) if len(fms) > 0 { @@ -4366,7 +4349,7 @@ func TestNoRaceFileStoreKeyFileCleanup(t *testing.T) { } // We will have cleanup the main .blk and .idx sans the lmb, but we should not have any *.fss files. - kms, err := filepath.Glob(path.Join(storeDir, msgDir, keyScanAll)) + kms, err := filepath.Glob(filepath.Join(storeDir, msgDir, keyScanAll)) require_NoError(t, err) if len(kms) > 1 { diff --git a/server/raft.go b/server/raft.go index df72d0daaf..19bf653312 100644 --- a/server/raft.go +++ b/server/raft.go @@ -25,7 +25,6 @@ import ( "math/rand" "net" "os" - "path" "path/filepath" "sync" "sync/atomic" @@ -409,13 +408,13 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { n.vote = vote } - if err := os.MkdirAll(path.Join(n.sd, snapshotsDir), 0750); err != nil { + if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), 0750); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) } // Can't recover snapshots if memory based. if _, ok := n.wal.(*memStore); ok { - os.Remove(path.Join(n.sd, snapshotsDir, "*")) + os.Remove(filepath.Join(n.sd, snapshotsDir, "*")) } else { // See if we have any snapshots and if so load and process on startup. n.setupLastSnapshot() @@ -913,9 +912,9 @@ func (n *raft) InstallSnapshot(data []byte) error { data: data, } - snapDir := path.Join(n.sd, snapshotsDir) + snapDir := filepath.Join(n.sd, snapshotsDir) sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex) - sfile := path.Join(snapDir, sn) + sfile := filepath.Join(snapDir, sn) // Remember our latest snapshot file. n.snapfile = sfile @@ -938,7 +937,7 @@ func (n *raft) InstallSnapshot(data []byte) error { for _, fi := range psnaps { pn := fi.Name() if pn != sn { - os.Remove(path.Join(snapDir, pn)) + os.Remove(filepath.Join(snapDir, pn)) } } @@ -968,7 +967,7 @@ func termAndIndexFromSnapFile(sn string) (term, index uint64, err error) { } func (n *raft) setupLastSnapshot() { - snapDir := path.Join(n.sd, snapshotsDir) + snapDir := filepath.Join(n.sd, snapshotsDir) psnaps, err := ioutil.ReadDir(snapDir) if err != nil { return @@ -977,7 +976,7 @@ func (n *raft) setupLastSnapshot() { var lterm, lindex uint64 var latest string for _, sf := range psnaps { - sfile := path.Join(snapDir, sf.Name()) + sfile := filepath.Join(snapDir, sf.Name()) var term, index uint64 term, index, err := termAndIndexFromSnapFile(sf.Name()) if err == nil { @@ -998,7 +997,7 @@ func (n *raft) setupLastSnapshot() { // Now cleanup any old entries for _, sf := range psnaps { - sfile := path.Join(snapDir, sf.Name()) + sfile := filepath.Join(snapDir, sf.Name()) if sfile != latest { n.debug("Removing old snapshot: %q", sfile) os.Remove(sfile) @@ -1334,9 +1333,9 @@ func (n *raft) shutdown(shouldDelete bool) { // Delete our peer state and vote state and any snapshots. if shouldDelete { - os.Remove(path.Join(n.sd, peerStateFile)) - os.Remove(path.Join(n.sd, termVoteFile)) - os.RemoveAll(path.Join(n.sd, snapshotsDir)) + os.Remove(filepath.Join(n.sd, peerStateFile)) + os.Remove(filepath.Join(n.sd, termVoteFile)) + os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } n.Unlock() @@ -3027,7 +3026,7 @@ func (n *raft) writePeerState(ps *peerState) { // Writes out our peer state outside of a specific raft context. func writePeerState(sd string, ps *peerState) error { - psf := path.Join(sd, peerStateFile) + psf := filepath.Join(sd, peerStateFile) if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } @@ -3038,7 +3037,7 @@ func writePeerState(sd string, ps *peerState) error { } func readPeerState(sd string) (ps *peerState, err error) { - buf, err := ioutil.ReadFile(path.Join(sd, peerStateFile)) + buf, err := ioutil.ReadFile(filepath.Join(sd, peerStateFile)) if err != nil { return nil, err } @@ -3051,7 +3050,7 @@ const termVoteLen = idLen + 8 // readTermVote will read the largest term and who we voted from to stable storage. // Lock should be held. func (n *raft) readTermVote() (term uint64, voted string, err error) { - buf, err := ioutil.ReadFile(path.Join(n.sd, termVoteFile)) + buf, err := ioutil.ReadFile(filepath.Join(n.sd, termVoteFile)) if err != nil { return 0, noVote, err } @@ -3100,8 +3099,8 @@ func (n *raft) fileWriter() { defer s.grWG.Done() n.RLock() - tvf := path.Join(n.sd, termVoteFile) - psf := path.Join(n.sd, peerStateFile) + tvf := filepath.Join(n.sd, termVoteFile) + psf := filepath.Join(n.sd, peerStateFile) n.RUnlock() for s.isRunning() { diff --git a/server/reload_test.go b/server/reload_test.go index ee8eea2b91..7be0b6d238 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -4161,6 +4161,10 @@ func TestLoggingReload(t *testing.T) { conf := createConfFile(t, []byte(commonCfg)) defer removeFile(t, conf) + defer removeFile(t, "off-pre.log") + defer removeFile(t, "on.log") + defer removeFile(t, "off-post.log") + s, opts := RunServerWithConfig(conf) defer s.Shutdown() @@ -4204,12 +4208,10 @@ func TestLoggingReload(t *testing.T) { nc.Close() } - defer removeFile(t, "off-pre.log") reload("log_file: off-pre.log") traffic(10) // generate NO trace/debug entries in off-pre.log - defer removeFile(t, "on.log") reload(` log_file: on.log debug: true @@ -4218,7 +4220,6 @@ func TestLoggingReload(t *testing.T) { traffic(10) // generate trace/debug entries in on.log - defer removeFile(t, "off-post.log") reload(` log_file: off-post.log debug: false diff --git a/server/routes_test.go b/server/routes_test.go index 5a37ea3378..13910f2743 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1465,6 +1465,7 @@ func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { if err := cfg.Sync(); err != nil { t.Fatal(err) } + cfg.Close() optsA := LoadConfig(cfg.Name()) optsB := LoadConfig(cfg.Name()) diff --git a/server/server_test.go b/server/server_test.go index f1dfcd6979..9d6a19f0df 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1946,7 +1946,7 @@ func TestServerLogsConfigurationFile(t *testing.T) { conf := createConfFile(t, []byte(fmt.Sprintf(` port: -1 - logfile: "%s" + logfile: '%s' `, file.Name()))) defer removeFile(t, conf) diff --git a/server/stream.go b/server/stream.go index c5cd88a9d0..20c25f02ac 100644 --- a/server/stream.go +++ b/server/stream.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "math" "os" - "path" "path/filepath" "reflect" "strconv" @@ -389,7 +388,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } jsa.streams[cfg.Name] = mset - storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name) + storeDir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name) jsa.mu.Unlock() // Bind to the user account. @@ -3603,7 +3602,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return nil, err } - sd := path.Join(jsa.storeDir, snapsDir) + sd := filepath.Join(jsa.storeDir, snapsDir) if _, err := os.Stat(sd); os.IsNotExist(err) { if err := os.MkdirAll(sd, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) @@ -3620,6 +3619,17 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error } defer os.RemoveAll(sdir) + logAndReturnError := func() error { + a.mu.RLock() + err := fmt.Errorf("unexpected content (account=%s)", a.Name) + if a.srv != nil { + a.srv.Errorf("Stream restore failed due to %v", err) + } + a.mu.RUnlock() + return err + } + sdirCheck := filepath.Clean(sdir) + string(os.PathSeparator) + tr := tar.NewReader(s2.NewReader(r)) for { hdr, err := tr.Next() @@ -3629,7 +3639,13 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error if err != nil { return nil, err } - fpath := path.Join(sdir, filepath.Clean(hdr.Name)) + if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA { + return nil, logAndReturnError() + } + fpath := filepath.Join(sdir, filepath.Clean(hdr.Name)) + if !strings.HasPrefix(fpath, sdirCheck) { + return nil, logAndReturnError() + } os.MkdirAll(filepath.Dir(fpath), defaultDirPerms) fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600) if err != nil { @@ -3645,7 +3661,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error // Check metadata. // The cfg passed in will be the new identity for the stream. var fcfg FileStreamInfo - b, err := ioutil.ReadFile(path.Join(sdir, JetStreamMetaFile)) + b, err := ioutil.ReadFile(filepath.Join(sdir, JetStreamMetaFile)) if err != nil { return nil, err } @@ -3663,13 +3679,13 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return nil, NewJSStreamNameExistError() } // Move into the correct place here. - ndir := path.Join(jsa.storeDir, streamsDir, cfg.Name) + ndir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name) // Remove old one if for some reason it is still here. if _, err := os.Stat(ndir); err == nil { os.RemoveAll(ndir) } // Make sure our destination streams directory exists. - if err := os.MkdirAll(path.Join(jsa.storeDir, streamsDir), defaultDirPerms); err != nil { + if err := os.MkdirAll(filepath.Join(jsa.storeDir, streamsDir), defaultDirPerms); err != nil { return nil, err } // Move into new location. @@ -3690,11 +3706,11 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error } // Now do consumers. - odir := path.Join(ndir, consumerDir) + odir := filepath.Join(ndir, consumerDir) ofis, _ := ioutil.ReadDir(odir) for _, ofi := range ofis { - metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile) - metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum) + metafile := filepath.Join(odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(odir, ofi.Name(), JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { mset.stop(true, false) return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err) diff --git a/test/configs/jetstream/restore_bad_stream/backup.json b/test/configs/jetstream/restore_bad_stream/backup.json new file mode 100644 index 0000000000..86b7391f9d --- /dev/null +++ b/test/configs/jetstream/restore_bad_stream/backup.json @@ -0,0 +1,33 @@ +{ + "config": { + "name": "TEST", + "subjects": [ + "foo" + ], + "retention": "limits", + "max_consumers": -1, + "max_msgs_per_subject": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "discard": "old", + "num_replicas": 1, + "duplicate_window": 120000000000, + "sealed": false, + "deny_delete": false, + "deny_purge": false, + "allow_rollup_hdrs": false + }, + "state": { + "messages": 10, + "bytes": 381, + "first_seq": 1, + "first_ts": "2022-03-07T23:59:01.710801Z", + "last_seq": 10, + "last_ts": "2022-03-07T23:59:01.712378Z", + "num_subjects": 1, + "consumer_count": 1 + } +} \ No newline at end of file diff --git a/test/configs/jetstream/restore_bad_stream/stream.tar.s2 b/test/configs/jetstream/restore_bad_stream/stream.tar.s2 new file mode 100755 index 0000000000000000000000000000000000000000..f83c8d76d747cc59eb739f4c8003cdae2758636d GIT binary patch literal 1590 zcmYjReP|nH7=JEJ+DlBg)HZcjwl1Dq8=+0#`%GHfTHUPMaC2<2ILFK`xyy3v<&H!a%m^n9rO3BZRx$@BO{M z=li+S46t4HvNMlEvKbP0O-{_c#bvh|^$nI`T2(Ja2FjLiq-?hcl!G`#fP?`c+!7=Z zhBzN1L{T6F;ouq|lBBB=Bxmx+OWCDEoHV8bPkE52m#Y zNu_mJk<3^`MMMAIaYR9TWVNtZMgEy}8*L4v-H z?0e@S>KeWmhmHdYIQ*sXM{yI-E>zpNIQy))u4);+Wr{QwCJyyWOHkR{vuas1KkHkoR&^MAP?!k!$8u9vHP<>V^ekqPkn`+k@PJ)0 zZLeTdnU596WY+ykwQ)-<+K$DXtCe$AYs#?;23lb^YK;%y%FWoWS(!mlCjA$DLh!a_ zI4t0z;dmBOEq69&S`NCecR+NiR^21Cj~R|rnaNcvj#Hb;mCUNw+6DXR-yCL5czGOn z9zzmV(+I>P{n^nQO>MDlz(ZPqQlL?zWpXBX0BJ)# zoyoS* zU#*Ypk3mo(v=ENNqbnP=a>ZsS&qFA=V_{fQ!$I9 zgB^{qQJ56kV{XN@#Li2-tplA4y&Y_Fj9DSsVY|4O+|&e!cX~U-yipbRZ`?scxGL@( z2W)!Uxh4)FEHGfZOfR)dNPatn-i|{uiW^90)g9IfQc+U0qLP>OjG4&`a1ACftCf^# z_%sqgkw#yq2@cHUl1@Z^pwV^`wscfc(-KpH&hB6Dp)J_5XCRYG4dAb6*p8I)>ppEp zowgN?J*cYPi)mbiJg(iNKvb2aB^}owEh z#ZTm|M9HdJ-RHXKc}Ae`GB}f_;Uy*6p8Iug|JWqbTouaI>HHX Gkoym)yQR4R literal 0 HcmV?d00001 diff --git a/test/ocsp_test.go b/test/ocsp_test.go index e9fe35fd95..694dd305b5 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -690,7 +690,7 @@ func TestOCSPReloadRotateTLSCertDisableMustStaple(t *testing.T) { originalContent := ` port: -1 - store_dir: "%s" + store_dir: '%s' tls { cert_file: "configs/certs/ocsp/server-status-request-url-01-cert.pem" @@ -769,7 +769,7 @@ func TestOCSPReloadRotateTLSCertDisableMustStaple(t *testing.T) { updatedContent := ` port: -1 - store_dir: "%s" + store_dir: '%s' tls { cert_file: "configs/certs/ocsp/server-cert.pem" @@ -1010,7 +1010,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: AB host: "127.0.0.1" @@ -1044,7 +1044,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: AB host: "127.0.0.1" @@ -1111,7 +1111,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: AB host: "127.0.0.1" @@ -1206,7 +1206,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { port: -1 name: AB @@ -1284,7 +1284,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { host: "127.0.0.1" port: -1 @@ -1317,7 +1317,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { remotes: [ { url: "tls://127.0.0.1:%d" @@ -1378,7 +1378,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { remotes: [ { url: "tls://127.0.0.1:%d" @@ -1468,7 +1468,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { host: "127.0.0.1" port: -1 @@ -1555,7 +1555,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: A host: "127.0.0.1" @@ -1589,7 +1589,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: B host: "127.0.0.1" @@ -1662,7 +1662,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: C host: "127.0.0.1" @@ -1758,7 +1758,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: A host: "127.0.0.1" @@ -2623,7 +2623,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: A @@ -2675,7 +2675,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: A @@ -2748,7 +2748,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: C host: "127.0.0.1" @@ -2798,7 +2798,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: D host: "127.0.0.1" diff --git a/test/service_latency_test.go b/test/service_latency_test.go index abb1b6710e..c2dbc6d350 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -19,7 +19,7 @@ import ( "io/ioutil" "math/rand" "net/http" - "path" + "path/filepath" "strings" "sync" "sync/atomic" @@ -1814,7 +1814,7 @@ func TestServiceLatencyMissingResults(t *testing.T) { server_name: s1 cluster { port: -1 } include %q - `, path.Base(accConf)))) + `, filepath.Base(accConf)))) defer removeFile(t, s1Conf) s1, opts1 := RunServerWithConfig(s1Conf) @@ -1828,7 +1828,7 @@ func TestServiceLatencyMissingResults(t *testing.T) { routes = [ nats-route://127.0.0.1:%d ] } include %q - `, opts1.Cluster.Port, path.Base(accConf)))) + `, opts1.Cluster.Port, filepath.Base(accConf)))) defer removeFile(t, s2Conf) s2, opts2 := RunServerWithConfig(s2Conf)