Skip to content

Commit

Permalink
[JdbcIO] Adding disableAutoCommit flag (#32988)
Browse files Browse the repository at this point in the history
* adding disableAutoCommit flag to ReadFn

---------

Co-authored-by: Chris Ashcraft <[email protected]>
  • Loading branch information
cwashcraft and Chris Ashcraft authored Nov 13, 2024
1 parent c03a5e0 commit 9394f85
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)).
* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)).
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public static <T> Read<T> read() {
return new AutoValue_JdbcIO_Read.Builder<T>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}

Expand All @@ -341,6 +342,7 @@ public static ReadRows readRows() {
return new AutoValue_JdbcIO_ReadRows.Builder()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setStatementPreparator(ignored -> {})
.build();
}
Expand All @@ -356,6 +358,7 @@ public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}

Expand All @@ -372,6 +375,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionColumnType(partitioningColumnType)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -389,6 +393,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionsHelper(partitionsHelper)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -400,6 +405,7 @@ public static <T> ReadWithPartitions<T, Long> readWithPartitions() {
private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);
Expand Down Expand Up @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -748,6 +757,8 @@ abstract Builder setDataSourceProviderFn(

abstract Builder setOutputParallelization(boolean outputParallelization);

abstract Builder setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadRows build();
}

Expand Down Expand Up @@ -799,6 +810,15 @@ public ReadRows withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public ReadRows withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
ValueProvider<String> query = checkStateNotNull(getQuery(), "withQuery() is required");
Expand All @@ -816,6 +836,7 @@ public PCollection<Row> expand(PBegin input) {
.withCoder(RowCoder.of(schema))
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
.withFetchSize(getFetchSize())
.withDisableAutoCommit(getDisableAutoCommit())
.withOutputParallelization(getOutputParallelization())
.withStatementPreparator(checkStateNotNull(getStatementPreparator())));
rows.setRowSchema(schema);
Expand Down Expand Up @@ -872,6 +893,9 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T> toBuilder();

Expand All @@ -892,6 +916,8 @@ abstract Builder<T> setDataSourceProviderFn(

abstract Builder<T> setOutputParallelization(boolean outputParallelization);

abstract Builder<T> setDisableAutoCommit(boolean disableAutoCommit);

abstract Read<T> build();
}

