From 7ca424deedf547a7f22fb971ebd54bb121d783d8 Mon Sep 17 00:00:00 2001 From: Jeff Kinard Date: Sun, 29 Dec 2024 17:40:18 -0600 Subject: [PATCH] [yaml] add mysql, oracle, postgres and sql server providers (#33124) * [yaml] add JDBC docs Signed-off-by: Jeffrey Kinard * Address comments Signed-off-by: Jeffrey Kinard * remove baseDescription Signed-off-by: Jeffrey Kinard * fix spotless Signed-off-by: Jeffrey Kinard * fix failing tests Signed-off-by: Jeffrey Kinard * remove hikari testImplementation for JDBCIO Signed-off-by: Jeffrey Kinard --------- Signed-off-by: Jeffrey Kinard --- .../SpannerReadSchemaTransformProvider.java | 54 ++--- .../SpannerWriteSchemaTransformProvider.java | 14 +- .../jdbc/JdbcReadSchemaTransformProvider.java | 201 +++++++++++++++--- .../org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 15 +- .../JdbcWriteSchemaTransformProvider.java | 192 ++++++++++++++--- .../ReadFromMySqlSchemaTransformProvider.java | 46 ++++ ...ReadFromOracleSchemaTransformProvider.java | 46 ++++ ...adFromPostgresSchemaTransformProvider.java | 46 ++++ ...dFromSqlServerSchemaTransformProvider.java | 46 ++++ .../WriteToMySqlSchemaTransformProvider.java | 46 ++++ .../WriteToOracleSchemaTransformProvider.java | 46 ++++ ...riteToPostgresSchemaTransformProvider.java | 46 ++++ ...iteToSqlServerSchemaTransformProvider.java | 46 ++++ .../sdk/io/jdbc/providers/package-info.java | 20 ++ .../JdbcReadSchemaTransformProviderTest.java | 11 - .../JdbcWriteSchemaTransformProviderTest.java | 11 - .../apache_beam/yaml/generate_yaml_docs.py | 36 +++- sdks/python/apache_beam/yaml/standard_io.yaml | 93 +++++--- 18 files changed, 854 insertions(+), 161 deletions(-) create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java index 76440b1ebf1a..0bcf6e0c4f75 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -76,40 +76,34 @@ public String description() { + "\n" + "Example configuration for performing a read using a SQL query: ::\n" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " query: 'SELECT * FROM table'\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " query: 'SELECT * FROM table'\n" + "\n" + "It is also possible to read a table by specifying a table name and a list of columns. For " + "example, the following configuration will perform a read on an entire table: ::\n" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " table: 'my-table'\n" - + " columns: ['col1', 'col2']\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " columns: ['col1', 'col2']\n" + "\n" + "Additionally, to read using a " + "Secondary Index, specify the index name: ::" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " table: 'my-table'\n" - + " index: 'my-index'\n" - + " columns: ['col1', 'col2']\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " index: 'my-index'\n" + + " columns: ['col1', 'col2']\n" + "\n" - + "### Advanced Usage\n" + + "#### Advanced Usage\n" + "\n" + "Reads by default use the " + "PartitionQuery API which enforces some limitations on the type of queries that can be used so that " @@ -118,12 +112,10 @@ public String description() { + "\n" + "For example: ::" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " batching: false\n" - + " ...\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " batching: false\n" + + " ...\n" + "\n" + "Note: See " diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 8601da09ea09..61955f448c3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -84,14 +84,12 @@ public String description() { + "\n" + "Example configuration for performing a write to a single table: ::\n" + "\n" - + " pipeline:\n" - + " transforms:\n" - + " - type: ReadFromSpanner\n" - + " config:\n" - + " project_id: 'my-project-id'\n" - + " instance_id: 'my-instance-id'\n" - + " database_id: 'my-database'\n" - + " table: 'my-table'\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " project_id: 'my-project-id'\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + "\n" + "Note: See " diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 435bfc138b5b..b4765f0392c1 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -49,25 +50,131 @@ public class JdbcReadSchemaTransformProvider extends TypedSchemaTransformProvider< JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> { + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:jdbc_read:v1"; + } + + @Override + public String description() { + return "Read from a JDBC source using a SQL query or by directly accessing a single table.\n" + + "\n" + + "This transform can be used to read from a JDBC source using either a given JDBC driver jar " + + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n" + + "\n" + + "#### Using a default driver\n" + + "\n" + + "This transform comes packaged with drivers for several popular JDBC distributions. The following " + + "distributions can be declared as the `jdbc_type`: " + + JDBC_DRIVER_MAP.keySet().toString().replaceAll("[\\[\\]]", "") + + ".\n" + + "\n" + + "For example, reading a MySQL source using a SQL query: ::" + + "\n" + + " - type: ReadFromJdbc\n" + + " config:\n" + + " jdbc_type: mysql\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " query: \"SELECT * FROM table\"\n" + + "\n" + + "\n" + + "**Note**: See the following transforms which are built on top of this transform and simplify " + + "this logic for several popular JDBC distributions:\n\n" + + " - ReadFromMySql\n" + + " - ReadFromPostgres\n" + + " - ReadFromOracle\n" + + " - ReadFromSqlServer\n" + + "\n" + + "#### Declaring custom JDBC drivers\n" + + "\n" + + "If reading from a JDBC source not listed above, or if it is necessary to use a custom driver not " + + "packaged with Beam, one must define a JDBC driver and class name.\n" + + "\n" + + "For example, reading a MySQL source table: ::" + + "\n" + + " - type: ReadFromJdbc\n" + + " config:\n" + + " driver_jars: \"path/to/some/jdbc.jar\"\n" + + " driver_class_name: \"com.mysql.jdbc.Driver\"\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " table: \"my-table\"\n" + + "\n" + + "#### Connection Properties\n" + + "\n" + + "Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, " + + "to set the character encoding to UTF-8, one could write: ::\n" + + "\n" + + " - type: ReadFromJdbc\n" + + " config:\n" + + " connectionProperties: \"characterEncoding=UTF-8;\"\n" + + " ...\n" + + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n"; + } + + protected String inheritedDescription( + String prettyName, String transformName, String databaseSchema, int defaultJdbcPort) { + return String.format( + "Read from a %s source using a SQL query or by directly accessing a single table.%n" + + "%n" + + "This is a special case of ReadFromJdbc that includes the " + + "necessary %s Driver and classes.%n" + + "%n" + + "An example of using %s with SQL query: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " query: \"SELECT * FROM table\"%n" + + "%n" + + "It is also possible to read a table by specifying a table name. For example, the " + + "following configuration will perform a read on an entire table: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " table: \"my-table\"%n" + + "%n" + + "#### Advanced Usage%n" + + "%n" + + "It might be necessary to use a custom JDBC driver that is not packaged with this " + + "transform. If that is the case, see ReadFromJdbc which " + + "allows for more custom configuration.", + prettyName, + prettyName, + transformName, + transformName, + databaseSchema, + defaultJdbcPort, + transformName, + databaseSchema, + defaultJdbcPort); + } + @Override protected @UnknownKeyFor @NonNull @Initialized Class configurationClass() { return JdbcReadSchemaTransformConfiguration.class; } + protected String jdbcType() { + return ""; + } + @Override protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcReadSchemaTransformConfiguration configuration) { - configuration.validate(); - return new JdbcReadSchemaTransform(configuration); + configuration.validate(jdbcType()); + return new JdbcReadSchemaTransform(configuration, jdbcType()); } - static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { + protected static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { JdbcReadSchemaTransformConfiguration config; + private final String jdbcType; - public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config, String jdbcType) { this.config = config; + this.jdbcType = jdbcType; } protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { @@ -75,7 +182,10 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { if (Strings.isNullOrEmpty(driverClassName)) { driverClassName = - JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase()); + JDBC_DRIVER_MAP.get( + (Objects.requireNonNull( + !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType())) + .toLowerCase()); } JdbcIO.DataSourceConfiguration dsConfig = @@ -109,7 +219,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } JdbcIO.ReadRows readRows = JdbcIO.readRows().withDataSourceConfiguration(dataSourceConfiguration()).withQuery(query); - Short fetchSize = config.getFetchSize(); + Integer fetchSize = config.getFetchSize(); if (fetchSize != null && fetchSize > 0) { readRows = readRows.withFetchSize(fetchSize); } @@ -125,11 +235,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:jdbc_read:v1"; - } - @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() { @@ -145,62 +250,91 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class JdbcReadSchemaTransformConfiguration implements Serializable { - @Nullable - public abstract String getDriverClassName(); + @SchemaFieldDescription("Connection URL for the JDBC source.") + public abstract String getJdbcUrl(); + + @SchemaFieldDescription( + "Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.") @Nullable - public abstract String getJdbcType(); + public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> + getConnectionInitSql(); - public abstract String getJdbcUrl(); + @SchemaFieldDescription( + "Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be \"key1=value1;key2=value2;\".") + @Nullable + public abstract String getConnectionProperties(); + @SchemaFieldDescription( + "Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.") @Nullable - public abstract String getUsername(); + public abstract Boolean getDisableAutoCommit(); + @SchemaFieldDescription( + "Name of a Java Driver class to use to connect to the JDBC source. For example, \"com.mysql.jdbc.Driver\".") @Nullable - public abstract String getPassword(); + public abstract String getDriverClassName(); + @SchemaFieldDescription( + "Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.") @Nullable - public abstract String getConnectionProperties(); + public abstract String getDriverJars(); + @SchemaFieldDescription( + "This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.") @Nullable - public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> - getConnectionInitSql(); + public abstract Integer getFetchSize(); + @SchemaFieldDescription( + "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.") @Nullable - public abstract String getReadQuery(); + public abstract String getJdbcType(); + @SchemaFieldDescription("Name of the table to read from.") @Nullable public abstract String getLocation(); + @SchemaFieldDescription( + "Whether to reshuffle the resulting PCollection so results are distributed to all workers.") @Nullable - public abstract Short getFetchSize(); + public abstract Boolean getOutputParallelization(); + @SchemaFieldDescription("Password for the JDBC source.") @Nullable - public abstract Boolean getOutputParallelization(); + public abstract String getPassword(); + @SchemaFieldDescription("SQL query used to query the JDBC source.") @Nullable - public abstract Boolean getDisableAutoCommit(); + public abstract String getReadQuery(); + @SchemaFieldDescription("Username for the JDBC source.") @Nullable - public abstract String getDriverJars(); + public abstract String getUsername(); + + public void validate() { + validate(""); + } - public void validate() throws IllegalArgumentException { + public void validate(String jdbcType) throws IllegalArgumentException { if (Strings.isNullOrEmpty(getJdbcUrl())) { throw new IllegalArgumentException("JDBC URL cannot be blank"); } + jdbcType = !Strings.isNullOrEmpty(jdbcType) ? jdbcType : getJdbcType(); + boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName()); - boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType()); - if (driverClassNamePresent && jdbcTypePresent) { + boolean driverJarsPresent = !Strings.isNullOrEmpty(getDriverJars()); + boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); + if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { throw new IllegalArgumentException( - "JDBC Driver class name and JDBC type are mutually exclusive configurations."); + "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); } if (!driverClassNamePresent && !jdbcTypePresent) { throw new IllegalArgumentException( "One of JDBC Driver class name or JDBC type must be specified."); } if (jdbcTypePresent - && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase())) { + && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); } @@ -208,11 +342,10 @@ public void validate() throws IllegalArgumentException { boolean locationPresent = (getLocation() != null && !"".equals(getLocation())); if (readQueryPresent && locationPresent) { - throw new IllegalArgumentException( - "ReadQuery and Location are mutually exclusive configurations"); + throw new IllegalArgumentException("Query and Table are mutually exclusive configurations"); } if (!readQueryPresent && !locationPresent) { - throw new IllegalArgumentException("Either ReadQuery or Location must be set."); + throw new IllegalArgumentException("Either Query or Table must be specified."); } } @@ -241,7 +374,7 @@ public abstract static class Builder { public abstract Builder setConnectionInitSql(List value); - public abstract Builder setFetchSize(Short value); + public abstract Builder setFetchSize(Integer value); public abstract Builder setOutputParallelization(Boolean value); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index c0f7d68899b3..503b64e4a446 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -83,20 +83,25 @@ import org.slf4j.LoggerFactory; /** Provides utility functions for working with {@link JdbcIO}. */ -class JdbcUtil { +public class JdbcUtil { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class); + public static final String MYSQL = "mysql"; + public static final String POSTGRES = "postgres"; + public static final String ORACLE = "oracle"; + public static final String MSSQL = "mssql"; + static final Map JDBC_DRIVER_MAP = new HashMap<>( ImmutableMap.of( - "mysql", + MYSQL, "com.mysql.cj.jdbc.Driver", - "postgres", + POSTGRES, "org.postgresql.Driver", - "oracle", + ORACLE, "oracle.jdbc.driver.OracleDriver", - "mssql", + MSSQL, "com.microsoft.sqlserver.jdbc.SQLServerDriver")); @VisibleForTesting diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 1f970ba0624f..6f10df56aab5 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -54,25 +55,131 @@ public class JdbcWriteSchemaTransformProvider extends TypedSchemaTransformProvider< JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> { + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:jdbc_write:v1"; + } + + @Override + public String description() { + return "Write to a JDBC sink using a SQL query or by directly accessing a single table.\n" + + "\n" + + "This transform can be used to write to a JDBC sink using either a given JDBC driver jar " + + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n" + + "\n" + + "#### Using a default driver\n" + + "\n" + + "This transform comes packaged with drivers for several popular JDBC distributions. The following " + + "distributions can be declared as the `jdbc_type`: " + + JDBC_DRIVER_MAP.keySet().toString().replaceAll("[\\[\\]]", "") + + ".\n" + + "\n" + + "For example, writing to a MySQL sink using a SQL query: ::" + + "\n" + + " - type: WriteToJdbc\n" + + " config:\n" + + " jdbc_type: mysql\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " query: \"INSERT INTO table VALUES(?, ?)\"\n" + + "\n" + + "\n" + + "**Note**: See the following transforms which are built on top of this transform and simplify " + + "this logic for several popular JDBC distributions:\n\n" + + " - WriteToMySql\n" + + " - WriteToPostgres\n" + + " - WriteToOracle\n" + + " - WriteToSqlServer\n" + + "\n" + + "#### Declaring custom JDBC drivers\n" + + "\n" + + "If writing to a JDBC sink not listed above, or if it is necessary to use a custom driver not " + + "packaged with Beam, one must define a JDBC driver and class name.\n" + + "\n" + + "For example, writing to a MySQL table: ::" + + "\n" + + " - type: WriteToJdbc\n" + + " config:\n" + + " driver_jars: \"path/to/some/jdbc.jar\"\n" + + " driver_class_name: \"com.mysql.jdbc.Driver\"\n" + + " url: \"jdbc:mysql://my-host:3306/database\"\n" + + " table: \"my-table\"\n" + + "\n" + + "#### Connection Properties\n" + + "\n" + + "Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, " + + "to set the character encoding to UTF-8, one could write: ::\n" + + "\n" + + " - type: WriteToJdbc\n" + + " config:\n" + + " connectionProperties: \"characterEncoding=UTF-8;\"\n" + + " ...\n" + + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n"; + } + + protected String inheritedDescription( + String prettyName, String transformName, String prefix, int port) { + return String.format( + "Write to a %s sink using a SQL query or by directly accessing a single table.%n" + + "%n" + + "This is a special case of WriteToJdbc that includes the " + + "necessary %s Driver and classes.%n" + + "%n" + + "An example of using %s with SQL query: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " query: \"INSERT INTO table VALUES(?, ?)\"%n" + + "%n" + + "It is also possible to read a table by specifying a table name. For example, the " + + "following configuration will perform a read on an entire table: ::%n" + + "%n" + + " - type: %s%n" + + " config:%n" + + " url: \"jdbc:%s://my-host:%d/database\"%n" + + " table: \"my-table\"%n" + + "%n" + + "#### Advanced Usage%n" + + "%n" + + "It might be necessary to use a custom JDBC driver that is not packaged with this " + + "transform. If that is the case, see WriteToJdbc which " + + "allows for more custom configuration.", + prettyName, + prettyName, + transformName, + transformName, + prefix, + port, + transformName, + prefix, + port); + } + @Override protected @UnknownKeyFor @NonNull @Initialized Class configurationClass() { return JdbcWriteSchemaTransformConfiguration.class; } + protected String jdbcType() { + return ""; + } + @Override protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcWriteSchemaTransformConfiguration configuration) { - configuration.validate(); - return new JdbcWriteSchemaTransform(configuration); + configuration.validate(jdbcType()); + return new JdbcWriteSchemaTransform(configuration, jdbcType()); } - static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { + protected static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { JdbcWriteSchemaTransformConfiguration config; + private String jdbcType; - public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config, String jdbcType) { this.config = config; + this.jdbcType = jdbcType; } protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { @@ -80,7 +187,10 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { if (Strings.isNullOrEmpty(driverClassName)) { driverClassName = - JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase()); + JDBC_DRIVER_MAP.get( + (Objects.requireNonNull( + !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType())) + .toLowerCase()); } JdbcIO.DataSourceConfiguration dsConfig = @@ -157,11 +267,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:jdbc_write:v1"; - } - @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() { @@ -178,60 +283,85 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { @DefaultSchema(AutoValueSchema.class) public abstract static class JdbcWriteSchemaTransformConfiguration implements Serializable { + @SchemaFieldDescription("Connection URL for the JDBC sink.") + public abstract String getJdbcUrl(); + + @SchemaFieldDescription( + "If true, enables using a dynamically determined number of shards to write.") @Nullable - public abstract String getDriverClassName(); + public abstract Boolean getAutosharding(); + @SchemaFieldDescription( + "Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.") @Nullable - public abstract String getJdbcType(); + public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> + getConnectionInitSql(); - public abstract String getJdbcUrl(); + @SchemaFieldDescription( + "Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be \"key1=value1;key2=value2;\".") + @Nullable + public abstract String getConnectionProperties(); + @SchemaFieldDescription( + "Name of a Java Driver class to use to connect to the JDBC source. For example, \"com.mysql.jdbc.Driver\".") @Nullable - public abstract String getUsername(); + public abstract String getDriverClassName(); + @SchemaFieldDescription( + "Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.") @Nullable - public abstract String getPassword(); + public abstract String getDriverJars(); @Nullable - public abstract String getConnectionProperties(); + public abstract Long getBatchSize(); + @SchemaFieldDescription( + "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.") @Nullable - public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String> - getConnectionInitSql(); + public abstract String getJdbcType(); + @SchemaFieldDescription("Name of the table to write to.") @Nullable public abstract String getLocation(); + @SchemaFieldDescription("Password for the JDBC source.") @Nullable - public abstract String getWriteStatement(); + public abstract String getPassword(); + @SchemaFieldDescription("Username for the JDBC source.") @Nullable - public abstract Boolean getAutosharding(); + public abstract String getUsername(); + @SchemaFieldDescription("SQL query used to insert records into the JDBC sink.") @Nullable - public abstract String getDriverJars(); + public abstract String getWriteStatement(); - @Nullable - public abstract Long getBatchSize(); + public void validate() { + validate(""); + } - public void validate() throws IllegalArgumentException { + public void validate(String jdbcType) throws IllegalArgumentException { if (Strings.isNullOrEmpty(getJdbcUrl())) { throw new IllegalArgumentException("JDBC URL cannot be blank"); } + jdbcType = !Strings.isNullOrEmpty(jdbcType) ? jdbcType : getJdbcType(); + boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName()); - boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType()); - if (driverClassNamePresent && jdbcTypePresent) { + boolean driverJarsPresent = !Strings.isNullOrEmpty(getDriverJars()); + boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType); + if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) { throw new IllegalArgumentException( - "JDBC Driver class name and JDBC type are mutually exclusive configurations."); + "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified."); } if (!driverClassNamePresent && !jdbcTypePresent) { throw new IllegalArgumentException( "One of JDBC Driver class name or JDBC type must be specified."); } if (jdbcTypePresent - && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase())) { - throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet()); + && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) { + throw new IllegalArgumentException( + "JDBC type must be one of " + JDBC_DRIVER_MAP.keySet() + " but was " + jdbcType); } boolean writeStatementPresent = @@ -240,10 +370,10 @@ public void validate() throws IllegalArgumentException { if (writeStatementPresent && locationPresent) { throw new IllegalArgumentException( - "ReadQuery and Location are mutually exclusive configurations"); + "Write Statement and Table are mutually exclusive configurations"); } if (!writeStatementPresent && !locationPresent) { - throw new IllegalArgumentException("Either ReadQuery or Location must be set."); + throw new IllegalArgumentException("Either Write Statement or Table must be set."); } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java new file mode 100644 index 000000000000..3d0135ef8ecd --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromMySqlSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:mysql_read:v1"; + } + + @Override + public String description() { + return inheritedDescription("MySQL", "ReadFromMySql", "mysql", 3306); + } + + @Override + protected String jdbcType() { + return MYSQL; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java new file mode 100644 index 000000000000..de18d5aa8189 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.ORACLE; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromOracleSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:oracle_read:v1"; + } + + @Override + public String description() { + return inheritedDescription("Oracle", "ReadFromOracle", "oracle", 1521); + } + + @Override + protected String jdbcType() { + return ORACLE; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java new file mode 100644 index 000000000000..62ff14c23e0a --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:postgres_read:v1"; + } + + @Override + public String description() { + return inheritedDescription("Postgres", "ReadFromPostgres", "postgresql", 5432); + } + + @Override + protected String jdbcType() { + return POSTGRES; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java new file mode 100644 index 000000000000..e4767177bb2f --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class ReadFromSqlServerSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:sql_server_read:v1"; + } + + @Override + public String description() { + return inheritedDescription("SQL Server", "ReadFromSqlServer", "sqlserver", 1433); + } + + @Override + protected String jdbcType() { + return MSSQL; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java new file mode 100644 index 000000000000..57f085220162 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:mysql_write:v1"; + } + + @Override + public String description() { + return inheritedDescription("MySQL", "WriteToMySql", "mysql", 3306); + } + + @Override + protected String jdbcType() { + return MYSQL; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java new file mode 100644 index 000000000000..5b3ae2c35e9d --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.ORACLE; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToOracleSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:oracle_write:v1"; + } + + @Override + public String description() { + return inheritedDescription("Oracle", "WriteToOracle", "oracle", 1521); + } + + @Override + protected String jdbcType() { + return ORACLE; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java new file mode 100644 index 000000000000..c50b84311630 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:postgres_write:v1"; + } + + @Override + public String description() { + return inheritedDescription("Postgres", "WriteToPostgres", "postgresql", 5432); + } + + @Override + protected String jdbcType() { + return POSTGRES; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java new file mode 100644 index 000000000000..9e849f4e49e2 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoService(SchemaTransformProvider.class) +public class WriteToSqlServerSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:sql_server_write:v1"; + } + + @Override + public String description() { + return inheritedDescription("SQL Server", "WriteToSqlServer", "sqlserver", 1433); + } + + @Override + protected String jdbcType() { + return MSSQL; + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java new file mode 100644 index 000000000000..db5bba936596 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing from JDBC. */ +package org.apache.beam.sdk.io.jdbc.providers; diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java index 7cbdd48d1587..ca7690ac9a08 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java @@ -122,17 +122,6 @@ public void testInvalidReadSchemaOptions() { .build() .validate(); }); - assertThrows( - IllegalArgumentException.class, - () -> { - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setDriverClassName("ClassName") - .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) - .build() - .validate(); - }); } @Test diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java index d6be4d9f89c8..a8d9162f3a8e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java @@ -122,17 +122,6 @@ public void testInvalidWriteSchemaOptions() { .build() .validate(); }); - assertThrows( - IllegalArgumentException.class, - () -> { - JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder() - .setJdbcUrl("JdbcUrl") - .setLocation("Location") - .setDriverClassName("ClassName") - .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0]) - .build() - .validate(); - }); } @Test diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index fe5727f3ef92..693df6179a2d 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -194,12 +194,46 @@ def io_grouping_key(transform_name): SKIP = {} +def add_transform_links(transform, description, provider_list): + """ + Convert references of Providers to urls that link to their respective pages. + + For example, + "Some description talking about MyTransform." + would be converted to + "Some description talking about MyTransform" + + meanwhile:: + + type: MyTransform + config: + ... + + Would remain unchanged. + + Avoid self-linking within a Transform page. + """ + for p in provider_list: + # Match all instances of built-in transforms within the description + # excluding the transform whose description is currently being evaluated. + # Match the entire word boundary so that partial matches do not count. + # (i.e. OtherTransform should not match Transform) + description = re.sub( + rf"(?{p}', + description or '') + return description + + def transform_docs(transform_base, transforms, providers, extra_docs=''): return '\n'.join([ f'## {transform_base}', '', longest( - lambda t: longest(lambda p: p.description(t), providers[t]), + lambda t: longest( + lambda p: add_transform_links( + t, p.description(t), providers.keys()), + providers[t]), transforms).replace('::\n', '\n\n :::yaml\n'), '', extra_docs, diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 305e6877ad90..a21782bdc603 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -194,38 +194,43 @@ transforms: 'ReadFromJdbc': 'ReadFromJdbc' 'WriteToJdbc': 'WriteToJdbc' - 'ReadFromMySql': 'ReadFromJdbc' - 'WriteToMySql': 'WriteToJdbc' - 'ReadFromPostgres': 'ReadFromJdbc' - 'WriteToPostgres': 'WriteToJdbc' - 'ReadFromOracle': 'ReadFromJdbc' - 'WriteToOracle': 'WriteToJdbc' - 'ReadFromSqlServer': 'ReadFromJdbc' - 'WriteToSqlServer': 'WriteToJdbc' + 'ReadFromMySql': 'ReadFromMySql' + 'WriteToMySql': 'WriteToMySql' + 'ReadFromPostgres': 'ReadFromPostgres' + 'WriteToPostgres': 'WriteToPostgres' + 'ReadFromOracle': 'ReadFromOracle' + 'WriteToOracle': 'WriteToOracle' + 'ReadFromSqlServer': 'ReadFromSqlServer' + 'WriteToSqlServer': 'WriteToSqlServer' config: mappings: 'ReadFromJdbc': - driver_class_name: 'driver_class_name' - type: 'jdbc_type' url: 'jdbc_url' - username: 'username' - password: 'password' - table: 'location' - query: 'read_query' - driver_jars: 'driver_jars' - connection_properties: 'connection_properties' connection_init_sql: 'connection_init_sql' - 'WriteToJdbc': + connection_properties: 'connection_properties' + disable_auto_commit: 'disable_auto_commit' driver_class_name: 'driver_class_name' + driver_jars: 'driver_jars' + fetch_size: 'fetch_size' + output_parallelization: 'output_parallelization' + password: 'password' + query: 'read_query' + table: 'location' type: 'jdbc_type' - url: 'jdbc_url' username: 'username' + 'WriteToJdbc': + url: 'jdbc_url' + auto_sharding: 'autosharding' + connection_init_sql: 'connection_init_sql' + connection_properties: 'connection_properties' + driver_class_name: 'driver_class_name' + driver_jars: 'driver_jars' password: 'password' table: 'location' - driver_jars: 'driver_jars' - connection_properties: 'connection_properties' - connection_init_sql: 'connection_init_sql' batch_size: 'batch_size' + type: 'jdbc_type' + username: 'username' + query: 'write_statement' 'ReadFromMySql': 'ReadFromJdbc' 'WriteToMySql': 'WriteToJdbc' 'ReadFromPostgres': 'ReadFromJdbc' @@ -236,26 +241,56 @@ 'WriteToSqlServer': 'WriteToJdbc' defaults: 'ReadFromMySql': - jdbc_type: 'mysql' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'WriteToMySql': - jdbc_type: 'mysql' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'ReadFromPostgres': - jdbc_type: 'postgres' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'WriteToPostgres': - jdbc_type: 'postgres' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'ReadFromOracle': - jdbc_type: 'oracle' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'WriteToOracle': - jdbc_type: 'oracle' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'ReadFromSqlServer': - jdbc_type: 'mssql' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' 'WriteToSqlServer': - jdbc_type: 'mssql' + connection_init_sql: '' + driver_class_name: '' + driver_jars: '' + jdbc_type: '' underlying_provider: type: beamJar transforms: 'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1' + 'ReadFromMySql': 'beam:schematransform:org.apache.beam:mysql_read:v1' + 'ReadFromPostgres': 'beam:schematransform:org.apache.beam:postgres_read:v1' + 'ReadFromOracle': 'beam:schematransform:org.apache.beam:oracle_read:v1' + 'ReadFromSqlServer': 'beam:schematransform:org.apache.beam:sql_server_read:v1' 'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1' + 'WriteToMySql': 'beam:schematransform:org.apache.beam:mysql_write:v1' + 'WriteToPostgres': 'beam:schematransform:org.apache.beam:postgres_write:v1' + 'WriteToOracle': 'beam:schematransform:org.apache.beam:oracle_write:v1' + 'WriteToSqlServer': 'beam:schematransform:org.apache.beam:sql_server_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'