From e03f044e10c3d7f99461921308c7c9c3aa334210 Mon Sep 17 00:00:00 2001 From: Solomon Emmanuel Thompson Date: Sun, 20 Oct 2024 13:14:09 +0530 Subject: [PATCH 1/2] draft commit for json edit values --- pkg/component/operator/json/v0/main.go | 171 +++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/pkg/component/operator/json/v0/main.go b/pkg/component/operator/json/v0/main.go index 6e0f6df21..9d33d1d56 100644 --- a/pkg/component/operator/json/v0/main.go +++ b/pkg/component/operator/json/v0/main.go @@ -15,12 +15,17 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/x/errmsg" + + "strconv" + "strings" + "reflect" ) const ( taskMarshal = "TASK_MARSHAL" taskUnmarshal = "TASK_UNMARSHAL" taskJQ = "TASK_JQ" + taskEditValues= "TASK_EDIT_VALUES" ) var ( @@ -67,6 +72,8 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, e.execute = e.unmarshal case taskJQ: e.execute = e.jq + case taskEditValues: + e.execute = e.updateJson default: return nil, errmsg.AddMessage( fmt.Errorf("not supported task: %s", x.Task), @@ -155,3 +162,167 @@ func (e *execution) jq(in *structpb.Struct) (*structpb.Struct, error) { func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { return base.SequentialExecutor(ctx, jobs, e.execute) } + +func (e *execution) updateJson(in *structpb.Struct) (*structpb.Struct, error) { + data := in.Fields["data"].AsInterface() + updates := in.Fields["updates"].GetListValue().AsSlice() + conflictResolution := in.Fields["conflictResolution"].GetStringValue() + supportDotNotation := in.Fields["supportDotNotation"].GetBoolValue() + + // Perform deep copy of the data before updates + updatedData := deepCopy(data) + + switch data := updatedData.(type) { + case []interface{}: + // Process each object in the array + for i, obj := range data { + if err := applyUpdatesToObject(obj, updates, supportDotNotation, conflictResolution); err != nil { + msg := fmt.Sprintf("Error in object %d: %v\n", i, err) + return nil, errmsg.AddMessage(err, msg) + } + } + case map[string]interface{}: + // Process the single object + if err := applyUpdatesToObject(data, updates, supportDotNotation, conflictResolution); err != nil { + msg := fmt.Sprintf("Error in single object: %v\n", err) + return nil, errmsg.AddMessage(err, msg) + } + default: + msg := fmt.Sprintf("Invalid data format") + return nil, errmsg.AddMessage(fmt.Errorf("Error "),msg) + } + output := map[string]interface{}{ + "data": updatedData, + } + outputStruct, err := structpb.NewStruct(output) + if err != nil { + msg := fmt.Sprintf("Failed to convert output to structpb.Struct:", err) + return nil, errmsg.AddMessage(err, msg) + } + return outputStruct,nil +} + +func applyUpdatesToObject(data interface{}, updates []interface{}, supportDotNotation bool, conflictResolution string) error { + for _, update := range updates { + updateMap := update.(map[string]interface{}) + field := updateMap["field"].(string) + newValue := updateMap["newValue"] + + err := applyUpdate(data, field, newValue, supportDotNotation, conflictResolution) + if err != nil { + // Handle the "error" conflictResolution case by stopping and returning the error + if conflictResolution == "error" { + return err + } + // Continue for other conflictResolution cases + } + } + return nil +} + +func applyUpdate(data interface{}, field string, newValue interface{}, supportDotNotation bool, conflictResolution string) error { + var fieldParts []string + if supportDotNotation { + fieldParts = strings.Split(field, ".") + } else { + fieldParts = []string{field} + } + + current := data + for i, part := range fieldParts { + if i == len(fieldParts)-1 { + // We're at the final part of the path, apply the update + + existingValue, fieldExisting := getFieldValue(current, part) + + // Check if the field exists and compare types + if fieldExisting { + if !(reflect.TypeOf(existingValue)==reflect.TypeOf(newValue)){ + return fmt.Errorf("type mismatch: existing field '%s' has type '%T' but got value of type '%T'", part, existingValue, newValue) + } + } + switch conflictResolution { + case "create": + // Create the field if it doesn't exist + setFieldValue(current, part, newValue) + case "skip": + // Skip if the field doesn't exist + if !fieldExists(current, part) { + return nil + } + setFieldValue(current, part, newValue) + case "error": + // Return an error if the field doesn't exist + if !fieldExists(current, part) { + return fmt.Errorf("Field '%s' does not exist", part) + } + setFieldValue(current, part, newValue) + } + } else { + // Traverse to the next part of the path + if next, ok := getFieldValue(current, part); ok { + current = next + } else { + // Field doesn't exist and we're not at the final part + if conflictResolution == "create" { + newMap := make(map[string]interface{}) + setFieldValue(current, part, newMap) + current = newMap + } else { + return fmt.Errorf("Field '%s' does not exist", part) + } + } + } + } + return nil +} + +func setFieldValue(data interface{}, field string, value interface{}) { + // Update the field in the map or array (handle different data structures) + switch data := data.(type) { + case map[string]interface{}: + data[field] = value + case []interface{}: + idx, _ := strconv.Atoi(field) + if idx >= 0 && idx < len(data) { + data[idx] = value + } + } +} + +func getFieldValue(data interface{}, field string) (interface{}, bool) { + // Retrieve the field value from the map or array + switch data := data.(type) { + case map[string]interface{}: + val, ok := data[field] + return val, ok + case []interface{}: + idx, err := strconv.Atoi(field) + if err != nil || idx < 0 || idx >= len(data) { + return nil, false + } + return data[idx], true + } + return nil, false +} + +func fieldExists(data interface{}, field string) bool { + // Check if the field exists in the map or array + switch data := data.(type) { + case map[string]interface{}: + _, ok := data[field] + return ok + case []interface{}: + idx, err := strconv.Atoi(field) + return err == nil && idx >= 0 && idx < len(data) + } + return false +} + +func deepCopy(data interface{}) interface{} { + // Deep copy the data structure to avoid modifying the original input + b, _ := json.Marshal(data) + var copiedData interface{} + json.Unmarshal(b, &copiedData) + return copiedData +} \ No newline at end of file From 2e181fe7c71bd39a8c20b138091a767869194c48 Mon Sep 17 00:00:00 2001 From: solomon Date: Sun, 20 Oct 2024 16:35:41 +0000 Subject: [PATCH 2/2] Json edit definition and tasks --- .../operator/json/v0/config/definition.json | 4 +- .../operator/json/v0/config/tasks.json | 42 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pkg/component/operator/json/v0/config/definition.json b/pkg/component/operator/json/v0/config/definition.json index 185edb259..69aa26c25 100644 --- a/pkg/component/operator/json/v0/config/definition.json +++ b/pkg/component/operator/json/v0/config/definition.json @@ -2,7 +2,8 @@ "availableTasks": [ "TASK_MARSHAL", "TASK_UNMARSHAL", - "TASK_JQ" + "TASK_JQ", + "TASK_EDIT_VALUES" ], "custom": false, "documentationUrl": "https://www.instill.tech/docs/component/operator/json", @@ -19,3 +20,4 @@ "description": "Manipulate and convert JSON entities", "releaseStage": "RELEASE_STAGE_ALPHA" } + diff --git a/pkg/component/operator/json/v0/config/tasks.json b/pkg/component/operator/json/v0/config/tasks.json index 721918136..b2c8ea83f 100644 --- a/pkg/component/operator/json/v0/config/tasks.json +++ b/pkg/component/operator/json/v0/config/tasks.json @@ -198,5 +198,47 @@ "title": "Output", "type": "object" } + }, + "TASK_EDIT_VALUES": { + "instillShortDescription": "Edit JSON values.", + "input": { + "data":{ + "type":"object", + "description":"Original data, which can be a JSON object or array of objects." + }, + "updates":{ + "type":"array", + "description": "An array of objects specifing the value to be updated.", + "items":{ + "type":"object", + "properties":{ + "field":{ + "type":"string", + "description":"The field in the original data whose value needs to be updated, supports nested paths if \"supportDotNotaion\" is true." + }, + "newValue":{ + "description": "The new value that will replace the current value at specific fields." + } + } + }, + "supportDotNotaion":{ + "type":"boolean", + "default":"true", + "description":"Determines whether to interpret the field as paths using dot notation. If false, field is treated as a literal key." + }, + "conflictResolution":{ + "type":"string", + "enum":["create","skip","error"], + "default": "skip", + "description": "Defines hot to handle cases where the field does not exist in the data." + } + } + }, + "output": { + "data":{ + "type": "object", + "description": "The modified data with the specified data updated." + } + } } }