diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/RecordOf.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/RecordOf.java new file mode 100644 index 000000000000..ca6ea81973a5 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/RecordOf.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType; + +public class RecordOf extends RecordPathSegment { + private final RecordPathSegment[] valuePaths; + + public RecordOf(final RecordPathSegment[] valuePaths, final boolean absolute) { + super("recordOf", null, absolute); + this.valuePaths = valuePaths; + } + + @Override + public Stream evaluate(final RecordPathEvaluationContext context) { + final List fields = new ArrayList<>(); + final Map values = new HashMap<>(); + + for (int i = 0; i + 1 < valuePaths.length; i += 2) { + final String fieldName = valuePaths[i].evaluate(context).findFirst().get().toString(); + final FieldValue fieldValueProvider = valuePaths[i + 1].evaluate(context).findFirst().get(); + + final Object fieldValue = fieldValueProvider.getValue(); + + final RecordField referencedField = fieldValueProvider.getField(); + final DataType fieldDataType = referencedField != null + ? referencedField.getDataType() : inferDataType(fieldValue, RecordFieldType.STRING.getDataType()); + + fields.add(new RecordField(fieldName, fieldDataType)); + values.put(fieldName, fieldValue); + } + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record record = new MapRecord(schema, values); + final RecordField recordField = new RecordField("recordOf", RecordFieldType.RECORD.getRecordDataType(schema)); + + final FieldValue responseValue = new StandardFieldValue(record, recordField, null); + return Stream.of(responseValue); + } +} \ No newline at end of file diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 5e484b3016bb..9e9c36b7c35e 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -48,6 +48,7 @@ import org.apache.nifi.record.path.functions.MapOf; import org.apache.nifi.record.path.functions.PadLeft; import org.apache.nifi.record.path.functions.PadRight; +import org.apache.nifi.record.path.functions.RecordOf; import org.apache.nifi.record.path.functions.Replace; import org.apache.nifi.record.path.functions.ReplaceNull; import org.apache.nifi.record.path.functions.ReplaceRegex; @@ -289,6 +290,20 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme return new MapOf(argPaths, absolute); } + case "recordOf": { + final int numArgs = argumentListTree.getChildCount(); + + if (numArgs % 2 != 0) { + throw new RecordPathException("The recordOf function requires an even number of arguments"); + } + + final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs]; + for (int i = 0; i < numArgs; i++) { + argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute); + } + + return new RecordOf(argPaths, absolute); + } case "toLowerCase": { final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); return new ToLowerCase(args[0], absolute); diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 3aa8ea1bef26..5fcf2dbeb6f1 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -26,6 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.uuid5.Uuid5Util; import org.junit.jupiter.api.AfterAll; @@ -56,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; - import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1264,6 +1265,69 @@ public void testMapOf() { assertThrows(RecordPathException.class, () -> RecordPath.compile("mapOf('firstName', /firstName, 'lastName')").evaluate(record)); } + @Test + public void testRecordOf() { + final Map embeddedMap = new HashMap<>(); + embeddedMap.put("aKey", "aValue"); + embeddedMap.put("anotherKey", "anotherValue"); + + final List embeddedRecordFields = new ArrayList<>(); + embeddedRecordFields.add(new RecordField("aField", RecordFieldType.INT.getDataType())); + embeddedRecordFields.add(new RecordField("anotherField", RecordFieldType.STRING.getDataType())); + final RecordSchema embeddedRecordSchema = new SimpleRecordSchema(embeddedRecordFields); + final Map embeddedRecordValues = new HashMap<>(); + embeddedRecordValues.put("aField", 2); + embeddedRecordValues.put("anotherField", "aRecordValue"); + final Record embeddedRecord = new MapRecord(embeddedRecordSchema, embeddedRecordValues); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("aLong", RecordFieldType.LONG.getDataType())); + fields.add(new RecordField("aDouble", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("aString", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("anArray", + RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); + fields.add(new RecordField("aMap", + RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))); + fields.add(new RecordField("aRecord", RecordFieldType.RECORD.getRecordDataType(embeddedRecordSchema))); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("aLong", 9876543210L); + values.put("aDouble", 2.5d); + values.put("aString", "texty"); + values.put("anArray", new String[]{"a", "b", "c"}); + values.put("aMap", embeddedMap); + values.put("aRecord", embeddedRecord); + final Record record = new MapRecord(schema, values); + + final FieldValue result = RecordPath + .compile("recordOf('mappedLong', /aLong, 'mappedDouble', /aDouble, 'mappedString', /aString, 'mappedArray', /anArray, 'mappedMap', /aMap, 'mappedRecord', /aRecord)") + .evaluate(record).getSelectedFields().findFirst().get(); + + final DataType resultDataType = result.getField().getDataType(); + assertInstanceOf(RecordDataType.class, resultDataType); + final RecordDataType recordDataType = (RecordDataType) resultDataType; + RecordSchema resultSchema = recordDataType.getChildSchema(); + assertEquals(Arrays.asList("mappedLong", "mappedDouble", "mappedString", "mappedArray", "mappedMap", "mappedRecord"), resultSchema.getFieldNames()); + assertEquals(RecordFieldType.LONG.getDataType(), resultSchema.getDataType("mappedLong").get()); + assertEquals(RecordFieldType.DOUBLE.getDataType(), resultSchema.getDataType("mappedDouble").get()); + assertEquals(RecordFieldType.STRING.getDataType(), resultSchema.getDataType("mappedString").get()); + assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), resultSchema.getDataType("mappedArray").get()); + assertEquals(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), resultSchema.getDataType("mappedMap").get()); + assertEquals(RecordFieldType.RECORD.getRecordDataType(embeddedRecordSchema), resultSchema.getDataType("mappedRecord").get()); + + Object resultValue = result.getValue(); + assertInstanceOf(MapRecord.class, resultValue); + MapRecord resultValueRecord = (MapRecord) resultValue; + assertEquals(9876543210L, resultValueRecord.getAsLong("mappedLong")); + assertEquals(2.5d, resultValueRecord.getAsDouble("mappedDouble")); + assertEquals("texty", resultValueRecord.getAsString("mappedString")); + assertArrayEquals(new String[]{"a", "b", "c"}, resultValueRecord.getAsArray("mappedArray")); + assertEquals(embeddedMap, resultValueRecord.getValue("mappedMap")); + assertEquals(embeddedRecord, resultValueRecord.getAsRecord("mappedRecord", embeddedRecordSchema)); + + assertThrows(RecordPathException.class, () -> RecordPath.compile("recordOf('firstName', /firstName, 'lastName')").evaluate(record)); + } @Test public void testCoalesce() { diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index 864dbcbfb80a..ae119b3d71d9 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -1151,6 +1151,46 @@ And that would give us something like: This function requires an even number of arguments and the record paths must represent simple field values. +=== recordOf + +Creates a nested record with the given parameters. For example, if we have the following record: + +---- +{ + "firstName": "Alice", + "lastName": "Koopa", + "age": 30, + "hobbies": ["reading", "hiking", "coding"], + "address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA" + } +} +---- + +We could use the `UpdateRecord` processor with + +---- +/profile => recordOf("name", /firstName, "location", /address/city, "hobbies", /hobbies, /age, "years old") +---- + +And that would give us something like: + +---- +{ + "name": "Alice", + "hobbies": ["reading", "hiking", "coding"], + "location": "Anytown", + "30": "years old" +} +---- + +This function requires an even number of arguments. +Each pair of arguments resembles a field in the new record. +Every odd argument, the first one of each pair, is used as field name and coerced into a String value. +Every even argument, the second one of each pair, is used as field value. + [[filter_functions]] == Filter Functions