-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Draft PR Support Edit Values for JSON operator #1132 #761
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,12 +15,17 @@ import ( | |||||
|
||||||
"github.com/instill-ai/pipeline-backend/pkg/component/base" | ||||||
"github.com/instill-ai/x/errmsg" | ||||||
|
||||||
"strconv" | ||||||
"strings" | ||||||
"reflect" | ||||||
) | ||||||
|
||||||
const ( | ||||||
taskMarshal = "TASK_MARSHAL" | ||||||
taskUnmarshal = "TASK_UNMARSHAL" | ||||||
taskJQ = "TASK_JQ" | ||||||
taskEditValues= "TASK_EDIT_VALUES" | ||||||
) | ||||||
|
||||||
var ( | ||||||
|
@@ -67,6 +72,8 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, | |||||
e.execute = e.unmarshal | ||||||
case taskJQ: | ||||||
e.execute = e.jq | ||||||
case taskEditValues: | ||||||
e.execute = e.updateJson | ||||||
default: | ||||||
return nil, errmsg.AddMessage( | ||||||
fmt.Errorf("not supported task: %s", x.Task), | ||||||
|
@@ -155,3 +162,167 @@ func (e *execution) jq(in *structpb.Struct) (*structpb.Struct, error) { | |||||
func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { | ||||||
return base.SequentialExecutor(ctx, jobs, e.execute) | ||||||
} | ||||||
|
||||||
func (e *execution) updateJson(in *structpb.Struct) (*structpb.Struct, error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There is another function called |
||||||
data := in.Fields["data"].AsInterface() | ||||||
updates := in.Fields["updates"].GetListValue().AsSlice() | ||||||
conflictResolution := in.Fields["conflictResolution"].GetStringValue() | ||||||
supportDotNotation := in.Fields["supportDotNotation"].GetBoolValue() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In other tasks in JSON, there are only 1 or 2 fields, so they don't use Struct to convert data from schema. How about we make it as struct? It can convert data easily.
|
||||||
|
||||||
// Perform deep copy of the data before updates | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think good comments are for
The code explains itself well, so I'd remove the comments here. |
||||||
updatedData := deepCopy(data) | ||||||
|
||||||
switch data := updatedData.(type) { | ||||||
case []interface{}: | ||||||
// Process each object in the array | ||||||
for i, obj := range data { | ||||||
if err := applyUpdatesToObject(obj, updates, supportDotNotation, conflictResolution); err != nil { | ||||||
msg := fmt.Sprintf("Error in object %d: %v\n", i, err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Now, we follow the pattern of describing what it does but not error statement when error handling. |
||||||
return nil, errmsg.AddMessage(err, msg) | ||||||
} | ||||||
} | ||||||
case map[string]interface{}: | ||||||
// Process the single object | ||||||
if err := applyUpdatesToObject(data, updates, supportDotNotation, conflictResolution); err != nil { | ||||||
msg := fmt.Sprintf("Error in single object: %v\n", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return nil, errmsg.AddMessage(err, msg) | ||||||
} | ||||||
default: | ||||||
msg := fmt.Sprintf("Invalid data format") | ||||||
return nil, errmsg.AddMessage(fmt.Errorf("Error "),msg) | ||||||
} | ||||||
output := map[string]interface{}{ | ||||||
"data": updatedData, | ||||||
} | ||||||
outputStruct, err := structpb.NewStruct(output) | ||||||
if err != nil { | ||||||
msg := fmt.Sprintf("Failed to convert output to structpb.Struct:", err) | ||||||
return nil, errmsg.AddMessage(err, msg) | ||||||
} | ||||||
return outputStruct,nil | ||||||
} | ||||||
|
||||||
func applyUpdatesToObject(data interface{}, updates []interface{}, supportDotNotation bool, conflictResolution string) error { | ||||||
for _, update := range updates { | ||||||
updateMap := update.(map[string]interface{}) | ||||||
field := updateMap["field"].(string) | ||||||
newValue := updateMap["newValue"] | ||||||
|
||||||
err := applyUpdate(data, field, newValue, supportDotNotation, conflictResolution) | ||||||
if err != nil { | ||||||
// Handle the "error" conflictResolution case by stopping and returning the error | ||||||
if conflictResolution == "error" { | ||||||
return err | ||||||
} | ||||||
// Continue for other conflictResolution cases | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func applyUpdate(data interface{}, field string, newValue interface{}, supportDotNotation bool, conflictResolution string) error { | ||||||
var fieldParts []string | ||||||
if supportDotNotation { | ||||||
fieldParts = strings.Split(field, ".") | ||||||
} else { | ||||||
fieldParts = []string{field} | ||||||
} | ||||||
|
||||||
current := data | ||||||
for i, part := range fieldParts { | ||||||
if i == len(fieldParts)-1 { | ||||||
// We're at the final part of the path, apply the update | ||||||
|
||||||
existingValue, fieldExisting := getFieldValue(current, part) | ||||||
|
||||||
// Check if the field exists and compare types | ||||||
if fieldExisting { | ||||||
if !(reflect.TypeOf(existingValue)==reflect.TypeOf(newValue)){ | ||||||
return fmt.Errorf("type mismatch: existing field '%s' has type '%T' but got value of type '%T'", part, existingValue, newValue) | ||||||
} | ||||||
} | ||||||
switch conflictResolution { | ||||||
case "create": | ||||||
// Create the field if it doesn't exist | ||||||
setFieldValue(current, part, newValue) | ||||||
case "skip": | ||||||
// Skip if the field doesn't exist | ||||||
if !fieldExists(current, part) { | ||||||
return nil | ||||||
} | ||||||
setFieldValue(current, part, newValue) | ||||||
case "error": | ||||||
// Return an error if the field doesn't exist | ||||||
if !fieldExists(current, part) { | ||||||
return fmt.Errorf("Field '%s' does not exist", part) | ||||||
} | ||||||
setFieldValue(current, part, newValue) | ||||||
} | ||||||
} else { | ||||||
// Traverse to the next part of the path | ||||||
if next, ok := getFieldValue(current, part); ok { | ||||||
current = next | ||||||
} else { | ||||||
// Field doesn't exist and we're not at the final part | ||||||
if conflictResolution == "create" { | ||||||
newMap := make(map[string]interface{}) | ||||||
setFieldValue(current, part, newMap) | ||||||
current = newMap | ||||||
} else { | ||||||
return fmt.Errorf("Field '%s' does not exist", part) | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func setFieldValue(data interface{}, field string, value interface{}) { | ||||||
// Update the field in the map or array (handle different data structures) | ||||||
switch data := data.(type) { | ||||||
case map[string]interface{}: | ||||||
data[field] = value | ||||||
case []interface{}: | ||||||
idx, _ := strconv.Atoi(field) | ||||||
if idx >= 0 && idx < len(data) { | ||||||
data[idx] = value | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func getFieldValue(data interface{}, field string) (interface{}, bool) { | ||||||
// Retrieve the field value from the map or array | ||||||
switch data := data.(type) { | ||||||
case map[string]interface{}: | ||||||
val, ok := data[field] | ||||||
return val, ok | ||||||
case []interface{}: | ||||||
idx, err := strconv.Atoi(field) | ||||||
if err != nil || idx < 0 || idx >= len(data) { | ||||||
return nil, false | ||||||
} | ||||||
return data[idx], true | ||||||
} | ||||||
return nil, false | ||||||
} | ||||||
|
||||||
func fieldExists(data interface{}, field string) bool { | ||||||
// Check if the field exists in the map or array | ||||||
switch data := data.(type) { | ||||||
case map[string]interface{}: | ||||||
_, ok := data[field] | ||||||
return ok | ||||||
case []interface{}: | ||||||
idx, err := strconv.Atoi(field) | ||||||
return err == nil && idx >= 0 && idx < len(data) | ||||||
} | ||||||
return false | ||||||
} | ||||||
|
||||||
func deepCopy(data interface{}) interface{} { | ||||||
// Deep copy the data structure to avoid modifying the original input | ||||||
b, _ := json.Marshal(data) | ||||||
var copiedData interface{} | ||||||
json.Unmarshal(b, &copiedData) | ||||||
return copiedData | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I found I put the wrong JSON schema in the issue.
Could you fetch this JSON schema?
So, it means your Golang reader will change the key name.
e.g.
conflictResolution
->conflict-resolution