Expand Down Expand Up @@ -958,6 +984,15 @@ public Read<T> withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public Read<T> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<T> expand(PBegin input) {
ValueProvider<String> query = checkArgumentNotNull(getQuery(), "withQuery() is required");
Expand All @@ -974,6 +1009,7 @@ public PCollection<T> expand(PBegin input) {
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withOutputParallelization(getOutputParallelization())
.withDisableAutoCommit(getDisableAutoCommit())
.withParameterSetter(
(element, preparedStatement) -> {
if (getStatementPreparator() != null) {
Expand Down Expand Up @@ -1029,6 +1065,8 @@ public abstract static class ReadAll<ParameterT, OutputT>

abstract boolean getOutputParallelization();

abstract boolean getDisableAutoCommit();

abstract Builder<ParameterT, OutputT> toBuilder();

@AutoValue.Builder
Expand All @@ -1049,6 +1087,8 @@ abstract Builder<ParameterT, OutputT> setParameterSetter(

abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean outputParallelization);

abstract Builder<ParameterT, OutputT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadAll<ParameterT, OutputT> build();
}

Expand Down Expand Up @@ -1127,6 +1167,15 @@ public ReadAll<ParameterT, OutputT> withOutputParallelization(boolean outputPara
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public ReadAll<ParameterT, OutputT> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

private @Nullable Coder<OutputT> inferCoder(
CoderRegistry registry, SchemaRegistry schemaRegistry) {
if (getCoder() != null) {
Expand Down Expand Up @@ -1173,7 +1222,8 @@ public PCollection<OutputT> expand(PCollection<ParameterT> input) {
checkStateNotNull(getQuery()),
checkStateNotNull(getParameterSetter()),
checkStateNotNull(getRowMapper()),
getFetchSize())))
getFetchSize(),
getDisableAutoCommit())))
.setCoder(coder);

if (getOutputParallelization()) {
Expand Down Expand Up @@ -1254,6 +1304,9 @@ public abstract static class ReadWithPartitions<T, PartitionColumnT>
@Pure
abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> getPartitionsHelper();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T, PartitionColumnT> toBuilder();

Expand Down Expand Up @@ -1287,6 +1340,8 @@ abstract Builder<T, PartitionColumnT> setPartitionColumnType(
abstract Builder<T, PartitionColumnT> setPartitionsHelper(
JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);

abstract Builder<T, PartitionColumnT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadWithPartitions<T, PartitionColumnT> build();
}

Expand Down Expand Up @@ -1337,6 +1392,16 @@ public ReadWithPartitions<T, PartitionColumnT> withFetchSize(int fetchSize) {
return toBuilder().setFetchSize(fetchSize).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public ReadWithPartitions<T, PartitionColumnT> withDisableAutoCommit(
boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

/** Data output type is {@link Row}, and schema is auto-inferred from the database. */
public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
return toBuilder().setUseBeamSchema(true).build();
Expand Down Expand Up @@ -1419,7 +1484,8 @@ && getLowerBound() instanceof Comparable<?>) {
.withQuery(query)
.withDataSourceProviderFn(dataSourceProviderFn)
.withRowMapper(checkStateNotNull(partitionsHelper))
.withFetchSize(getFetchSize()))
.withFetchSize(getFetchSize())
.withDisableAutoCommit(getDisableAutoCommit()))
.apply(
MapElements.via(
new SimpleFunction<
Expand Down Expand Up @@ -1487,7 +1553,8 @@ public KV<Long, KV<PartitionColumnT, PartitionColumnT>> apply(
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withParameterSetter(checkStateNotNull(partitionsHelper))
.withOutputParallelization(false);
.withOutputParallelization(false)
.withDisableAutoCommit(getDisableAutoCommit());

if (getUseBeamSchema()) {
checkStateNotNull(schema);
Expand Down Expand Up @@ -1537,6 +1604,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
private final PreparedStatementSetter<ParameterT> parameterSetter;
private final RowMapper<OutputT> rowMapper;
private final int fetchSize;
private final boolean disableAutoCommit;

private @Nullable DataSource dataSource;
private @Nullable Connection connection;
Expand All @@ -1546,12 +1614,14 @@ private ReadFn(
ValueProvider<String> query,
PreparedStatementSetter<ParameterT> parameterSetter,
RowMapper<OutputT> rowMapper,
int fetchSize) {
int fetchSize,
boolean disableAutoCommit) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
this.parameterSetter = parameterSetter;
this.rowMapper = rowMapper;
this.fetchSize = fetchSize;
this.disableAutoCommit = disableAutoCommit;
}

@Setup
Expand All @@ -1577,8 +1647,12 @@ public void processElement(ProcessContext context) throws Exception {
Connection connection = getConnection();
// PostgreSQL requires autocommit to be disabled to enable cursor streaming
// see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
// This option is configurable as Informix will error
// if calling setAutoCommit on a non-logged database
if (disableAutoCommit) {
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
}
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (outputParallelization != null) {
readRows = readRows.withOutputParallelization(outputParallelization);
}
Boolean disableAutoCommit = config.getDisableAutoCommit();
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}
return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows));
}
}
Expand Down Expand Up @@ -174,6 +178,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser
@Nullable
public abstract Boolean getOutputParallelization();

@Nullable
public abstract Boolean getDisableAutoCommit();

@Nullable
public abstract String getDriverJars();

Expand Down Expand Up @@ -238,6 +245,8 @@ public abstract static class Builder {

public abstract Builder setOutputParallelization(Boolean value);

public abstract Builder setDisableAutoCommit(Boolean value);

public abstract Builder setDriverJars(String value);

public abstract JdbcReadSchemaTransformConfiguration build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Schema configurationSchema() {
.addNullableField("readQuery", FieldType.STRING)
.addNullableField("writeStatement", FieldType.STRING)
.addNullableField("fetchSize", FieldType.INT16)
.addNullableField("disableAutoCommit", FieldType.BOOLEAN)
.addNullableField("outputParallelization", FieldType.BOOLEAN)
.addNullableField("autosharding", FieldType.BOOLEAN)
// Partitioning support. If you specify a partition column we will use that instead of
Expand Down Expand Up @@ -140,6 +141,11 @@ public PCollection<Row> expand(PBegin input) {
readRows = readRows.withFetchSize(fetchSize);
}

@Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit");
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}

return input.apply(readRows);
} else {

Expand All @@ -163,6 +169,11 @@ public PCollection<Row> expand(PBegin input) {
readRows = readRows.withOutputParallelization(outputParallelization);
}

@Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit");
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}

return input.apply(readRows);
}
}
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def default_io_expansion_service(classpath=None):
('read_query', typing.Optional[str]),
('write_statement', typing.Optional[str]),
('fetch_size', typing.Optional[np.int16]),
('disable_autocommit', typing.Optional[bool]),
('output_parallelization', typing.Optional[bool]),
('autosharding', typing.Optional[bool]),
('partition_column', typing.Optional[str]),
Expand Down Expand Up @@ -236,6 +237,7 @@ def __init__(
write_statement=statement,
read_query=None,
fetch_size=None,
disable_autocommit=None,
output_parallelization=None,
autosharding=autosharding,
max_connections=max_connections,
Expand Down Expand Up @@ -286,6 +288,7 @@ def __init__(
username,
password,
query=None,
disable_autocommit=None,
output_parallelization=None,
fetch_size=None,
partition_column=None,
Expand All @@ -305,6 +308,7 @@ def __init__(
:param username: database username
:param password: database password
:param query: sql query to be executed
:param disable_autocommit: disable autocommit on read
:param output_parallelization: is output parallelization on
:param fetch_size: how many rows to fetch
:param partition_column: enable partitioned reads by splitting on this
Expand Down Expand Up @@ -350,6 +354,7 @@ def __init__(
write_statement=None,
read_query=query,
fetch_size=fetch_size,
disable_autocommit=disable_autocommit,
output_parallelization=output_parallelization,
autosharding=None,
max_connections=max_connections,
Expand Down

0 comments on commit 9394f85

Please sign in to comment.