Skip to content

Commit

Permalink
[CHANGED] Remove SubjectTransformDest in new API (#1404)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Sep 20, 2023
1 parent 8ce44e0 commit a95ece8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 22 deletions.
6 changes: 0 additions & 6 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,6 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
Expand Down Expand Up @@ -450,9 +447,6 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
Expand Down
12 changes: 10 additions & 2 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,20 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
} else {
sourceBucketName = ss.Name
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}

if ss.External == nil || sourceBucketName != cfg.Bucket {
ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
}
scfg.Sources = append(scfg.Sources, ss)
}
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}
Expand Down
26 changes: 12 additions & 14 deletions jetstream/stream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ type (

// StreamSourceInfo shows information about an upstream stream source.
StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamState is information about the given stream.
Expand Down Expand Up @@ -147,14 +146,13 @@ type (

// StreamSource dictates how streams can source from other streams.
StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
Expand Down
43 changes: 43 additions & 0 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,3 +1607,46 @@ func TestStreamNameBySubject(t *testing.T) {
})
}
}

func TestJetStreamTransform(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
nc, js := jsClient(t, s)
defer nc.Close()

ctx := context.Background()
_, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORIGIN",
Subjects: []string{"test"},
SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
Storage: jetstream.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
err = nc.Publish("test", []byte("1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sourcingStream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Subjects: []string{},
Name: "SOURCING",
Sources: []*jetstream.StreamSource{{Name: "ORIGIN", SubjectTransforms: []jetstream.SubjectTransformConfig{{Source: ">", Destination: "fromtest.>"}}}},
Storage: jetstream.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

cons, err := sourcingStream.CreateConsumer(ctx, jetstream.ConsumerConfig{FilterSubject: "fromtest.>", MemoryStorage: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
m, err := cons.Next()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m.Subject() != "fromtest.transformed.test" {
t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject())
}
}

0 comments on commit a95ece8

Please sign in to comment.