diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 582ee1916..eadd3e69c 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -368,9 +368,6 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, ErrStreamSourceNotSupported } for i := range cfg.Sources { - if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" { - return nil, ErrStreamSourceSubjectTransformNotSupported - } if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported } @@ -450,9 +447,6 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, ErrStreamSourceNotSupported } for i := range cfg.Sources { - if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" { - return nil, ErrStreamSourceSubjectTransformNotSupported - } if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported } diff --git a/jetstream/kv.go b/jetstream/kv.go index 3546f0176..c16c57146 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -351,12 +351,20 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke } else if len(cfg.Sources) > 0 { // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. for _, ss := range cfg.Sources { - if !strings.HasPrefix(ss.Name, kvBucketNamePre) { - ss = ss.copy() + var sourceBucketName string + if strings.HasPrefix(ss.Name, kvBucketNamePre) { + sourceBucketName = ss.Name[len(kvBucketNamePre):] + } else { + sourceBucketName = ss.Name ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name) } + + if ss.External == nil || sourceBucketName != cfg.Bucket { + ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}} + } scfg.Sources = append(scfg.Sources, ss) } + scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } else { scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } diff --git a/jetstream/stream_config.go b/jetstream/stream_config.go index 575e50604..225db0403 100644 --- a/jetstream/stream_config.go +++ b/jetstream/stream_config.go @@ -79,12 +79,11 @@ type ( // StreamSourceInfo shows information about an upstream stream source. StreamSourceInfo struct { - Name string `json:"name"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"subject_transform_dest,omitempty"` - SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + Name string `json:"name"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamState is information about the given stream. @@ -142,14 +141,13 @@ type ( // StreamSource dictates how streams can source from other streams. StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"subject_transform_dest,omitempty"` - SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` - External *ExternalStream `json:"external,omitempty"` - Domain string `json:"-"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index 7794e6e46..67a3357eb 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -1607,3 +1607,46 @@ func TestStreamNameBySubject(t *testing.T) { }) } } + +func TestJetStreamTransform(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + nc, js := jsClient(t, s) + defer nc.Close() + + ctx := context.Background() + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + Storage: jetstream.MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + err = nc.Publish("test", []byte("1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sourcingStream, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Subjects: []string{}, + Name: "SOURCING", + Sources: []*jetstream.StreamSource{{Name: "ORIGIN", SubjectTransforms: []jetstream.SubjectTransformConfig{{Source: ">", Destination: "fromtest.>"}}}}, + Storage: jetstream.MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + cons, err := sourcingStream.CreateConsumer(ctx, jetstream.ConsumerConfig{FilterSubject: "fromtest.>", MemoryStorage: true}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + m, err := cons.Next() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if m.Subject() != "fromtest.transformed.test" { + t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject()) + } +}