From 770cf2edd6b82877f98becdf4324b93f051fe136 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Tue, 10 Oct 2023 13:00:37 -0700 Subject: [PATCH] Backport JetStream benchmarks improvements to 2.9.x (#4644) Backporting of JetStream benchmarks that landed in `main` (post 2.10.x) and `dev` branches back to 2.9. These improvements are to the benchmark code and not specific to 2.10. Therefore it seems appropriate to back port them, so that benchmarks more accurately compare 2.9 to 2.10 and beyond. --------- Signed-off-by: Marco Primi Signed-off-by: Reuben Ninan Co-authored-by: reubenninan --- server/jetstream_benchmark_test.go | 461 +++++++++++++---------------- 1 file changed, 207 insertions(+), 254 deletions(-) diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index 94ded0ef93..5d6ec571d1 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -29,11 +29,12 @@ import ( func BenchmarkJetStreamConsume(b *testing.B) { const ( - verbose = false - streamName = "S" - subject = "s" - seed = 12345 - publishTimeout = 30 * time.Second + verbose = false + streamName = "S" + subject = "s" + seed = 12345 + publishTimeout = 30 * time.Second + PublishBatchSize = 10000 ) runSyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string) (int, int, int) { @@ -81,7 +82,6 @@ func BenchmarkJetStreamConsume(b *testing.B) { uniqueConsumed++ bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) if verbose && uniqueConsumed%1000 == 0 { b.Logf("Consumed: %d/%d", bitset.count(), b.N) @@ -127,7 +127,6 @@ func BenchmarkJetStreamConsume(b *testing.B) { uniqueConsumed++ bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) if uniqueConsumed == b.N { msg.Sub.Unsubscribe() @@ -223,7 +222,6 @@ func BenchmarkJetStreamConsume(b *testing.B) { uniqueConsumed++ bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) if uniqueConsumed == b.N { msg.Sub.Unsubscribe() @@ -307,20 +305,9 @@ func BenchmarkJetStreamConsume(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var connectURL string - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - nc, js := jsClientConnectURL(b, connectURL) + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() defer nc.Close() if verbose { @@ -335,28 +322,39 @@ func BenchmarkJetStreamConsume(b *testing.B) { b.Fatalf("Error creating stream: %v", err) } + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + connectURL := cl.streamLeader("$G", streamName).ClientURL() + nc.Close() + _, js = jsClientConnectURL(b, connectURL) + } + rng := rand.New(rand.NewSource(int64(seed))) message := make([]byte, bc.messageSize) - publishedCount := 0 - for publishedCount < b.N { + + // Publish b.N messages to the stream (in batches) + for i := 1; i <= b.N; i++ { rng.Read(message) _, err := js.PublishAsync(subject, message) if err != nil { - continue - } else { - publishedCount++ + b.Fatalf("Failed to publish: %s", err) } - } - - select { - case <-js.PublishAsyncComplete(): - if verbose { - b.Logf("Published %d messages", b.N) + // Limit outstanding published messages to PublishBatchSize + if i%PublishBatchSize == 0 || i == b.N { + select { + case <-js.PublishAsyncComplete(): + if verbose { + b.Logf("Published %d/%d messages", i, b.N) + } + case <-time.After(publishTimeout): + b.Fatalf("Publish timed out") + } } - case <-time.After(publishTimeout): - b.Fatalf("Publish timed out") } + // Set size of each operation, for throughput calculation + b.SetBytes(int64(bc.messageSize)) + // Discard time spent during setup // Consumer may reset again further in b.ResetTimer() @@ -407,8 +405,9 @@ func BenchmarkJetStreamConsume(b *testing.B) { func BenchmarkJetStreamPublish(b *testing.B) { const ( - verbose = false - seed = 12345 + verbose = false + seed = 12345 + streamName = "S" ) runSyncPublisher := func(b *testing.B, js nats.JetStreamContext, messageSize int, subjects []string) (int, int) { @@ -426,7 +425,6 @@ func BenchmarkJetStreamPublish(b *testing.B) { errors++ } else { published++ - b.SetBytes(int64(messageSize)) } if verbose && i%1000 == 0 { @@ -443,61 +441,54 @@ func BenchmarkJetStreamPublish(b *testing.B) { const publishCompleteMaxWait = 30 * time.Second rng := rand.New(rand.NewSource(int64(seed))) message := make([]byte, messageSize) - pending := make([]nats.PubAckFuture, 0, asyncWindow) + published, errors := 0, 0 b.ResetTimer() - for i := 1; i <= b.N; i++ { - rng.Read(message) // TODO may skip this? - subject := subjects[rng.Intn(len(subjects))] - pubAckFuture, err := js.PublishAsync(subject, message) - if err != nil { - errors++ - continue + for published < b.N { + + // Normally publish a full batch (of size `asyncWindow`) + publishBatchSize := asyncWindow + // Unless fewer are left to complete the benchmark + if b.N-published < asyncWindow { + publishBatchSize = b.N - published } - pending = append(pending, pubAckFuture) - - // Regularly trim the list of pending - if i%asyncWindow == 0 { - newPending := make([]nats.PubAckFuture, 0, asyncWindow) - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - // This pubAck is still pending, keep it - newPending = append(newPending, pubAckFuture) - } + + pending := make([]nats.PubAckFuture, 0, publishBatchSize) + + for i := 0; i < publishBatchSize; i++ { + rng.Read(message) // TODO may skip this? + subject := subjects[rng.Intn(len(subjects))] + pubAckFuture, err := js.PublishAsync(subject, message) + if err != nil { + errors++ + continue } - pending = newPending + pending = append(pending, pubAckFuture) } - if verbose && i%1000 == 0 { - b.Logf("Published %d/%d, %d errors", i, b.N, errors) + // All in this batch published, wait for completed + select { + case <-js.PublishAsyncComplete(): + case <-time.After(publishCompleteMaxWait): + b.Fatalf("Publish timed out") } - } - // All published, wait for completed - select { - case <-js.PublishAsyncComplete(): - case <-time.After(publishCompleteMaxWait): - b.Fatalf("Publish timed out") - } + // Verify one by one if they were published successfully + for _, pubAckFuture := range pending { + select { + case <-pubAckFuture.Ok(): + published++ + case <-pubAckFuture.Err(): + errors++ + default: + b.Fatalf("PubAck is still pending after publish completed") + } + } - // Clear whatever is left pending - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - b.Fatalf("PubAck is still pending after publish completed") + if verbose { + b.Logf("Published %d/%d", published, b.N) } } @@ -558,11 +549,6 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Run( name, func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < bc.minMessages { - b.ResetTimer() - return - } subjects := make([]string, bc.numSubjects) for i := 0; i < bc.numSubjects; i++ { @@ -576,24 +562,9 @@ func BenchmarkJetStreamPublish(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var connectURL string - - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } + cl, _, shutdown, nc, _ := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() defer nc.Close() jsOpts := []nats.JSOpt{ @@ -613,7 +584,7 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Logf("Creating stream with R=%d and %d input subjects", bc.replicas, bc.numSubjects) } streamConfig := &nats.StreamConfig{ - Name: "S", + Name: streamName, Subjects: subjects, Replicas: bc.replicas, } @@ -621,10 +592,27 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Fatalf("Error creating stream: %v", err) } + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + connectURL := cl.streamLeader("$G", streamName).ClientURL() + nc.Close() + nc, err = nats.Connect(connectURL) + if err != nil { + b.Fatalf("Failed to create client connection to stream leader: %v", err) + } + defer nc.Close() + js, err = nc.JetStream(jsOpts...) + if err != nil { + b.Fatalf("Unexpected error getting JetStream context for stream leader: %v", err) + } + } + if verbose { b.Logf("Running %v publisher with message size: %dB", pc.pType, bc.messageSize) } + b.SetBytes(int64(bc.messageSize)) + // Benchmark starts here b.ResetTimer() @@ -717,48 +705,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { }, } - // Helper: Stand up in-process single node or cluster - setupCluster := func(b *testing.B, clusterSize int) (string, func()) { - var connectURL string - var shutdownFunc func() - - if clusterSize == 1 { - s := RunBasicJetStreamServer(b) - shutdownFunc = s.Shutdown - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) - shutdownFunc = cl.shutdown - cl.waitOnClusterReadyWithNumPeers(clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - //connectURL = cl.leader().ClientURL() - } - - return connectURL, shutdownFunc - } - - // Helper: Create the stream - setupStream := func(b *testing.B, connectURL string, streamConfig *nats.StreamConfig) { - // Connect - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } - defer nc.Close() - - jsOpts := []nats.JSOpt{} - - js, err := nc.JetStream(jsOpts...) - if err != nil { - b.Fatalf("Unexpected error getting JetStream context: %v", err) - } - - if _, err := js.AddStream(streamConfig); err != nil { - b.Fatalf("Error creating stream: %v", err) - } - } - // Context shared by publishers routines type PublishersContext = struct { readyWg sync.WaitGroup @@ -844,12 +790,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { b.Run( limitDescription, func(b *testing.B) { - // Stop timer during setup - b.StopTimer() - b.ResetTimer() - - // Set per-iteration bytes to calculate throughput (a.k.a. speed) - b.SetBytes(messageSize) // Print benchmark parameters if verbose { @@ -863,8 +803,9 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { } // Setup server or cluster - connectURL, shutdownFunc := setupCluster(b, benchmarkCase.clusterSize) - defer shutdownFunc() + cl, ls, shutdown, nc, js := startJSClusterAndConnect(b, benchmarkCase.clusterSize) + defer shutdown() + defer nc.Close() // Common stream configuration streamConfig := &nats.StreamConfig{ @@ -877,8 +818,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { } // Configure stream limit limitConfigFunc(streamConfig) + // Create stream - setupStream(b, connectURL, streamConfig) + if _, err := js.AddStream(streamConfig); err != nil { + b.Fatalf("Error creating stream: %v", err) + } // Set up publishers shared context var pubCtx PublishersContext @@ -889,6 +833,12 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { pubCtx.lock.Lock() pubCtx.messagesLeft = b.N + connectURL := ls.ClientURL() + // If replicated resource, connect to stream leader for lower variability + if benchmarkCase.replicas > 1 { + connectURL = cl.streamLeader("$G", "S").ClientURL() + } + // Spawn publishers routines, each with its own connection and JS context for i := 0; i < numPublishers; i++ { nc, err := nats.Connect(connectURL) @@ -906,8 +856,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { // Wait for all publishers to be ready pubCtx.readyWg.Wait() + // Set size of each operation, for throughput calculation + b.SetBytes(messageSize) + // Benchmark starts here - b.StartTimer() + b.ResetTimer() // Unblock the publishers pubCtx.lock.Unlock() @@ -938,30 +891,26 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { func BenchmarkJetStreamKV(b *testing.B) { const ( - verbose = false - kvNamePrefix = "B_" - keyPrefix = "K_" - seed = 12345 - minOps = 1_000 + verbose = false + kvName = "BUCKET" + keyPrefix = "K_" + seed = 12345 ) - runKVGet := func(b *testing.B, kvs []nats.KeyValue, keys []string) int { + runKVGet := func(b *testing.B, kv nats.KeyValue, keys []string) int { rng := rand.New(rand.NewSource(int64(seed))) errors := 0 b.ResetTimer() for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] key := keys[rng.Intn(len(keys))] - kve, err := kv.Get(key) + _, err := kv.Get(key) if err != nil { errors++ continue } - b.SetBytes(int64(len(kve.Value()))) - if verbose && i%1000 == 0 { b.Logf("Completed %d/%d Get ops", i, b.N) } @@ -971,7 +920,7 @@ func BenchmarkJetStreamKV(b *testing.B) { return errors } - runKVPut := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { + runKVPut := func(b *testing.B, kv nats.KeyValue, keys []string, valueSize int) int { rng := rand.New(rand.NewSource(int64(seed))) value := make([]byte, valueSize) errors := 0 @@ -979,7 +928,6 @@ func BenchmarkJetStreamKV(b *testing.B) { b.ResetTimer() for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] key := keys[rng.Intn(len(keys))] rng.Read(value) _, err := kv.Put(key, value) @@ -988,8 +936,6 @@ func BenchmarkJetStreamKV(b *testing.B) { continue } - b.SetBytes(int64(valueSize)) - if verbose && i%1000 == 0 { b.Logf("Completed %d/%d Put ops", i, b.N) } @@ -999,7 +945,7 @@ func BenchmarkJetStreamKV(b *testing.B) { return errors } - runKVUpdate := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { + runKVUpdate := func(b *testing.B, kv nats.KeyValue, keys []string, valueSize int) int { rng := rand.New(rand.NewSource(int64(seed))) value := make([]byte, valueSize) errors := 0 @@ -1007,7 +953,6 @@ func BenchmarkJetStreamKV(b *testing.B) { b.ResetTimer() for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] key := keys[rng.Intn(len(keys))] kve, getErr := kv.Get(key) @@ -1023,8 +968,6 @@ func BenchmarkJetStreamKV(b *testing.B) { continue } - b.SetBytes(int64(valueSize)) - if verbose && i%1000 == 0 { b.Logf("Completed %d/%d Update ops", i, b.N) } @@ -1044,15 +987,14 @@ func BenchmarkJetStreamKV(b *testing.B) { benchmarksCases := []struct { clusterSize int replicas int - numBuckets int numKeys int valueSize int }{ - {1, 1, 1, 100, 100}, // 1 node, 1 bucket with 100 keys, 100B values - {1, 1, 10, 1000, 100}, // 1 node, 10 buckets with 1000 keys, 100B values - {3, 3, 1, 100, 100}, // 3 nodes, 1 bucket with 100 keys, 100B values - {3, 3, 10, 1000, 100}, // 3 nodes, 10 buckets with 1000 keys, 100B values - {3, 3, 10, 1000, 1024}, // 3 nodes, 10 buckets with 1000 keys, 1KB values + {1, 1, 100, 100}, // 1 node with 100 keys, 100B values + {1, 1, 1000, 100}, // 1 node with 1000 keys, 100B values + {3, 3, 100, 100}, // 3 nodes with 100 keys, 100B values + {3, 3, 1000, 100}, // 3 nodes with 1000 keys, 100B values + {3, 3, 1000, 1024}, // 3 nodes with 1000 keys, 1KB values } workloadCases := []WorkloadType{ @@ -1064,10 +1006,9 @@ func BenchmarkJetStreamKV(b *testing.B) { for _, bc := range benchmarksCases { bName := fmt.Sprintf( - "N=%d,R=%d,B=%d,K=%d,ValSz=%db", + "N=%d,R=%d,B=1,K=%d,ValSz=%db", bc.clusterSize, bc.replicas, - bc.numBuckets, bc.numKeys, bc.valueSize, ) @@ -1080,11 +1021,6 @@ func BenchmarkJetStreamKV(b *testing.B) { b.Run( wName, func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < minOps { - b.ResetTimer() - return - } if verbose { b.Logf("Running %s workload %s with %d messages", wName, bName, b.N) @@ -1093,21 +1029,6 @@ func BenchmarkJetStreamKV(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var connectURL string - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - - nc, js := jsClientConnectURL(b, connectURL) - defer nc.Close() // Pre-generate all keys keys := make([]string, 0, bc.numKeys) @@ -1116,36 +1037,51 @@ func BenchmarkJetStreamKV(b *testing.B) { keys = append(keys, key) } - // Initialize all KVs - kvs := make([]nats.KeyValue, 0, bc.numBuckets) - for i := 1; i <= bc.numBuckets; i++ { - // Create bucket - kvName := fmt.Sprintf("%s%d", kvNamePrefix, i) - if verbose { - b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) - } - kvConfig := &nats.KeyValueConfig{ - Bucket: kvName, - Replicas: bc.replicas, - } - kv, err := js.CreateKeyValue(kvConfig) + // Setup server or cluster + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() + defer nc.Close() + + // Create bucket + if verbose { + b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) + } + kvConfig := &nats.KeyValueConfig{ + Bucket: kvName, + Replicas: bc.replicas, + } + kv, err := js.CreateKeyValue(kvConfig) + if err != nil { + b.Fatalf("Error creating KV: %v", err) + } + + // Initialize all keys + rng := rand.New(rand.NewSource(int64(seed))) + value := make([]byte, bc.valueSize) + for _, key := range keys { + rng.Read(value) + _, err := kv.Create(key, value) if err != nil { - b.Fatalf("Error creating KV: %v", err) - } - kvs = append(kvs, kv) - - // Initialize all keys - rng := rand.New(rand.NewSource(int64(seed * i))) - value := make([]byte, bc.valueSize) - for _, key := range keys { - rng.Read(value) - _, err := kv.Create(key, value) - if err != nil { - b.Fatalf("Failed to initialize %s/%s: %v", kvName, key, err) - } + b.Fatalf("Failed to initialize %s/%s: %v", kvName, key, err) } } + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + nc.Close() + connectURL := cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL() + nc, js = jsClientConnectURL(b, connectURL) + defer nc.Close() + } + + kv, err = js.KeyValue(kv.Bucket()) + if err != nil { + b.Fatalf("Error binding to KV: %v", err) + } + + // Set size of each operation, for throughput calculation + b.SetBytes(int64(bc.valueSize)) + // Discard time spent during setup // May reset again further in b.ResetTimer() @@ -1154,11 +1090,11 @@ func BenchmarkJetStreamKV(b *testing.B) { switch wc { case Get: - errors = runKVGet(b, kvs, keys) + errors = runKVGet(b, kv, keys) case Put: - errors = runKVPut(b, kvs, keys, bc.valueSize) + errors = runKVPut(b, kv, keys, bc.valueSize) case Update: - errors = runKVUpdate(b, kvs, keys, bc.valueSize) + errors = runKVUpdate(b, kv, keys, bc.valueSize) default: b.Fatalf("Unknown workload type: %v", wc) } @@ -1252,7 +1188,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) { minObjSz int maxObjSz int }{ - // TODO remove duplicates and fix comments {nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB) {nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB) {nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB) @@ -1261,7 +1196,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) { {nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB) {nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB) {nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB) - } var ( @@ -1294,23 +1228,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", replicas) } - var ( - connectURL string - cl *cluster - ) - if clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(replicas) - cl.waitOnLeader() - // connect to leader and not replicas - connectURL = cl.leader().ClientURL() - } - nc, js := jsClientConnectURL(b, connectURL) + + // Setup server or cluster + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, clusterSize) + defer shutdown() defer nc.Close() // Initialize object store @@ -1327,10 +1248,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) { b.Fatalf("Error creating ObjectStore: %v", err) } - // if cluster_size > 1, connect to stream leader - if cl != nil { + // If replicated resource, connect to stream leader for lower variability + if clusterSize > 1 { nc.Close() - connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() + connectURL := cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() nc, js := jsClientConnectURL(b, connectURL) defer nc.Close() objStore, err = js.ObjectStore(objStoreName) @@ -1368,9 +1289,41 @@ func BenchmarkJetStreamObjStore(b *testing.B) { } }, ) - } }, ) } } + +// Helper function to stand up a JS-enabled single server or cluster +func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) { + b.Helper() + var err error + + if clusterSize == 1 { + s = RunBasicJetStreamServer(b) + shutdown = func() { + s.Shutdown() + } + } else { + c = createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) + c.waitOnClusterReadyWithNumPeers(clusterSize) + c.waitOnLeader() + s = c.leader() + shutdown = func() { + c.shutdown() + } + } + + nc, err = nats.Connect(s.ClientURL()) + if err != nil { + b.Fatalf("failed to connect: %s", err) + } + + js, err = nc.JetStream() + if err != nil { + b.Fatalf("failed to init jetstream: %s", err) + } + + return c, s, shutdown, nc, js +}