diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c4ed61a935..126c4bc030 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -17,6 +17,7 @@ env: DOCKER_PASS: ${{ secrets.DOCKER_PASS }} CI_WAIT_FOR_OK_SECONDS: 60 CI_MAX_ITERATIONS_THRESHOLD: 90 + CI_CLIENT_CONCURRENT_CONNECTIONS: 1 CI_MAX_WAIT_FOR_POD_TIME_SECONDS: 60 CI_MIN_SUCCESS_THRESHOLD: 1 ACR: ${{ secrets.ACR }} diff --git a/demo/cmd/bookbuyer/bookbuyer.go b/demo/cmd/bookbuyer/bookbuyer.go index 823e752af1..a90b22f039 100755 --- a/demo/cmd/bookbuyer/bookbuyer.go +++ b/demo/cmd/bookbuyer/bookbuyer.go @@ -6,6 +6,9 @@ import ( "html" "html/template" "net/http" + "strconv" + "sync" + "sync/atomic" "time" "github.com/gorilla/mux" @@ -20,12 +23,14 @@ const ( ) var ( - booksBought = 0 - booksBoughtV1 = 0 - booksBoughtV2 = 0 - log = logger.NewPretty(participantName) - port = flag.Int("port", 80, "port on which this app is listening for incoming HTTP") - path = flag.String("path", ".", "path to the HTML template") + wg sync.WaitGroup + booksBought int64 = 0 + booksBoughtV1 int64 = 0 + booksBoughtV2 int64 = 0 + log = logger.NewPretty(participantName) + port = flag.Int("port", 80, "port on which this app is listening for incoming HTTP") + path = flag.String("path", ".", "path to the HTML template") + numConnectionsStr = common.GetEnv("CI_CLIENT_CONCURRENT_CONNECTIONS", "1") ) type handler struct { @@ -81,20 +86,37 @@ func getHandlers() []handler { } func reset(w http.ResponseWriter, r *http.Request) { - booksBought = 0 - booksBoughtV1 = 0 - booksBoughtV2 = 0 + atomic.StoreInt64(&booksBought, 0) + atomic.StoreInt64(&booksBoughtV1, 0) + atomic.StoreInt64(&booksBoughtV2, 0) renderTemplate(w) } -func main() { - - go debugServer() +func getBooksWrapper(wg *sync.WaitGroup) { + defer wg.Done() - // This is the bookbuyer. - // When it tries to buy books from the bookstore - we expect it to see 200 responses. - // When it tries to make an egress request, we expect a 200 response with egress enabled and a 404 response with egress disabled. meshExpectedResponseCode := http.StatusOK egressExpectedResponseCode := common.GetExpectedResponseCodeFromEnvVar(common.EgressExpectedResponseCodeEnvVar, httpStatusOK) + common.GetBooks(participantName, meshExpectedResponseCode, egressExpectedResponseCode, &booksBought, &booksBoughtV1, &booksBoughtV2) } + +func main() { + + go debugServer() + + numConnections, err := strconv.Atoi(numConnectionsStr) + if err != nil { + fmt.Printf("Error: invalid value for number of bookstore connections: %s", numConnectionsStr) + numConnections = 1 + } + + // This is the bookbuyer. When it tries to buy books from the bookstore - we expect it to see 200 responses. + for i := 0; i < numConnections; i++ { + wg.Add(1) + fmt.Printf("Backpressure: starting bookbuyer connection #%d", i) + go getBooksWrapper(&wg) + } + + wg.Wait() +} diff --git a/demo/cmd/bookstore/bookstore.go b/demo/cmd/bookstore/bookstore.go index 5ab8d5a725..f074213de3 100644 --- a/demo/cmd/bookstore/bookstore.go +++ b/demo/cmd/bookstore/bookstore.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "strings" + "sync/atomic" "time" "github.com/gorilla/mux" @@ -19,11 +20,11 @@ import ( ) var ( - booksSold = 0 - log = logger.NewPretty("bookstore") - identity = flag.String("ident", "unidentified", "the identity of the container where this demo app is running (VM, K8s, etc)") - port = flag.Int("port", 80, "port on which this app is listening for incoming HTTP") - path = flag.String("path", ".", "path to the HTML template") + booksSold int64 = 0 + log = logger.NewPretty("bookstore") + identity = flag.String("ident", "unidentified", "the identity of the container where this demo app is running (VM, K8s, etc)") + port = flag.Int("port", 80, "port on which this app is listening for incoming HTTP") + path = flag.String("path", ".", "path to the HTML template") ) type handler struct { @@ -76,12 +77,12 @@ func getIndex(w http.ResponseWriter, r *http.Request) { // updateBooksSold updates the booksSold value to the one specified by the user func updateBooksSold(w http.ResponseWriter, r *http.Request) { - var updatedBooksSold int + var updatedBooksSold int64 err := json.NewDecoder(r.Body).Decode(&updatedBooksSold) if err != nil { log.Fatal().Err(err).Msg("Could not decode request body") } - booksSold = updatedBooksSold + atomic.StoreInt64(&booksSold, updatedBooksSold) setHeaders(w) renderTemplate(w) log.Info().Msgf("%s; URL: %q; %s: %d\n", getIdentity(), html.EscapeString(r.URL.Path), common.BooksBoughtHeader, booksSold) @@ -90,7 +91,8 @@ func updateBooksSold(w http.ResponseWriter, r *http.Request) { // sellBook increments the value of the booksSold func sellBook(w http.ResponseWriter, r *http.Request) { - booksSold++ + fmt.Println("Selling a book!") + atomic.AddInt64(&booksSold, 1) setHeaders(w) renderTemplate(w) log.Info().Msgf("%s; URL: %q; Count: %d\n", getIdentity(), html.EscapeString(r.URL.Path), booksSold) diff --git a/demo/cmd/bookthief/bookthief.go b/demo/cmd/bookthief/bookthief.go index 8be9609ea5..a7dec539a6 100644 --- a/demo/cmd/bookthief/bookthief.go +++ b/demo/cmd/bookthief/bookthief.go @@ -21,12 +21,12 @@ const ( ) var ( - booksStolen = 0 - booksStolenV1 = 0 - booksStolenV2 = 0 - log = logger.NewPretty(participantName) - port = flag.Int("port", 80, "port on which this app is listening for incoming HTTP") - path = flag.String("path", ".", "path to the HTML template") + booksStolen int64 = 0 + booksStolenV1 int64 = 0 + booksStolenV2 int64 = 0 + log = logger.NewPretty(participantName) + port = flag.Int("port", 80, "port on which this app is listening for incoming HTTP") + path = flag.String("path", ".", "path to the HTML template") ) func renderTemplate(w http.ResponseWriter) { diff --git a/demo/cmd/common/books.go b/demo/cmd/common/books.go index 6928b7d9d9..6b10739ca8 100644 --- a/demo/cmd/common/books.go +++ b/demo/cmd/common/books.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/openservicemesh/osm/pkg/logger" @@ -93,7 +94,7 @@ func GetEnv(envVar string, defaultValue string) string { } // GetBooks reaches out to the bookstore and buys/steals books. This is invoked by the bookbuyer and the bookthief. -func GetBooks(participantName string, meshExpectedResponseCode int, egressExpectedResponseCode int, booksCount *int, booksCountV1 *int, booksCountV2 *int) { +func GetBooks(participantName string, meshExpectedResponseCode int, egressExpectedResponseCode int, booksCount *int64, booksCountV1 *int64, booksCountV2 *int64) { minSuccessThreshold, maxIterations, sleepDurationBetweenRequests := getEnvVars(participantName) // The URLs this participant will attempt to query from the bookstore service @@ -144,12 +145,12 @@ func GetBooks(participantName string, meshExpectedResponseCode int, egressExpect if responseCode == http.StatusOK { if url == buyBook { if strings.HasPrefix(identity, "bookstore-v1") { - *booksCountV1++ - *booksCount++ + atomic.AddInt64(booksCountV1, 1) + atomic.AddInt64(booksCount, 1) log.Info().Msgf("BooksCountV1=%d", booksCountV1) } else if strings.HasPrefix(identity, "bookstore-v2") { - *booksCountV2++ - *booksCount++ + atomic.AddInt64(booksCountV2, 1) + atomic.AddInt64(booksCount, 1) log.Info().Msgf("BooksCountV2=%d", booksCountV2) } } diff --git a/demo/deploy-bookbuyer.sh b/demo/deploy-bookbuyer.sh index 8de92f9c0a..8618d3b8e0 100755 --- a/demo/deploy-bookbuyer.sh +++ b/demo/deploy-bookbuyer.sh @@ -6,6 +6,7 @@ set -aueo pipefail source .env BOOKSTORE_SVC="${BOOKSTORE_SVC:-bookstore}" CI_MAX_ITERATIONS_THRESHOLD="${CI_MAX_ITERATIONS_THRESHOLD:-0}" +CI_CLIENT_CONCURRENT_CONNECTIONS="${CI_CLIENT_CONCURRENT_CONNECTIONS:-1}" EGRESS_EXPECTED_RESPONSE_CODE="${EGRESS_EXPECTED_RESPONSE_CODE:-200}" kubectl delete deployment bookbuyer -n "$BOOKBUYER_NAMESPACE" --ignore-not-found @@ -74,6 +75,8 @@ spec: value: "$CI_MAX_ITERATIONS_THRESHOLD" - name: "EGRESS_EXPECTED_RESPONSE_CODE" value: "$EGRESS_EXPECTED_RESPONSE_CODE" + - name: "CI_CLIENT_CONCURRENT_CONNECTIONS" + value: "$CI_CLIENT_CONCURRENT_CONNECTIONS" imagePullSecrets: - name: "$CTR_REGISTRY_CREDS_NAME" diff --git a/demo/deploy-bookthief.sh b/demo/deploy-bookthief.sh index 58642be092..fbb46e04ed 100755 --- a/demo/deploy-bookthief.sh +++ b/demo/deploy-bookthief.sh @@ -8,6 +8,7 @@ source .env BOOKSTORE_SVC="${BOOKSTORE_SVC:-bookstore}" BOOKTHIEF_EXPECTED_RESPONSE_CODE="${BOOKTHIEF_EXPECTED_RESPONSE_CODE:-404}" CI_MAX_ITERATIONS_THRESHOLD="${CI_MAX_ITERATIONS_THRESHOLD:-0}" +CI_CLIENT_CONCURRENT_CONNECTIONS="${CI_CLIENT_CONCURRENT_CONNECTIONS:-1}" EGRESS_EXPECTED_RESPONSE_CODE="${EGRESS_EXPECTED_RESPONSE_CODE:-200}" kubectl delete deployment bookthief -n "$BOOKTHIEF_NAMESPACE" --ignore-not-found