Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14486] Document pubsubio & fix its behavior. #17709

Merged
merged 2 commits into from
May 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions sdks/go/pkg/beam/io/pubsubio/pubsubio.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package pubsubio provides access to PubSub on Dataflow streaming.
// Experimental.
// Package pubsubio provides access to Pub/Sub on Dataflow streaming.
//
// This implementation only functions on the Dataflow runner.
//
// See https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub
// for details on using Pub/Sub with Dataflow.
package pubsubio

import (
"fmt"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/proto"
Expand All @@ -36,8 +42,12 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem())
beam.RegisterFunction(unmarshalMessageFn)
register.Function2x1(unmarshalMessageFn)
register.Function2x1(marshalMessageFn)
register.Function2x0(wrapInMessage)
register.Function2x0(wrapInMessage)
register.Emitter1[[]byte]()
register.Emitter1[*pb.PubsubMessage]()
}

// ReadOptions represents options for reading from PubSub.
Expand Down Expand Up @@ -73,15 +83,38 @@ func Read(s beam.Scope, project, topic string, opts *ReadOptions) beam.PCollecti
return out[0]
}

func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) {
func unmarshalMessageFn(raw []byte, emit func(*pb.PubsubMessage)) error {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
var msg pb.PubsubMessage
if err := proto.Unmarshal(raw, &msg); err != nil {
return nil, err
return err
}
return &msg, nil
emit(&msg)
return nil
}

func wrapInMessage(raw []byte, emit func(*pb.PubsubMessage)) {
emit(&pb.PubsubMessage{
Data: raw,
})
}

// Write writes PubSubMessages or bytes to the given pubsub topic.
func marshalMessageFn(in *pb.PubsubMessage, emit func([]byte)) error {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
out, err := proto.Marshal(in)
if err != nil {
return err
}
emit(out)
return nil
}

var pubSubMessageT = reflect.TypeOf((*pb.PubsubMessage)(nil))

// Write writes PubSubMessages or []bytes to the given pubsub topic.
// Panics if the input pcollection type is not one of those two types.
//
// When given []bytes, they are first wrapped in PubSubMessages.
//
// Note: Doesn't function in batch pipelines.
func Write(s beam.Scope, project, topic string, col beam.PCollection) {
s = s.Scope("pubsubio.Write")

Expand All @@ -90,8 +123,12 @@ func Write(s beam.Scope, project, topic string, col beam.PCollection) {
}

out := col
if col.Type().Type() != reflectx.ByteSlice {
out = beam.ParDo(s, proto.Marshal, col)
if col.Type().Type() == reflectx.ByteSlice {
out = beam.ParDo(s, wrapInMessage, col)
}
if out.Type().Type() != pubSubMessageT {
panic(fmt.Sprintf("pubsubio.Write only accepts PCollections of %v and %v, received %v", pubSubMessageT, reflectx.ByteSlice, col.Type().Type()))
}
beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{out}, nil, false)
marshaled := beam.ParDo(s, marshalMessageFn, out)
beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{marshaled}, nil, false)
}