Skip to content

Commit

Permalink
Default SchemaTransform configs to snake_case (#31374)
Browse files Browse the repository at this point in the history
* default schematransform configs to snake_case

* add to CHANGES.md

* update Go's bigtable wrapper to export snake_case param names

* make more yaml snake_case changes
  • Loading branch information
ahmedabu98 authored Jun 4, 2024
1 parent 8fb9dc8 commit a7f5898
Show file tree
Hide file tree
Showing 22 changed files with 141 additions and 227 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 1
}
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().
* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a
configuration Schema with snake_case naming convention
([#31374](https://github.com/apache/beam/pull/31374)). This will make the following
cases problematic:
* Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform,
and vice versa:
* Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
* All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
should be updated to use new snake_case parameter names.

## Deprecations

Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ import (
)

type bigtableConfig struct {
InstanceId string `beam:"instanceId"`
ProjectId string `beam:"projectId"`
TableId string `beam:"tableId"`
InstanceId string `beam:"instance_id"`
ProjectId string `beam:"project_id"`
TableId string `beam:"table_id"`
}

// Cell represents a single cell in a Bigtable row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;

/**
Expand All @@ -38,8 +45,12 @@
* <p>ConfigT should be available in the SchemaRegistry.
*
* <p>{@link #configurationSchema()} produces a configuration {@link Schema} that is inferred from
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used to produce a
* {@link SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration
* Schema.
*
* <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
* {@code snake_case} naming convention.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
Expand Down Expand Up @@ -78,10 +89,11 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
}

@Override
public Schema configurationSchema() {
public final Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
// We also establish a `snake_case` convention for all SchemaTransform configurations
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
Expand All @@ -90,9 +102,12 @@ public Schema configurationSchema() {
}
}

/** Produces a {@link SchemaTransform} from a Row configuration. */
/**
* Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
* `snake_case` naming convention.
*/
@Override
public SchemaTransform from(Row configuration) {
public final SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}

Expand All @@ -103,9 +118,22 @@ public final Optional<List<String>> dependencies(Row configuration, PipelineOpti

private ConfigT configFromRow(Row configuration) {
try {
return SchemaRegistry.createDefault()
.getFromRowFunction(configurationClass())
.apply(configuration);
SchemaRegistry registry = SchemaRegistry.createDefault();
SerializableFunction<Row, ConfigT> rowToConfigT =
registry.getFromRowFunction(configurationClass());

// Configuration objects handled by the AutoValueSchema provider will expect Row fields with
// camelCase naming convention
SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
&& checkNotNull(
((DefaultSchemaProvider) schemaProvider)
.getUnderlyingSchemaProvider(configurationClass()))
.getClass()
.equals(AutoValueSchema.class)) {
configuration = configuration.toCamelCase();
}
return rowToConfigT.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public void testFrom() {

Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.build();

Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
Expand All @@ -150,8 +150,8 @@ public void testDependencies() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.build();

assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
Expand Down Expand Up @@ -132,15 +131,4 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,4 @@ public Row apply(KV<String, SnapshotInfo> input) {
}
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ public void testFindTransformAndMakeItWork() {

assertEquals(
Sets.newHashSet(
"bootstrapServers",
"bootstrap_servers",
"topic",
"schema",
"autoOffsetResetConfig",
"consumerConfigUpdates",
"auto_offset_reset_config",
"consumer_config_updates",
"format",
"confluentSchemaRegistrySubject",
"confluentSchemaRegistryUrl",
"errorHandling",
"fileDescriptorPath",
"messageName"),
"confluent_schema_registry_subject",
"confluent_schema_registry_url",
"error_handling",
"file_descriptor_path",
"message_name"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,4 @@ synchronized Map<String, SchemaTransformProvider> getAllProviders() {
throw new RuntimeException(e.getMessage());
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,10 @@ public class ManagedTransformConstants {
public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1";

private static final Map<String, String> KAFKA_READ_MAPPINGS =
ImmutableMap.<String, String>builder()
.put("topic", "topic")
.put("bootstrap_servers", "bootstrapServers")
.put("consumer_config_updates", "consumerConfigUpdates")
.put("confluent_schema_registry_url", "confluentSchemaRegistryUrl")
.put("confluent_schema_registry_subject", "confluentSchemaRegistrySubject")
.put("data_format", "format")
.put("schema", "schema")
.put("file_descriptor_path", "fileDescriptorPath")
.put("message_name", "messageName")
.build();
ImmutableMap.<String, String>builder().put("data_format", "format").build();

private static final Map<String, String> KAFKA_WRITE_MAPPINGS =
ImmutableMap.<String, String>builder()
.put("topic", "topic")
.put("bootstrap_servers", "bootstrapServers")
.put("producer_config_updates", "producerConfigUpdates")
.put("data_format", "format")
.put("file_descriptor_path", "fileDescriptorPath")
.put("message_name", "messageName")
.build();
ImmutableMap.<String, String>builder().put("data_format", "format").build();

public static final Map<String, Map<String, String>> MAPPINGS =
ImmutableMap.<String, Map<String, String>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {

@Test
public void testGetConfigRowFromYamlString() {
String yamlString = "extraString: abc\n" + "extraInteger: 123";
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
Expand All @@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {

Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.build();

Row returnedRow =
Expand All @@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
Expand All @@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {

@Test
public void testBuildWithYamlString() {
String yamlString = "extraString: abc\n" + "extraInteger: 123";
String yamlString = "extra_string: abc\n" + "extra_integer: 123";

ManagedConfig config =
ManagedConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio

@Test
public void testReCreateTransformFromRowWithConfig() {
String yamlString = "extraString: abc\n" + "extraInteger: 123";
String yamlString = "extra_string: abc\n" + "extra_integer: 123";

ManagedConfig originalConfig =
ManagedConfig.builder()
Expand Down Expand Up @@ -130,8 +130,8 @@ public void testProtoTranslation() throws Exception {
.setRowSchema(inputSchema);
Map<String, Object> underlyingConfig =
ImmutableMap.<String, Object>builder()
.put("extraString", "abc")
.put("extraInteger", 123)
.put("extra_string", "abc")
.put("extra_integer", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
.withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));
.withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));

runTestProviderTest(writeOp);
}
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/managed/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
# under the License.
#

extraString: "abc"
extraInteger: 123
extra_string: "abc"
extra_integer: 123
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2599,13 +2599,13 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
createDisposition=self._create_disposition,
writeDisposition=self._write_disposition,
triggeringFrequencySeconds=self._triggering_frequency,
autoSharding=self._with_auto_sharding,
numStreams=self._num_storage_api_streams,
useAtLeastOnceSemantics=self._use_at_least_once,
errorHandling={
create_disposition=self._create_disposition,
write_disposition=self._write_disposition,
triggering_frequency_seconds=self._triggering_frequency,
auto_sharding=self._with_auto_sharding,
num_streams=self._num_storage_api_streams,
use_at_least_once_semantics=self._use_at_least_once,
error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))

Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)

return (
input
Expand Down Expand Up @@ -323,9 +323,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)

return (
input.pipeline
Expand Down
Loading

0 comments on commit a7f5898

Please sign in to comment.