diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index b12d2c869f46..21d44bab3c95 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -345,6 +345,14 @@ public class PutDatabaseRecord extends AbstractProcessor { .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor USE_DATABASE_TABLE_COLUMN_DATATYPE = new Builder() + .name("put-db-record-use-database-table-column-datatype") + .displayName("Use Database Table Column Data Types") + .description("Enabling this option will cause all column values to be formatted using database table column data types.") + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor DB_TYPE; protected static final Map dbAdapters; @@ -399,6 +407,7 @@ public class PutDatabaseRecord extends AbstractProcessor { pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); pds.add(TABLE_SCHEMA_CACHE_SIZE); pds.add(MAX_BATCH_SIZE); + pds.add(USE_DATABASE_TABLE_COLUMN_DATATYPE); propDescriptors = Collections.unmodifiableList(pds); } @@ -604,6 +613,7 @@ private void executeDML(final ProcessContext context, final ProcessSession sessi final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue(); final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final boolean useDatabaseTableColumnDatatype = context.getProperty(USE_DATABASE_TABLE_COLUMN_DATATYPE).asBoolean(); // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it if (StringUtils.isEmpty(tableName)) { @@ -740,7 +750,7 @@ private void executeDML(final ProcessContext context, final ProcessSession sessi } // Convert (if necessary) from field data type to column data type - if (fieldSqlType != sqlType) { + if ((fieldSqlType != sqlType) && useDatabaseTableColumnDatatype) { try { DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType); // If sqlType is unsupported, fall back to the fieldSqlType instead