Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

making the database session autocommit value a configurable property #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -74,6 +77,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import static java.lang.String.format;
import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;

@SupportsBatching
Expand Down Expand Up @@ -134,6 +138,14 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
.name("database-session-autocommit")
.displayName("Database session autocommit value")
.description("The autocommit mode to set on the database connection being used.")
.allowableValues("true", "false")
.defaultValue("false")
.build();

static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
Expand Down Expand Up @@ -189,13 +201,40 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
properties.add(CONNECTION_POOL);
properties.add(SQL_STATEMENT);
properties.add(SUPPORT_TRANSACTIONS);
properties.add(AUTO_COMMIT);
properties.add(TRANSACTION_TIMEOUT);
properties.add(BATCH_SIZE);
properties.add(OBTAIN_GENERATED_KEYS);
properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
return properties;
}

@Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String support_transactions = context.getProperty(SUPPORT_TRANSACTIONS).getValue();
final String rollback_on_failure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).getValue();
final String auto_commit = context.getProperty(AUTO_COMMIT).getValue();

if(auto_commit.equalsIgnoreCase("true")) {
if(support_transactions.equalsIgnoreCase("true")) {
results.add(new ValidationResult.Builder()
.subject(SUPPORT_TRANSACTIONS.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'."
+ "Transactions for batch updates cannot be supported when auto commit is set to 'true'", SUPPORT_TRANSACTIONS.getDisplayName(), AUTO_COMMIT.getDisplayName()))
.build());
}
if(rollback_on_failure.equalsIgnoreCase("true")) {
results.add(new ValidationResult.Builder()
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'."
+ "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'", RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName()))
.build());
}
}
return results;
}

@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
Expand Down Expand Up @@ -239,7 +278,10 @@ private boolean isSupportBatching() {
.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
try {
fc.originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if(fc.originalAutoCommit != autocommit) {
connection.setAutoCommit(autocommit);
}
} catch (SQLException e) {
throw new ProcessException("Failed to disable auto commit due to " + e, e);
}
Expand Down Expand Up @@ -521,9 +563,10 @@ public void constructProcess() {

process.cleanup((c, s, fc, conn) -> {
// make sure that we try to set the auto commit back to whatever it was.
if (fc.originalAutoCommit) {
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if (fc.originalAutoCommit != autocommit) {
try {
conn.setAutoCommit(true);
conn.setAutoCommit(fc.originalAutoCommit);
} catch (final SQLException se) {
getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
}
Expand Down Expand Up @@ -670,7 +713,7 @@ boolean isFragmentedTransactionReady(final List<FlowFile> flowFiles, final Long
int selectedNumFragments = 0;
final BitSet bitSet = new BitSet();

BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects));
BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(format(s, objects));

for (final FlowFile flowFile : flowFiles) {
final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
Expand Down