Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Remove SubjectTransformDest in new API #1404

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,6 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
Expand Down Expand Up @@ -450,9 +447,6 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
Expand Down
12 changes: 10 additions & 2 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,20 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
} else {
sourceBucketName = ss.Name
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}

if ss.External == nil || sourceBucketName != cfg.Bucket {
ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
}
scfg.Sources = append(scfg.Sources, ss)
}
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}
Expand Down
26 changes: 12 additions & 14 deletions jetstream/stream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ type (

// StreamSourceInfo shows information about an upstream stream source.
StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamState is information about the given stream.
Expand Down Expand Up @@ -142,14 +141,13 @@ type (

// StreamSource dictates how streams can source from other streams.
StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
Expand Down
43 changes: 43 additions & 0 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,3 +1607,46 @@ func TestStreamNameBySubject(t *testing.T) {
})
}
}

func TestJetStreamTransform(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
nc, js := jsClient(t, s)
defer nc.Close()

ctx := context.Background()
_, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORIGIN",
Subjects: []string{"test"},
SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
Storage: jetstream.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
err = nc.Publish("test", []byte("1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sourcingStream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Subjects: []string{},
Name: "SOURCING",
Sources: []*jetstream.StreamSource{{Name: "ORIGIN", SubjectTransforms: []jetstream.SubjectTransformConfig{{Source: ">", Destination: "fromtest.>"}}}},
Storage: jetstream.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

cons, err := sourcingStream.CreateConsumer(ctx, jetstream.ConsumerConfig{FilterSubject: "fromtest.>", MemoryStorage: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
m, err := cons.Next()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m.Subject() != "fromtest.transformed.test" {
t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject())
}
}