From cf560ca2f70c53d6c9c625e8a4c6fb13c192e640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johanna=20=C3=96jeling?= <51084516+johannaojeling@users.noreply.github.com> Date: Tue, 7 Nov 2023 00:02:13 +0100 Subject: [PATCH] [Go SDK]: Create natsio.Write transform for writing to NATS (#29184) * Create natsio.Write transform for writing to NATS * Emit representation of acknowledged message from writeFn * Use type map[string][]string for ProduceMessage headers --- sdks/go.mod | 9 +- sdks/go.sum | 18 +- sdks/go/pkg/beam/io/natsio/common.go | 58 +++++ sdks/go/pkg/beam/io/natsio/example_test.go | 55 +++++ sdks/go/pkg/beam/io/natsio/helper_test.go | 130 +++++++++++ sdks/go/pkg/beam/io/natsio/write.go | 114 ++++++++++ sdks/go/pkg/beam/io/natsio/write_option.go | 31 +++ sdks/go/pkg/beam/io/natsio/write_test.go | 252 +++++++++++++++++++++ 8 files changed, 664 insertions(+), 3 deletions(-) create mode 100644 sdks/go/pkg/beam/io/natsio/common.go create mode 100644 sdks/go/pkg/beam/io/natsio/example_test.go create mode 100644 sdks/go/pkg/beam/io/natsio/helper_test.go create mode 100644 sdks/go/pkg/beam/io/natsio/write.go create mode 100644 sdks/go/pkg/beam/io/natsio/write_option.go create mode 100644 sdks/go/pkg/beam/io/natsio/write_test.go diff --git a/sdks/go.mod b/sdks/go.mod index c3c1c4593f82..6f64472eb8a2 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -45,6 +45,8 @@ require ( github.com/johannesboyne/gofakes3 v0.0.0-20221110173912-32fb85c5aed6 github.com/lib/pq v1.10.9 github.com/linkedin/goavro/v2 v2.12.0 + github.com/nats-io/nats-server/v2 v2.10.4 + github.com/nats-io/nats.go v1.31.0 github.com/proullon/ramsql v0.1.3 github.com/spf13/cobra v1.8.0 github.com/testcontainers/testcontainers-go v0.25.0 @@ -76,12 +78,17 @@ require ( github.com/Microsoft/hcsshim v0.11.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.5.2 // indirect + github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/shirou/gopsutil/v3 v3.23.8 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect + golang.org/x/time v0.3.0 // indirect ) require ( @@ -138,7 +145,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kr/text v0.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect diff --git a/sdks/go.sum b/sdks/go.sum index 875ba55d8596..efffec5951a8 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -319,8 +319,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -343,6 +343,8 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/minio-go/v7 v7.0.63 h1:GbZ2oCvaUdgT5640WJOpyDhhDxvknAJU2/T3yurwcbQ= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= @@ -360,6 +362,16 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus= +github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= +github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= @@ -561,6 +573,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -619,6 +632,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/sdks/go/pkg/beam/io/natsio/common.go b/sdks/go/pkg/beam/io/natsio/common.go new file mode 100644 index 000000000000..53f595516987 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/common.go @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package natsio contains transforms for interacting with NATS. +package natsio + +import ( + "fmt" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type natsFn struct { + URI string + CredsFile string + nc *nats.Conn + js jetstream.JetStream +} + +func (fn *natsFn) Setup() error { + var opts []nats.Option + if fn.CredsFile != "" { + opts = append(opts, nats.UserCredentials(fn.CredsFile)) + } + + conn, err := nats.Connect(fn.URI, opts...) + if err != nil { + return fmt.Errorf("error connecting to NATS: %v", err) + } + fn.nc = conn + + js, err := jetstream.New(fn.nc) + if err != nil { + return fmt.Errorf("error creating JetStream context: %v", err) + } + fn.js = js + + return nil +} + +func (fn *natsFn) Teardown() { + if fn.nc != nil { + fn.nc.Close() + } +} diff --git a/sdks/go/pkg/beam/io/natsio/example_test.go b/sdks/go/pkg/beam/io/natsio/example_test.go new file mode 100644 index 000000000000..0516b8efa921 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/example_test.go @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio_test + +import ( + "context" + "log" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/natsio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/nats-io/nats.go" +) + +func ExampleWrite() { + beam.Init() + + p, s := beam.NewPipelineWithRoot() + + uri := "nats://localhost:4222" + msgs := []natsio.ProducerMessage{ + { + Subject: "events.1", + ID: "123", + Data: []byte("hello"), + Headers: nats.Header{"key": []string{"val1"}}, + }, + { + Subject: "events.2", + ID: "124", + Data: []byte("world"), + Headers: nats.Header{"key": []string{"val2"}}, + }, + } + + input := beam.CreateList(s, msgs) + natsio.Write(s, uri, input) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/io/natsio/helper_test.go b/sdks/go/pkg/beam/io/natsio/helper_test.go new file mode 100644 index 000000000000..cd47ed331de0 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/helper_test.go @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "context" + "testing" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func newServer(t *testing.T) *server.Server { + t.Helper() + + opts := &test.DefaultTestOptions + opts.Port = server.RANDOM_PORT + opts.JetStream = true + + srv := test.RunServer(opts) + t.Cleanup(srv.Shutdown) + + return srv +} + +func newConn(t *testing.T, uri string) *nats.Conn { + t.Helper() + + conn, err := nats.Connect(uri) + if err != nil { + t.Fatalf("Failed to connect to NATS: %v", err) + } + t.Cleanup(conn.Close) + + return conn +} + +func newJetStream(t *testing.T, conn *nats.Conn) jetstream.JetStream { + t.Helper() + + js, err := jetstream.New(conn) + if err != nil { + t.Fatalf("Failed to create JetStream instance: %v", err) + } + + return js +} + +func createStream( + t *testing.T, + ctx context.Context, + js jetstream.JetStream, + stream string, + subjects []string, +) jetstream.Stream { + t.Helper() + + cfg := jetstream.StreamConfig{ + Name: stream, + Subjects: subjects, + } + str, err := js.CreateStream(ctx, cfg) + if err != nil { + t.Fatalf("Failed to create stream: %v", err) + } + + t.Cleanup(func() { + if err := js.DeleteStream(ctx, stream); err != nil { + t.Fatalf("Failed to delete stream: %v", err) + } + }) + + return str +} + +func createConsumer( + t *testing.T, + ctx context.Context, + js jetstream.JetStream, + stream string, + subjects []string, +) jetstream.Consumer { + t.Helper() + + cfg := jetstream.OrderedConsumerConfig{ + FilterSubjects: subjects, + } + cons, err := js.OrderedConsumer(ctx, stream, cfg) + if err != nil { + t.Fatalf("Failed to create consumer: %v", err) + } + + return cons +} + +func fetchMessages(t *testing.T, cons jetstream.Consumer, size int) []jetstream.Msg { + t.Helper() + + msgs, err := cons.FetchNoWait(size) + if err != nil { + t.Fatalf("Failed to fetch messages: %v", err) + } + + var result []jetstream.Msg + + for msg := range msgs.Messages() { + if err := msg.Ack(); err != nil { + t.Fatalf("Failed to ack message: %v", err) + } + + result = append(result, msg) + } + + return result +} diff --git a/sdks/go/pkg/beam/io/natsio/write.go b/sdks/go/pkg/beam/io/natsio/write.go new file mode 100644 index 000000000000..8991ef8cac16 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/write.go @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "context" + "fmt" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func init() { + register.DoFn3x1[context.Context, ProducerMessage, func(ack PublishAck), error](&writeFn{}) + register.Emitter1[PublishAck]() + + beam.RegisterType(reflect.TypeOf((*ProducerMessage)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*PublishAck)(nil)).Elem()) +} + +// ProducerMessage represents a message to be published to NATS. +type ProducerMessage struct { + Subject string + ID string + Headers map[string][]string + Data []byte +} + +// PublishAck represents an acknowledgement from NATS after publishing a message. +type PublishAck struct { + Stream string + Subject string + ID string + Sequence uint64 + Duplicate bool +} + +// Write writes a PCollection to NATS JetStream and returns a +// PCollection of the acknowledged messages. The ID field can be set in the +// ProducerMessage to utilize JetStream's support for deduplication of messages. +// Write takes a variable number of WriteOptionFn to configure the write operation: +// - UserCredentials: path to the user credentials file. Defaults to empty. +func Write(s beam.Scope, uri string, col beam.PCollection, opts ...WriteOptionFn) beam.PCollection { + s = s.Scope("natsio.Write") + + option := &writeOption{} + for _, opt := range opts { + opt(option) + } + + return beam.ParDo(s, newWriteFn(uri, option), col) +} + +type writeFn struct { + natsFn +} + +func newWriteFn(uri string, option *writeOption) *writeFn { + return &writeFn{ + natsFn: natsFn{ + URI: uri, + CredsFile: option.CredsFile, + }, + } +} + +func (fn *writeFn) ProcessElement( + ctx context.Context, + elem ProducerMessage, + emit func(PublishAck), +) error { + msg := &nats.Msg{ + Subject: elem.Subject, + Data: elem.Data, + Header: elem.Headers, + } + + var opts []jetstream.PublishOpt + if elem.ID != "" { + opts = append(opts, jetstream.WithMsgID(elem.ID)) + } + + ack, err := fn.js.PublishMsg(ctx, msg, opts...) + if err != nil { + return fmt.Errorf("error publishing message: %v", err) + } + + pubAck := PublishAck{ + Stream: ack.Stream, + Subject: elem.Subject, + ID: elem.ID, + Sequence: ack.Sequence, + Duplicate: ack.Duplicate, + } + emit(pubAck) + + return nil +} diff --git a/sdks/go/pkg/beam/io/natsio/write_option.go b/sdks/go/pkg/beam/io/natsio/write_option.go new file mode 100644 index 000000000000..b1ee48cbffe4 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/write_option.go @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +type writeOption struct { + CredsFile string +} + +// WriteOptionFn is a function that can be passed to Write to configure options for +// writing messages. +type WriteOptionFn func(option *writeOption) + +// WriteUserCredentials sets the user credentials when connecting to NATS. +func WriteUserCredentials(credsFile string) WriteOptionFn { + return func(o *writeOption) { + o.CredsFile = credsFile + } +} diff --git a/sdks/go/pkg/beam/io/natsio/write_test.go b/sdks/go/pkg/beam/io/natsio/write_test.go new file mode 100644 index 000000000000..5e9387ece5f6 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/write_test.go @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "bytes" + "context" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/google/go-cmp/cmp" + "github.com/nats-io/nats.go" +) + +func TestMain(m *testing.M) { + ptest.Main(m) +} + +func TestWrite(t *testing.T) { + stream := "STREAM" + subject := "subject" + + tests := []struct { + name string + input []any + wantAcks []any + wantMsgs []jsMsg + }{ + { + name: "Write messages and deduplicate based on ID", + input: []any{ + ProducerMessage{ + Subject: subject, + ID: "1", + Data: []byte("msg1a"), + }, + ProducerMessage{ + Subject: subject, + ID: "1", + Data: []byte("msg1b"), + }, + ProducerMessage{ + Subject: subject, + ID: "2", + Data: []byte("msg2"), + }, + }, + wantAcks: []any{ + PublishAck{ + Stream: stream, + Subject: subject, + ID: "1", + Sequence: 1, + Duplicate: false, + }, + PublishAck{ + Stream: stream, + Subject: subject, + ID: "1", + Sequence: 1, + Duplicate: true, + }, + PublishAck{ + Stream: stream, + Subject: subject, + ID: "2", + Sequence: 2, + Duplicate: false, + }, + }, + wantMsgs: []jsMsg{ + testMsg{ + subject: subject, + headers: nats.Header{nats.MsgIdHdr: []string{"1"}}, + data: []byte("msg1a"), + }, + testMsg{ + subject: subject, + headers: nats.Header{nats.MsgIdHdr: []string{"2"}}, + data: []byte("msg2"), + }, + }, + }, + { + name: "Write messages without ID", + input: []any{ + ProducerMessage{ + Subject: subject, + Data: []byte("msg1a"), + }, + ProducerMessage{ + Subject: subject, + Data: []byte("msg1b"), + }, + ProducerMessage{ + Subject: subject, + Data: []byte("msg2"), + }, + }, + wantAcks: []any{ + PublishAck{ + Stream: stream, + Subject: subject, + ID: "", + Sequence: 1, + Duplicate: false, + }, + PublishAck{ + Stream: stream, + Subject: subject, + ID: "", + Sequence: 2, + Duplicate: false, + }, + PublishAck{ + Stream: stream, + Subject: subject, + ID: "", + Sequence: 3, + Duplicate: false, + }, + }, + wantMsgs: []jsMsg{ + testMsg{ + subject: subject, + data: []byte("msg1a"), + }, + testMsg{ + subject: subject, + data: []byte("msg1b"), + }, + testMsg{ + subject: subject, + data: []byte("msg2"), + }, + }, + }, + { + name: "Write message with headers", + input: []any{ + ProducerMessage{ + Subject: subject, + ID: "1", + Headers: map[string][]string{"key": {"val"}}, + Data: []byte("msg1"), + }, + }, + wantAcks: []any{ + PublishAck{ + Stream: stream, + Subject: subject, + ID: "1", + Sequence: 1, + Duplicate: false, + }, + }, + wantMsgs: []jsMsg{ + testMsg{ + subject: subject, + headers: nats.Header{nats.MsgIdHdr: []string{"1"}, "key": []string{"val"}}, + data: []byte("msg1"), + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + srv := newServer(t) + uri := srv.ClientURL() + conn := newConn(t, uri) + js := newJetStream(t, conn) + + subjects := []string{subject} + createStream(t, ctx, js, stream, subjects) + cons := createConsumer(t, ctx, js, stream, subjects) + + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, tc.input...) + gotAcks := Write(s, uri, col) + + passert.Equals(s, gotAcks, tc.wantAcks...) + ptest.RunAndValidate(t, p) + + gotMsgs := fetchMessages(t, cons, len(tc.input)+1) + + if gotLen, wantLen := len(gotMsgs), len(tc.wantMsgs); gotLen != wantLen { + t.Fatalf("Len() = %v, want %v", gotLen, wantLen) + } + + for i := range gotMsgs { + if gotSubject, wantSubject := gotMsgs[i].Subject(), tc.wantMsgs[i].Subject(); gotSubject != wantSubject { + t.Errorf("msg %d: Subject() = %v, want %v", i, gotSubject, wantSubject) + } + + if gotHeaders, wantHeaders := gotMsgs[i].Headers(), tc.wantMsgs[i].Headers(); !cmp.Equal( + gotHeaders, + wantHeaders, + ) { + t.Errorf("msg %d: Headers() = %v, want %v", i, gotHeaders, wantHeaders) + } + + if gotData, wantData := gotMsgs[i].Data(), tc.wantMsgs[i].Data(); !bytes.Equal( + gotData, + wantData, + ) { + t.Errorf("msg %d: Data() = %q, want %q", i, gotData, wantData) + } + } + }) + } +} + +type jsMsg interface { + Subject() string + Headers() nats.Header + Data() []byte +} + +type testMsg struct { + subject string + headers nats.Header + data []byte +} + +func (m testMsg) Subject() string { + return m.subject +} + +func (m testMsg) Headers() nats.Header { + return m.headers +} + +func (m testMsg) Data() []byte { + return m.data +}