Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance the switch mode for GMC router service required #152

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 190 additions & 144 deletions microservices-connector/cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,41 +76,40 @@ func isSuccessFul(statusCode int) bool {
return false
}

func pickupRouteByCondition(input []byte, routes []mcv1alpha3.Step) *mcv1alpha3.Step {
func pickupRouteByCondition(rawInput []byte, condition string) bool {
//sample config supported by gjson
//"instances" : [
// {"model_id", "1"},
// ]
// sample condition support by gjson query: "instances.#(modelId==\"1\")""
if !gjson.ValidBytes(input) {
fmt.Println("the input json format is invalid")
return nil
if !gjson.ValidBytes(rawInput) {
fmt.Println("the rawInput json format is invalid")
return false
}
for _, route := range routes {
c := route.Condition
if gjson.GetBytes(input, c).Exists() {
return &route
}
// ' and # will define a gjson query
if strings.ContainsAny(c, ".") || strings.ContainsAny(c, "#") {
continue
}
// key == value without nested json
// sample config support by direct query {"model_id", "1"}
// smaple condition support by json query: "modelId==\"1\""
index := strings.Index(c, "==")
if index == -1 {
fmt.Println("No '==' found in the route.Condition")
} else {
key := strings.TrimSpace(c[:index])
value := strings.TrimSpace(c[index+2:])
v := gjson.GetBytes(input, key).String()
if v == value {
return &route
}

if gjson.GetBytes(rawInput, condition).Exists() {
return true
}
// ' and # will define a gjson query
if strings.ContainsAny(condition, ".") || strings.ContainsAny(condition, "#") {
return false
}
// key == value without nested json
// sample config support by direct query {"model_id", "1"}
// smaple condition support by json query: "modelId==\"1\""
index := strings.Index(condition, "==")
if index == -1 {
fmt.Println("No '==' found in the route with condition [", condition, "]")
return false
} else {
key := strings.TrimSpace(condition[:index])
value := strings.TrimSpace(condition[index+2:])
v := gjson.GetBytes(rawInput, key).String()
if v == value {
return true
}
}
return nil
return false
}

func prepareErrorResponse(err error, errorMessage string) []byte {
Expand Down Expand Up @@ -199,7 +198,7 @@ func executeStep(
func handleSwitchNode(
route *mcv1alpha3.Step,
graph mcv1alpha3.GMConnector,
input []byte,
request []byte,
headers http.Header,
) ([]byte, int, error) {
var statusCode int
Expand All @@ -210,7 +209,7 @@ func handleSwitchNode(
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName)
if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil {
if responseBytes, statusCode, err = executeStep(route, graph, request, headers); err != nil {
return nil, 500, err
}

Expand All @@ -226,142 +225,189 @@ func handleSwitchNode(
return responseBytes, statusCode, nil
}

func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) {
defer timeTrack(time.Now(), "node", nodeName)
func handleSwitchPipeline(nodeName string,
graph mcv1alpha3.GMConnector,
input []byte,
headers http.Header,
) ([]byte, int, error) {
currentNode := graph.Spec.Nodes[nodeName]

if currentNode.RouterType == mcv1alpha3.Switch {
var err error
route := pickupRouteByCondition(input, currentNode.Steps)
if route == nil {
errorMessage := "None of the routes matched with the switch condition"
err = errors.New(errorMessage)
log.Error(err, errorMessage)
return nil, 404, err
var statusCode int
var responseBytes []byte
var err error
for index, route := range currentNode.Steps {
if route.InternalService.IsDownstreamService {
log.Info(
"InternalService DownstreamService is true, skip the execution of step",
"type",
currentNode.RouterType,
"stepName",
route.StepName,
)
continue
}
request := input
if route.Data == "$response" && index > 0 {
request = responseBytes
}
return handleSwitchNode(route, graph, input, headers)
if route.Condition == "" {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers)
if err != nil {
return responseBytes, statusCode, err
}
} else {
if pickupRouteByCondition(input, route.Condition) {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers)
if err != nil {
return responseBytes, statusCode, err
}
}
}
log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
}
return responseBytes, statusCode, err
}

if currentNode.RouterType == mcv1alpha3.Ensemble {
ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps))
errChan := make(chan error)
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
resultChan := make(chan EnsembleStepOutput)
ensembleRes[i] = resultChan

go func() {
output, statusCode, err := executeStep(step, graph, input, headers)
if err == nil {
var res map[string]interface{}
if err = json.Unmarshal(output, &res); err == nil {
resultChan <- EnsembleStepOutput{
StepResponse: res,
StepStatusCode: statusCode,
}
return
func handleEnsemblePipeline(nodeName string,
graph mcv1alpha3.GMConnector,
input []byte,
headers http.Header,
) ([]byte, int, error) {
currentNode := graph.Spec.Nodes[nodeName]
ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps))
errChan := make(chan error)
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
resultChan := make(chan EnsembleStepOutput)
ensembleRes[i] = resultChan
go func() {
output, statusCode, err := executeStep(step, graph, input, headers)
if err == nil {
var res map[string]interface{}
if err = json.Unmarshal(output, &res); err == nil {
resultChan <- EnsembleStepOutput{
StepResponse: res,
StepStatusCode: statusCode,
}
return
}
errChan <- err
}()
}
// merge responses from parallel steps
response := map[string]interface{}{}
ensembleStepOutput := EnsembleStepOutput{}
for i, resultChan := range ensembleRes {
key := currentNode.Steps[i].StepName
if key == "" {
key = strconv.Itoa(i) // Use index if no step name
}
select {
case ensembleStepOutput = <-resultChan:
if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard {
log.Info(
"This step is a hard dependency and it is unsuccessful",
"stepName",
currentNode.Steps[i].StepName,
"statusCode",
ensembleStepOutput.StepStatusCode,
)
stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse)
return stepResponse, ensembleStepOutput.StepStatusCode, nil
} else {
response[key] = ensembleStepOutput.StepResponse
}
case err := <-errChan:
return nil, 500, err
errChan <- err
}()
}
// merge responses from parallel steps
response := map[string]interface{}{}
ensembleStepOutput := EnsembleStepOutput{}
for i, resultChan := range ensembleRes {
key := currentNode.Steps[i].StepName
if key == "" {
key = strconv.Itoa(i) // Use index if no step name
}
select {
case ensembleStepOutput = <-resultChan:
if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard {
log.Info(
"This step is a hard dependency and it is unsuccessful",
"stepName",
currentNode.Steps[i].StepName,
"statusCode",
ensembleStepOutput.StepStatusCode,
)
stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse)
return stepResponse, ensembleStepOutput.StepStatusCode, nil
} else {
response[key] = ensembleStepOutput.StepResponse
}
case err := <-errChan:
return nil, 500, err
}
// return json.Marshal(response)
combinedResponse, _ := json.Marshal(response) // TODO check if you need err handling for Marshalling
return combinedResponse, 200, nil
}
// return json.Marshal(response)
combinedResponse, _ := json.Marshal(response) // TODO check if you need err handling for Marshalling
return combinedResponse, 200, nil
}

if currentNode.RouterType == mcv1alpha3.Sequence {
var statusCode int
var responseBytes []byte
var err error
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
func handleSequencePipeline(nodeName string,
graph mcv1alpha3.GMConnector,
input []byte,
headers http.Header,
) ([]byte, int, error) {
currentNode := graph.Spec.Nodes[nodeName]
var statusCode int
var responseBytes []byte
var err error
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
}
if step.InternalService.IsDownstreamService {
log.Info(
"InternalService DownstreamService is true, skip the execution of step",
"type",
stepType,
"stepName",
step.StepName,
)
continue
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
request := input
if step.Data == "$response" && i > 0 {
request = responseBytes
}
if step.Condition != "" {
if !gjson.ValidBytes(responseBytes) {
return nil, 500, fmt.Errorf("invalid response")
}
if step.InternalService.IsDownstreamService {
// if the condition does not match for the step in the sequence we stop and return the response
if !gjson.GetBytes(responseBytes, step.Condition).Exists() {
return responseBytes, 500, nil
}
}
if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil {
return nil, 500, err
}
log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
/*
Only if a step is a hard dependency, we will check for its success.
*/
if step.Dependency == mcv1alpha3.Hard {
if !isSuccessFul(statusCode) {
log.Info(
"InternalService DownstreamService is true, skip the execution of step",
"type",
stepType,
"This step is a hard dependency and it is unsuccessful",
"stepName",
step.StepName,
"statusCode",
statusCode,
)
continue
// Stop the execution of sequence right away if step is a hard dependency and is unsuccessful
return responseBytes, statusCode, nil
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
}
}
return responseBytes, statusCode, nil
}

request := input
if step.Data == "$response" && i > 0 {
request = responseBytes
}
func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) {
defer timeTrack(time.Now(), "node", nodeName)
currentNode := graph.Spec.Nodes[nodeName]

if step.Condition != "" {
if !gjson.ValidBytes(responseBytes) {
return nil, 500, fmt.Errorf("invalid response")
}
// if the condition does not match for the step in the sequence we stop and return the response
if !gjson.GetBytes(responseBytes, step.Condition).Exists() {
return responseBytes, 500, nil
}
}
if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil {
return nil, 500, err
}
if currentNode.RouterType == mcv1alpha3.Switch {
return handleSwitchPipeline(nodeName, graph, input, headers)
}

log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
/*
Only if a step is a hard dependency, we will check for its success.
*/
if step.Dependency == mcv1alpha3.Hard {
if !isSuccessFul(statusCode) {
log.Info(
"This step is a hard dependency and it is unsuccessful",
"stepName",
step.StepName,
"statusCode",
statusCode,
)
// Stop the execution of sequence right away if step is a hard dependency and is unsuccessful
return responseBytes, statusCode, nil
}
}
}
if currentNode.RouterType == mcv1alpha3.Ensemble {
return handleEnsemblePipeline(nodeName, graph, input, headers)
}

return responseBytes, statusCode, nil
if currentNode.RouterType == mcv1alpha3.Sequence {
return handleSequencePipeline(nodeName, graph, input, headers)
}
log.Error(nil, "invalid route type", "type", currentNode.RouterType)
return nil, 500, fmt.Errorf("invalid route type: %v", currentNode.RouterType)
Expand Down
Loading