Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement CreateOrUpdateStream function #1395

Merged
merged 11 commits into from
Oct 7, 2023
14 changes: 14 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type (
CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
// UpdateStream updates an existing stream
UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
// CreateOrUpdateStream creates a stream with given config. If stream already exists, it will be updated (if possible).
CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
// Stream returns a [Stream] hook for a given stream name
Stream(ctx context.Context, stream string) (Stream, error)
// StreamNameBySubject returns a stream name stream listening on given subject
Expand Down Expand Up @@ -420,6 +422,18 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
}, nil
}

func (js *jetStream) CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
samanebi marked this conversation as resolved.
Show resolved Hide resolved
s, err := js.UpdateStream(ctx, cfg)
if err != nil {
if !errors.Is(err, ErrStreamNotFound) {
return nil, err
}
return js.CreateStream(ctx, cfg)
}

return s, nil
}

// Stream returns a [Stream] hook for a given stream name
func (js *jetStream) Stream(ctx context.Context, name string) (Stream, error) {
if err := validateStreamName(name); err != nil {
Expand Down
130 changes: 130 additions & 0 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,136 @@ func TestCreateStreamMirrorCrossDomains(t *testing.T) {
}
}

func TestCreateOrUpdateStream(t *testing.T) {
tests := []struct {
name string
stream string
subject string
timeout time.Duration
withError error
withInfoCheck bool
}{
{
name: "create stream ok",
stream: "foo",
timeout: 10 * time.Second,
subject: "FOO.1",
withInfoCheck: false,
},
{
name: "create stream empty context",
stream: "foo-o",
subject: "FOO.12",
withInfoCheck: false,
},
{
name: "create stream invalid stream name",
stream: "foo.123",
subject: "FOO-123",
timeout: 10 * time.Second,
withError: jetstream.ErrInvalidStreamName,
withInfoCheck: false,
},
{
name: "create stream stream name required",
stream: "",
subject: "FOO-1234",
timeout: 10 * time.Second,
withError: jetstream.ErrStreamNameRequired,
withInfoCheck: false,
},
{
name: "update stream ok",
stream: "foo",
subject: "BAR-123",
timeout: 10 * time.Second,
withInfoCheck: true,
},
{
name: "create stream context timeout",
stream: "foo",
subject: "BAR-1234",
timeout: 1 * time.Microsecond,
withError: context.DeadlineExceeded,
withInfoCheck: false,
},
{
name: "update stream with empty context",
stream: "sample-foo-1",
subject: "SAMPLE-FOO-123",
withInfoCheck: true,
},
{
name: "update stream invalid stream name",
stream: "sample-foo.123",
subject: "SAMPLE-FOO-1234",
timeout: 10 * time.Second,
withError: jetstream.ErrInvalidStreamName,
withInfoCheck: true,
},
{
name: "update stream stream name required",
stream: "",
subject: "SAMPLE-FOO-123",
timeout: 10 * time.Second,
withError: jetstream.ErrStreamNameRequired,
withInfoCheck: true,
},
{
name: "update stream context timeout",
stream: "sample-foo-2",
subject: "SAMPLE-FOO-123456",
timeout: 1 * time.Microsecond,
withError: context.DeadlineExceeded,
withInfoCheck: true,
},
}

srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
if test.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), test.timeout)
defer cancel()
}
s, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}})
if test.withError != nil {
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: %v; got: %v", test.withError, err)
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if test.withInfoCheck {
info, err := s.Info(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(info.Config.Subjects) != 1 || info.Config.Subjects[0] != test.subject {
t.Fatalf("Invalid stream subjects after update: %v", info.Config.Subjects)
}
}
})
}
}

func TestUpdateStream(t *testing.T) {
tests := []struct {
name string
Expand Down