From ca096ed3d441f2d4b30dc70c6a6cd73104df32be Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 30 Jan 2024 11:40:10 +0100 Subject: [PATCH 1/3] [IMPROVED] KeyValue documentation Signed-off-by: Piotr Piotrowski --- jetstream/README.md | 201 ++++++++++++++++++++++++++++- jetstream/kv.go | 273 +++++++++++++++++++++++++++++++--------- jetstream/kv_options.go | 7 +- 3 files changed, 412 insertions(+), 69 deletions(-) diff --git a/jetstream/README.md b/jetstream/README.md index 0e4df7d09..32a3343f8 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -25,6 +25,11 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client. - [Publishing on stream](#publishing-on-stream) - [Synchronous publish](#synchronous-publish) - [Async publish](#async-publish) + - [KeyValue Store](#keyvalue-store) + - [Basic usage of KV bucket](#basic-usage-of-kv-bucket) + - [Watching for changes on a bucket](#watching-for-changes-on-a-bucket) + - [Additional operations on a bucket](#additional-operations-on-a-bucket) + - [Examples](#examples) ## Overview @@ -53,11 +58,12 @@ JetStream API. Key differences between `jetstream` and `nats` packages include: - `Msg` - used for message-specific operations - reading data, headers and metadata, as well as performing various types of acknowledgements -> __NOTE__: `jetstream` requires nats-server >= 2.9.0 to work correctly. +Additionally, `jetstream` exposes [KeyValue Store](#keyvalue-store) and +[ObjectStore](#object-store) capabilities. KV and Object stores are abstraction +layers on top of JetStream Streams, simplifying key value and large data +storage on Streams. -> __WARNING__: The new API is currently provided as a _preview_, and will -> deprecate previous JetStream subscribe APIs. It is encouraged to start -experimenting with the new APIs as soon as possible. +> __NOTE__: `jetstream` requires nats-server >= 2.9.0 to work correctly. ## Basic usage @@ -603,6 +609,193 @@ ackF, err = js.PublishAsync("ORDERS.new", []byte("hello")) Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept options for setting headers. +## KeyValue Store + +JetStream KeyValue Stores offer a straightforward method for storing key-value +pairs within JetStream. These stores are supported by a specially configured +stream, designed to efficiently and compactly store these pairs. This structure +ensures rapid and convenient access to the data. + +The KV Store, also known as a bucket, enables the execution of various operations: + +- create/update a value for a given key +- get a value for a given key +- delete a value for a given key +- purge all values from a bucket +- list all keys in a bucket +- watch for changes on given key set or the whole bucket +- retrieve history of changes for a given key + +### Basic usage of KV bucket + +The most basic usage of KV bucket is to create or retrieve a bucket and perform +basic CRUD operations on keys. + +```go +js, _ := jetstream.New(nc) +ctx := context.Background() + +// Create a new bucket. Bucket name is required and has to be unique within a stream. +kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) + +// Set a value for a given key +// Put will either create or update a value for a given key +kv.Put(ctx, "sue.color", []byte("blue")) + +// Get an entry for a given key +// Entry contains key/value, but also metadata (revision, timestamp, etc.)) +entry, _ := kv.Get(ctx, "sue.color") + +// Prints `sue.color @ 1 -> "blue"` +fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value())) + +// Update a value for a given key +// Update will fail if the key does not exist or the revision is not up to date +kv.Update(ctx, "sue.color", []byte("red"), 1) + +// Create will fail if the key already exists +_, err := kv.Create(ctx, "sue.color", []byte("purple")) +fmt.Println(err) // prints `nats: key exists` + +// Delete a value for a given key +kv.Delete(ctx, "sue.color") + +// getting a deleted key will return an error +_, err = kv.Get(ctx, "sue.color") +fmt.Println(err) // prints `nats: key not found` + +// A bucket can be deleted once it is no longer needed +js.DeleteKeyValue(ctx, "profiles") +``` + +### Watching for changes on a bucket + +KV buckets support Watchers, which can be used to watch for changes on a given +key or the whole bucket. Watcher will receive a notification on a channel when a +change occurs. By default, watcher will return initial values for all matching +keys. After sending all initial values, watcher will send nil on the channel to +signal that all initial values have been sent and it will start sending updates when +changes occur. + +Watcher supports several configuration options: + +- `IncludeHistory` will have the key watcher send all historical values +for each key (up to KeyValueMaxHistory). +- `IgnoreDeletes` will have the key watcher not pass any keys with +delete markers. +- `UpdatesOnly` will have the key watcher only pass updates on values +(without latest values when started). +- `MetaOnly` will have the key watcher retrieve only the entry meta +data, not the entry value. +- `ResumeFromRevision` instructs the key watcher to resume from a +specific revision number. + +```go +js, _ := jetstream.New(nc) +ctx := context.Background() +kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) + +kv.Put(ctx, "sue.color", []byte("blue")) + +// A watcher can be created to watch for changes on a given key or the whole bucket +// Watcher will receive a notification on a channel when a change occurs +// By default, watcher will return initial values for all matching keys. +// Watcher can be configured to only return updates by using jetstream.UpdatesOnly() option. +watcher, _ := kv.Watch(ctx, "sue.*") +defer watcher.Stop() + +kv.Put(ctx, "sue.age", []byte("43")) +kv.Put(ctx, "sue.color", []byte("red")) + +// First, the watcher sends initial values for all matching keys +// In this case, it will send a single entry for `sue.color`. +entry := <-watcher.Updates() +// Prints `sue.color @ 1 -> "blue"` +fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value())) + +// After all initial values have been sent, watcher will send nil on the channel. +entry = <-watcher.Updates() +if entry != nil { + fmt.Println("Unexpected entry received") +} + +// After that, watcher will send updates when changes occur +// In this case, it will send an entry for `sue.color` and `sue.age`. + +entry = <-watcher.Updates() +// Prints `sue.age @ 2 -> "43"` +fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value())) + +entry = <-watcher.Updates() +// Prints `sue.color @ 3 -> "red"` +fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value())) +``` + +### Additional operations on a bucket + +In addition to basic CRUD operations and watching for changes, KV buckets +support several additional operations: + +- `ListKeys` will return all keys in a bucket" + +```go +js, _ := jetstream.New(nc) +ctx := context.Background() +kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) + +kv.Put(ctx, "sue.color", []byte("blue")) +kv.Put(ctx, "sue.age", []byte("43")) +kv.Put(ctx, "bucket", []byte("profiles")) + +keys, _ := kv.ListKeys(ctx) + +// Prints all 3 keys +for key := range keys.Keys() { + fmt.Println(key) +} +``` + +- `Purge` and `PurgeDeletes` for removing all keys from a bucket + +```go +js, _ := jetstream.New(nc) +ctx := context.Background() +kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) + +kv.Put(ctx, "sue.color", []byte("blue")) +kv.Put(ctx, "sue.age", []byte("43")) +kv.Put(ctx, "bucket", []byte("profiles")) + +// Purge will remove all keys from a bucket +// The latest revision of each key will be kept +// with a delete marker, all previous revisions will be removed +kv.Purge(ctx) + +// PurgeDeletes will remove all keys from a bucket +// with a delete marker. +kv.PurgeDeletes(ctx) +``` + +- `Status` will return the current status of a bucket + +```go +js, _ := jetstream.New(nc) +ctx := context.Background() +kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) + +kv.Put(ctx, "sue.color", []byte("blue")) +kv.Put(ctx, "sue.age", []byte("43")) +kv.Put(ctx, "bucket", []byte("profiles")) + +status, _ := kv.Status(ctx) + +fmt.Println(status.Bucket()) // prints `profiles` +fmt.Println(status.Values()) // prints `3` +fmt.Println(status.Bytes()) // prints the size of all values in bytes +``` + +## Object Store + ## Examples You can find more examples of `jetstream` usage [here](https://github.com/nats-io/nats.go/tree/main/examples/jetstream). diff --git a/jetstream/kv.go b/jetstream/kv.go index 160dd1a6a..455f8732e 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -27,149 +27,287 @@ import ( "github.com/nats-io/nats.go/internal/parser" ) -// Public interfaces and structs type ( - // KeyValueManager is used to manage KeyValue stores. + // KeyValueManager is used to manage KeyValue stores. It provides methods to + // create, delete, and retrieve KeyValue stores. KeyValueManager interface { - // KeyValue will lookup and bind to an existing KeyValue store. + // KeyValue will lookup and bind to an existing KeyValue store. If the + // KeyValue store with given name does not exist, ErrBucketNotFound will + // be returned. KeyValue(ctx context.Context, bucket string) (KeyValue, error) - // CreateKeyValue will create a KeyValue store with the following configuration. + + // CreateKeyValue will create a KeyValue store with the given + // configuration. CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) - // DeleteKeyValue will delete this KeyValue store (JetStream stream). + + // DeleteKeyValue will delete this KeyValue store. DeleteKeyValue(ctx context.Context, bucket string) error - // KeyValueStoreNames is used to retrieve a list of key value store names + + // KeyValueStoreNames is used to retrieve a list of key value store + // names. It returns a KeyValueNamesLister exposing a channel to read + // the names from. The lister will always close the channel when done + // (either all names have been read or an error occurred) and therefore + // can be used in range loops. KeyValueStoreNames(ctx context.Context) KeyValueNamesLister - // KeyValueStores is used to retrieve a list of key value store statuses + + // KeyValueStores is used to retrieve a list of key value store + // statuses. It returns a KeyValueLister exposing a channel to read the + // statuses from. The lister will always close the channel when done + // (either all statuses have been read or an error occurred) and + // therefore can be used in range loops. KeyValueStores(ctx context.Context) KeyValueLister } // KeyValue contains methods to operate on a KeyValue store. + // Using the KeyValue interface, it is possible to: + // + // - Get, Put, Create, Update, Delete and Purge a key + // - Watch for updates to keys + // - List all keys + // - Retrieve historical values for a key + // - Retrieve status and configuration of a key value bucket + // - Purge all delete markers + // - Close the KeyValue store KeyValue interface { - // Get returns the latest value for the key. + // Get returns the latest value for the key. If the key does not exist, + // ErrKeyNotFound will be returned. Get(ctx context.Context, key string) (KeyValueEntry, error) - // GetRevision returns a specific revision value for the key. + + // GetRevision returns a specific revision value for the key. If the key + // does not exist, ErrKeyNotFound will be returned. GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) - // Put will place the new value for the key into the store. + + // Put will place the new value for the key into the store. If the key + // does not exist, it will be created. If the key exists, the value will + // be updated. + // + // A key has to consist of alphanumeric characters, dashes, underscores, + // equal signs, and dots. Put(ctx context.Context, key string, value []byte) (uint64, error) - // PutString will place the string for the key into the store. + + // PutString will place the string for the key into the store. If the + // key does not exist, it will be created. If the key exists, the value + // will be updated. + // + // A key has to consist of alphanumeric characters, dashes, underscores, + // equal signs, and dots. PutString(ctx context.Context, key string, value string) (uint64, error) - // Create will add the key/value pair if it does not exist. + + // Create will add the key/value pair if it does not exist. If the key + // already exists, ErrKeyExists will be returned. + // + // A key has to consist of alphanumeric characters, dashes, underscores, + // equal signs, and dots. Create(ctx context.Context, key string, value []byte) (uint64, error) + // Update will update the value if the latest revision matches. + // If the provided revision is not the latest, Update will return an error. Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error) - // Delete will place a delete marker and leave all revisions. + + // Delete will place a delete marker and leave all revisions. A history + // of a deleted key can still be retrieved by using the History method + // or a watch on the key. [LastRevision] option can be specified to only + // perform delete if the latest revision the provided one. Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error + // Purge will place a delete marker and remove all previous revisions. + // Only the latest revision will be preserved (with a delete marker). + // [LastRevision] option can be specified to only perform purge if the + // latest revision the provided one. Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error - // Watch for any updates to keys that match the keys argument which could include wildcards. - // Watch will send a nil entry when it has received all initial values. + + // Watch for any updates to keys that match the keys argument which + // could include wildcards. By default, the watcher will send the latest + // value for each key and all future updates. Watch will send a nil + // entry when it has received all initial values. There are a few ways + // to configure the watcher: + // + // - IncludeHistory will have the key watcher send all historical values + // for each key (up to KeyValueMaxHistory). + // - IgnoreDeletes will have the key watcher not pass any keys with + // delete markers. + // - UpdatesOnly will have the key watcher only pass updates on values + // (without latest values when started). + // - MetaOnly will have the key watcher retrieve only the entry meta + // data, not the entry value. + // - ResumeFromRevision instructs the key watcher to resume from a + // specific revision number. Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) - // WatchAll will invoke the callback for all updates. + + // WatchAll will watch for any updates to all keys. It can be configured + // with the same options as Watch. WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) - // Keys will return all keys. - // DEPRECATED: Use ListKeys instead to avoid memory issues. + + // Keys will return all keys. DEPRECATED: Use ListKeys instead to avoid + // memory issues. Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) - // ListKeys will return all keys in a channel. + + // ListKeys will return KeyLister, allowing to retrieve all keys from + // the key value store in a streaming fashion (on a channel). ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) - // History will return all historical values for the key. + + // History will return all historical values for the key (up to + // KeyValueMaxHistory). History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) - // Bucket returns the current bucket name. + + // Bucket returns the KV store name. Bucket() string - // PurgeDeletes will remove all current delete markers. + + // PurgeDeletes will remove all current delete markers. It can be + // configured using DeleteMarkersOlderThan option to only remove delete + // markers older than a certain duration. PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error + // Status retrieves the status and configuration of a bucket Status(ctx context.Context) (KeyValueStatus, error) } - // KeyLister is used to retrieve a list of key value store keys - KeyLister interface { - Keys() <-chan string - Stop() error - } - - // KeyValueConfig is for configuring a KeyValue store. + // KeyValueConfig is the configuration for a KeyValue store. KeyValueConfig struct { - Bucket string - Description string + // Bucket is the name of the KeyValue store. Bucket name has to be + // unique and can only contain alphanumeric characters, dashes, and + // underscores. + Bucket string + + // Description is an optional description for the KeyValue store. + Description string + + // MaxValueSize is the maximum size of a value in bytes. If not + // specified, the default is -1 (unlimited). MaxValueSize int32 - History uint8 - TTL time.Duration - MaxBytes int64 - Storage StorageType - Replicas int - Placement *Placement - RePublish *RePublish - Mirror *StreamSource - Sources []*StreamSource - - // Enable underlying stream compression. + + // History is the number of historical values to keep per key. If not + // specified, the default is 1. Max is 64. + History uint8 + + // TTL is the expiry time for keys. By default, keys do not expire. + TTL time.Duration + + // MaxBytes is the maximum size in bytes of the KeyValue store. If not + // specified, the default is -1 (unlimited). + MaxBytes int64 + + // Storage is the type of storage to use for the KeyValue store. If not + // specified, the default is FileStorage. + Storage StorageType + + // Replicas is the number of replicas to keep for the KeyValue store in + // clustered jetstream. Defaults to 1, maximum is 5. + Replicas int + + // Placement is used to declare where the stream should be placed via + // tags and/or an explicit cluster name. + Placement *Placement + + // RePublish allows immediate republishing a message to the configured + // subject after it's stored. + RePublish *RePublish + + // Mirror defines the consiguration for mirroring another KeyValue + // store. + Mirror *StreamSource + + // Sources defines the configuration for sources of a KeyValue store. + Sources []*StreamSource + + // Compression sets the underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ Compression bool } + // KeyLister is used to retrieve a list of key value store keys. It returns + // a channel to read the keys from. The lister will always close the channel + // when done (either all keys have been read or an error occurred) and + // therefore can be used in range loops. Stop can be used to stop the lister + // when not all keys have been read. + KeyLister interface { + Keys() <-chan string + Stop() error + } + + // KeyValueLister is used to retrieve a list of key value stores. It returns + // a channel to read the KV store statuses from. The lister will always + // close the channel when done (either all stores have been retrieved or an + // error occurred) and therefore can be used in range loops. Stop can be + // used to stop the lister when not all KeyValue stores have been read. KeyValueLister interface { Status() <-chan KeyValueStatus Error() error } + // KeyValueNamesLister is used to retrieve a list of key value store names. + // It returns a channel to read the KV bucket names from. The lister will + // always close the channel when done (either all stores have been retrieved + // or an error occurred) and therefore can be used in range loops. Stop can + // be used to stop the lister when not all bucket names have been read. KeyValueNamesLister interface { Name() <-chan string Error() error } - // KeyValueStatus is run-time status about a Key-Value bucket + // KeyValueStatus is run-time status about a Key-Value bucket. KeyValueStatus interface { - // Bucket the name of the bucket + // Bucket returns the name of the KeyValue store. Bucket() string - // Values is how many messages are in the bucket, including historical values + // Values is how many messages are in the bucket, including historical values. Values() uint64 - // History returns the configured history kept per key + // History returns the configured history kept per key. History() int64 - // TTL is how long the bucket keeps values for + // TTL returns the duration for which keys are kept in the bucket. TTL() time.Duration - // BackingStore indicates what technology is used for storage of the bucket + // BackingStore indicates what technology is used for storage of the bucket. + // Currently only JetStream is supported. BackingStore() string - // Bytes returns the size in bytes of the bucket + // Bytes returns the size of the bucket in bytes. Bytes() uint64 - // IsCompressed indicates if the data is compressed on disk + // IsCompressed indicates if the data is compressed on disk. IsCompressed() bool } - // KeyWatcher is what is returned when doing a watch. + // KeyWatcher is what is returned when doing a watch. It can be used to + // retrieve updates to keys. If not using UpdatesOnly option, it will also + // send the latest value for each key. After all initial values have been + // sent, a nil entry will be sent. Stop can be used to stop the watcher and + // close the underlying channel. Watcher will not close the channel until + // Stop is called or connection is closed. KeyWatcher interface { - // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry - // Stop will stop this watcher. Stop() error } - // KeyValueEntry is a retrieved entry for Get or List or Watch. + // KeyValueEntry is a retrieved entry for Get, List or Watch. KeyValueEntry interface { // Bucket is the bucket the data was loaded from. Bucket() string - // Key is the key that was retrieved. + + // Key is the name of the key that was retrieved. Key() string + // Value is the retrieved value. Value() []byte + // Revision is a unique sequence for this value. Revision() uint64 + // Created is the time the data was put in the bucket. Created() time.Time - // Delta is distance from the latest value. + + // Delta is distance from the latest value (how far the current sequence + // is from the latest). Delta() uint64 - // Operation returns Put or Delete or Purge. + + // Operation returns Put or Delete or Purge, depending on the manner in + // which the current revision was created. Operation() KeyValueOp } ) -// Option types - type ( WatchOpt interface { configureWatcher(opts *watchOpts) error @@ -188,6 +326,7 @@ type ( resumeFromRevision uint64 } + // KVDeleteOpt is used to configure delete and purge operations. KVDeleteOpt interface { configureDelete(opts *deleteOpts) error } @@ -200,6 +339,7 @@ type ( revision uint64 } + // KVPurgeOpt is used to configure PurgeDeletes. KVPurgeOpt interface { configurePurge(opts *purgeOpts) error } @@ -226,13 +366,22 @@ type kvs struct { useDirect bool } -// KeyValueOp represents the type of KV operation (Put, Delete, Purge) -// Returned as part of watcher entry. +// KeyValueOp represents the type of KV operation (Put, Delete, Purge). It is a +// part of KeyValueEntry. type KeyValueOp uint8 +// Available KeyValueOp values. const ( + // KeyValuePut is a set on a revision which creates or updates a value for a + // key. KeyValuePut KeyValueOp = iota + + // KeyValueDelete is a set on a revision which adds a delete marker for a + // key. KeyValueDelete + + // KeyValuePurge is a set on a revision which removes all previous revisions + // for a key. KeyValuePurge ) diff --git a/jetstream/kv_options.go b/jetstream/kv_options.go index 30b5d9765..07a255727 100644 --- a/jetstream/kv_options.go +++ b/jetstream/kv_options.go @@ -25,7 +25,7 @@ func (opt watchOptFn) configureWatcher(opts *watchOpts) error { } // IncludeHistory instructs the key watcher to include historical values as -// well. +// well (up to KeyValueMaxHistory). func IncludeHistory() WatchOpt { return watchOptFn(func(opts *watchOpts) error { if opts.updatesOnly { @@ -75,7 +75,7 @@ func ResumeFromRevision(revision uint64) WatchOpt { } // DeleteMarkersOlderThan indicates that delete or purge markers older than that -// will be deleted as part of PurgeDeletes() operation, otherwise, only the data +// will be deleted as part of [KeyValue.PurgeDeletes] operation, otherwise, only the data // will be removed but markers that are recent will be kept. // Note that if no option is specified, the default is 30 minutes. You can set // this option to a negative value to instruct to always remove the markers, @@ -93,7 +93,8 @@ func (opt deleteOptFn) configureDelete(opts *deleteOpts) error { return opt(opts) } -// LastRevision deletes if the latest revision matches. +// LastRevision deletes if the latest revision matches the provided one. If the +// provided revision is not the latest, the delete will return an error. func LastRevision(revision uint64) KVDeleteOpt { return deleteOptFn(func(opts *deleteOpts) error { opts.revision = revision From cd51d91b6334bafb402b252b30f5ce3701c2f809 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 30 Jan 2024 12:26:44 +0100 Subject: [PATCH 2/3] Fix GetRevision doc Signed-off-by: Piotr Piotrowski --- jetstream/kv.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jetstream/kv.go b/jetstream/kv.go index 455f8732e..281c44068 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -74,7 +74,8 @@ type ( Get(ctx context.Context, key string) (KeyValueEntry, error) // GetRevision returns a specific revision value for the key. If the key - // does not exist, ErrKeyNotFound will be returned. + // does not exist or the provided revision does not exists, + // ErrKeyNotFound will be returned. GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) // Put will place the new value for the key into the store. If the key From 69daea8b160c3796d868ceaf919ccfcca7a1aa7e Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 31 Jan 2024 11:47:14 +0100 Subject: [PATCH 3/3] Apply review comments Signed-off-by: Piotr Piotrowski --- jetstream/README.md | 23 ++++++++++++----------- jetstream/kv.go | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/jetstream/README.md b/jetstream/README.md index 32a3343f8..dfb5cd87d 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -635,7 +635,7 @@ basic CRUD operations on keys. js, _ := jetstream.New(nc) ctx := context.Background() -// Create a new bucket. Bucket name is required and has to be unique within a stream. +// Create a new bucket. Bucket name is required and has to be unique within a JetStream account. kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) // Set a value for a given key @@ -650,14 +650,16 @@ entry, _ := kv.Get(ctx, "sue.color") fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value())) // Update a value for a given key -// Update will fail if the key does not exist or the revision is not up to date +// Update will fail if the key does not exist or the revision has changed kv.Update(ctx, "sue.color", []byte("red"), 1) // Create will fail if the key already exists _, err := kv.Create(ctx, "sue.color", []byte("purple")) fmt.Println(err) // prints `nats: key exists` -// Delete a value for a given key +// Delete a value for a given key. +// Delete is not destructive, it will add a delete marker for a given key +// and all previous revisions will still be available kv.Delete(ctx, "sue.color") // getting a deleted key will return an error @@ -684,9 +686,8 @@ for each key (up to KeyValueMaxHistory). - `IgnoreDeletes` will have the key watcher not pass any keys with delete markers. - `UpdatesOnly` will have the key watcher only pass updates on values -(without latest values when started). -- `MetaOnly` will have the key watcher retrieve only the entry meta -data, not the entry value. +(without values already present when starting). +- `MetaOnly` will have the key watcher retrieve only the entry metadata, not the entry value. - `ResumeFromRevision` instructs the key watcher to resume from a specific revision number. @@ -698,8 +699,7 @@ kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"}) kv.Put(ctx, "sue.color", []byte("blue")) // A watcher can be created to watch for changes on a given key or the whole bucket -// Watcher will receive a notification on a channel when a change occurs -// By default, watcher will return initial values for all matching keys. +// By default, watcher will return most recent values for all matching keys. // Watcher can be configured to only return updates by using jetstream.UpdatesOnly() option. watcher, _ := kv.Watch(ctx, "sue.*") defer watcher.Stop() @@ -707,13 +707,13 @@ defer watcher.Stop() kv.Put(ctx, "sue.age", []byte("43")) kv.Put(ctx, "sue.color", []byte("red")) -// First, the watcher sends initial values for all matching keys +// First, the watcher sends most recent values for all matching keys. // In this case, it will send a single entry for `sue.color`. entry := <-watcher.Updates() // Prints `sue.color @ 1 -> "blue"` fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value())) -// After all initial values have been sent, watcher will send nil on the channel. +// After all current values have been sent, watcher will send nil on the channel. entry = <-watcher.Updates() if entry != nil { fmt.Println("Unexpected entry received") @@ -766,9 +766,10 @@ kv.Put(ctx, "sue.color", []byte("blue")) kv.Put(ctx, "sue.age", []byte("43")) kv.Put(ctx, "bucket", []byte("profiles")) -// Purge will remove all keys from a bucket +// Purge will remove all keys from a bucket. // The latest revision of each key will be kept // with a delete marker, all previous revisions will be removed +// permanently. kv.Purge(ctx) // PurgeDeletes will remove all keys from a bucket diff --git a/jetstream/kv.go b/jetstream/kv.go index 281c44068..9371f6fb1 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -107,12 +107,18 @@ type ( // Delete will place a delete marker and leave all revisions. A history // of a deleted key can still be retrieved by using the History method - // or a watch on the key. [LastRevision] option can be specified to only - // perform delete if the latest revision the provided one. + // or a watch on the key. [Delete] is a non-destructive operation and + // will not remove any previous revisions from the underlying stream. + // + // [LastRevision] option can be specified to only perform delete if the + // latest revision the provided one. Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error // Purge will place a delete marker and remove all previous revisions. // Only the latest revision will be preserved (with a delete marker). + // Unlike [Delete], Purge is a destructive operation and will remove all + // previous revisions from the underlying streams. + // // [LastRevision] option can be specified to only perform purge if the // latest revision the provided one. Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error @@ -157,9 +163,12 @@ type ( // PurgeDeletes will remove all current delete markers. It can be // configured using DeleteMarkersOlderThan option to only remove delete // markers older than a certain duration. + // + // [PurgeDeletes] is a destructive operation and will remove all entries + // with delete markers from the underlying stream. PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error - // Status retrieves the status and configuration of a bucket + // Status retrieves the status and configuration of a bucket. Status(ctx context.Context) (KeyValueStatus, error) }