diff --git a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go index 88b7395f44eb..bcf20e89c893 100644 --- a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go +++ b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go @@ -13,11 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package pubsubio provides access to PubSub on Dataflow streaming. -// Experimental. +// Package pubsubio provides access to Pub/Sub on Dataflow streaming. +// +// This implementation only functions on the Dataflow runner. +// +// See https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub +// for details on using Pub/Sub with Dataflow. package pubsubio import ( + "fmt" "reflect" "github.com/apache/beam/sdks/v2/go/pkg/beam" @@ -25,6 +30,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/protobuf/proto" @@ -36,8 +42,12 @@ var ( ) func init() { - beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem()) - beam.RegisterFunction(unmarshalMessageFn) + register.Function2x1(unmarshalMessageFn) + register.Function2x1(marshalMessageFn) + register.Function2x0(wrapInMessage) + register.Function2x0(wrapInMessage) + register.Emitter1[[]byte]() + register.Emitter1[*pb.PubsubMessage]() } // ReadOptions represents options for reading from PubSub. @@ -73,15 +83,38 @@ func Read(s beam.Scope, project, topic string, opts *ReadOptions) beam.PCollecti return out[0] } -func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) { +func unmarshalMessageFn(raw []byte, emit func(*pb.PubsubMessage)) error { var msg pb.PubsubMessage if err := proto.Unmarshal(raw, &msg); err != nil { - return nil, err + return err } - return &msg, nil + emit(&msg) + return nil +} + +func wrapInMessage(raw []byte, emit func(*pb.PubsubMessage)) { + emit(&pb.PubsubMessage{ + Data: raw, + }) } -// Write writes PubSubMessages or bytes to the given pubsub topic. +func marshalMessageFn(in *pb.PubsubMessage, emit func([]byte)) error { + out, err := proto.Marshal(in) + if err != nil { + return err + } + emit(out) + return nil +} + +var pubSubMessageT = reflect.TypeOf((*pb.PubsubMessage)(nil)) + +// Write writes PubSubMessages or []bytes to the given pubsub topic. +// Panics if the input pcollection type is not one of those two types. +// +// When given []bytes, they are first wrapped in PubSubMessages. +// +// Note: Doesn't function in batch pipelines. func Write(s beam.Scope, project, topic string, col beam.PCollection) { s = s.Scope("pubsubio.Write") @@ -90,8 +123,12 @@ func Write(s beam.Scope, project, topic string, col beam.PCollection) { } out := col - if col.Type().Type() != reflectx.ByteSlice { - out = beam.ParDo(s, proto.Marshal, col) + if col.Type().Type() == reflectx.ByteSlice { + out = beam.ParDo(s, wrapInMessage, col) + } + if out.Type().Type() != pubSubMessageT { + panic(fmt.Sprintf("pubsubio.Write only accepts PCollections of %v and %v, received %v", pubSubMessageT, reflectx.ByteSlice, col.Type().Type())) } - beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{out}, nil, false) + marshaled := beam.ParDo(s, marshalMessageFn, out) + beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{marshaled}, nil, false) }