Skip to content

Commit

Permalink
Improve reply handling and add a timeout in the http client
Browse files Browse the repository at this point in the history
(cherry picked from commit 5bfe5da)
  • Loading branch information
wiardvanrij committed Oct 22, 2023
1 parent 4de30c1 commit feb53e7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 48 deletions.
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
Expand All @@ -29,8 +31,14 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
Expand All @@ -56,3 +64,4 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"net/http"
"sync"
"time"

"fortio.org/log"
"fortio.org/scli"
Expand Down Expand Up @@ -78,7 +79,9 @@ func main() {
metrics := NewMetrics(r)

// Initialize the app, metrics are passed along so they are accessible
app := NewApp(maxQueueSize, &http.Client{}, metrics)
app := NewApp(maxQueueSize, &http.Client{
Timeout: 10 * time.Second,
}, metrics)
// The only required flag is the token at the moment.
if tokenFlag == "" {
log.Fatalf("Missing token flag")
Expand Down
76 changes: 29 additions & 47 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"fortio.org/fortio/fhttp"
"fortio.org/fortio/jrpc"
"fortio.org/log"
)

Expand Down Expand Up @@ -45,14 +46,9 @@ func (app *App) StartServer(ctx context.Context, applicationPort string) error {
}

func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
// Regardless of the outcome, we always respond as json
w.Header().Set("Content-Type", "application/json")

// "Mock" the response from Slack.
// OK is true by default, so we only need to set it to false if we want to trow an error which then could use a custom error message.
// From testing, any application only checks if OK is true. So we can ignore all other fields
fakeSlackResponse := SlackResponse{
Ok: true,
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

maxQueueSize := int(float64(cap(app.slackQueue)) * 0.9)
Expand All @@ -61,60 +57,44 @@ func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
// Ideally we don't reject at 90%, but initially after some tests I got blocked. So I decided to be a bit more conservative.
// ToDo: Fix this behavior so we can reach 100% channel size without problems.
if len(app.slackQueue) >= maxQueueSize {
w.WriteHeader(http.StatusServiceUnavailable)
log.S(log.Info, "Queue is almost full, returning StatusServiceUnavailable", log.Int("queueSize", len(app.slackQueue)))

fakeSlackResponse.Ok = false
fakeSlackResponse.Error = "Queue is almost full"
responseData, err := json.Marshal(fakeSlackResponse)
if err != nil {
http.Error(w, "Failed to serialize Slack response", http.StatusInternalServerError)
return
}
err := jrpc.Reply[SlackResponse](w, http.StatusServiceUnavailable, &SlackResponse{
Ok: false,
Error: "Queue is almost full",
})

_, err = w.Write(responseData)
if err != nil {
log.S(log.Error, "Failed to write response", log.Any("err", err))
}

return
}

if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var request SlackPostMessageRequest
err := json.NewDecoder(r.Body).Decode(&request)
if err != nil {
http.Error(w, "Failed to read the request body", http.StatusInternalServerError)
return
}
requestErr := json.NewDecoder(r.Body).Decode(&request)

// Validate the request
err = validate(request)
if err != nil {
log.S(log.Error, "Invalid request", log.Any("err", err))
// TODO: jrpc.(Client)ErrorReply ?
w.WriteHeader(http.StatusBadRequest)
fakeSlackResponse.Ok = false
fakeSlackResponse.Error = err.Error()
responseData, err2 := json.Marshal(fakeSlackResponse)
_, err3 := w.Write(responseData)
if err2 != nil || err3 != nil {
log.S(log.Error, "Failed to write response", log.Any("err2", err2), log.Any("err3", err3))
}
return
// If we can't decode, we don't bother validating. In the end it's the same outcome if either one is invalid.
if requestErr == nil {
requestErr = validate(request)
}

app.metrics.RequestsReceivedTotal.WithLabelValues(request.Channel).Inc()
if requestErr != nil {
log.S(log.Error, "Invalid request", log.Any("err", requestErr))

responseData, err := json.Marshal(fakeSlackResponse)
if err != nil {
http.Error(w, "Failed to serialize Slack response", http.StatusInternalServerError)
err := jrpc.Reply[SlackResponse](w, http.StatusBadRequest, &SlackResponse{
Ok: false,
Error: requestErr.Error(),
})

if err != nil {
log.S(log.Error, "Failed to write response", log.Any("err", err))
}
return
}

// Start the logic (as we passed all our checks) to process the request.
app.metrics.RequestsReceivedTotal.WithLabelValues(request.Channel).Inc()
// Add a counter to the wait group, this is important to wait for all the messages to be processed before shutting down the server.
app.wg.Add(1)
// Send the message to the slackQueue to be processed
Expand All @@ -126,8 +106,10 @@ func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
// This is the downside of having a queue which could potentially delay responses by a lot.
// We do our due diligences on the received message and can make a fair assumption we will be able to process it.
// Application should utlise this applications metrics and logs to find out if there are any issues.
w.WriteHeader(http.StatusOK)
_, err = w.Write(responseData)
err := jrpc.Reply[SlackResponse](w, http.StatusOK, &SlackResponse{
Ok: true,
})

if err != nil {
log.S(log.Error, "Failed to write response", log.Any("err", err))
}
Expand Down

0 comments on commit feb53e7

Please sign in to comment.