diff --git a/test/compat_test.go b/test/compat_test.go index 8054fd236..5a0f0bacb 100644 --- a/test/compat_test.go +++ b/test/compat_test.go @@ -223,6 +223,76 @@ func TestCompatibilityObjectStoreUpdateMetadata(t *testing.T) { validateTestResult(t, sub) } +func TestCompatibilityObjectStoreWatch(t *testing.T) { + t.Parallel() + + type config struct { + Bucket string `json:"bucket"` + Object string `json:"object"` + } + + nc, js := connect(t) + defer nc.Close() + + // setup subscription on which tester will be sending requests + sub, err := nc.SubscribeSync("tests.object-store.watch.>") + if err != nil { + t.Fatalf("Error subscribing to test subject: %v", err) + } + defer sub.Unsubscribe() + + msg, err := sub.NextMsg(1 * time.Hour) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + // Watch object + var cfg config + if err := json.Unmarshal(msg.Data, &cfg); err != nil { + t.Fatalf("Error unmarshalling message: %v", err) + } + os, err := js.ObjectStore(cfg.Bucket) + if err != nil { + t.Fatalf("Error getting object store: %v", err) + } + watcher, err := os.Watch() + if err != nil { + t.Fatalf("Error getting watcher: %v", err) + } + var digests []string + var info *nats.ObjectInfo + + // get the initial value + select { + case info = <-watcher.Updates(): + digests = append(digests, info.Digest) + case <-time.After(30 * time.Second): + t.Fatalf("Timeout waiting for object update") + } + + // init done, should receive nil + select { + case info = <-watcher.Updates(): + if info != nil { + t.Fatalf("Expected nil, got: %v", info) + } + case <-time.After(30 * time.Second): + t.Fatalf("Timeout waiting for object update") + } + + // get the updated value + select { + case info = <-watcher.Updates(): + digests = append(digests, info.Digest) + case <-time.After(30 * time.Second): + t.Fatalf("Timeout waiting for object update") + } + + if err := msg.Respond([]byte(strings.Join(digests, ","))); err != nil { + t.Fatalf("Error responding to message: %v", err) + } + validateTestResult(t, sub) +} + func TestCompatibilityObjectStoreWatchUpdates(t *testing.T) { t.Parallel()