Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-11858 Configurable Column Name Normalization in PutDatabaseRecord and UpdateDatabaseTable #9382

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.TableNotFoundException;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
Expand Down Expand Up @@ -71,6 +74,7 @@
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;

Expand Down Expand Up @@ -177,6 +181,26 @@ public class UpdateDatabaseTable extends AbstractProcessor {
.defaultValue("true")
.build();

public static final PropertyDescriptor TRANSLATION_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
.name("Column Name Translation Strategy")
.description("The strategy used to normalize table column name")
.allowableValues(TranslationStrategy.class)
.defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
.dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue())
.build();

public static final PropertyDescriptor TRANSLATION_PATTERN = new PropertyDescriptor.Builder()
.name("Column Name Translation Pattern")
.displayName("Column Name Translation Pattern")
.description("Column name will be normalized with this regular expression")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue())
.dependsOn(TRANSLATION_STRATEGY, TranslationStrategy.PATTERN.getValue())
.build();

static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("updatedatabasetable-update-field-names")
.displayName("Update Field Names")
Expand Down Expand Up @@ -282,6 +306,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
CREATE_TABLE,
PRIMARY_KEY_FIELDS,
TRANSLATE_FIELD_NAMES,
TRANSLATION_STRATEGY,
TRANSLATION_PATTERN,
UPDATE_FIELD_NAMES,
RECORD_WRITER_FACTORY,
QUOTE_TABLE_IDENTIFIER,
Expand Down Expand Up @@ -371,6 +397,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final TranslationStrategy translationStrategy = TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
final String translationRegex = context.getProperty(TRANSLATION_PATTERN).getValue();
final Pattern translationPattern = translationRegex == null ? null : Pattern.compile(translationRegex);
NameNormalizer normalizer = null;
if (translateFieldNames) {
normalizer = NameNormalizerFactory.getNormalizer(translationStrategy, translationPattern);
}
if (recordWriterFactory == null && updateFieldNames) {
throw new ProcessException("Record Writer must be set if 'Update Field Names' is true");
}
Expand All @@ -393,7 +426,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
primaryKeyColumnNames = null;
}
final OutputMetadataHolder outputMetadataHolder = checkAndUpdateTableSchema(connection, databaseAdapter, recordSchema,
catalogName, schemaName, tableName, createIfNotExists, translateFieldNames, updateFieldNames, primaryKeyColumnNames, quoteTableName, quoteColumnNames);
catalogName, schemaName, tableName, createIfNotExists, translateFieldNames, normalizer,
updateFieldNames, primaryKeyColumnNames, quoteTableName, quoteColumnNames);
if (outputMetadataHolder != null) {
// The output schema changed (i.e. field names were updated), so write out the corresponding FlowFile
try {
Expand Down Expand Up @@ -457,15 +491,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connection conn, final DatabaseAdapter databaseAdapter, final RecordSchema schema,
final String catalogName, final String schemaName, final String tableName,
final boolean createIfNotExists, final boolean translateFieldNames, final boolean updateFieldNames,
final Set<String> primaryKeyColumnNames, final boolean quoteTableName, final boolean quoteColumnNames) throws IOException {
final boolean createIfNotExists, final boolean translateFieldNames, final NameNormalizer normalizer,
final boolean updateFieldNames, final Set<String> primaryKeyColumnNames, final boolean quoteTableName,
final boolean quoteColumnNames) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (final Statement s = conn.createStatement()) {
// Determine whether the table exists
TableSchema tableSchema = null;
try {
tableSchema = TableSchema.from(conn, catalogName, schemaName, tableName, translateFieldNames, null, getLogger());
tableSchema = TableSchema.from(conn, catalogName, schemaName, tableName, translateFieldNames, normalizer, null, getLogger());
} catch (TableNotFoundException tnfe) {
// Do nothing, the value will be populated if necessary
}
Expand All @@ -483,7 +518,7 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connec
getLogger().debug("Adding column {} to table {}", recordFieldName, tableName);
}

tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString());
tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString());

final String createTableSql = databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, quoteColumnNames);

Expand All @@ -502,7 +537,7 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connec

final List<String> dbColumns = new ArrayList<>();
for (final ColumnDescription columnDescription : tableSchema.getColumnsAsList()) {
dbColumns.add(ColumnDescription.normalizeColumnName(columnDescription.getColumnName(), translateFieldNames));
dbColumns.add(TableSchema.normalizedName(columnDescription.getColumnName(), translateFieldNames, normalizer));
}

