Skip to content

Commit

Permalink
Enhance the switch mode for GMC router service required (#152)
Browse files Browse the repository at this point in the history
* Enhance the switch mode for GMC router service required
Signed-off-by: zhlsunshine <[email protected]>
  • Loading branch information
zhlsunshine authored Jul 5, 2024
1 parent af2d0f5 commit f96b0e5
Showing 1 changed file with 190 additions and 144 deletions.
334 changes: 190 additions & 144 deletions microservices-connector/cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,41 +76,40 @@ func isSuccessFul(statusCode int) bool {
return false
}

func pickupRouteByCondition(input []byte, routes []mcv1alpha3.Step) *mcv1alpha3.Step {
func pickupRouteByCondition(rawInput []byte, condition string) bool {
//sample config supported by gjson
//"instances" : [
// {"model_id", "1"},
// ]
// sample condition support by gjson query: "instances.#(modelId==\"1\")""
if !gjson.ValidBytes(input) {
fmt.Println("the input json format is invalid")
return nil
if !gjson.ValidBytes(rawInput) {
fmt.Println("the rawInput json format is invalid")
return false
}
for _, route := range routes {
c := route.Condition
if gjson.GetBytes(input, c).Exists() {
return &route
}
// ' and # will define a gjson query
if strings.ContainsAny(c, ".") || strings.ContainsAny(c, "#") {
continue
}
// key == value without nested json
// sample config support by direct query {"model_id", "1"}
// smaple condition support by json query: "modelId==\"1\""
index := strings.Index(c, "==")
if index == -1 {
fmt.Println("No '==' found in the route.Condition")
} else {
key := strings.TrimSpace(c[:index])
value := strings.TrimSpace(c[index+2:])
v := gjson.GetBytes(input, key).String()
if v == value {
return &route
}

if gjson.GetBytes(rawInput, condition).Exists() {
return true
}
// ' and # will define a gjson query
if strings.ContainsAny(condition, ".") || strings.ContainsAny(condition, "#") {
return false
}
// key == value without nested json
// sample config support by direct query {"model_id", "1"}
// smaple condition support by json query: "modelId==\"1\""
index := strings.Index(condition, "==")
if index == -1 {
fmt.Println("No '==' found in the route with condition [", condition, "]")
return false
} else {
key := strings.TrimSpace(condition[:index])
value := strings.TrimSpace(condition[index+2:])
v := gjson.GetBytes(rawInput, key).String()
if v == value {
return true
}
}
return nil
return false
}

func prepareErrorResponse(err error, errorMessage string) []byte {
Expand Down Expand Up @@ -199,7 +198,7 @@ func executeStep(
func handleSwitchNode(
route *mcv1alpha3.Step,
graph mcv1alpha3.GMConnector,
input []byte,
request []byte,
headers http.Header,
) ([]byte, int, error) {
var statusCode int
Expand All @@ -210,7 +209,7 @@ func handleSwitchNode(
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName)
if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil {
if responseBytes, statusCode, err = executeStep(route, graph, request, headers); err != nil {
return nil, 500, err
}

Expand All @@ -226,142 +225,189 @@ func handleSwitchNode(
return responseBytes, statusCode, nil
}

func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) {
defer timeTrack(time.Now(), "node", nodeName)
func handleSwitchPipeline(nodeName string,
graph mcv1alpha3.GMConnector,
input []byte,
headers http.Header,
) ([]byte, int, error) {
currentNode := graph.Spec.Nodes[nodeName]

if currentNode.RouterType == mcv1alpha3.Switch {
var err error
route := pickupRouteByCondition(input, currentNode.Steps)
if route == nil {
errorMessage := "None of the routes matched with the switch condition"
err = errors.New(errorMessage)
log.Error(err, errorMessage)
return nil, 404, err
var statusCode int
var responseBytes []byte
var err error
for index, route := range currentNode.Steps {
if route.InternalService.IsDownstreamService {
log.Info(
"InternalService DownstreamService is true, skip the execution of step",
"type",
currentNode.RouterType,
"stepName",
route.StepName,
)
continue
}
request := input
if route.Data == "$response" && index > 0 {
request = responseBytes
}
return handleSwitchNode(route, graph, input, headers)
if route.Condition == "" {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers)
if err != nil {
return responseBytes, statusCode, err
}
} else {
if pickupRouteByCondition(input, route.Condition) {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers)
if err != nil {
return responseBytes, statusCode, err
}
}
}
log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
}
return responseBytes, statusCode, err
}

if currentNode.RouterType == mcv1alpha3.Ensemble {
ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps))
errChan := make(chan error)
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
resultChan := make(chan EnsembleStepOutput)
ensembleRes[i] = resultChan

go func() {
output, statusCode, err := executeStep(step, graph, input, headers)
if err == nil {
var res map[string]interface{}
if err = json.Unmarshal(output, &res); err == nil {
resultChan <- EnsembleStepOutput{
StepResponse: res,
StepStatusCode: statusCode,
}
return
func handleEnsemblePipeline(nodeName string,
graph mcv1alpha3.GMConnector,
input []byte,
headers http.Header,
) ([]byte, int, error) {
currentNode := graph.Spec.Nodes[nodeName]
ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps))
errChan := make(chan error)
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
resultChan := make(chan EnsembleStepOutput)
ensembleRes[i] = resultChan
go func() {
output, statusCode, err := executeStep(step, graph, input, headers)
if err == nil {
var res map[string]interface{}
if err = json.Unmarshal(output, &res); err == nil {
resultChan <- EnsembleStepOutput{
StepResponse: res,
StepStatusCode: statusCode,
}
return
}
errChan <- err
}()
}
// merge responses from parallel steps
response := map[string]interface{}{}
ensembleStepOutput := EnsembleStepOutput{}
for i, resultChan := range ensembleRes {
key := currentNode.Steps[i].StepName
if key == "" {
key = strconv.Itoa(i) // Use index if no step name
}
select {
case ensembleStepOutput = <-resultChan:
if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard {
log.Info(
"This step is a hard dependency and it is unsuccessful",
"stepName",
currentNode.Steps[i].StepName,
"statusCode",
ensembleStepOutput.StepStatusCode,
)
stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse)
return stepResponse, ensembleStepOutput.StepStatusCode, nil
} else {
response[key] = ensembleStepOutput.StepResponse
}
case err := <-errChan:
return nil, 500, err
errChan <- err
}()
}
// merge responses from parallel steps
response := map[string]interface{}{}
ensembleStepOutput := EnsembleStepOutput{}
for i, resultChan := range ensembleRes {
key := currentNode.Steps[i].StepName
if key == "" {
key = strconv.Itoa(i) // Use index if no step name
}
select {
case ensembleStepOutput = <-resultChan:
if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard {
log.Info(
"This step is a hard dependency and it is unsuccessful",
"stepName",
currentNode.Steps[i].StepName,
"statusCode",
ensembleStepOutput.StepStatusCode,
)
stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse)
return stepResponse, ensembleStepOutput.StepStatusCode, nil
} else {
response[key] = ensembleStepOutput.StepResponse
}
case err := <-errChan:
return nil, 500, err
}
// return json.Marshal(response)
combinedResponse, _ := json.Marshal(response) // TODO check if you need err handling for Marshalling
return combinedResponse, 200, nil
}
// return json.Marshal(response)
combinedResponse, _ := json.Marshal(response) // TODO check if you need err handling for Marshalling
return combinedResponse, 200, nil
}

if currentNode.RouterType == mcv1alpha3.Sequence {
var statusCode int
var responseBytes []byte
var err error
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
func handleSequencePipeline(nodeName string,
graph mcv1alpha3.GMConnector,
input []byte,
headers http.Header,
) ([]byte, int, error) {
currentNode := graph.Spec.Nodes[nodeName]
var statusCode int
var responseBytes []byte
var err error
for i := range currentNode.Steps {
step := &currentNode.Steps[i]
stepType := ServiceURL
if step.NodeName != "" {
stepType = ServiceNode
}
if step.InternalService.IsDownstreamService {
log.Info(
"InternalService DownstreamService is true, skip the execution of step",
"type",
stepType,
"stepName",
step.StepName,
)
continue
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
request := input
if step.Data == "$response" && i > 0 {
request = responseBytes
}
if step.Condition != "" {
if !gjson.ValidBytes(responseBytes) {
return nil, 500, fmt.Errorf("invalid response")
}
if step.InternalService.IsDownstreamService {
// if the condition does not match for the step in the sequence we stop and return the response
if !gjson.GetBytes(responseBytes, step.Condition).Exists() {
return responseBytes, 500, nil
}
}
if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil {
return nil, 500, err
}
log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
/*
Only if a step is a hard dependency, we will check for its success.
*/
if step.Dependency == mcv1alpha3.Hard {
if !isSuccessFul(statusCode) {
log.Info(
"InternalService DownstreamService is true, skip the execution of step",
"type",
stepType,
"This step is a hard dependency and it is unsuccessful",
"stepName",
step.StepName,
"statusCode",
statusCode,
)
continue
// Stop the execution of sequence right away if step is a hard dependency and is unsuccessful
return responseBytes, statusCode, nil
}
log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName)
}
}
return responseBytes, statusCode, nil
}

request := input
if step.Data == "$response" && i > 0 {
request = responseBytes
}
func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) {
defer timeTrack(time.Now(), "node", nodeName)
currentNode := graph.Spec.Nodes[nodeName]

if step.Condition != "" {
if !gjson.ValidBytes(responseBytes) {
return nil, 500, fmt.Errorf("invalid response")
}
// if the condition does not match for the step in the sequence we stop and return the response
if !gjson.GetBytes(responseBytes, step.Condition).Exists() {
return responseBytes, 500, nil
}
}
if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil {
return nil, 500, err
}
if currentNode.RouterType == mcv1alpha3.Switch {
return handleSwitchPipeline(nodeName, graph, input, headers)
}

log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
/*
Only if a step is a hard dependency, we will check for its success.
*/
if step.Dependency == mcv1alpha3.Hard {
if !isSuccessFul(statusCode) {
log.Info(
"This step is a hard dependency and it is unsuccessful",
"stepName",
step.StepName,
"statusCode",
statusCode,
)
// Stop the execution of sequence right away if step is a hard dependency and is unsuccessful
return responseBytes, statusCode, nil
}
}
}
if currentNode.RouterType == mcv1alpha3.Ensemble {
return handleEnsemblePipeline(nodeName, graph, input, headers)
}

return responseBytes, statusCode, nil
if currentNode.RouterType == mcv1alpha3.Sequence {
return handleSequencePipeline(nodeName, graph, input, headers)
}
log.Error(nil, "invalid route type", "type", currentNode.RouterType)
return nil, 500, fmt.Errorf("invalid route type: %v", currentNode.RouterType)
Expand Down

0 comments on commit f96b0e5

Please sign in to comment.