Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Add discard policy repair logic in CreateKeyValue #1617

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -488,8 +489,26 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
// errors are joined so that backwards compatibility is retained
// and previous checks for ErrStreamNameAlreadyInUse will still work.
err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err)

// If we have a failure to add, it could be because we have
// a config change if the KV was created against before a bug fix
// that changed the value of discard policy.
// We will check if the stream exists and if the only difference
// is the discard policy, we will update the stream.
// The same logic applies for KVs created pre 2.9.x and
// the AllowDirect setting.
if stream, _ = js.Stream(ctx, scfg.Name); stream != nil {
cfg := stream.CachedInfo().Config
cfg.Discard = scfg.Discard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just on the off chance an ancient bucket is accessed lets also set AllowDirect otherwise lgtm!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cfg.AllowDirect = scfg.AllowDirect
if reflect.DeepEqual(cfg, scfg) {
stream, err = js.UpdateStream(ctx, scfg)
}
}
}
if err != nil {
return nil, err
}
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
Expand Down
55 changes: 55 additions & 0 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,3 +1634,58 @@ func TestKeyValueCompression(t *testing.T) {
t.Fatalf("Expected stream to be compressed with S2")
}
}

func TestKeyValueCreateRepairOldKV(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// create a standard kv
_, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

// get stream config and set discard policy to old and AllowDirect to false
stream, err := js.Stream(ctx, "KV_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
streamCfg := stream.CachedInfo().Config
streamCfg.Discard = jetstream.DiscardOld
streamCfg.AllowDirect = false

// create a new kv with the same name - client should fix the config
_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

// get stream config again and check if the discard policy is set to new
stream, err = js.Stream(ctx, "KV_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
if stream.CachedInfo().Config.Discard != jetstream.DiscardNew {
t.Fatalf("Expected stream to have discard policy set to new")
}
if !stream.CachedInfo().Config.AllowDirect {
t.Fatalf("Expected stream to have AllowDirect set to true")
}

// attempting to create a new kv with the same name and different settings should fail
_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
Description: "New KV",
})
if !errors.Is(err, jetstream.ErrBucketExists) {
t.Fatalf("Expected error to be ErrBucketExists, got: %v", err)
}
}