final List<ColumnDescription> columnsToAdd = new ArrayList<>();
Expand All @@ -511,7 +546,7 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connec
// Handle new columns
for (RecordField recordField : schema.getFields()) {
final String recordFieldName = recordField.getFieldName();
final String normalizedFieldName = ColumnDescription.normalizeColumnName(recordFieldName, translateFieldNames);
final String normalizedFieldName = TableSchema.normalizedName(recordFieldName, translateFieldNames, normalizer);
if (!dbColumns.contains(normalizedFieldName)) {
// The field does not exist in the table, add it
ColumnDescription columnToAdd = new ColumnDescription(recordFieldName, DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.processors.standard.db;

public interface NameNormalizer {

String getNormalizedName(String colName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db;

import org.apache.nifi.processors.standard.db.impl.PatternNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveAllSpecialCharNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveSpaceNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreAndSpaceNormalizer;
import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreNormalizer;

import java.util.regex.Pattern;

public class NameNormalizerFactory {
public static NameNormalizer getNormalizer(TranslationStrategy strategy, Pattern regex) {

return switch (strategy) {
case REMOVE_UNDERSCORE -> new RemoveUnderscoreNormalizer();
case REMOVE_SPACE -> new RemoveSpaceNormalizer();
case REMOVE_UNDERSCORE_AND_SPACE -> new RemoveUnderscoreAndSpaceNormalizer();
case REMOVE_ALL_SPECIAL_CHAR -> new RemoveAllSpecialCharNormalizer();
case PATTERN -> new PatternNormalizer(regex);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;


public class TableSchema {
private final List<String> requiredColumnNames;
private final Set<String> primaryKeyColumnNames;
Expand All @@ -38,18 +39,20 @@ public class TableSchema {
private final String schemaName;
private final String tableName;

public TableSchema(final String catalogName, final String schemaName, final String tableName, final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
public TableSchema(final String catalogName, final String schemaName, final String tableName,
final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final NameNormalizer normalizer,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
this.columns = new LinkedHashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.quotedIdentifierString = quotedIdentifierString;

this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
columns.put(ColumnDescription.normalizeColumnName(desc.getColumnName(), translateColumnNames), desc);
final String colName = normalizedName(desc.getColumnName(), translateColumnNames, normalizer);
columns.put(colName, desc);
if (desc.isRequired()) {
requiredColumnNames.add(desc.getColumnName());
}
Expand Down Expand Up @@ -89,7 +92,8 @@ public String getQuotedIdentifierString() {
}

public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final String updateKeys, ComponentLog log) throws SQLException {
final boolean translateColumnNames, final NameNormalizer normalizer,
final String updateKeys, ComponentLog log) throws SQLException {
final DatabaseMetaData dmd = conn.getMetaData();

try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
Expand Down Expand Up @@ -136,12 +140,21 @@ public static TableSchema from(final Connection conn, final String catalog, fina
} else {
// Parse the Update Keys field and normalize the column names
for (final String updateKey : updateKeys.split(",")) {
primaryKeyColumns.add(ColumnDescription.normalizeColumnName(updateKey.trim(), translateColumnNames));
final String colName = normalizedName(updateKey, translateColumnNames, normalizer);
primaryKeyColumns.add(colName);

}
}

return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, normalizer, primaryKeyColumns, dmd.getIdentifierQuoteString());
}
}

public static String normalizedName(final String name, final boolean translateNames, final NameNormalizer normalizer) {
if (translateNames && normalizer != null) {
return normalizer.getNormalizedName(name).trim().toUpperCase();
}
return name;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db;

import org.apache.nifi.components.DescribedValue;

/**
* Enumeration of supported Database column name Translation Strategies
*/
public enum TranslationStrategy implements DescribedValue {
REMOVE_UNDERSCORE("Remove Underscore",
"Underscores '_' will be removed from column names Ex: 'Pics_1_23' becomes 'PICS123'"),
REMOVE_SPACE("Remove Space",
"Spaces will be removed from column names Ex. 'User Name' becomes 'USERNAME'"),
REMOVE_UNDERSCORE_AND_SPACE("Remove Underscores and Spaces",
"Spaces and Underscores will be removed from column names Ex. 'User_1 Name' becomes 'USER1NAME'"),
REMOVE_ALL_SPECIAL_CHAR("Remove Regular Expression Characters",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include examples for the two Regex choices? They are the most complex so the more and clearer the documentation the better. After that I'm good to merge!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @mattyb149
I have added example . please have a look

"Remove Regular Expression Characters"),
PATTERN("Regular Expression",
"Remove characters matching this Regular Expression from the column names");
private final String displayName;
private final String description;

TranslationStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.processors.standard.db.impl;

import org.apache.nifi.processors.standard.db.NameNormalizer;

import java.util.regex.Pattern;

public class PatternNormalizer implements NameNormalizer {
private final Pattern pattern;

public PatternNormalizer(Pattern pattern) {
this.pattern = pattern;
}

@Override
public String getNormalizedName(String colName) {

if (pattern == null) {
return colName;
} else {
return pattern.matcher(colName).replaceAll("");
}
}

}
Loading