Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default SchemaTransform configs to snake_case #31374

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 1
}
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().
* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a
configuration Schema with snake_case naming convention
([#31374](https://github.com/apache/beam/pull/31374)). This will make the following
cases problematic:
* Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform,
and vice versa:
* Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
* All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
should be updated to use new snake_case parameter names.

## Deprecations

Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ import (
)

type bigtableConfig struct {
InstanceId string `beam:"instanceId"`
ProjectId string `beam:"projectId"`
TableId string `beam:"tableId"`
InstanceId string `beam:"instance_id"`
ProjectId string `beam:"project_id"`
TableId string `beam:"table_id"`
}

// Cell represents a single cell in a Bigtable row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;

/**
Expand All @@ -38,8 +45,12 @@
* <p>ConfigT should be available in the SchemaRegistry.
*
* <p>{@link #configurationSchema()} produces a configuration {@link Schema} that is inferred from
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used to produce a
* {@link SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration
* Schema.
*
* <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
* {@code snake_case} naming convention.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
Expand Down Expand Up @@ -78,10 +89,11 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
}

@Override
public Schema configurationSchema() {
public final Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
// We also establish a `snake_case` convention for all SchemaTransform configurations
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
Expand All @@ -90,9 +102,12 @@ public Schema configurationSchema() {
}
}

/** Produces a {@link SchemaTransform} from a Row configuration. */
/**
* Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
* `snake_case` naming convention.
*/
@Override
public SchemaTransform from(Row configuration) {
public final SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}

Expand All @@ -103,9 +118,22 @@ public final Optional<List<String>> dependencies(Row configuration, PipelineOpti

private ConfigT configFromRow(Row configuration) {
try {
return SchemaRegistry.createDefault()
.getFromRowFunction(configurationClass())
.apply(configuration);
SchemaRegistry registry = SchemaRegistry.createDefault();
SerializableFunction<Row, ConfigT> rowToConfigT =
registry.getFromRowFunction(configurationClass());

// Configuration objects handled by the AutoValueSchema provider will expect Row fields with
// camelCase naming convention
SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
&& checkNotNull(
((DefaultSchemaProvider) schemaProvider)
.getUnderlyingSchemaProvider(configurationClass()))
.getClass()
.equals(AutoValueSchema.class)) {
configuration = configuration.toCamelCase();
}
return rowToConfigT.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public void testFrom() {

Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.build();

Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
Expand All @@ -150,8 +150,8 @@ public void testDependencies() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.build();

assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
Expand Down Expand Up @@ -132,15 +131,4 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,4 @@ public Row apply(KV<String, SnapshotInfo> input) {
}
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ public void testFindTransformAndMakeItWork() {

assertEquals(
Sets.newHashSet(
"bootstrapServers",
"bootstrap_servers",
"topic",
"schema",
"autoOffsetResetConfig",
"consumerConfigUpdates",
"auto_offset_reset_config",
"consumer_config_updates",
"format",
"confluentSchemaRegistrySubject",
"confluentSchemaRegistryUrl",
"errorHandling",
"fileDescriptorPath",
"messageName"),
"confluent_schema_registry_subject",
"confluent_schema_registry_url",
"error_handling",
"file_descriptor_path",
"message_name"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,4 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
Map<String, SchemaTransformProvider> getAllProviders() {
return schemaTransformProviders;
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,10 @@ public class ManagedTransformConstants {
public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1";

private static final Map<String, String> KAFKA_READ_MAPPINGS =
ImmutableMap.<String, String>builder()
.put("topic", "topic")
.put("bootstrap_servers", "bootstrapServers")
.put("consumer_config_updates", "consumerConfigUpdates")
.put("confluent_schema_registry_url", "confluentSchemaRegistryUrl")
.put("confluent_schema_registry_subject", "confluentSchemaRegistrySubject")
.put("data_format", "format")
.put("schema", "schema")
.put("file_descriptor_path", "fileDescriptorPath")
.put("message_name", "messageName")
.build();
ImmutableMap.<String, String>builder().put("data_format", "format").build();

private static final Map<String, String> KAFKA_WRITE_MAPPINGS =
ImmutableMap.<String, String>builder()
.put("topic", "topic")
.put("bootstrap_servers", "bootstrapServers")
.put("producer_config_updates", "producerConfigUpdates")
.put("data_format", "format")
.put("file_descriptor_path", "fileDescriptorPath")
.put("message_name", "messageName")
.build();
ImmutableMap.<String, String>builder().put("data_format", "format").build();

public static final Map<String, Map<String, String>> MAPPINGS =
ImmutableMap.<String, Map<String, String>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {

@Test
public void testGetConfigRowFromYamlString() {
String yamlString = "extraString: abc\n" + "extraInteger: 123";
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
Expand All @@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {

Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.build();

Row returnedRow =
Expand All @@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
Expand All @@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {

@Test
public void testBuildWithYamlString() {
String yamlString = "extraString: abc\n" + "extraInteger: 123";
String yamlString = "extra_string: abc\n" + "extra_integer: 123";

ManagedConfig config =
ManagedConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio

@Test
public void testReCreateTransformFromRowWithConfig() {
String yamlString = "extraString: abc\n" + "extraInteger: 123";
String yamlString = "extra_string: abc\n" + "extra_integer: 123";

ManagedConfig originalConfig =
ManagedConfig.builder()
Expand Down Expand Up @@ -123,8 +123,8 @@ public void testProtoTranslation() throws Exception {
.setRowSchema(inputSchema);
Map<String, Object> underlyingConfig =
ImmutableMap.<String, Object>builder()
.put("extraString", "abc")
.put("extraInteger", 123)
.put("extra_string", "abc")
.put("extra_integer", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
.withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));
.withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));

runTestProviderTest(writeOp);
}
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/managed/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
# under the License.
#

extraString: "abc"
extraInteger: 123
extra_string: "abc"
extra_integer: 123
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2574,13 +2574,13 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
createDisposition=self._create_disposition,
writeDisposition=self._write_disposition,
triggeringFrequencySeconds=self._triggering_frequency,
autoSharding=self._with_auto_sharding,
numStreams=self._num_storage_api_streams,
useAtLeastOnceSemantics=self._use_at_least_once,
errorHandling={
create_disposition=self._create_disposition,
write_disposition=self._write_disposition,
triggering_frequency_seconds=self._triggering_frequency,
auto_sharding=self._with_auto_sharding,
num_streams=self._num_storage_api_streams,
use_at_least_once_semantics=self._use_at_least_once,
error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))

Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)

return (
input
Expand Down Expand Up @@ -323,9 +323,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)

return (
input.pipeline
Expand Down
Loading
Loading