From d3c906f647a52e70f1054bb2aa5739b9ba631129 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Tue, 12 Nov 2024 13:36:19 -0500 Subject: [PATCH 1/6] [yaml] add JDBC docs Signed-off-by: Jeffrey Kinard --- .../SpannerReadSchemaTransformProvider.java | 54 ++-- .../SpannerWriteSchemaTransformProvider.java | 14 +- .../jdbc/JdbcReadSchemaTransformProvider.java | 241 +++++++++++++----- .../org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 15 +- .../JdbcWriteSchemaTransformProvider.java | 232 +++++++++++++---- .../ReadFromMySqlSchemaTransformProvider.java | 49 ++++ ...ReadFromOracleSchemaTransformProvider.java | 50 ++++ ...adFromPostgresSchemaTransformProvider.java | 50 ++++ ...dFromSqlServerSchemaTransformProvider.java | 50 ++++ .../WriteToMySqlSchemaTransformProvider.java | 49 ++++ .../WriteToOracleSchemaTransformProvider.java | 50 ++++ ...riteToPostgresSchemaTransformProvider.java | 50 ++++ ...iteToSqlServerSchemaTransformProvider.java | 50 ++++ .../JdbcReadSchemaTransformProviderTest.java | 82 +++--- .../apache_beam/yaml/generate_yaml_docs.py | 18 +- sdks/python/apache_beam/yaml/standard_io.yaml | 85 +++--- 16 files changed, 915 insertions(+), 224 deletions(-) create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java index 76440b1ebf1a..0bcf6e0c4f75 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -76,40 +76,34 @@ public String description() { + "\n" + "Example configuration for performing a read using a SQL query: ::\n" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " query: 'SELECT * FROM table'\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " query: 'SELECT * FROM table'\n" + "\n" + "It is also possible to read a table by specifying a table name and a list of columns. For " + "example, the following configuration will perform a read on an entire table: ::\n" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " table: 'my-table'\n" - + " columns: ['col1', 'col2']\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " columns: ['col1', 'col2']\n" + "\n" + "Additionally, to read using a " + "Secondary Index, specify the index name: ::" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " table: 'my-table'\n" - + " index: 'my-index'\n" - + " columns: ['col1', 'col2']\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " index: 'my-index'\n" + + " columns: ['col1', 'col2']\n" + "\n" - + "### Advanced Usage\n" + + "#### Advanced Usage\n" + "\n" + "Reads by default use the " + "PartitionQuery API which enforces some limitations on the type of queries that can be used so that " @@ -118,12 +112,10 @@ public String description() { + "\n" + "For example: ::" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " batching: false\n" - + " ...\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " batching: false\n" + + " ...\n" + "\n" + "Note: See " diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 8601da09ea09..61955f448c3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -84,14 +84,12 @@ public String description() { + "\n" + "Example configuration for performing a write to a single table: ::\n" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " project_id: 'my-project-id'\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " table: 'my-table'\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " project_id: 'my-project-id'\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + "\n" + "Note: See " diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 435bfc138b5b..4056b2518ac1 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -49,33 +50,174 @@ public class JdbcReadSchemaTransformProvider extends TypedSchemaTransformProvider< JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> { + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:jdbc_read:v1"; + } + + @Override + public String description() { + return baseDescription("JDBC") + + "\n" + + "This transform can be used to read from a JDBC source using either a given JDBC driver jar " + + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n" + + "\n" + + "#### Using a default driver\n" + + "\n" + + "This transform comes packaged with drivers for several popular JDBC distributions. The following " + + "distributions can be declared as the `jdbc_type`: " + + JDBC_DRIVER_MAP.keySet().toString().replaceAll("[\\[\\]]", "") + + ".\n" + + "\n" + + "For example, reading a MySQL source using a SQL query: ::" + + "\n" + + " - type: ReadFromJdbc\n" + + " config:\n" + + " jdbc_type: mysql\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " query: \"SELECT * FROM table\"\n" + + "\n" + + "\n" + + "**Note**: See the following transforms which are built on top of this transform and simplify " + + "this logic for several popular JDBC distributions:\n\n" + + " - ReadFromMySql\n" + + " - ReadFromPostgres\n" + + " - ReadFromOracle\n" + + " - ReadFromSqlServer\n" + + "\n" + + "#### Declaring custom JDBC drivers\n" + + "\n" + + "If reading from a JDBC source not listed above, or if it is necessary to use a custom driver not " + + "packaged with Beam, one must define a JDBC driver and class name.\n" + + "\n" + + "For example, reading a MySQL source table: ::" + + "\n" + + " - type: ReadFromJdbc\n" + + " config:\n" + + " driver_jars: \"path/to/some/jdbc.jar\"\n" + + " driver_class_name: \"com.mysql.jdbc.Driver\"\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " table: \"my-table\"\n" + + "\n" + + "#### Connection Properties\n" + + "\n" + + "Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, " + + "to set the character encoding to UTF-8, one could write: ::\n" + + "\n" + + " - type: ReadFromJdbc\n" + + " config:\n" + + " connectionProperties: \"characterEncoding=UTF-8;\"\n" + + " ...\n" + + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n"; + } + + protected String baseDescription(String jdbcType) { + return String.format( + "Read from a %s source using a SQL query or by directly accessing " + "a single table.\n", + jdbcType); + } + + protected String inheritedDescription( + String prettyName, String transformName, String prefix, int port) { + return String.format( + "\n" + + "This is a special case of ReadFromJdbc that includes the " + + "necessary %s Driver and classes.\n" + + "\n" + + "An example of using %s with SQL query: ::\n" + + "\n" + + " - type: %s\n" + + " config:\n" + + " url: \"jdbc:%s://my-host:%d/database\"\n" + + " query: \"SELECT * FROM table\"\n" + + "\n" + + "It is also possible to read a table by specifying a table name. For example, the " + + "following configuration will perform a read on an entire table: ::\n" + + "\n" + + " - type: %s\n" + + " config:\n" + + " url: \"jdbc:%s://my-host:%d/database\"\n" + + " table: \"my-table\"\n" + + "\n" + + "#### Advanced Usage\n" + + "\n" + + "It might be necessary to use a custom JDBC driver that is not packaged with this " + + "transform. If that is the case, see ReadFromJdbc which " + + "allows for more custom configuration.", + prettyName, transformName, transformName, prefix, port, transformName, prefix, port); + } + @Override protected @UnknownKeyFor @NonNull @Initialized Class configurationClass() { return JdbcReadSchemaTransformConfiguration.class; } + protected static void validateConfig(JdbcReadSchemaTransformConfiguration config, String jdbcType) + throws IllegalArgumentException { + if (Strings.isNullOrEmpty(config.getJdbcUrl())) { + throw new IllegalArgumentException("JDBC URL cannot be blank"); + } + + boolean driverClassNamePresent = !Strings.isNullOrEmpty(config.getDriverClassName()); + boolean driverJarsPresent = !Strings.isNullOrEmpty(config.getDriverJars()); + boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); + if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); + } + if (!driverClassNamePresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "One of JDBC Driver class name or JDBC type must be specified."); + } + if (jdbcTypePresent + && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { + throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); + } + + boolean readQueryPresent = (config.getReadQuery() != null && !"".equals(config.getReadQuery())); + boolean locationPresent = (config.getLocation() != null && !"".equals(config.getLocation())); + + if (readQueryPresent && locationPresent) { + throw new IllegalArgumentException("Query and Table are mutually exclusive configurations"); + } + if (!readQueryPresent && !locationPresent) { + throw new IllegalArgumentException("Either Query or Table must be specified."); + } + } + + protected static void validateConfig(JdbcReadSchemaTransformConfiguration config) + throws IllegalArgumentException { + validateConfig(config, config.getJdbcType()); + } + @Override protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcReadSchemaTransformConfiguration configuration) { - configuration.validate(); + validateConfig(configuration); return new JdbcReadSchemaTransform(configuration); } - static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { + protected static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { JdbcReadSchemaTransformConfiguration config; + private String jdbcType; public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { this.config = config; + this.jdbcType = config.getJdbcType(); + } + + public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config, String jdbcType) { + this.config = config; + this.jdbcType = jdbcType; } protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { String driverClassName = config.getDriverClassName(); if (Strings.isNullOrEmpty(driverClassName)) { - driverClassName = - JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase()); + driverClassName = JDBC_DRIVER_MAP.get(Objects.requireNonNull(jdbcType).toLowerCase()); } JdbcIO.DataSourceConfiguration dsConfig = @@ -109,7 +251,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } JdbcIO.ReadRows readRows = JdbcIO.readRows().withDataSourceConfiguration(dataSourceConfiguration()).withQuery(query); - Short fetchSize = config.getFetchSize(); + Integer fetchSize = config.getFetchSize(); if (fetchSize != null && fetchSize > 0) { readRows = readRows.withFetchSize(fetchSize); } @@ -125,11 +267,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:jdbc_read:v1"; - } - @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() { @@ -145,76 +282,66 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class JdbcReadSchemaTransformConfiguration implements Serializable { - @Nullable - public abstract String getDriverClassName(); - - @Nullable - public abstract String getJdbcType(); + @SchemaFieldDescription("Connection URL for the JDBC source.") public abstract String getJdbcUrl(); + @SchemaFieldDescription( + "Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.") @Nullable - public abstract String getUsername(); - - @Nullable - public abstract String getPassword(); + public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> + getConnectionInitSql(); + @SchemaFieldDescription( + "Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be \"key1=value1;key2=value2;\".") @Nullable public abstract String getConnectionProperties(); + @SchemaFieldDescription( + "Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.") @Nullable - public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> - getConnectionInitSql(); + public abstract Boolean getDisableAutoCommit(); + @SchemaFieldDescription( + "Name of a Java Driver class to use to connect to the JDBC source. For example, \"com.mysql.jdbc.Driver\".") @Nullable - public abstract String getReadQuery(); + public abstract String getDriverClassName(); + @SchemaFieldDescription( + "Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.") @Nullable - public abstract String getLocation(); + public abstract String getDriverJars(); + @SchemaFieldDescription( + "This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.") @Nullable - public abstract Short getFetchSize(); + public abstract Integer getFetchSize(); + @SchemaFieldDescription( + "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.") @Nullable - public abstract Boolean getOutputParallelization(); + public abstract String getJdbcType(); + @SchemaFieldDescription("Name of the table to read from.") @Nullable - public abstract Boolean getDisableAutoCommit(); + public abstract String getLocation(); + @SchemaFieldDescription( + "Whether to reshuffle the resulting PCollection so results are distributed to all workers.") @Nullable - public abstract String getDriverJars(); - - public void validate() throws IllegalArgumentException { - if (Strings.isNullOrEmpty(getJdbcUrl())) { - throw new IllegalArgumentException("JDBC URL cannot be blank"); - } + public abstract Boolean getOutputParallelization(); - boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName()); - boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType()); - if (driverClassNamePresent && jdbcTypePresent) { - throw new IllegalArgumentException( - "JDBC Driver class name and JDBC type are mutually exclusive configurations."); - } - if (!driverClassNamePresent && !jdbcTypePresent) { - throw new IllegalArgumentException( - "One of JDBC Driver class name or JDBC type must be specified."); - } - if (jdbcTypePresent - && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase())) { - throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); - } + @SchemaFieldDescription("Password for the JDBC source.") + @Nullable + public abstract String getPassword(); - boolean readQueryPresent = (getReadQuery() != null && !"".equals(getReadQuery())); - boolean locationPresent = (getLocation() != null && !"".equals(getLocation())); + @SchemaFieldDescription("SQL query used to query the JDBC source.") + @Nullable + public abstract String getReadQuery(); - if (readQueryPresent && locationPresent) { - throw new IllegalArgumentException( - "ReadQuery and Location are mutually exclusive configurations"); - } - if (!readQueryPresent && !locationPresent) { - throw new IllegalArgumentException("Either ReadQuery or Location must be set."); - } - } + @SchemaFieldDescription("Username for the JDBC source.") + @Nullable + public abstract String getUsername(); public static Builder builder() { return new AutoValue_JdbcReadSchemaTransformProvider_JdbcReadSchemaTransformConfiguration @@ -241,7 +368,7 @@ public abstract static class Builder { public abstract Builder setConnectionInitSql(List value); - public abstract Builder setFetchSize(Short value); + public abstract Builder setFetchSize(Integer value); public abstract Builder setOutputParallelization(Boolean value); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index c0f7d68899b3..503b64e4a446 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -83,20 +83,25 @@ import org.slf4j.LoggerFactory; /** Provides utility functions for working with {@link JdbcIO}. */ -class JdbcUtil { +public class JdbcUtil { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class); + public static final String MYSQL = "mysql"; + public static final String POSTGRES = "postgres"; + public static final String ORACLE = "oracle"; + public static final String MSSQL = "mssql"; + static final Map JDBC_DRIVER_MAP = new HashMap<>( ImmutableMap.of( - "mysql", + MYSQL, "com.mysql.cj.jdbc.Driver", - "postgres", + POSTGRES, "org.postgresql.Driver", - "oracle", + ORACLE, "oracle.jdbc.driver.OracleDriver", - "mssql", + MSSQL, "com.microsoft.sqlserver.jdbc.SQLServerDriver")); @VisibleForTesting diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 1f970ba0624f..d1437d3ea5e7 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -54,33 +55,177 @@ public class JdbcWriteSchemaTransformProvider extends TypedSchemaTransformProvider< JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> { + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:jdbc_write:v1"; + } + + @Override + public String description() { + return baseDescription("JDBC") + + "\n" + + "This transform can be used to write to a JDBC sink using either a given JDBC driver jar " + + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n" + + "\n" + + "#### Using a default driver\n" + + "\n" + + "This transform comes packaged with drivers for several popular JDBC distributions. The following " + + "distributions can be declared as the `jdbc_type`: " + + JDBC_DRIVER_MAP.keySet().toString().replaceAll("[\\[\\]]", "") + + ".\n" + + "\n" + + "For example, writing to a MySQL sink using a SQL query: ::" + + "\n" + + " - type: WriteToJdbc\n" + + " config:\n" + + " jdbc_type: mysql\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " query: \"INSERT INTO table VALUES(?, ?)\"\n" + + "\n" + + "\n" + + "**Note**: See the following transforms which are built on top of this transform and simplify " + + "this logic for several popular JDBC distributions:\n\n" + + " - WriteToMySql\n" + + " - WriteToPostgres\n" + + " - WriteToOracle\n" + + " - WriteToSqlServer\n" + + "\n" + + "#### Declaring custom JDBC drivers\n" + + "\n" + + "If writing to a JDBC sink not listed above, or if it is necessary to use a custom driver not " + + "packaged with Beam, one must define a JDBC driver and class name.\n" + + "\n" + + "For example, writing to a MySQL table: ::" + + "\n" + + " - type: WriteToJdbc\n" + + " config:\n" + + " driver_jars: \"path/to/some/jdbc.jar\"\n" + + " driver_class_name: \"com.mysql.jdbc.Driver\"\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " table: \"my-table\"\n" + + "\n" + + "#### Connection Properties\n" + + "\n" + + "Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, " + + "to set the character encoding to UTF-8, one could write: ::\n" + + "\n" + + " - type: WriteToJdbc\n" + + " config:\n" + + " connectionProperties: \"characterEncoding=UTF-8;\"\n" + + " ...\n" + + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n"; + } + + protected String baseDescription(String jdbcType) { + return String.format( + "Write to a %s sink using a SQL query or by directly accessing " + "a single table.\n", + jdbcType); + } + + protected String inheritedDescription( + String prettyName, String transformName, String prefix, int port) { + return String.format( + "\n" + + "This is a special case of WriteToJdbc that includes the " + + "necessary %s Driver and classes.\n" + + "\n" + + "An example of using %s with SQL query: ::\n" + + "\n" + + " - type: %s\n" + + " config:\n" + + " url: \"jdbc:%s://my-host:%d/database\"\n" + + " query: \"INSERT INTO table VALUES(?, ?)\"\n" + + "\n" + + "It is also possible to read a table by specifying a table name. For example, the " + + "following configuration will perform a read on an entire table: ::\n" + + "\n" + + " - type: %s\n" + + " config:\n" + + " url: \"jdbc:%s://my-host:%d/database\"\n" + + " table: \"my-table\"\n" + + "\n" + + "#### Advanced Usage\n" + + "\n" + + "It might be necessary to use a custom JDBC driver that is not packaged with this " + + "transform. If that is the case, see WriteToJdbc which " + + "allows for more custom configuration.", + prettyName, transformName, transformName, prefix, port, transformName, prefix, port); + } + @Override protected @UnknownKeyFor @NonNull @Initialized Class configurationClass() { return JdbcWriteSchemaTransformConfiguration.class; } + protected static void validateConfig( + JdbcWriteSchemaTransformConfiguration config, String jdbcType) + throws IllegalArgumentException { + if (Strings.isNullOrEmpty(config.getJdbcUrl())) { + throw new IllegalArgumentException("JDBC URL cannot be blank"); + } + + boolean driverClassNamePresent = !Strings.isNullOrEmpty(config.getDriverClassName()); + boolean driverJarsPresent = !Strings.isNullOrEmpty(config.getDriverJars()); + boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); + if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); + } + if (!driverClassNamePresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "One of JDBC Driver class name or JDBC type must be specified."); + } + if (jdbcTypePresent + && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { + throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); + } + + boolean writeStatementPresent = + (config.getWriteStatement() != null && !"".equals(config.getWriteStatement())); + boolean locationPresent = (config.getLocation() != null && !"".equals(config.getLocation())); + + if (writeStatementPresent && locationPresent) { + throw new IllegalArgumentException( + "Write Statement and Table are mutually exclusive configurations"); + } + if (!writeStatementPresent && !locationPresent) { + throw new IllegalArgumentException("Either Write Statement or Table must be set."); + } + } + + protected static void validateConfig(JdbcWriteSchemaTransformConfiguration config) + throws IllegalArgumentException { + validateConfig(config, config.getJdbcType()); + } + @Override protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcWriteSchemaTransformConfiguration configuration) { - configuration.validate(); + validateConfig(configuration); return new JdbcWriteSchemaTransform(configuration); } - static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { + protected static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { JdbcWriteSchemaTransformConfiguration config; + private String jdbcType; public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { this.config = config; + this.jdbcType = config.getJdbcType(); + } + + public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config, String jdbcType) { + this.config = config; + this.jdbcType = jdbcType; } protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { String driverClassName = config.getDriverClassName(); if (Strings.isNullOrEmpty(driverClassName)) { - driverClassName = - JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase()); + driverClassName = JDBC_DRIVER_MAP.get(Objects.requireNonNull(jdbcType).toLowerCase()); } JdbcIO.DataSourceConfiguration dsConfig = @@ -157,11 +302,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:jdbc_write:v1"; - } - @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() { @@ -178,36 +318,32 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { @DefaultSchema(AutoValueSchema.class) public abstract static class JdbcWriteSchemaTransformConfiguration implements Serializable { - @Nullable - public abstract String getDriverClassName(); - - @Nullable - public abstract String getJdbcType(); - + @SchemaFieldDescription("Connection URL for the JDBC sink.") public abstract String getJdbcUrl(); + @SchemaFieldDescription( + "If true, enables using a dynamically determined number of shards to write.") @Nullable - public abstract String getUsername(); - - @Nullable - public abstract String getPassword(); - - @Nullable - public abstract String getConnectionProperties(); + public abstract Boolean getAutosharding(); + @SchemaFieldDescription( + "Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.") @Nullable public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> getConnectionInitSql(); + @SchemaFieldDescription( + "Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be \"key1=value1;key2=value2;\".") @Nullable - public abstract String getLocation(); - - @Nullable - public abstract String getWriteStatement(); + public abstract String getConnectionProperties(); + @SchemaFieldDescription( + "Name of a Java Driver class to use to connect to the JDBC source. For example, \"com.mysql.jdbc.Driver\".") @Nullable - public abstract Boolean getAutosharding(); + public abstract String getDriverClassName(); + @SchemaFieldDescription( + "Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.") @Nullable public abstract String getDriverJars(); @@ -218,34 +354,26 @@ public void validate() throws IllegalArgumentException { if (Strings.isNullOrEmpty(getJdbcUrl())) { throw new IllegalArgumentException("JDBC URL cannot be blank"); } + @SchemaFieldDescription( + "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.") + @Nullable + public abstract String getJdbcType(); - boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName()); - boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType()); - if (driverClassNamePresent && jdbcTypePresent) { - throw new IllegalArgumentException( - "JDBC Driver class name and JDBC type are mutually exclusive configurations."); - } - if (!driverClassNamePresent && !jdbcTypePresent) { - throw new IllegalArgumentException( - "One of JDBC Driver class name or JDBC type must be specified."); - } - if (jdbcTypePresent - && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase())) { - throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); - } + @SchemaFieldDescription("Name of the table to write to.") + @Nullable + public abstract String getLocation(); - boolean writeStatementPresent = - (getWriteStatement() != null && !"".equals(getWriteStatement())); - boolean locationPresent = (getLocation() != null && !"".equals(getLocation())); + @SchemaFieldDescription("Password for the JDBC source.") + @Nullable + public abstract String getPassword(); - if (writeStatementPresent && locationPresent) { - throw new IllegalArgumentException( - "ReadQuery and Location are mutually exclusive configurations"); - } - if (!writeStatementPresent && !locationPresent) { - throw new IllegalArgumentException("Either ReadQuery or Location must be set."); - } - } + @SchemaFieldDescription("Username for the JDBC source.") + @Nullable + public abstract String getUsername(); + + @SchemaFieldDescription("SQL query used to insert records into the JDBC sink.") + @Nullable + public abstract String getWriteStatement(); public static Builder builder() { return new AutoValue_JdbcWriteSchemaTransformProvider_JdbcWriteSchemaTransformConfiguration diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java new file mode 100644 index 000000000000..a90929e08278 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromMySqlSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:mysql_read:v1"; + } + + @Override + public String description() { + return baseDescription("MySQL") + inheritedDescription("MySQL", "ReadFromMySql", "mysql", 3306); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + validateConfig(configuration, MYSQL); + return new JdbcReadSchemaTransform(configuration, MYSQL); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java new file mode 100644 index 000000000000..008dba41ae04 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.ORACLE; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromOracleSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:oracle_read:v1"; + } + + @Override + public String description() { + return baseDescription("Oracle") + + inheritedDescription("Oracle", "ReadFromOracle", "oracle", 1521); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + validateConfig(configuration, ORACLE); + return new JdbcReadSchemaTransform(configuration, ORACLE); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java new file mode 100644 index 000000000000..773386db40c3 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:postgres_read:v1"; + } + + @Override + public String description() { + return baseDescription("PostgreSQL") + + inheritedDescription("Postgres", "ReadFromPostgres", "postgresql", 5432); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + validateConfig(configuration, POSTGRES); + return new JdbcReadSchemaTransform(configuration, POSTGRES); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java new file mode 100644 index 000000000000..a0afbf28c3d0 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromSqlServerSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:sql_server_read:v1"; + } + + @Override + public String description() { + return baseDescription("Sql Server (Microsoft SQL)") + + inheritedDescription("SQL Server", "ReadFromSqlServer", "sqlserver", 1433); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + validateConfig(configuration, MSSQL); + return new JdbcReadSchemaTransform(configuration, MSSQL); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java new file mode 100644 index 000000000000..ceab4461d8fa --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:mysql_write:v1"; + } + + @Override + public String description() { + return baseDescription("MySQL") + inheritedDescription("MySQL", "WriteToMySql", "mysql", 3306); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + validateConfig(configuration, MYSQL); + return new JdbcWriteSchemaTransform(configuration, MYSQL); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java new file mode 100644 index 000000000000..55e4a2d8b3de --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.ORACLE; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToOracleSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:oracle_write:v1"; + } + + @Override + public String description() { + return baseDescription("Oracle") + + inheritedDescription("Oracle", "WriteToOracle", "oracle", 1521); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + validateConfig(configuration, ORACLE); + return new JdbcWriteSchemaTransform(configuration, ORACLE); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java new file mode 100644 index 000000000000..6ac4f6f43efd --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:postgres_write:v1"; + } + + @Override + public String description() { + return baseDescription("PostgreSQL") + + inheritedDescription("Postgres", "WriteToPostgres", "postgresql", 5432); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + validateConfig(configuration, POSTGRES); + return new JdbcWriteSchemaTransform(configuration, POSTGRES); + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java new file mode 100644 index 000000000000..c9166c1ea549 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToSqlServerSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:sql_server_write:v1"; + } + + @Override + public String description() { + return baseDescription("Sql Server (Microsoft SQL)") + + inheritedDescription("SQL Server", "WriteToSqlServer", "sqlserver", 1433); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + validateConfig(configuration, MSSQL); + return new JdbcWriteSchemaTransform(configuration, MSSQL); + } +} diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java index 7cbdd48d1587..931c7f248bbf 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java @@ -77,73 +77,73 @@ public void testInvalidReadSchemaOptions() { assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setDriverClassName("") - .setJdbcUrl("") - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName("") + .setJdbcUrl("") + .build()); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setDriverClassName("ClassName") - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setReadQuery("Query") - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName("ClassName") + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setReadQuery("Query") + .build()); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setDriverClassName("ClassName") - .setJdbcUrl("JdbcUrl") - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName("ClassName") + .setJdbcUrl("JdbcUrl") + .build()); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setJdbcType("invalidType") - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setJdbcType("invalidType") + .build()); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .build()); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setDriverClassName("ClassName") - .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setDriverClassName("ClassName") + .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) + .build()); }); } @Test public void testValidReadSchemaOptions() { for (String jdbcType : JDBC_DRIVER_MAP.keySet()) { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setJdbcType(jdbcType) - .build() - .validate(); + JdbcReadSchemaTransformProvider.validateConfig( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setJdbcType(jdbcType) + .build()); } } diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index fe5727f3ef92..a49f67533e76 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -194,12 +194,28 @@ def io_grouping_key(transform_name): SKIP = {} +def add_transform_links(transform, description, provider_list): + """ + Convert references of Providers to urls that link to their respective pages. + Avoid self-linking within a Transform page. + """ + for p in provider_list: + description = re.sub( + rf"(?{p}', + description or '') + return description + + def transform_docs(transform_base, transforms, providers, extra_docs=''): return '\n'.join([ f'## {transform_base}', '', longest( - lambda t: longest(lambda p: p.description(t), providers[t]), + lambda t: longest( + lambda p: add_transform_links( + t, p.description(t), providers.keys()), + providers[t]), transforms).replace('::\n', '\n\n :::yaml\n'), '', extra_docs, diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 305e6877ad90..68081fb41929 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -194,38 +194,43 @@ transforms: 'ReadFromJdbc': 'ReadFromJdbc' 'WriteToJdbc': 'WriteToJdbc' - 'ReadFromMySql': 'ReadFromJdbc' - 'WriteToMySql': 'WriteToJdbc' - 'ReadFromPostgres': 'ReadFromJdbc' - 'WriteToPostgres': 'WriteToJdbc' - 'ReadFromOracle': 'ReadFromJdbc' - 'WriteToOracle': 'WriteToJdbc' - 'ReadFromSqlServer': 'ReadFromJdbc' - 'WriteToSqlServer': 'WriteToJdbc' + 'ReadFromMySql': 'ReadFromMySql' + 'WriteToMySql': 'WriteToMySql' + 'ReadFromPostgres': 'ReadFromPostgres' + 'WriteToPostgres': 'WriteToPostgres' + 'ReadFromOracle': 'ReadFromOracle' + 'WriteToOracle': 'WriteToOracle' + 'ReadFromSqlServer': 'ReadFromSqlServer' + 'WriteToSqlServer': 'WriteToSqlServer' config: mappings: 'ReadFromJdbc': - driver_class_name: 'driver_class_name' - type: 'jdbc_type' url: 'jdbc_url' - username: 'username' - password: 'password' - table: 'location' - query: 'read_query' - driver_jars: 'driver_jars' - connection_properties: 'connection_properties' connection_init_sql: 'connection_init_sql' - 'WriteToJdbc': + connection_properties: 'connection_properties' + disable_auto_commit: 'disable_auto_commit' driver_class_name: 'driver_class_name' + driver_jars: 'driver_jars' + fetch_size: 'fetch_size' + output_parallelization: 'output_parallelization' + password: 'password' + query: 'read_query' + table: 'location' type: 'jdbc_type' - url: 'jdbc_url' username: 'username' + 'WriteToJdbc': + url: 'jdbc_url' + auto_sharding: 'autosharding' + connection_init_sql: 'connection_init_sql' + connection_properties: 'connection_properties' + driver_class_name: 'driver_class_name' + driver_jars: 'driver_jars' password: 'password' table: 'location' - driver_jars: 'driver_jars' - connection_properties: 'connection_properties' - connection_init_sql: 'connection_init_sql' batch_size: 'batch_size' + type: 'jdbc_type' + username: 'username' + query: 'write_statement' 'ReadFromMySql': 'ReadFromJdbc' 'WriteToMySql': 'WriteToJdbc' 'ReadFromPostgres': 'ReadFromJdbc' @@ -236,26 +241,48 @@ 'WriteToSqlServer': 'WriteToJdbc' defaults: 'ReadFromMySql': - jdbc_type: 'mysql' + driver_class_name: '' + driver_jars: '' 'WriteToMySql': - jdbc_type: 'mysql' + driver_class_name: '' + driver_jars: '' 'ReadFromPostgres': - jdbc_type: 'postgres' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' 'WriteToPostgres': - jdbc_type: 'postgres' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' 'ReadFromOracle': - jdbc_type: 'oracle' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' 'WriteToOracle': - jdbc_type: 'oracle' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' 'ReadFromSqlServer': - jdbc_type: 'mssql' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' 'WriteToSqlServer': - jdbc_type: 'mssql' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' underlying_provider: type: beamJar transforms: 'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1' + 'ReadFromMySql': 'beam:schematransform:org.apache.beam:mysql_read:v1' + 'ReadFromPostgres': 'beam:schematransform:org.apache.beam:postgres_read:v1' + 'ReadFromOracle': 'beam:schematransform:org.apache.beam:oracle_read:v1' + 'ReadFromSqlServer': 'beam:schematransform:org.apache.beam:sql_server_read:v1' 'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1' + 'WriteToMySql': 'beam:schematransform:org.apache.beam:mysql_write:v1' + 'WriteToPostgres': 'beam:schematransform:org.apache.beam:postgres_write:v1' + 'WriteToOracle': 'beam:schematransform:org.apache.beam:oracle_write:v1' + 'WriteToSqlServer': 'beam:schematransform:org.apache.beam:sql_server_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' From ac6a59bd615b0c642bda629092130525b98c7c02 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 20 Nov 2024 16:53:24 -0500 Subject: [PATCH 2/6] Address comments Signed-off-by: Jeffrey Kinard --- .../jdbc/JdbcReadSchemaTransformProvider.java | 104 ++++++++++-------- .../JdbcWriteSchemaTransformProvider.java | 96 ++++++++-------- .../ReadFromMySqlSchemaTransformProvider.java | 7 +- ...ReadFromOracleSchemaTransformProvider.java | 7 +- ...adFromPostgresSchemaTransformProvider.java | 7 +- ...dFromSqlServerSchemaTransformProvider.java | 7 +- .../WriteToMySqlSchemaTransformProvider.java | 7 +- .../WriteToOracleSchemaTransformProvider.java | 7 +- ...riteToPostgresSchemaTransformProvider.java | 7 +- ...iteToSqlServerSchemaTransformProvider.java | 7 +- .../JdbcReadSchemaTransformProviderTest.java | 82 +++++++------- .../apache_beam/yaml/generate_yaml_docs.py | 18 +++ sdks/python/apache_beam/yaml/standard_io.yaml | 8 ++ 13 files changed, 189 insertions(+), 175 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 4056b2518ac1..eeea9e8a4745 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -118,7 +118,7 @@ protected String baseDescription(String jdbcType) { } protected String inheritedDescription( - String prettyName, String transformName, String prefix, int port) { + String prettyName, String transformName, String databaseSchema, int defaultJdbcPort) { return String.format( "\n" + "This is a special case of ReadFromJdbc that includes the " @@ -144,7 +144,14 @@ protected String inheritedDescription( + "It might be necessary to use a custom JDBC driver that is not packaged with this " + "transform. If that is the case, see ReadFromJdbc which " + "allows for more custom configuration.", - prettyName, transformName, transformName, prefix, port, transformName, prefix, port); + prettyName, + transformName, + transformName, + databaseSchema, + defaultJdbcPort, + transformName, + databaseSchema, + defaultJdbcPort); } @Override @@ -153,60 +160,21 @@ protected String inheritedDescription( return JdbcReadSchemaTransformConfiguration.class; } - protected static void validateConfig(JdbcReadSchemaTransformConfiguration config, String jdbcType) - throws IllegalArgumentException { - if (Strings.isNullOrEmpty(config.getJdbcUrl())) { - throw new IllegalArgumentException("JDBC URL cannot be blank"); - } - - boolean driverClassNamePresent = !Strings.isNullOrEmpty(config.getDriverClassName()); - boolean driverJarsPresent = !Strings.isNullOrEmpty(config.getDriverJars()); - boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); - if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { - throw new IllegalArgumentException( - "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); - } - if (!driverClassNamePresent && !jdbcTypePresent) { - throw new IllegalArgumentException( - "One of JDBC Driver class name or JDBC type must be specified."); - } - if (jdbcTypePresent - && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { - throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); - } - - boolean readQueryPresent = (config.getReadQuery() != null && !"".equals(config.getReadQuery())); - boolean locationPresent = (config.getLocation() != null && !"".equals(config.getLocation())); - - if (readQueryPresent && locationPresent) { - throw new IllegalArgumentException("Query and Table are mutually exclusive configurations"); - } - if (!readQueryPresent && !locationPresent) { - throw new IllegalArgumentException("Either Query or Table must be specified."); - } - } - - protected static void validateConfig(JdbcReadSchemaTransformConfiguration config) - throws IllegalArgumentException { - validateConfig(config, config.getJdbcType()); + protected String jdbcType() { + return ""; } @Override protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcReadSchemaTransformConfiguration configuration) { - validateConfig(configuration); - return new JdbcReadSchemaTransform(configuration); + configuration.validate(jdbcType()); + return new JdbcReadSchemaTransform(configuration, jdbcType()); } protected static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { JdbcReadSchemaTransformConfiguration config; - private String jdbcType; - - public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { - this.config = config; - this.jdbcType = config.getJdbcType(); - } + private final String jdbcType; public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config, String jdbcType) { this.config = config; @@ -217,7 +185,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { String driverClassName = config.getDriverClassName(); if (Strings.isNullOrEmpty(driverClassName)) { - driverClassName = JDBC_DRIVER_MAP.get(Objects.requireNonNull(jdbcType).toLowerCase()); + driverClassName = + JDBC_DRIVER_MAP.get( + (Objects.requireNonNull( + !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType())) + .toLowerCase()); } JdbcIO.DataSourceConfiguration dsConfig = @@ -343,6 +315,44 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser @Nullable public abstract String getUsername(); + public void validate() { + validate(""); + } + + public void validate(String jdbcType) throws IllegalArgumentException { + if (Strings.isNullOrEmpty(getJdbcUrl())) { + throw new IllegalArgumentException("JDBC URL cannot be blank"); + } + + jdbcType = !Strings.isNullOrEmpty(jdbcType) ? jdbcType : getJdbcType(); + + boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName()); + boolean driverJarsPresent = !Strings.isNullOrEmpty(getDriverJars()); + boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); + if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); + } + if (!driverClassNamePresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "One of JDBC Driver class name or JDBC type must be specified."); + } + if (jdbcTypePresent + && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { + throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); + } + + boolean readQueryPresent = (getReadQuery() != null && !"".equals(getReadQuery())); + boolean locationPresent = (getLocation() != null && !"".equals(getLocation())); + + if (readQueryPresent && locationPresent) { + throw new IllegalArgumentException("Query and Table are mutually exclusive configurations"); + } + if (!readQueryPresent && !locationPresent) { + throw new IllegalArgumentException("Either Query or Table must be specified."); + } + } + public static Builder builder() { return new AutoValue_JdbcReadSchemaTransformProvider_JdbcReadSchemaTransformConfiguration .Builder(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index d1437d3ea5e7..8e8692f07ae5 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -158,52 +158,15 @@ protected String inheritedDescription( return JdbcWriteSchemaTransformConfiguration.class; } - protected static void validateConfig( - JdbcWriteSchemaTransformConfiguration config, String jdbcType) - throws IllegalArgumentException { - if (Strings.isNullOrEmpty(config.getJdbcUrl())) { - throw new IllegalArgumentException("JDBC URL cannot be blank"); - } - - boolean driverClassNamePresent = !Strings.isNullOrEmpty(config.getDriverClassName()); - boolean driverJarsPresent = !Strings.isNullOrEmpty(config.getDriverJars()); - boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); - if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { - throw new IllegalArgumentException( - "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); - } - if (!driverClassNamePresent && !jdbcTypePresent) { - throw new IllegalArgumentException( - "One of JDBC Driver class name or JDBC type must be specified."); - } - if (jdbcTypePresent - && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { - throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); - } - - boolean writeStatementPresent = - (config.getWriteStatement() != null && !"".equals(config.getWriteStatement())); - boolean locationPresent = (config.getLocation() != null && !"".equals(config.getLocation())); - - if (writeStatementPresent && locationPresent) { - throw new IllegalArgumentException( - "Write Statement and Table are mutually exclusive configurations"); - } - if (!writeStatementPresent && !locationPresent) { - throw new IllegalArgumentException("Either Write Statement or Table must be set."); - } - } - - protected static void validateConfig(JdbcWriteSchemaTransformConfiguration config) - throws IllegalArgumentException { - validateConfig(config, config.getJdbcType()); + protected String jdbcType() { + return ""; } @Override protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcWriteSchemaTransformConfiguration configuration) { - validateConfig(configuration); - return new JdbcWriteSchemaTransform(configuration); + configuration.validate(jdbcType()); + return new JdbcWriteSchemaTransform(configuration, jdbcType()); } protected static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { @@ -211,11 +174,6 @@ protected static class JdbcWriteSchemaTransform extends SchemaTransform implemen JdbcWriteSchemaTransformConfiguration config; private String jdbcType; - public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { - this.config = config; - this.jdbcType = config.getJdbcType(); - } - public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config, String jdbcType) { this.config = config; this.jdbcType = jdbcType; @@ -225,7 +183,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { String driverClassName = config.getDriverClassName(); if (Strings.isNullOrEmpty(driverClassName)) { - driverClassName = JDBC_DRIVER_MAP.get(Objects.requireNonNull(jdbcType).toLowerCase()); + driverClassName = + JDBC_DRIVER_MAP.get( + (Objects.requireNonNull( + !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType())) + .toLowerCase()); } JdbcIO.DataSourceConfiguration dsConfig = @@ -375,6 +337,46 @@ public void validate() throws IllegalArgumentException { @Nullable public abstract String getWriteStatement(); + public void validate() { + validate("JDBC"); + } + + public void validate(String jdbcType) throws IllegalArgumentException { + if (Strings.isNullOrEmpty(getJdbcUrl())) { + throw new IllegalArgumentException("JDBC URL cannot be blank"); + } + + jdbcType = !Strings.isNullOrEmpty(jdbcType) ? jdbcType : getJdbcType(); + + boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName()); + boolean driverJarsPresent = !Strings.isNullOrEmpty(getDriverJars()); + boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); + if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); + } + if (!driverClassNamePresent && !jdbcTypePresent) { + throw new IllegalArgumentException( + "One of JDBC Driver class name or JDBC type must be specified."); + } + if (jdbcTypePresent + && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { + throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); + } + + boolean writeStatementPresent = + (getWriteStatement() != null && !"".equals(getWriteStatement())); + boolean locationPresent = (getLocation() != null && !"".equals(getLocation())); + + if (writeStatementPresent && locationPresent) { + throw new IllegalArgumentException( + "Write Statement and Table are mutually exclusive configurations"); + } + if (!writeStatementPresent && !locationPresent) { + throw new IllegalArgumentException("Either Write Statement or Table must be set."); + } + } + public static Builder builder() { return new AutoValue_JdbcWriteSchemaTransformProvider_JdbcWriteSchemaTransformConfiguration .Builder(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index a90929e08278..19c588f5049b 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -41,9 +40,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcReadSchemaTransformConfiguration configuration) { - validateConfig(configuration, MYSQL); - return new JdbcReadSchemaTransform(configuration, MYSQL); + protected String jdbcType() { + return MYSQL; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java index 008dba41ae04..a3e99aede3e8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -42,9 +41,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcReadSchemaTransformConfiguration configuration) { - validateConfig(configuration, ORACLE); - return new JdbcReadSchemaTransform(configuration, ORACLE); + protected String jdbcType() { + return ORACLE; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java index 773386db40c3..82aa50e6e5fc 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -42,9 +41,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcReadSchemaTransformConfiguration configuration) { - validateConfig(configuration, POSTGRES); - return new JdbcReadSchemaTransform(configuration, POSTGRES); + protected String jdbcType() { + return POSTGRES; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java index a0afbf28c3d0..bc9e95fc7881 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -42,9 +41,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcReadSchemaTransformConfiguration configuration) { - validateConfig(configuration, MSSQL); - return new JdbcReadSchemaTransform(configuration, MSSQL); + protected String jdbcType() { + return MSSQL; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index ceab4461d8fa..3ed71e8164d1 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -41,9 +40,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcWriteSchemaTransformConfiguration configuration) { - validateConfig(configuration, MYSQL); - return new JdbcWriteSchemaTransform(configuration, MYSQL); + protected String jdbcType() { + return MYSQL; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java index 55e4a2d8b3de..fc39110bc102 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -42,9 +41,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcWriteSchemaTransformConfiguration configuration) { - validateConfig(configuration, ORACLE); - return new JdbcWriteSchemaTransform(configuration, ORACLE); + protected String jdbcType() { + return ORACLE; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java index 6ac4f6f43efd..e15d2794490a 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -42,9 +41,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcWriteSchemaTransformConfiguration configuration) { - validateConfig(configuration, POSTGRES); - return new JdbcWriteSchemaTransform(configuration, POSTGRES); + protected String jdbcType() { + return POSTGRES; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java index c9166c1ea549..4c9566cdf58b 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -42,9 +41,7 @@ public String description() { } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcWriteSchemaTransformConfiguration configuration) { - validateConfig(configuration, MSSQL); - return new JdbcWriteSchemaTransform(configuration, MSSQL); + protected String jdbcType() { + return MSSQL; } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java index 931c7f248bbf..7cbdd48d1587 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java @@ -77,73 +77,73 @@ public void testInvalidReadSchemaOptions() { assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setDriverClassName("") - .setJdbcUrl("") - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName("") + .setJdbcUrl("") + .build() + .validate(); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setDriverClassName("ClassName") - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setReadQuery("Query") - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName("ClassName") + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setReadQuery("Query") + .build() + .validate(); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setDriverClassName("ClassName") - .setJdbcUrl("JdbcUrl") - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName("ClassName") + .setJdbcUrl("JdbcUrl") + .build() + .validate(); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setJdbcType("invalidType") - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setJdbcType("invalidType") + .build() + .validate(); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .build() + .validate(); }); assertThrows( IllegalArgumentException.class, () -> { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setDriverClassName("ClassName") - .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setDriverClassName("ClassName") + .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) + .build() + .validate(); }); } @Test public void testValidReadSchemaOptions() { for (String jdbcType : JDBC_DRIVER_MAP.keySet()) { - JdbcReadSchemaTransformProvider.validateConfig( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setJdbcType(jdbcType) - .build()); + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl("JdbcUrl") + .setLocation("Location") + .setJdbcType(jdbcType) + .build() + .validate(); } } diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index a49f67533e76..076d26e2e623 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -197,9 +197,27 @@ def io_grouping_key(transform_name): def add_transform_links(transform, description, provider_list): """ Convert references of Providers to urls that link to their respective pages. + + For example, + "Some description talking about MyTransform." + would be converted to + "Some description talking about MyTransform" + + meanwhile, + ``` + - type: MyTransform + config: + ... + ``` + Would remain unchanged. + Avoid self-linking within a Transform page. """ for p in provider_list: + # Match all instances of built-in transforms within the description + # excluding the transform whose description is currently being evaluated. + # Match the entire word boundary so that partial matches do not count. + # (i.e. OtherTransform should not match Transform) description = re.sub( rf"(?{p}', diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 68081fb41929..a21782bdc603 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -243,33 +243,41 @@ 'ReadFromMySql': driver_class_name: '' driver_jars: '' + jdbc_type: '' 'WriteToMySql': driver_class_name: '' driver_jars: '' + jdbc_type: '' 'ReadFromPostgres': connection_init_sql: '' driver_class_name: '' driver_jars: '' + jdbc_type: '' 'WriteToPostgres': connection_init_sql: '' driver_class_name: '' driver_jars: '' + jdbc_type: '' 'ReadFromOracle': connection_init_sql: '' driver_class_name: '' driver_jars: '' + jdbc_type: '' 'WriteToOracle': connection_init_sql: '' driver_class_name: '' driver_jars: '' + jdbc_type: '' 'ReadFromSqlServer': connection_init_sql: '' driver_class_name: '' driver_jars: '' + jdbc_type: '' 'WriteToSqlServer': connection_init_sql: '' driver_class_name: '' driver_jars: '' + jdbc_type: '' underlying_provider: type: beamJar transforms: From 4cfc96bfa15859fd116ee1b4b2cbbee77b530688 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 27 Nov 2024 14:27:44 -0500 Subject: [PATCH 3/6] remove baseDescription Signed-off-by: Jeffrey Kinard --- .../jdbc/JdbcReadSchemaTransformProvider.java | 12 ++++------- .../JdbcWriteSchemaTransformProvider.java | 21 +++++++++++-------- .../ReadFromMySqlSchemaTransformProvider.java | 2 +- ...ReadFromOracleSchemaTransformProvider.java | 3 +-- ...adFromPostgresSchemaTransformProvider.java | 3 +-- ...dFromSqlServerSchemaTransformProvider.java | 3 +-- .../WriteToMySqlSchemaTransformProvider.java | 2 +- .../WriteToOracleSchemaTransformProvider.java | 3 +-- ...riteToPostgresSchemaTransformProvider.java | 3 +-- ...iteToSqlServerSchemaTransformProvider.java | 3 +-- 10 files changed, 24 insertions(+), 31 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index eeea9e8a4745..1be672cb9615 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -57,7 +57,7 @@ public class JdbcReadSchemaTransformProvider @Override public String description() { - return baseDescription("JDBC") + return "Read from a JDBC source using a SQL query or by directly accessing a single table.\n" + "\n" + "This transform can be used to read from a JDBC source using either a given JDBC driver jar " + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n" @@ -111,16 +111,11 @@ public String description() { + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n"; } - protected String baseDescription(String jdbcType) { - return String.format( - "Read from a %s source using a SQL query or by directly accessing " + "a single table.\n", - jdbcType); - } - protected String inheritedDescription( String prettyName, String transformName, String databaseSchema, int defaultJdbcPort) { return String.format( - "\n" + "Read from a %s source using a SQL query or by directly accessing a single table.\n" + + "\n" + "This is a special case of ReadFromJdbc that includes the " + "necessary %s Driver and classes.\n" + "\n" @@ -145,6 +140,7 @@ protected String inheritedDescription( + "transform. If that is the case, see ReadFromJdbc which " + "allows for more custom configuration.", prettyName, + prettyName, transformName, transformName, databaseSchema, diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 8e8692f07ae5..a61fa74b4ce5 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -62,7 +62,7 @@ public class JdbcWriteSchemaTransformProvider @Override public String description() { - return baseDescription("JDBC") + return "Write to a JDBC sink using a SQL query or by directly accessing a single table.\n" + "\n" + "This transform can be used to write to a JDBC sink using either a given JDBC driver jar " + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n" @@ -116,16 +116,11 @@ public String description() { + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n"; } - protected String baseDescription(String jdbcType) { - return String.format( - "Write to a %s sink using a SQL query or by directly accessing " + "a single table.\n", - jdbcType); - } - protected String inheritedDescription( String prettyName, String transformName, String prefix, int port) { return String.format( - "\n" + "Write to a %s sink using a SQL query or by directly accessing a single table.\n" + + "\n" + "This is a special case of WriteToJdbc that includes the " + "necessary %s Driver and classes.\n" + "\n" @@ -149,7 +144,15 @@ protected String inheritedDescription( + "It might be necessary to use a custom JDBC driver that is not packaged with this " + "transform. If that is the case, see WriteToJdbc which " + "allows for more custom configuration.", - prettyName, transformName, transformName, prefix, port, transformName, prefix, port); + prettyName, + prettyName, + transformName, + transformName, + prefix, + port, + transformName, + prefix, + port); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index 19c588f5049b..3d0135ef8ecd 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -36,7 +36,7 @@ public class ReadFromMySqlSchemaTransformProvider extends JdbcReadSchemaTransfor @Override public String description() { - return baseDescription("MySQL") + inheritedDescription("MySQL", "ReadFromMySql", "mysql", 3306); + return inheritedDescription("MySQL", "ReadFromMySql", "mysql", 3306); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java index a3e99aede3e8..de18d5aa8189 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java @@ -36,8 +36,7 @@ public class ReadFromOracleSchemaTransformProvider extends JdbcReadSchemaTransfo @Override public String description() { - return baseDescription("Oracle") - + inheritedDescription("Oracle", "ReadFromOracle", "oracle", 1521); + return inheritedDescription("Oracle", "ReadFromOracle", "oracle", 1521); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java index 82aa50e6e5fc..62ff14c23e0a 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java @@ -36,8 +36,7 @@ public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTrans @Override public String description() { - return baseDescription("PostgreSQL") - + inheritedDescription("Postgres", "ReadFromPostgres", "postgresql", 5432); + return inheritedDescription("Postgres", "ReadFromPostgres", "postgresql", 5432); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java index bc9e95fc7881..e4767177bb2f 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java @@ -36,8 +36,7 @@ public class ReadFromSqlServerSchemaTransformProvider extends JdbcReadSchemaTran @Override public String description() { - return baseDescription("Sql Server (Microsoft SQL)") - + inheritedDescription("SQL Server", "ReadFromSqlServer", "sqlserver", 1433); + return inheritedDescription("SQL Server", "ReadFromSqlServer", "sqlserver", 1433); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index 3ed71e8164d1..57f085220162 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -36,7 +36,7 @@ public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransfor @Override public String description() { - return baseDescription("MySQL") + inheritedDescription("MySQL", "WriteToMySql", "mysql", 3306); + return inheritedDescription("MySQL", "WriteToMySql", "mysql", 3306); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java index fc39110bc102..5b3ae2c35e9d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java @@ -36,8 +36,7 @@ public class WriteToOracleSchemaTransformProvider extends JdbcWriteSchemaTransfo @Override public String description() { - return baseDescription("Oracle") - + inheritedDescription("Oracle", "WriteToOracle", "oracle", 1521); + return inheritedDescription("Oracle", "WriteToOracle", "oracle", 1521); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java index e15d2794490a..c50b84311630 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java @@ -36,8 +36,7 @@ public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTrans @Override public String description() { - return baseDescription("PostgreSQL") - + inheritedDescription("Postgres", "WriteToPostgres", "postgresql", 5432); + return inheritedDescription("Postgres", "WriteToPostgres", "postgresql", 5432); } @Override diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java index 4c9566cdf58b..9e849f4e49e2 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java @@ -36,8 +36,7 @@ public class WriteToSqlServerSchemaTransformProvider extends JdbcWriteSchemaTran @Override public String description() { - return baseDescription("Sql Server (Microsoft SQL)") - + inheritedDescription("SQL Server", "WriteToSqlServer", "sqlserver", 1433); + return inheritedDescription("SQL Server", "WriteToSqlServer", "sqlserver", 1433); } @Override From c93df87f397e6dd09e4f1629a9a0d38c873400a7 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 27 Nov 2024 18:40:35 -0500 Subject: [PATCH 4/6] fix spotless Signed-off-by: Jeffrey Kinard --- .../JdbcWriteSchemaTransformProvider.java | 4 ---- .../sdk/io/jdbc/providers/package-info.java | 20 +++++++++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index a61fa74b4ce5..64e9a44abbbe 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -315,10 +315,6 @@ public abstract static class JdbcWriteSchemaTransformConfiguration implements Se @Nullable public abstract Long getBatchSize(); - public void validate() throws IllegalArgumentException { - if (Strings.isNullOrEmpty(getJdbcUrl())) { - throw new IllegalArgumentException("JDBC URL cannot be blank"); - } @SchemaFieldDescription( "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.") @Nullable diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java new file mode 100644 index 000000000000..db5bba936596 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing from JDBC. */ +package org.apache.beam.sdk.io.jdbc.providers; From bb28a32bdf0d33ead10ab96e85d8043e42d4f403 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 4 Dec 2024 11:27:07 -0500 Subject: [PATCH 5/6] fix failing tests Signed-off-by: Jeffrey Kinard --- .../io/jdbc/JdbcWriteSchemaTransformProvider.java | 5 +++-- .../io/jdbc/JdbcReadSchemaTransformProviderTest.java | 11 ----------- .../jdbc/JdbcWriteSchemaTransformProviderTest.java | 11 ----------- sdks/python/apache_beam/yaml/generate_yaml_docs.py | 12 ++++++------ 4 files changed, 9 insertions(+), 30 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 64e9a44abbbe..9d4803addf31 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -337,7 +337,7 @@ public abstract static class JdbcWriteSchemaTransformConfiguration implements Se public abstract String getWriteStatement(); public void validate() { - validate("JDBC"); + validate(""); } public void validate(String jdbcType) throws IllegalArgumentException { @@ -360,7 +360,8 @@ public void validate(String jdbcType) throws IllegalArgumentException { } if (jdbcTypePresent && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { - throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); + throw new IllegalArgumentException( + "JDBC type must be one of " + JDBC_DRIVER_MAP.keySet() + " but was " + jdbcType); } boolean writeStatementPresent = diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java index 7cbdd48d1587..ca7690ac9a08 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java @@ -122,17 +122,6 @@ public void testInvalidReadSchemaOptions() { .build() .validate(); }); - assertThrows( - IllegalArgumentException.class, - () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setDriverClassName("ClassName") - .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) - .build() - .validate(); - }); } @Test diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java index d6be4d9f89c8..a8d9162f3a8e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java @@ -122,17 +122,6 @@ public void testInvalidWriteSchemaOptions() { .build() .validate(); }); - assertThrows( - IllegalArgumentException.class, - () -> { - JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setDriverClassName("ClassName") - .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) - .build() - .validate(); - }); } @Test diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index 076d26e2e623..693df6179a2d 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -203,12 +203,12 @@ def add_transform_links(transform, description, provider_list): would be converted to "Some description talking about MyTransform" - meanwhile, - ``` - - type: MyTransform - config: - ... - ``` + meanwhile:: + + type: MyTransform + config: + ... + Would remain unchanged. Avoid self-linking within a Transform page. From 1bfa3443ad6360e9fbe9f099d061c5557dce018d Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Thu, 26 Dec 2024 15:11:27 -0600 Subject: [PATCH 6/6] remove hikari testImplementation for JDBCIO Signed-off-by: Jeffrey Kinard --- .../jdbc/JdbcReadSchemaTransformProvider.java | 40 +++++++++---------- .../JdbcWriteSchemaTransformProvider.java | 40 +++++++++---------- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 1be672cb9615..b4765f0392c1 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -114,28 +114,28 @@ public String description() { protected String inheritedDescription( String prettyName, String transformName, String databaseSchema, int defaultJdbcPort) { return String.format( - "Read from a %s source using a SQL query or by directly accessing a single table.\n" - + "\n" + "Read from a %s source using a SQL query or by directly accessing a single table.%n" + + "%n" + "This is a special case of ReadFromJdbc that includes the " - + "necessary %s Driver and classes.\n" - + "\n" - + "An example of using %s with SQL query: ::\n" - + "\n" - + " - type: %s\n" - + " config:\n" - + " url: \"jdbc:%s://my-host:%d/database\"\n" - + " query: \"SELECT * FROM table\"\n" - + "\n" + + "necessary %s Driver and classes.%n" + + "%n" + + "An example of using %s with SQL query: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " query: \"SELECT * FROM table\"%n" + + "%n" + "It is also possible to read a table by specifying a table name. For example, the " - + "following configuration will perform a read on an entire table: ::\n" - + "\n" - + " - type: %s\n" - + " config:\n" - + " url: \"jdbc:%s://my-host:%d/database\"\n" - + " table: \"my-table\"\n" - + "\n" - + "#### Advanced Usage\n" - + "\n" + + "following configuration will perform a read on an entire table: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " table: \"my-table\"%n" + + "%n" + + "#### Advanced Usage%n" + + "%n" + "It might be necessary to use a custom JDBC driver that is not packaged with this " + "transform. If that is the case, see ReadFromJdbc which " + "allows for more custom configuration.", diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 9d4803addf31..6f10df56aab5 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -119,28 +119,28 @@ public String description() { protected String inheritedDescription( String prettyName, String transformName, String prefix, int port) { return String.format( - "Write to a %s sink using a SQL query or by directly accessing a single table.\n" - + "\n" + "Write to a %s sink using a SQL query or by directly accessing a single table.%n" + + "%n" + "This is a special case of WriteToJdbc that includes the " - + "necessary %s Driver and classes.\n" - + "\n" - + "An example of using %s with SQL query: ::\n" - + "\n" - + " - type: %s\n" - + " config:\n" - + " url: \"jdbc:%s://my-host:%d/database\"\n" - + " query: \"INSERT INTO table VALUES(?, ?)\"\n" - + "\n" + + "necessary %s Driver and classes.%n" + + "%n" + + "An example of using %s with SQL query: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " query: \"INSERT INTO table VALUES(?, ?)\"%n" + + "%n" + "It is also possible to read a table by specifying a table name. For example, the " - + "following configuration will perform a read on an entire table: ::\n" - + "\n" - + " - type: %s\n" - + " config:\n" - + " url: \"jdbc:%s://my-host:%d/database\"\n" - + " table: \"my-table\"\n" - + "\n" - + "#### Advanced Usage\n" - + "\n" + + "following configuration will perform a read on an entire table: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " table: \"my-table\"%n" + + "%n" + + "#### Advanced Usage%n" + + "%n" + "It might be necessary to use a custom JDBC driver that is not packaged with this " + "transform. If that is the case, see WriteToJdbc which " + "allows for more custom configuration.",