Skip to content

Commit

Permalink
[BEAM-14486] Document pubsubio & fix its behavior. (apache#17709)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored May 23, 2022
1 parent e4a3bdd commit aea7c13
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions sdks/go/pkg/beam/io/pubsubio/pubsubio.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package pubsubio provides access to PubSub on Dataflow streaming.
// Experimental.
// Package pubsubio provides access to Pub/Sub on Dataflow streaming.
//
// This implementation only functions on the Dataflow runner.
//
// See https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub
// for details on using Pub/Sub with Dataflow.
package pubsubio

import (
"fmt"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/proto"
Expand All @@ -36,8 +42,12 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem())
beam.RegisterFunction(unmarshalMessageFn)
register.Function2x1(unmarshalMessageFn)
register.Function2x1(marshalMessageFn)
register.Function2x0(wrapInMessage)
register.Function2x0(wrapInMessage)
register.Emitter1[[]byte]()
register.Emitter1[*pb.PubsubMessage]()
}

// ReadOptions represents options for reading from PubSub.
Expand Down Expand Up @@ -73,15 +83,38 @@ func Read(s beam.Scope, project, topic string, opts *ReadOptions) beam.PCollecti
return out[0]
}

func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) {
func unmarshalMessageFn(raw []byte, emit func(*pb.PubsubMessage)) error {
var msg pb.PubsubMessage
if err := proto.Unmarshal(raw, &msg); err != nil {
return nil, err
return err
}
return &msg, nil
emit(&msg)
return nil
}

func wrapInMessage(raw []byte, emit func(*pb.PubsubMessage)) {
emit(&pb.PubsubMessage{
Data: raw,
})
}

// Write writes PubSubMessages or bytes to the given pubsub topic.
func marshalMessageFn(in *pb.PubsubMessage, emit func([]byte)) error {
out, err := proto.Marshal(in)
if err != nil {
return err
}
emit(out)
return nil
}

var pubSubMessageT = reflect.TypeOf((*pb.PubsubMessage)(nil))

// Write writes PubSubMessages or []bytes to the given pubsub topic.
// Panics if the input pcollection type is not one of those two types.
//
// When given []bytes, they are first wrapped in PubSubMessages.
//
// Note: Doesn't function in batch pipelines.
func Write(s beam.Scope, project, topic string, col beam.PCollection) {
s = s.Scope("pubsubio.Write")

Expand All @@ -90,8 +123,12 @@ func Write(s beam.Scope, project, topic string, col beam.PCollection) {
}

out := col
if col.Type().Type() != reflectx.ByteSlice {
out = beam.ParDo(s, proto.Marshal, col)
if col.Type().Type() == reflectx.ByteSlice {
out = beam.ParDo(s, wrapInMessage, col)
}
if out.Type().Type() != pubSubMessageT {
panic(fmt.Sprintf("pubsubio.Write only accepts PCollections of %v and %v, received %v", pubSubMessageT, reflectx.ByteSlice, col.Type().Type()))
}
beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{out}, nil, false)
marshaled := beam.ParDo(s, marshalMessageFn, out)
beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{marshaled}, nil, false)
}

0 comments on commit aea7c13

Please sign in to comment.