diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index 4f07f04a5788..d920acdfa2a9 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -300,7 +300,7 @@ func filterWordsAbove(word string, lengthCutOffIter func(*float64) bool, emitAbo var cutOff float64 ok := lengthCutOffIter(&cutOff) if !ok { - return fmt.Errorf("No length cutoff provided.") + return fmt.Errorf("no length cutoff provided") } if float64(len(word)) > cutOff { emitAboveCutoff(word) diff --git a/sdks/go/examples/snippets/06schemas.go b/sdks/go/examples/snippets/06schemas.go new file mode 100644 index 000000000000..d68bc7cf70d1 --- /dev/null +++ b/sdks/go/examples/snippets/06schemas.go @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snippets + +import ( + "fmt" + "io" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" +) + +// [START schema_define] + +type Purchase struct { + // ID of the user who made the purchase. + UserID string `beam:"userId"` + // Identifier of the item that was purchased. + ItemID int64 `beam:"itemId"` + // The shipping address, a nested type. + ShippingAddress ShippingAddress `beam:"shippingAddress"` + // The cost of the item in cents. + Cost int64 `beam:"cost"` + // The transactions that paid for this purchase. + // A slice since the purchase might be spread out over multiple + // credit cards. + Transactions []Transaction `beam:"transactions"` +} + +type ShippingAddress struct { + StreetAddress string `beam:"streetAddress"` + City string `beam:"city"` + State *string `beam:"state"` + Country string `beam:"country"` + PostCode string `beam:"postCode"` +} + +type Transaction struct { + Bank string `beam:"bank"` + PurchaseAmount float64 `beam:"purchaseAmount"` +} + +// [END schema_define] + +// Validate that the interface is being implemented. +var _ beam.SchemaProvider = &TimestampNanosProvider{} + +// [START schema_logical_provider] + +// TimestampNanos is a logical type using time.Time, but +// encodes as a schema type. +type TimestampNanos time.Time + +func (tn TimestampNanos) Seconds() int64 { + return time.Time(tn).Unix() +} +func (tn TimestampNanos) Nanos() int32 { + return int32(time.Time(tn).UnixNano() % 1000000000) +} + +// tnStorage is the storage schema for TimestampNanos. +type tnStorage struct { + Seconds int64 `beam:"seconds"` + Nanos int32 `beam:"nanos"` +} + +var ( + // reflect.Type of the Value type of TimestampNanos + tnType = reflect.TypeOf((*TimestampNanos)(nil)).Elem() + tnStorageType = reflect.TypeOf((*tnStorage)(nil)).Elem() +) + +// TimestampNanosProvider implements the beam.SchemaProvider interface. +type TimestampNanosProvider struct{} + +// FromLogicalType converts checks if the given type is TimestampNanos, and if so +// returns the storage type. +func (p *TimestampNanosProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) { + if rt != tnType { + return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, tnType) + } + return tnStorageType, nil +} + +// BuildEncoder builds a Beam schema encoder for the TimestampNanos type. +func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { + if _, err := p.FromLogicalType(rt); err != nil { + return nil, err + } + enc, err := coder.RowEncoderForStruct(tnStorageType) + if err != nil { + return nil, err + } + return func(iface interface{}, w io.Writer) error { + v := iface.(TimestampNanos) + return enc(tnStorage{ + Seconds: v.Seconds(), + Nanos: v.Nanos(), + }, w) + }, nil +} + +// BuildDecoder builds a Beam schema decoder for the TimestampNanos type. +func (p *TimestampNanosProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { + if _, err := p.FromLogicalType(rt); err != nil { + return nil, err + } + dec, err := coder.RowDecoderForStruct(tnStorageType) + if err != nil { + return nil, err + } + return func(r io.Reader) (interface{}, error) { + s, err := dec(r) + if err != nil { + return nil, err + } + tn := s.(tnStorage) + return TimestampNanos(time.Unix(tn.Seconds, int64(tn.Nanos))), nil + }, nil +} + +// [END schema_logical_provider] + +func LogicalTypeExample() { + // [START schema_logical_register] + beam.RegisterSchemaProvider(tnType, &TimestampNanosProvider{}) + // [END schema_logical_register] +} diff --git a/sdks/go/examples/snippets/06schemas_test.go b/sdks/go/examples/snippets/06schemas_test.go new file mode 100644 index 000000000000..1353c9769874 --- /dev/null +++ b/sdks/go/examples/snippets/06schemas_test.go @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snippets + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder/testutil" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" +) + +func atomicSchemaField(name string, typ pipepb.AtomicType) *pipepb.Field { + return &pipepb.Field{ + Name: name, + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: typ, + }, + }, + } +} + +func rowSchemaField(name string, typ *pipepb.Schema) *pipepb.Field { + return &pipepb.Field{ + Name: name, + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_RowType{ + RowType: &pipepb.RowType{ + Schema: typ, + }, + }, + }, + } +} + +func listSchemaField(name string, typ *pipepb.Field) *pipepb.Field { + return &pipepb.Field{ + Name: name, + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_ArrayType{ + ArrayType: &pipepb.ArrayType{ + ElementType: typ.GetType(), + }, + }, + }, + } +} + +func nillable(f *pipepb.Field) *pipepb.Field { + f.Type.Nullable = true + return f +} + +func TestSchemaTypes(t *testing.T) { + transactionSchema := &pipepb.Schema{ + Fields: []*pipepb.Field{ + atomicSchemaField("bank", pipepb.AtomicType_STRING), + atomicSchemaField("purchaseAmount", pipepb.AtomicType_DOUBLE), + }, + } + shippingAddressSchema := &pipepb.Schema{ + Fields: []*pipepb.Field{ + atomicSchemaField("streetAddress", pipepb.AtomicType_STRING), + atomicSchemaField("city", pipepb.AtomicType_STRING), + nillable(atomicSchemaField("state", pipepb.AtomicType_STRING)), + atomicSchemaField("country", pipepb.AtomicType_STRING), + atomicSchemaField("postCode", pipepb.AtomicType_STRING), + }, + } + + tests := []struct { + rt reflect.Type + st *pipepb.Schema + preReg func(reg *schema.Registry) + }{{ + rt: reflect.TypeOf(Transaction{}), + st: transactionSchema, + }, { + rt: reflect.TypeOf(ShippingAddress{}), + st: shippingAddressSchema, + }, { + rt: reflect.TypeOf(Purchase{}), + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + atomicSchemaField("userId", pipepb.AtomicType_STRING), + atomicSchemaField("itemId", pipepb.AtomicType_INT64), + rowSchemaField("shippingAddress", shippingAddressSchema), + atomicSchemaField("cost", pipepb.AtomicType_INT64), + listSchemaField("transactions", + rowSchemaField("n/a", transactionSchema)), + }, + }, + }, { + rt: tnType, + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + atomicSchemaField("seconds", pipepb.AtomicType_INT64), + atomicSchemaField("nanos", pipepb.AtomicType_INT32), + }, + }, + preReg: func(reg *schema.Registry) { + reg.RegisterLogicalType(schema.ToLogicalType(tnType.Name(), tnType, tnStorageType)) + }, + }} + for _, test := range tests { + t.Run(fmt.Sprintf("%v", test.rt), func(t *testing.T) { + reg := schema.NewRegistry() + if test.preReg != nil { + test.preReg(reg) + } + { + got, err := reg.FromType(test.rt) + if err != nil { + t.Fatalf("error FromType(%v) = %v", test.rt, err) + } + if d := cmp.Diff(test.st, got, + protocmp.Transform(), + protocmp.IgnoreFields(proto.Message(&pipepb.Schema{}), "id", "options"), + ); d != "" { + t.Errorf("diff (-want, +got): %v", d) + } + } + }) + } +} + +func TestSchema_validate(t *testing.T) { + tests := []struct { + rt reflect.Type + p beam.SchemaProvider + logical, storage interface{} + }{ + { + rt: tnType, + p: &TimestampNanosProvider{}, + logical: TimestampNanos(time.Unix(2300003, 456789)), + storage: tnStorage{}, + }, + } + for _, test := range tests { + sc := &testutil.SchemaCoder{ + CmpOptions: cmp.Options{ + cmp.Comparer(func(a, b TimestampNanos) bool { + return a.Seconds() == b.Seconds() && a.Nanos() == b.Nanos() + })}, + } + sc.Validate(t, test.rt, test.p.BuildEncoder, test.p.BuildDecoder, test.storage, test.logical) + } +} diff --git a/sdks/go/pkg/beam/schema.go b/sdks/go/pkg/beam/schema.go index 95811bfd1086..b25a3e285f71 100644 --- a/sdks/go/pkg/beam/schema.go +++ b/sdks/go/pkg/beam/schema.go @@ -16,6 +16,7 @@ package beam import ( + "fmt" "io" "reflect" @@ -55,7 +56,24 @@ import ( // is called in a package init() function. func RegisterSchemaProvider(rt reflect.Type, provider interface{}) { p := provider.(SchemaProvider) - schema.RegisterLogicalTypeProvider(rt, p.FromLogicalType) + switch rt.Kind() { + case reflect.Interface: + schema.RegisterLogicalTypeProvider(rt, p.FromLogicalType) + case reflect.Ptr: + if rt.Elem().Kind() != reflect.Struct { + panic(fmt.Sprintf("beam.RegisterSchemaProvider: unsupported type kind for schema provider %v is a %v, must be interface, struct or *struct.", rt, rt.Kind())) + } + fallthrough + case reflect.Struct: + st, err := p.FromLogicalType(rt) + if err != nil { + panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type provider for %v, doesn't support that type", rt)) + } + schema.RegisterLogicalType(schema.ToLogicalType(rt.Name(), rt, st)) + default: + panic(fmt.Sprintf("beam.RegisterSchemaProvider: unsupported type kind for schema provider %v is a %v, must be interface, struct or *struct.", rt, rt.Kind())) + } + coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder) } diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 5d31fe389f5b..0c4d56fca2c4 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -1834,7 +1834,7 @@ a single global window and specify a trigger. ### 4.5. Additional outputs {#additional-outputs} -{{< paragraph class="language-java language-python" >}} +{{< paragraph class="language-java language-py" >}} While `ParDo` always produces a main output `PCollection` (as the return value from `apply`), you can also have your `ParDo` produce any number of additional output `PCollection`s. If you choose to have multiple outputs, your `ParDo` @@ -2666,6 +2666,11 @@ infer the correct schema based on the members of the class. In Python you can use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class. {{< /paragraph >}} +{{< paragraph class="language-go" >}} +In Go, schema encoding is used by default for struct types, with Exported fields becoming part of the schema. +Beam will automatically infer the schema based on the fields and field tags of the struct, and their order. +{{< /paragraph >}} + {{< highlight java >}} @DefaultSchema(JavaBeanSchema.class) public class Purchase { @@ -2731,6 +2736,10 @@ class Transaction(typing.NamedTuple): purchase_amount: float {{< /highlight >}} +{{< highlight go >}} +{{< code_sample "sdks/go/examples/snippets/06schemas.go" schema_define >}} +{{< /highlight >}} + {{< paragraph class="language-java" >}} Using JavaBean classes as above is one way to map a schema to Java classes. However multiple Java classes might have the same schema, in which case the different Java types can often be used interchangeably. Beam will add implicit @@ -2887,6 +2896,13 @@ The argument is represented by a schema type, so can itself be a complex type. In Java, a logical type is specified as a subclass of the `LogicalType` class. A custom Java class can be specified to represent the logical type and conversion functions must be supplied to convert back and forth between this Java class and the underlying Schema type representation. For example, the logical type representing nanosecond timestamp might be implemented as follows {{< /paragraph >}} + +{{< paragraph class="language-go" >}} +In Go, a logical type is specified with a custom implementation of the `beam.SchemaProvider` interface. +For example, the logical type provider representing nanosecond timestamps +might be implemented as follows +{{< /paragraph >}} + {{< highlight java >}} // A Logical type using java.time.Instant to represent the logical type. public class TimestampNanos implements LogicalType { @@ -2909,11 +2925,39 @@ public class TimestampNanos implements LogicalType { } {{< /highlight >}} +{{< highlight go >}} +// Define a logical provider like so: +{{< code_sample "sdks/go/examples/snippets/06schemas.go" schema_logical_provider >}} + +// Register it like so: +{{< code_sample "sdks/go/examples/snippets/06schemas.go" schema_logical_register >}} +{{< /highlight >}} + #### 6.4.2. Useful logical types {#built-in-logical-types} +{{< paragraph class="language-py" >}} +Currently the Python SDK provides minimal convenience logical types, +other than to handle `MicrosInstant`. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +Currently the Go SDK provides minimal convenience logical types, +other than to handle additional integer primitives, and `time.Time`. +{{< /paragraph >}} + ##### **EnumerationType** +{{< paragraph class="language-py" >}} +This convenience builder doesn't yet exist for the Python SDK. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +This convenience builder doesn't yet exist for the Go SDK. +{{< /paragraph >}} + +{{< paragraph class="language-java" >}} This logical type allows creating an enumeration type consisting of a set of named constants. +{{< /paragraph >}} {{< highlight java >}} Schema schema = Schema.builder() @@ -2922,8 +2966,10 @@ Schema schema = Schema.builder() .build(); {{< /highlight >}} +{{< paragraph class="language-java" >}} The value of this field is stored in the row as an INT32 type, however the logical type defines a value type that lets you access the enumeration either as a string or a value. For example: +{{< /paragraph >}} {{< highlight java >}} EnumerationType.Value enumValue = enumType.valueOf("RED"); @@ -2931,18 +2977,32 @@ enumValue.getValue(); // Returns 0, the integer value of the constant. enumValue.toString(); // Returns "RED", the string value of the constant {{< /highlight >}} +{{< paragraph class="language-java" >}} Given a row object with an enumeration field, you can also extract the field as the enumeration value. +{{< /paragraph >}} {{< highlight java >}} EnumerationType.Value enumValue = row.getLogicalTypeValue("color", EnumerationType.Value.class); {{< /highlight >}} +{{< paragraph class="language-java" >}} Automatic schema inference from Java POJOs and JavaBeans automatically converts Java enums to EnumerationType logical types. +{{< /paragraph >}} ##### **OneOfType** +{{< paragraph class="language-py" >}} +This convenience builder doesn't yet exist for the Python SDK. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +This convenience builder doesn't yet exist for the Go SDK. +{{< /paragraph >}} + +{{< paragraph class="language-java" >}} OneOfType allows creating a disjoint union type over a set of schema fields. For example: +{{< /paragraph >}} {{< highlight java >}} Schema schema = Schema.builder() @@ -2954,9 +3014,11 @@ Schema schema = Schema.builder() .build(); {{< /highlight >}} +{{< paragraph class="language-java" >}} The value of this field is stored in the row as another Row type, where all the fields are marked as nullable. The logical type however defines a Value object that contains an enumeration value indicating which field was set and allows getting just that field: +{{< /paragraph >}} {{< highlight java >}} // Returns an enumeration indicating all possible case values for the enum. @@ -2978,17 +3040,28 @@ switch (oneOfValue.getCaseEnumType().toString()) { } {{< /highlight >}} +{{< paragraph class="language-java" >}} In the above example we used the field names in the switch statement for clarity, however the enum integer values could also be used. +{{< /paragraph >}} ### 6.5. Creating Schemas {#creating-schemas} -In order to take advantage of schemas, your `PCollection`s must have a schema attached to it. Often, the source itself will attach a schema to the PCollection. For example, when using `AvroIO` to read Avro files, the source can automatically infer a Beam schema from the Avro schema and attach that to the Beam `PCollection`. However not all sources produce schemas. In addition, often Beam pipelines have intermediate stages and types, and those also can benefit from the expressiveness of schemas. +In order to take advantage of schemas, your `PCollection`s must have a schema attached to it. +Often, the source itself will attach a schema to the PCollection. +For example, when using `AvroIO` to read Avro files, the source can automatically infer a Beam schema from the Avro schema and attach that to the Beam `PCollection`. +However not all sources produce schemas. +In addition, often Beam pipelines have intermediate stages and types, and those also can benefit from the expressiveness of schemas. #### 6.5.1. Inferring schemas {#inferring-schemas} +{{< language-switcher java py go >}} + {{< paragraph class="language-java" >}} -Beam is able to infer schemas from a variety of common Java types. The `@DefaultSchema` annotation can be used to tell Beam to infer schemas from a specific type. The annotation takes a `SchemaProvider` as an argument, and `SchemaProvider` classes are already built in for common Java types. The `SchemaRegistry` can also be invoked programmatically for cases where it is not practical to annotate the Java type itself. +Beam is able to infer schemas from a variety of common Java types. +The `@DefaultSchema` annotation can be used to tell Beam to infer schemas from a specific type. +The annotation takes a `SchemaProvider` as an argument, and `SchemaProvider` classes are already built in for common Java types. +The `SchemaRegistry` can also be invoked programmatically for cases where it is not practical to annotate the Java type itself. {{< /paragraph >}} {{< paragraph class="language-java" >}} @@ -3191,6 +3264,53 @@ output_pc = input_pc | beam.Map(lambda item: beam.Row(bank=str(item["bank"]), purchase_amount=float(item["purchase_amount"]))) {{< /highlight >}} +{{< paragraph class="language-go" >}} +Beam currently only infers schemas for exported fields in Go structs. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +**Structs** +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +Beam will automatically infer schemas for all Go structs used +as PCollection elements, and default to encoding them using +schema encoding. +{{< /paragraph >}} + +{{< highlight go >}} +type Transaction struct{ + Bank string + PurchaseAmount float64 + + checksum []byte // ignored +} +{{< /highlight >}} + +{{< paragraph class="language-go" >}} +Unexported fields are ignored, and cannot be automatically infered as part of the schema. +Fields of type func, channel, unsafe.Pointer, or uintptr will be ignored by inference. +Fields of interface types are ignored, unless a schema provider +is registered for them. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +By default, schema field names will match the exported struct field names. +In the above example, "Bank" and "PurchaseAmount" are the schema field names. +A schema field name can be overridden with a struct tag for the field. +{{< /paragraph >}} + +{{< highlight go >}} +type Transaction struct{ + Bank string `beam:"bank"` + PurchaseAmount float64 `beam:"purchase_amount"` +} +{{< /highlight >}} + +{{< paragraph class="language-go" >}} +Overriding schema field names is useful for compatibility cross language transforms, +as schema fields may have different requirements or restrictions from Go exported fields. +{{< /paragraph >}} ### 6.6. Using Schema Transforms {#using-schemas} @@ -3198,6 +3318,10 @@ A schema on a `PCollection` enables a rich variety of relational transforms. The named fields allows for simple and readable aggregations that reference fields by name, similar to the aggregations in a SQL expression. +{{< paragraph class="language-go" >}} +Beam does not yet support Schema transforms natively in Go. However, it will be implemented with the following behavior. +{{< /paragraph >}} + #### 6.6.1. Field selection syntax The advantage of schemas is that they allow referencing of element fields by name. Beam provides a selection syntax for @@ -3723,6 +3847,10 @@ A `PCollection` with a schema can apply a `ParDo`, just like any other `PCollect ##### **Input conversion** +{{< paragraph class="language-go" >}} +Beam does not yet support input conversion in Go. +{{< /paragraph >}} + Since Beam knows the schema of the source `PCollection`, it can automatically convert the elements to any Java type for which a matching schema is known. For example, using the above-mentioned Transaction schema, say we have the following `PCollection`: @@ -3789,6 +3917,8 @@ automatically convert to any matching schema type, just like when reading the en ## 7. Data encoding and type safety {#data-encoding-and-type-safety} +{{< language-switcher java py go >}} + When Beam runners execute your pipeline, they often need to materialize the intermediate data in your `PCollection`s, which requires converting elements to and from byte strings. The Beam SDKs use objects called `Coder`s to describe how @@ -3817,6 +3947,15 @@ Coder subclasses in the package. {{< /paragraph >}} +{{< paragraph class="language-go" >}} +Standard Go types like `int`, `int64` `float64`, `[]byte`, and `string` and more are coded using builtin coders. +Structs and pointers to structs default using Beam Schema Row encoding. +However, users can build and register custom coders with `beam.RegisterCoder`. +You can find available Coder functions in the +[coder](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coders) +package. +{{< /paragraph >}} + > Note that coders do not necessarily have a 1:1 relationship with types. For > example, the Integer type can have multiple valid coders, and input and output > data can use different Integer coders. A transform might have Integer-typed @@ -3858,6 +3997,11 @@ Python types to the default coder that should be used for `PCollection`s of each type. {{< /paragraph >}} +{{< paragraph class="language-go" >}} +The Beam SDK for Go allows users to register default coder +implementations with `beam.RegisterCoder`. +{{< /paragraph >}} + {{< paragraph class="language-java" >}} By default, the Beam SDK for Java automatically infers the `Coder` for the elements of a `PCollection` produced by a `PTransform` using the type parameter @@ -3880,12 +4024,24 @@ Python will automatically infer the default `Coder` for the output `PCollection` (in the default pipeline `CoderRegistry`, this is `BytesCoder`). {{< /paragraph >}} -> NOTE: If you create your `PCollection` from in-memory data by using the +{{< paragraph class="language-go" >}} +By default, the Beam SDK for Go automatically infers the `Coder` for the elements of an output `PCollection` by the output of the transform's function object, such as a `DoFn`. + In the case of `ParDo`, for example a `DoFn` +with the parameters of `v int, emit func(string)` accepts an input element of type `int` +and produces an output element of type `string`. +In such a case, the Beam SDK for Go will automatically infer the default `Coder` for the output `PCollection` to be the `string_utf8` coder. +{{< /paragraph >}} + + + +> **Note:** If you create your `PCollection` from in-memory data by using the > `Create` transform, you cannot rely on coder inference and default coders. > `Create` does not have access to any typing information for its arguments, and > may not be able to infer a coder if the argument list contains a value whose > exact run-time class doesn't have a default coder registered. + + {{< paragraph class="language-java" >}} When using `Create`, the simplest way to ensure that you have the correct coder is by invoking `withCoder` when you apply the `Create` transform. @@ -4019,8 +4175,13 @@ for a Python type. You can use `coders.registry` to access the `CoderRegistry`. This allows you to determine (or set) the default Coder for a Python type. {{< /paragraph >}} +{{< paragraph class="language-go" >}} +You can use the `beam.NewCoder` function to determine the default Coder for a Go type. +{{< /paragraph >}} + #### 7.2.2. Setting the default coder for a type {#setting-default-coder} +{{< paragraph class="language-java language-py" >}} To set the default Coder for a JavaPython type for a particular pipeline, you obtain and modify the pipeline's @@ -4031,11 +4192,23 @@ to get the `CoderRegistry` object, and then use the method `CoderRegistry.registerCoder` `CoderRegistry.register_coder` to register a new `Coder` for the target type. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +To set the default Coder for a Go type you use the function `beam.RegisterCoder` to register a encoder and decoder functions for the target type. +However, built in types like `int`, `string`, `float64`, etc cannot have their coders overridde. +{{< /paragraph >}} +{{< paragraph class="language-java language-py" >}} The following example code demonstrates how to set a default Coder, in this case `BigEndianIntegerCoder`, for Integerint values for a pipeline. +{{< /paragraph >}} + +{{< paragraph class="language-go" >}} +The following example code demonstrates how to set a custom Coder for `MyCustomType` elements. +{{< /paragraph >}} {{< highlight java >}} PipelineOptions options = PipelineOptionsFactory.create(); @@ -4049,9 +4222,26 @@ cr.registerCoder(Integer.class, BigEndianIntegerCoder.class); apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder) {{< /highlight >}} +{{< highlight go >}} +type MyCustomType struct{ + ... +} + +// See documentation on beam.RegisterCoder for other supported coder forms. + +func encode(MyCustomType) []byte { ... } + +func decode(b []byte) MyCustomType { ... } + +func init() { + beam.RegisterCoder(reflect.TypeOf((*MyCustomType)(nil)).Elem(), encode, decode) +} +{{< /highlight >}} + #### 7.2.3. Annotating a custom data type with a default coder {#annotating-custom-type-default-coder} -{{< paragraph class="language-java" >}} + + If your pipeline program defines a custom data type, you can use the `@DefaultCoder` annotation to specify the coder to use with that type. By default, Beam will use `SerializableCoder` which uses Java serialization, @@ -4064,10 +4254,11 @@ but it has drawbacks: For key/value pairs, the correctness of key-based operations (GroupByKey, Combine) and per-key State depends on having a deterministic - coder for the key. + coder for the key You can use the `@DefaultCoder` annotation to set a new default as follows: -{{< /paragraph >}} + + {{< highlight java >}} @DefaultCoder(AvroCoder.class) @@ -4094,9 +4285,10 @@ public class MyCustomDataType { } {{< /highlight >}} -{{< paragraph class="language-py" >}} -The Beam SDK for Python does not support annotating data types with a default -coder. If you would like to set a default coder, use the method described in the +{{< paragraph class="language-py language-go" >}} +The Beam SDK for PythonGo +does not support annotating data types with a default coder. +If you would like to set a default coder, use the method described in the previous section, *Setting the default coder for a type*. {{< /paragraph >}}