diff --git a/microservices-connector/cmd/router/main.go b/microservices-connector/cmd/router/main.go index 2537ec3a..2bd746eb 100644 --- a/microservices-connector/cmd/router/main.go +++ b/microservices-connector/cmd/router/main.go @@ -76,41 +76,40 @@ func isSuccessFul(statusCode int) bool { return false } -func pickupRouteByCondition(input []byte, routes []mcv1alpha3.Step) *mcv1alpha3.Step { +func pickupRouteByCondition(rawInput []byte, condition string) bool { //sample config supported by gjson //"instances" : [ // {"model_id", "1"}, // ] // sample condition support by gjson query: "instances.#(modelId==\"1\")"" - if !gjson.ValidBytes(input) { - fmt.Println("the input json format is invalid") - return nil + if !gjson.ValidBytes(rawInput) { + fmt.Println("the rawInput json format is invalid") + return false } - for _, route := range routes { - c := route.Condition - if gjson.GetBytes(input, c).Exists() { - return &route - } - // ' and # will define a gjson query - if strings.ContainsAny(c, ".") || strings.ContainsAny(c, "#") { - continue - } - // key == value without nested json - // sample config support by direct query {"model_id", "1"} - // smaple condition support by json query: "modelId==\"1\"" - index := strings.Index(c, "==") - if index == -1 { - fmt.Println("No '==' found in the route.Condition") - } else { - key := strings.TrimSpace(c[:index]) - value := strings.TrimSpace(c[index+2:]) - v := gjson.GetBytes(input, key).String() - if v == value { - return &route - } + + if gjson.GetBytes(rawInput, condition).Exists() { + return true + } + // ' and # will define a gjson query + if strings.ContainsAny(condition, ".") || strings.ContainsAny(condition, "#") { + return false + } + // key == value without nested json + // sample config support by direct query {"model_id", "1"} + // smaple condition support by json query: "modelId==\"1\"" + index := strings.Index(condition, "==") + if index == -1 { + fmt.Println("No '==' found in the route with condition [", condition, "]") + return false + } else { + key := strings.TrimSpace(condition[:index]) + value := strings.TrimSpace(condition[index+2:]) + v := gjson.GetBytes(rawInput, key).String() + if v == value { + return true } } - return nil + return false } func prepareErrorResponse(err error, errorMessage string) []byte { @@ -199,7 +198,7 @@ func executeStep( func handleSwitchNode( route *mcv1alpha3.Step, graph mcv1alpha3.GMConnector, - input []byte, + request []byte, headers http.Header, ) ([]byte, int, error) { var statusCode int @@ -210,7 +209,7 @@ func handleSwitchNode( stepType = ServiceNode } log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName) - if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil { + if responseBytes, statusCode, err = executeStep(route, graph, request, headers); err != nil { return nil, 500, err } @@ -226,142 +225,189 @@ func handleSwitchNode( return responseBytes, statusCode, nil } -func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) { - defer timeTrack(time.Now(), "node", nodeName) +func handleSwitchPipeline(nodeName string, + graph mcv1alpha3.GMConnector, + input []byte, + headers http.Header, +) ([]byte, int, error) { currentNode := graph.Spec.Nodes[nodeName] - - if currentNode.RouterType == mcv1alpha3.Switch { - var err error - route := pickupRouteByCondition(input, currentNode.Steps) - if route == nil { - errorMessage := "None of the routes matched with the switch condition" - err = errors.New(errorMessage) - log.Error(err, errorMessage) - return nil, 404, err + var statusCode int + var responseBytes []byte + var err error + for index, route := range currentNode.Steps { + if route.InternalService.IsDownstreamService { + log.Info( + "InternalService DownstreamService is true, skip the execution of step", + "type", + currentNode.RouterType, + "stepName", + route.StepName, + ) + continue + } + request := input + if route.Data == "$response" && index > 0 { + request = responseBytes } - return handleSwitchNode(route, graph, input, headers) + if route.Condition == "" { + responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers) + if err != nil { + return responseBytes, statusCode, err + } + } else { + if pickupRouteByCondition(input, route.Condition) { + responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers) + if err != nil { + return responseBytes, statusCode, err + } + } + } + log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode) } + return responseBytes, statusCode, err +} - if currentNode.RouterType == mcv1alpha3.Ensemble { - ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps)) - errChan := make(chan error) - for i := range currentNode.Steps { - step := ¤tNode.Steps[i] - stepType := ServiceURL - if step.NodeName != "" { - stepType = ServiceNode - } - log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName) - resultChan := make(chan EnsembleStepOutput) - ensembleRes[i] = resultChan - - go func() { - output, statusCode, err := executeStep(step, graph, input, headers) - if err == nil { - var res map[string]interface{} - if err = json.Unmarshal(output, &res); err == nil { - resultChan <- EnsembleStepOutput{ - StepResponse: res, - StepStatusCode: statusCode, - } - return +func handleEnsemblePipeline(nodeName string, + graph mcv1alpha3.GMConnector, + input []byte, + headers http.Header, +) ([]byte, int, error) { + currentNode := graph.Spec.Nodes[nodeName] + ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps)) + errChan := make(chan error) + for i := range currentNode.Steps { + step := ¤tNode.Steps[i] + stepType := ServiceURL + if step.NodeName != "" { + stepType = ServiceNode + } + log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName) + resultChan := make(chan EnsembleStepOutput) + ensembleRes[i] = resultChan + go func() { + output, statusCode, err := executeStep(step, graph, input, headers) + if err == nil { + var res map[string]interface{} + if err = json.Unmarshal(output, &res); err == nil { + resultChan <- EnsembleStepOutput{ + StepResponse: res, + StepStatusCode: statusCode, } + return } - errChan <- err - }() - } - // merge responses from parallel steps - response := map[string]interface{}{} - ensembleStepOutput := EnsembleStepOutput{} - for i, resultChan := range ensembleRes { - key := currentNode.Steps[i].StepName - if key == "" { - key = strconv.Itoa(i) // Use index if no step name } - select { - case ensembleStepOutput = <-resultChan: - if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard { - log.Info( - "This step is a hard dependency and it is unsuccessful", - "stepName", - currentNode.Steps[i].StepName, - "statusCode", - ensembleStepOutput.StepStatusCode, - ) - stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse) - return stepResponse, ensembleStepOutput.StepStatusCode, nil - } else { - response[key] = ensembleStepOutput.StepResponse - } - case err := <-errChan: - return nil, 500, err + errChan <- err + }() + } + // merge responses from parallel steps + response := map[string]interface{}{} + ensembleStepOutput := EnsembleStepOutput{} + for i, resultChan := range ensembleRes { + key := currentNode.Steps[i].StepName + if key == "" { + key = strconv.Itoa(i) // Use index if no step name + } + select { + case ensembleStepOutput = <-resultChan: + if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard { + log.Info( + "This step is a hard dependency and it is unsuccessful", + "stepName", + currentNode.Steps[i].StepName, + "statusCode", + ensembleStepOutput.StepStatusCode, + ) + stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse) + return stepResponse, ensembleStepOutput.StepStatusCode, nil + } else { + response[key] = ensembleStepOutput.StepResponse } + case err := <-errChan: + return nil, 500, err } - // return json.Marshal(response) - combinedResponse, _ := json.Marshal(response) // TODO check if you need err handling for Marshalling - return combinedResponse, 200, nil } + // return json.Marshal(response) + combinedResponse, _ := json.Marshal(response) // TODO check if you need err handling for Marshalling + return combinedResponse, 200, nil +} - if currentNode.RouterType == mcv1alpha3.Sequence { - var statusCode int - var responseBytes []byte - var err error - for i := range currentNode.Steps { - step := ¤tNode.Steps[i] - stepType := ServiceURL - if step.NodeName != "" { - stepType = ServiceNode +func handleSequencePipeline(nodeName string, + graph mcv1alpha3.GMConnector, + input []byte, + headers http.Header, +) ([]byte, int, error) { + currentNode := graph.Spec.Nodes[nodeName] + var statusCode int + var responseBytes []byte + var err error + for i := range currentNode.Steps { + step := ¤tNode.Steps[i] + stepType := ServiceURL + if step.NodeName != "" { + stepType = ServiceNode + } + if step.InternalService.IsDownstreamService { + log.Info( + "InternalService DownstreamService is true, skip the execution of step", + "type", + stepType, + "stepName", + step.StepName, + ) + continue + } + log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName) + request := input + if step.Data == "$response" && i > 0 { + request = responseBytes + } + if step.Condition != "" { + if !gjson.ValidBytes(responseBytes) { + return nil, 500, fmt.Errorf("invalid response") } - if step.InternalService.IsDownstreamService { + // if the condition does not match for the step in the sequence we stop and return the response + if !gjson.GetBytes(responseBytes, step.Condition).Exists() { + return responseBytes, 500, nil + } + } + if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil { + return nil, 500, err + } + log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode) + /* + Only if a step is a hard dependency, we will check for its success. + */ + if step.Dependency == mcv1alpha3.Hard { + if !isSuccessFul(statusCode) { log.Info( - "InternalService DownstreamService is true, skip the execution of step", - "type", - stepType, + "This step is a hard dependency and it is unsuccessful", "stepName", step.StepName, + "statusCode", + statusCode, ) - continue + // Stop the execution of sequence right away if step is a hard dependency and is unsuccessful + return responseBytes, statusCode, nil } - log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName) + } + } + return responseBytes, statusCode, nil +} - request := input - if step.Data == "$response" && i > 0 { - request = responseBytes - } +func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) { + defer timeTrack(time.Now(), "node", nodeName) + currentNode := graph.Spec.Nodes[nodeName] - if step.Condition != "" { - if !gjson.ValidBytes(responseBytes) { - return nil, 500, fmt.Errorf("invalid response") - } - // if the condition does not match for the step in the sequence we stop and return the response - if !gjson.GetBytes(responseBytes, step.Condition).Exists() { - return responseBytes, 500, nil - } - } - if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil { - return nil, 500, err - } + if currentNode.RouterType == mcv1alpha3.Switch { + return handleSwitchPipeline(nodeName, graph, input, headers) + } - log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode) - /* - Only if a step is a hard dependency, we will check for its success. - */ - if step.Dependency == mcv1alpha3.Hard { - if !isSuccessFul(statusCode) { - log.Info( - "This step is a hard dependency and it is unsuccessful", - "stepName", - step.StepName, - "statusCode", - statusCode, - ) - // Stop the execution of sequence right away if step is a hard dependency and is unsuccessful - return responseBytes, statusCode, nil - } - } - } + if currentNode.RouterType == mcv1alpha3.Ensemble { + return handleEnsemblePipeline(nodeName, graph, input, headers) + } - return responseBytes, statusCode, nil + if currentNode.RouterType == mcv1alpha3.Sequence { + return handleSequencePipeline(nodeName, graph, input, headers) } log.Error(nil, "invalid route type", "type", currentNode.RouterType) return nil, 500, fmt.Errorf("invalid route type: %v", currentNode.RouterType)