From 9394f8561d29509a6e67fcd66254197bd0d46b2e Mon Sep 17 00:00:00 2001 From: Chris Ashcraft Date: Tue, 12 Nov 2024 18:58:38 -0600 Subject: [PATCH] [JdbcIO] Adding disableAutoCommit flag (#32988) * adding disableAutoCommit flag to ReadFn --------- Co-authored-by: Chris Ashcraft --- CHANGES.md | 1 + .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 86 +++++++++++++++++-- .../jdbc/JdbcReadSchemaTransformProvider.java | 9 ++ .../sdk/io/jdbc/JdbcSchemaIOProvider.java | 11 +++ sdks/python/apache_beam/io/jdbc.py | 5 ++ 5 files changed, 106 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 261fafc024f3..c5731bcff313 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -94,6 +94,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)). * (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)). +* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 2f164fa3bb78..946c07f55763 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -333,6 +333,7 @@ public static Read read() { return new AutoValue_JdbcIO_Read.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) .setOutputParallelization(true) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .build(); } @@ -341,6 +342,7 @@ public static ReadRows readRows() { return new AutoValue_JdbcIO_ReadRows.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) .setOutputParallelization(true) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setStatementPreparator(ignored -> {}) .build(); } @@ -356,6 +358,7 @@ public static ReadAll readAll() { return new AutoValue_JdbcIO_ReadAll.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) .setOutputParallelization(true) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .build(); } @@ -372,6 +375,7 @@ public static ReadWithPartitions read .setPartitionColumnType(partitioningColumnType) .setNumPartitions(DEFAULT_NUM_PARTITIONS) .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setUseBeamSchema(false) .build(); } @@ -389,6 +393,7 @@ public static ReadWithPartitions read .setPartitionsHelper(partitionsHelper) .setNumPartitions(DEFAULT_NUM_PARTITIONS) .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setUseBeamSchema(false) .build(); } @@ -400,6 +405,7 @@ public static ReadWithPartitions readWithPartitions() { private static final long DEFAULT_BATCH_SIZE = 1000L; private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L; private static final int DEFAULT_FETCH_SIZE = 50_000; + private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true; // Default values used from fluent backoff. private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1); private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform expand(PBegin input) { ValueProvider query = checkStateNotNull(getQuery(), "withQuery() is required"); @@ -816,6 +836,7 @@ public PCollection expand(PBegin input) { .withCoder(RowCoder.of(schema)) .withRowMapper(SchemaUtil.BeamRowMapper.of(schema)) .withFetchSize(getFetchSize()) + .withDisableAutoCommit(getDisableAutoCommit()) .withOutputParallelization(getOutputParallelization()) .withStatementPreparator(checkStateNotNull(getStatementPreparator()))); rows.setRowSchema(schema); @@ -872,6 +893,9 @@ public abstract static class Read extends PTransform> @Pure abstract boolean getOutputParallelization(); + @Pure + abstract boolean getDisableAutoCommit(); + @Pure abstract Builder toBuilder(); @@ -892,6 +916,8 @@ abstract Builder setDataSourceProviderFn( abstract Builder setOutputParallelization(boolean outputParallelization); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract Read build(); } @@ -958,6 +984,15 @@ public Read withOutputParallelization(boolean outputParallelization) { return toBuilder().setOutputParallelization(outputParallelization).build(); } + /** + * Whether to disable auto commit on read. Defaults to true if not provided. The need for this + * config varies depending on the database platform. Informix requires this to be set to false + * while Postgres requires this to be set to true. + */ + public Read withDisableAutoCommit(boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + @Override public PCollection expand(PBegin input) { ValueProvider query = checkArgumentNotNull(getQuery(), "withQuery() is required"); @@ -974,6 +1009,7 @@ public PCollection expand(PBegin input) { .withRowMapper(rowMapper) .withFetchSize(getFetchSize()) .withOutputParallelization(getOutputParallelization()) + .withDisableAutoCommit(getDisableAutoCommit()) .withParameterSetter( (element, preparedStatement) -> { if (getStatementPreparator() != null) { @@ -1029,6 +1065,8 @@ public abstract static class ReadAll abstract boolean getOutputParallelization(); + abstract boolean getDisableAutoCommit(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1049,6 +1087,8 @@ abstract Builder setParameterSetter( abstract Builder setOutputParallelization(boolean outputParallelization); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract ReadAll build(); } @@ -1127,6 +1167,15 @@ public ReadAll withOutputParallelization(boolean outputPara return toBuilder().setOutputParallelization(outputParallelization).build(); } + /** + * Whether to disable auto commit on read. Defaults to true if not provided. The need for this + * config varies depending on the database platform. Informix requires this to be set to false + * while Postgres requires this to be set to true. + */ + public ReadAll withDisableAutoCommit(boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + private @Nullable Coder inferCoder( CoderRegistry registry, SchemaRegistry schemaRegistry) { if (getCoder() != null) { @@ -1173,7 +1222,8 @@ public PCollection expand(PCollection input) { checkStateNotNull(getQuery()), checkStateNotNull(getParameterSetter()), checkStateNotNull(getRowMapper()), - getFetchSize()))) + getFetchSize(), + getDisableAutoCommit()))) .setCoder(coder); if (getOutputParallelization()) { @@ -1254,6 +1304,9 @@ public abstract static class ReadWithPartitions @Pure abstract @Nullable JdbcReadWithPartitionsHelper getPartitionsHelper(); + @Pure + abstract boolean getDisableAutoCommit(); + @Pure abstract Builder toBuilder(); @@ -1287,6 +1340,8 @@ abstract Builder setPartitionColumnType( abstract Builder setPartitionsHelper( JdbcReadWithPartitionsHelper partitionsHelper); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract ReadWithPartitions build(); } @@ -1337,6 +1392,16 @@ public ReadWithPartitions withFetchSize(int fetchSize) { return toBuilder().setFetchSize(fetchSize).build(); } + /** + * Whether to disable auto commit on read. Defaults to true if not provided. The need for this + * config varies depending on the database platform. Informix requires this to be set to false + * while Postgres requires this to be set to true. + */ + public ReadWithPartitions withDisableAutoCommit( + boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + /** Data output type is {@link Row}, and schema is auto-inferred from the database. */ public ReadWithPartitions withRowOutput() { return toBuilder().setUseBeamSchema(true).build(); @@ -1419,7 +1484,8 @@ && getLowerBound() instanceof Comparable) { .withQuery(query) .withDataSourceProviderFn(dataSourceProviderFn) .withRowMapper(checkStateNotNull(partitionsHelper)) - .withFetchSize(getFetchSize())) + .withFetchSize(getFetchSize()) + .withDisableAutoCommit(getDisableAutoCommit())) .apply( MapElements.via( new SimpleFunction< @@ -1487,7 +1553,8 @@ public KV> apply( .withRowMapper(rowMapper) .withFetchSize(getFetchSize()) .withParameterSetter(checkStateNotNull(partitionsHelper)) - .withOutputParallelization(false); + .withOutputParallelization(false) + .withDisableAutoCommit(getDisableAutoCommit()); if (getUseBeamSchema()) { checkStateNotNull(schema); @@ -1537,6 +1604,7 @@ private static class ReadFn extends DoFn parameterSetter; private final RowMapper rowMapper; private final int fetchSize; + private final boolean disableAutoCommit; private @Nullable DataSource dataSource; private @Nullable Connection connection; @@ -1546,12 +1614,14 @@ private ReadFn( ValueProvider query, PreparedStatementSetter parameterSetter, RowMapper rowMapper, - int fetchSize) { + int fetchSize, + boolean disableAutoCommit) { this.dataSourceProviderFn = dataSourceProviderFn; this.query = query; this.parameterSetter = parameterSetter; this.rowMapper = rowMapper; this.fetchSize = fetchSize; + this.disableAutoCommit = disableAutoCommit; } @Setup @@ -1577,8 +1647,12 @@ public void processElement(ProcessContext context) throws Exception { Connection connection = getConnection(); // PostgreSQL requires autocommit to be disabled to enable cursor streaming // see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor - LOG.info("Autocommit has been disabled"); - connection.setAutoCommit(false); + // This option is configurable as Informix will error + // if calling setAutoCommit on a non-logged database + if (disableAutoCommit) { + LOG.info("Autocommit has been disabled"); + connection.setAutoCommit(false); + } try (PreparedStatement statement = connection.prepareStatement( query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 0139207235a0..435bfc138b5b 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -117,6 +117,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (outputParallelization != null) { readRows = readRows.withOutputParallelization(outputParallelization); } + Boolean disableAutoCommit = config.getDisableAutoCommit(); + if (disableAutoCommit != null) { + readRows = readRows.withDisableAutoCommit(disableAutoCommit); + } return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows)); } } @@ -174,6 +178,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser @Nullable public abstract Boolean getOutputParallelization(); + @Nullable + public abstract Boolean getDisableAutoCommit(); + @Nullable public abstract String getDriverJars(); @@ -238,6 +245,8 @@ public abstract static class Builder { public abstract Builder setOutputParallelization(Boolean value); + public abstract Builder setDisableAutoCommit(Boolean value); + public abstract Builder setDriverJars(String value); public abstract JdbcReadSchemaTransformConfiguration build(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java index 4b5dc0d7e24a..30012465eb9e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java @@ -65,6 +65,7 @@ public Schema configurationSchema() { .addNullableField("readQuery", FieldType.STRING) .addNullableField("writeStatement", FieldType.STRING) .addNullableField("fetchSize", FieldType.INT16) + .addNullableField("disableAutoCommit", FieldType.BOOLEAN) .addNullableField("outputParallelization", FieldType.BOOLEAN) .addNullableField("autosharding", FieldType.BOOLEAN) // Partitioning support. If you specify a partition column we will use that instead of @@ -140,6 +141,11 @@ public PCollection expand(PBegin input) { readRows = readRows.withFetchSize(fetchSize); } + @Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit"); + if (disableAutoCommit != null) { + readRows = readRows.withDisableAutoCommit(disableAutoCommit); + } + return input.apply(readRows); } else { @@ -163,6 +169,11 @@ public PCollection expand(PBegin input) { readRows = readRows.withOutputParallelization(outputParallelization); } + @Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit"); + if (disableAutoCommit != null) { + readRows = readRows.withDisableAutoCommit(disableAutoCommit); + } + return input.apply(readRows); } } diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 3fef1f5fee35..d4ece0c7bc29 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -125,6 +125,7 @@ def default_io_expansion_service(classpath=None): ('read_query', typing.Optional[str]), ('write_statement', typing.Optional[str]), ('fetch_size', typing.Optional[np.int16]), + ('disable_autocommit', typing.Optional[bool]), ('output_parallelization', typing.Optional[bool]), ('autosharding', typing.Optional[bool]), ('partition_column', typing.Optional[str]), @@ -236,6 +237,7 @@ def __init__( write_statement=statement, read_query=None, fetch_size=None, + disable_autocommit=None, output_parallelization=None, autosharding=autosharding, max_connections=max_connections, @@ -286,6 +288,7 @@ def __init__( username, password, query=None, + disable_autocommit=None, output_parallelization=None, fetch_size=None, partition_column=None, @@ -305,6 +308,7 @@ def __init__( :param username: database username :param password: database password :param query: sql query to be executed + :param disable_autocommit: disable autocommit on read :param output_parallelization: is output parallelization on :param fetch_size: how many rows to fetch :param partition_column: enable partitioned reads by splitting on this @@ -350,6 +354,7 @@ def __init__( write_statement=None, read_query=query, fetch_size=fetch_size, + disable_autocommit=disable_autocommit, output_parallelization=output_parallelization, autosharding=None, max_connections=max_connections,