Skip to content

Commit

Permalink
[IMPROVED] KeyValue documentation (#1537)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Feb 2, 2024
1 parent ad97d94 commit 7baddc7
Show file tree
Hide file tree
Showing 3 changed files with 424 additions and 70 deletions.
202 changes: 198 additions & 4 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client.
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Examples](#examples)

## Overview

Expand Down Expand Up @@ -53,11 +58,12 @@ JetStream API. Key differences between `jetstream` and `nats` packages include:
- `Msg` - used for message-specific operations - reading data, headers and
metadata, as well as performing various types of acknowledgements

> __NOTE__: `jetstream` requires nats-server >= 2.9.0 to work correctly.
Additionally, `jetstream` exposes [KeyValue Store](#keyvalue-store) and
[ObjectStore](#object-store) capabilities. KV and Object stores are abstraction
layers on top of JetStream Streams, simplifying key value and large data
storage on Streams.

> __WARNING__: The new API is currently provided as a _preview_, and will
> deprecate previous JetStream subscribe APIs. It is encouraged to start
experimenting with the new APIs as soon as possible.
> __NOTE__: `jetstream` requires nats-server >= 2.9.0 to work correctly.
## Basic usage

Expand Down Expand Up @@ -603,6 +609,194 @@ ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))
Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept
options for setting headers.

## KeyValue Store

JetStream KeyValue Stores offer a straightforward method for storing key-value
pairs within JetStream. These stores are supported by a specially configured
stream, designed to efficiently and compactly store these pairs. This structure
ensures rapid and convenient access to the data.

The KV Store, also known as a bucket, enables the execution of various operations:

- create/update a value for a given key
- get a value for a given key
- delete a value for a given key
- purge all values from a bucket
- list all keys in a bucket
- watch for changes on given key set or the whole bucket
- retrieve history of changes for a given key

### Basic usage of KV bucket

The most basic usage of KV bucket is to create or retrieve a bucket and perform
basic CRUD operations on keys.

```go
js, _ := jetstream.New(nc)
ctx := context.Background()

// Create a new bucket. Bucket name is required and has to be unique within a JetStream account.
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

// Set a value for a given key
// Put will either create or update a value for a given key
kv.Put(ctx, "sue.color", []byte("blue"))

// Get an entry for a given key
// Entry contains key/value, but also metadata (revision, timestamp, etc.))
entry, _ := kv.Get(ctx, "sue.color")

// Prints `sue.color @ 1 -> "blue"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// Update a value for a given key
// Update will fail if the key does not exist or the revision has changed
kv.Update(ctx, "sue.color", []byte("red"), 1)

// Create will fail if the key already exists
_, err := kv.Create(ctx, "sue.color", []byte("purple"))
fmt.Println(err) // prints `nats: key exists`

// Delete a value for a given key.
// Delete is not destructive, it will add a delete marker for a given key
// and all previous revisions will still be available
kv.Delete(ctx, "sue.color")

// getting a deleted key will return an error
_, err = kv.Get(ctx, "sue.color")
fmt.Println(err) // prints `nats: key not found`

// A bucket can be deleted once it is no longer needed
js.DeleteKeyValue(ctx, "profiles")
```

### Watching for changes on a bucket

KV buckets support Watchers, which can be used to watch for changes on a given
key or the whole bucket. Watcher will receive a notification on a channel when a
change occurs. By default, watcher will return initial values for all matching
keys. After sending all initial values, watcher will send nil on the channel to
signal that all initial values have been sent and it will start sending updates when
changes occur.

Watcher supports several configuration options:

- `IncludeHistory` will have the key watcher send all historical values
for each key (up to KeyValueMaxHistory).
- `IgnoreDeletes` will have the key watcher not pass any keys with
delete markers.
- `UpdatesOnly` will have the key watcher only pass updates on values
(without values already present when starting).
- `MetaOnly` will have the key watcher retrieve only the entry metadata, not the entry value.
- `ResumeFromRevision` instructs the key watcher to resume from a
specific revision number.

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))

// A watcher can be created to watch for changes on a given key or the whole bucket
// By default, watcher will return most recent values for all matching keys.
// Watcher can be configured to only return updates by using jetstream.UpdatesOnly() option.
watcher, _ := kv.Watch(ctx, "sue.*")
defer watcher.Stop()

kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "sue.color", []byte("red"))

// First, the watcher sends most recent values for all matching keys.
// In this case, it will send a single entry for `sue.color`.
entry := <-watcher.Updates()
// Prints `sue.color @ 1 -> "blue"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// After all current values have been sent, watcher will send nil on the channel.
entry = <-watcher.Updates()
if entry != nil {
fmt.Println("Unexpected entry received")
}

// After that, watcher will send updates when changes occur
// In this case, it will send an entry for `sue.color` and `sue.age`.

entry = <-watcher.Updates()
// Prints `sue.age @ 2 -> "43"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

entry = <-watcher.Updates()
// Prints `sue.color @ 3 -> "red"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
```

### Additional operations on a bucket

In addition to basic CRUD operations and watching for changes, KV buckets
support several additional operations:

- `ListKeys` will return all keys in a bucket"

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

keys, _ := kv.ListKeys(ctx)

// Prints all 3 keys
for key := range keys.Keys() {
fmt.Println(key)
}
```

- `Purge` and `PurgeDeletes` for removing all keys from a bucket

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

// Purge will remove all keys from a bucket.
// The latest revision of each key will be kept
// with a delete marker, all previous revisions will be removed
// permanently.
kv.Purge(ctx)

// PurgeDeletes will remove all keys from a bucket
// with a delete marker.
kv.PurgeDeletes(ctx)
```

- `Status` will return the current status of a bucket

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

status, _ := kv.Status(ctx)

fmt.Println(status.Bucket()) // prints `profiles`
fmt.Println(status.Values()) // prints `3`
fmt.Println(status.Bytes()) // prints the size of all values in bytes
```

## Object Store

## Examples

You can find more examples of `jetstream` usage [here](https://github.com/nats-io/nats.go/tree/main/examples/jetstream).
Loading

0 comments on commit 7baddc7

Please sign in to comment.