Skip to content

Commit

Permalink
[BEAM-12158][BEAM-12341] Pubsub fixes. (#14896)
Browse files Browse the repository at this point in the history
* [BEAM-12158] Fix pubsubio post portable.
* [BEAM-12341] Delete go specific pubsubio.
  • Loading branch information
lostluck authored May 27, 2021
1 parent 85b85a5 commit eafa48d
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 385 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, c
}
paths = make(map[string]string)
for _, e := range edges {
if e.Op == graph.External {
if e.Op == graph.External && e.External != nil {
components, err := graphx.ExpandedComponents(e.External.Expanded)
if err != nil {
return nil, errors.WithContextf(err,
Expand Down
20 changes: 11 additions & 9 deletions sdks/go/pkg/beam/io/pubsubio/pubsubio.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
"github.com/golang/protobuf/proto"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/proto"
)

var (
readURN = "beam:transform:pubsub_read:v1"
writeURN = "beam:transform:pubsub_write:v1"
)

func init() {
Expand All @@ -49,8 +54,7 @@ type ReadOptions struct {
func Read(s beam.Scope, project, topic string, opts *ReadOptions) beam.PCollection {
s = s.Scope("pubsubio.Read")

payload := &v1.PubSubPayload{
Op: v1.PubSubPayload_READ,
payload := &pipepb.PubSubReadPayload{
Topic: pubsubx.MakeQualifiedTopicName(project, topic),
}
if opts != nil {
Expand All @@ -62,7 +66,7 @@ func Read(s beam.Scope, project, topic string, opts *ReadOptions) beam.PCollecti
payload.WithAttributes = opts.WithAttributes
}

out := beam.External(s, v1.PubSubPayloadURN, protox.MustEncode(payload), nil, []beam.FullType{typex.New(reflectx.ByteSlice)}, false)
out := beam.External(s, readURN, protox.MustEncode(payload), nil, []beam.FullType{typex.New(reflectx.ByteSlice)}, false)
if opts.WithAttributes {
return beam.ParDo(s, unmarshalMessageFn, out[0])
}
Expand All @@ -81,15 +85,13 @@ func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) {
func Write(s beam.Scope, project, topic string, col beam.PCollection) {
s = s.Scope("pubsubio.Write")

payload := &v1.PubSubPayload{
Op: v1.PubSubPayload_WRITE,
payload := &pipepb.PubSubWritePayload{
Topic: pubsubx.MakeQualifiedTopicName(project, topic),
}

out := col
if col.Type().Type() != reflectx.ByteSlice {
out = beam.ParDo(s, proto.Marshal, col)
payload.WithAttributes = true
}
beam.External(s, v1.PubSubPayloadURN, protox.MustEncode(payload), []beam.PCollection{out}, nil, false)
beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{out}, nil, false)
}
21 changes: 0 additions & 21 deletions sdks/go/pkg/beam/io/pubsubio/v1/gen.go

This file was deleted.

278 changes: 0 additions & 278 deletions sdks/go/pkg/beam/io/pubsubio/v1/v1.pb.go

This file was deleted.

Loading

0 comments on commit eafa48d

Please sign in to comment.