Skip to content

Commit

Permalink
NIFI-13468 Add standalone RecordPath function recordOf
Browse files Browse the repository at this point in the history
  • Loading branch information
EndzeitBegins committed Jun 29, 2024
1 parent 903090b commit 1a197e9
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType;

public class RecordOf extends RecordPathSegment {
private final RecordPathSegment[] valuePaths;

public RecordOf(final RecordPathSegment[] valuePaths, final boolean absolute) {
super("recordOf", null, absolute);
this.valuePaths = valuePaths;
}

@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final List<RecordField> fields = new ArrayList<>();
final Map<String, Object> values = new HashMap<>();

for (int i = 0; i + 1 < valuePaths.length; i += 2) {
final String fieldName = valuePaths[i].evaluate(context).findFirst().orElseThrow().toString();
final FieldValue fieldValueProvider = valuePaths[i + 1].evaluate(context).findFirst().orElseThrow();

final Object fieldValue = fieldValueProvider.getValue();

final RecordField referencedField = fieldValueProvider.getField();
final DataType fieldDataType = referencedField != null
? referencedField.getDataType() : inferDataType(fieldValue, RecordFieldType.STRING.getDataType());

fields.add(new RecordField(fieldName, fieldDataType));
values.put(fieldName, fieldValue);
}

final RecordSchema schema = new SimpleRecordSchema(fields);
final Record record = new MapRecord(schema, values);
final RecordField recordField = new RecordField("recordOf", RecordFieldType.RECORD.getRecordDataType(schema));

final FieldValue responseValue = new StandardFieldValue(record, recordField, null);
return Stream.of(responseValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.nifi.record.path.functions.MapOf;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
import org.apache.nifi.record.path.functions.RecordOf;
import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull;
import org.apache.nifi.record.path.functions.ReplaceRegex;
Expand Down Expand Up @@ -295,6 +296,20 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme

return new MapOf(argPaths, absolute);
}
case "recordOf": {
final int numArgs = argumentListTree.getChildCount();

if (numArgs % 2 != 0) {
throw new RecordPathException("The recordOf function requires an even number of arguments");
}

final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
for (int i = 0; i < numArgs; i++) {
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
}

return new RecordOf(argPaths, absolute);
}
case "toLowerCase": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new ToLowerCase(args[0], absolute);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.uuid5.Uuid5Util;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -1412,6 +1413,66 @@ public void testMapOf() {
assertThrows(RecordPathException.class, () -> RecordPath.compile("mapOf('firstName', /firstName, 'lastName')").evaluate(record));
}

@Test
public void testRecordOf() {
final Map<String, String> embeddedMap = Map.of(
"aKey", "aValue",
"anotherKey", "anotherValue"
);

final RecordSchema embeddedRecordSchema = new SimpleRecordSchema(List.of(
recordFieldOf("aField", RecordFieldType.INT),
recordFieldOf("anotherField", RecordFieldType.STRING)
));
final Record embeddedRecord = new MapRecord(embeddedRecordSchema, Map.of(
"aField", 2,
"anotherField", "aRecordValue"
));

final RecordSchema schema = new SimpleRecordSchema(List.of(
recordFieldOf("aLong", RecordFieldType.LONG),
recordFieldOf("aDouble", RecordFieldType.DOUBLE),
recordFieldOf("aString", RecordFieldType.STRING),
recordFieldOf("anArray", arrayDataTypeFor(RecordFieldType.STRING)),
recordFieldOf("aMap", mapDataTypeFor(RecordFieldType.STRING)),
recordFieldOf("aRecord", recordDataTypeFor(embeddedRecordSchema))
));
final MapRecord record = new MapRecord(schema, Map.of(
"aLong", 9876543210L,
"aDouble", 2.5d,
"aString", "texty",
"anArray", new String[]{"a", "b", "c"},
"aMap", embeddedMap,
"aRecord", embeddedRecord
));

final FieldValue result = evaluateSingleFieldValue(
"recordOf('mappedLong', /aLong, 'mappedDouble', /aDouble, 'mappedString', /aString, 'mappedArray', /anArray, 'mappedMap', /aMap, 'mappedRecord', /aRecord, /aString, 2012)",
record
);

final RecordDataType recordDataType = (RecordDataType) result.getField().getDataType();
RecordSchema resultSchema = recordDataType.getChildSchema();
assertEquals(Arrays.asList("mappedLong", "mappedDouble", "mappedString", "mappedArray", "mappedMap", "mappedRecord", "texty"), resultSchema.getFieldNames());
assertDataTypesEqual(resultSchema, "mappedLong", schema, "aLong");
assertDataTypesEqual(resultSchema, "mappedDouble", schema, "aDouble");
assertDataTypesEqual(resultSchema, "mappedString", schema, "aString");
assertDataTypesEqual(resultSchema, "mappedArray", schema, "anArray");
assertDataTypesEqual(resultSchema, "mappedMap", schema, "aMap");
assertDataTypesEqual(resultSchema, "mappedRecord", schema, "aRecord");
assertEquals(RecordFieldType.INT.getDataType(), resultSchema.getDataType("texty").get());

final MapRecord resultValueRecord = (MapRecord) result.getValue();
assertValuesEqual(resultValueRecord, "mappedLong", record, "aLong");
assertValuesEqual(resultValueRecord, "mappedDouble", record, "aDouble");
assertValuesEqual(resultValueRecord, "mappedString", record, "aString");
assertArrayValuesEqual(resultValueRecord, "mappedArray", record, "anArray");
assertValuesEqual(resultValueRecord, "mappedMap", record, "aMap");
assertValuesEqual(resultValueRecord, "mappedRecord", record, "aRecord");
assertEquals(2012, resultValueRecord.getValue("texty"));

assertThrows(RecordPathException.class, () -> RecordPath.compile("recordOf('mappedRecord', /aRecord, 'dangling')").evaluate(record));
}

@Test
public void testCoalesce() {
Expand Down Expand Up @@ -2357,4 +2418,35 @@ private static FieldValue evaluateSingleFieldValue(RecordPath recordPath, Record
private static FieldValue evaluateSingleFieldValue(String path, Record record) {
return evaluateSingleFieldValue(RecordPath.compile(path), record);
}

private static RecordField recordFieldOf(final String fieldName, final RecordFieldType fieldType) {
return recordFieldOf(fieldName, fieldType.getDataType());
}
private static RecordField recordFieldOf(final String fieldName, final DataType fieldType) {
return new RecordField(fieldName, fieldType);
}

private static DataType arrayDataTypeFor(final RecordFieldType itemType) {
return RecordFieldType.ARRAY.getArrayDataType(itemType.getDataType());
}

private static DataType mapDataTypeFor(final RecordFieldType itemType) {
return RecordFieldType.MAP.getMapDataType(itemType.getDataType());
}

private static DataType recordDataTypeFor(final RecordSchema childSchmea) {
return RecordFieldType.RECORD.getRecordDataType(childSchmea);
}

private static void assertDataTypesEqual(RecordSchema actualSchema, String actualFieldName, RecordSchema expectedSchema, String expectedFieldName) {
assertEquals(expectedSchema.getDataType(expectedFieldName).get(), actualSchema.getDataType(actualFieldName).get());
}

private static void assertValuesEqual(MapRecord actualRecord, String actualFieldName, MapRecord expectedRecord, String expectedFieldName) {
assertEquals(expectedRecord.getValue(expectedFieldName), actualRecord.getValue(actualFieldName));
}

private static void assertArrayValuesEqual(MapRecord actualRecord, String actualFieldName, MapRecord expectedRecord, String expectedFieldName) {
assertArrayEquals(expectedRecord.getAsArray(expectedFieldName), actualRecord.getAsArray(actualFieldName));
}
}
40 changes: 40 additions & 0 deletions nifi-docs/src/main/asciidoc/record-path-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,46 @@ And that would give us something like:

This function requires an even number of arguments and the record paths must represent simple field values.

=== recordOf

Creates a nested record with the given parameters. For example, if we have the following record:

----
{
"firstName": "Alice",
"lastName": "Koopa",
"age": 30,
"hobbies": ["reading", "hiking", "coding"],
"address": {
"street": "123 Main St",
"city": "Anytown",
"state": "CA"
}
}
----

We could use the `UpdateRecord` processor with

----
/profile => recordOf("name", /firstName, "location", /address/city, "hobbies", /hobbies, /age, "years old")
----

And that would give us something like:

----
{
"name": "Alice",
"hobbies": ["reading", "hiking", "coding"],
"location": "Anytown",
"30": "years old"
}
----

This function requires an even number of arguments.
Each pair of arguments resembles a field in the new record.
Every odd argument, the first one of each pair, is used as field name and coerced into a String value.
Every even argument, the second one of each pair, is used as field value.

[[filter_functions]]
== Filter Functions

Expand Down

0 comments on commit 1a197e9

Please sign in to comment.