diff --git a/config/config.go b/config/config.go index e44cf3a..a513319 100644 --- a/config/config.go +++ b/config/config.go @@ -78,6 +78,7 @@ type SectionFCMv1 struct { Enabled bool ProjectID string TokenSource oauth2.TokenSource + Endpoint string } // DefaultLoadConfig loads default /etc/gunfish.toml diff --git a/const.go b/const.go index 50ef4c7..2d2e52d 100644 --- a/const.go +++ b/const.go @@ -78,3 +78,5 @@ var ( OutputHookStdout bool OutputHookStderr bool ) + +var RetryBackoff = true diff --git a/fcmv1/client.go b/fcmv1/client.go index 462838e..21aa927 100644 --- a/fcmv1/client.go +++ b/fcmv1/client.go @@ -46,14 +46,14 @@ func (c *Client) Send(p Payload) ([]Result, error) { if body.Error == nil && body.Name != "" { return []Result{ - Result{ + { StatusCode: res.StatusCode, Token: p.Message.Token, }, }, nil } else if body.Error != nil { return []Result{ - Result{ + { StatusCode: res.StatusCode, Token: p.Message.Token, Error: body.Error, @@ -70,22 +70,28 @@ func (c *Client) NewRequest(p Payload) (*http.Request, error) { if err != nil { return nil, err } - token, err := c.tokenSource.Token() - if err != nil { - return nil, err + var bearer string + if ts := c.tokenSource; ts != nil { + token, err := c.tokenSource.Token() + if err != nil { + return nil, err + } + bearer = token.AccessToken + } else { + bearer = p.Message.Token } req, err := http.NewRequest("POST", c.endpoint.String(), bytes.NewReader(data)) if err != nil { return nil, err } - req.Header.Set("Authorization", "Bearer "+token.AccessToken) + req.Header.Set("Authorization", "Bearer "+bearer) req.Header.Set("Content-Type", "application/json") return req, nil } // NewClient establishes a http connection with fcm v1 -func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint *url.URL, timeout time.Duration) (*Client, error) { +func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint string, timeout time.Duration) (*Client, error) { client := &http.Client{ Timeout: timeout, } @@ -94,16 +100,15 @@ func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint *url.U tokenSource: tokenSource, } - if endpoint != nil { - c.endpoint = endpoint - } else { - ep, err := url.Parse(DefaultFCMEndpoint) - if err != nil { - return nil, err - } - ep.Path = path.Join(ep.Path, projectID, "messages:send") - c.endpoint = ep + if endpoint == "" { + endpoint = DefaultFCMEndpoint + } + ep, err := url.Parse(endpoint) + if err != nil { + return nil, err } + ep.Path = path.Join(ep.Path, projectID, "messages:send") + c.endpoint = ep return c, nil } diff --git a/fcmv1/error.go b/fcmv1/error.go index 9f4b094..fe0ad86 100644 --- a/fcmv1/error.go +++ b/fcmv1/error.go @@ -9,6 +9,9 @@ const ( InvalidArgument = "INVALID_ARGUMENT" Unregistered = "UNREGISTERED" NotFound = "NOT_FOUND" + Internal = "INTERNAL" + Unavailable = "UNAVAILABLE" + QuotaExceeded = "QUOTA_EXCEEDED" ) type Error struct { diff --git a/mock/fcmv1_server.go b/mock/fcmv1_server.go new file mode 100644 index 0000000..c1defed --- /dev/null +++ b/mock/fcmv1_server.go @@ -0,0 +1,64 @@ +package mock + +import ( + "encoding/json" + "fmt" + "log" + "math/rand" + "net/http" + "strings" + "time" + + "github.com/kayac/Gunfish/fcmv1" +) + +func FCMv1MockServer(projectID string, verbose bool) *http.ServeMux { + mux := http.NewServeMux() + p := fmt.Sprintf("/v1/projects/%s/messages:send", projectID) + log.Println("fcmv1 mock server path:", p) + mux.HandleFunc(p, func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + defer func() { + if verbose { + log.Printf("reqtime:%f proto:%s method:%s path:%s host:%s", reqtime(start), r.Proto, r.Method, r.URL.Path, r.RemoteAddr) + } + }() + + // sets the response time from FCM server + time.Sleep(time.Millisecond*200 + time.Millisecond*(time.Duration(rand.Int63n(200)-100))) + token := r.Header.Get("Authorization") + token = strings.TrimPrefix(token, "Bearer ") + + w.Header().Set("Content-Type", ApplicationJSON) + switch token { + case fcmv1.InvalidArgument: + createFCMv1ErrorResponse(w, http.StatusBadRequest, fcmv1.InvalidArgument) + case fcmv1.Unregistered: + createFCMv1ErrorResponse(w, http.StatusNotFound, fcmv1.Unregistered) + case fcmv1.Unavailable: + createFCMv1ErrorResponse(w, http.StatusServiceUnavailable, fcmv1.Unavailable) + case fcmv1.Internal: + createFCMv1ErrorResponse(w, http.StatusInternalServerError, fcmv1.Internal) + case fcmv1.QuotaExceeded: + createFCMv1ErrorResponse(w, http.StatusTooManyRequests, fcmv1.QuotaExceeded) + default: + enc := json.NewEncoder(w) + enc.Encode(fcmv1.ResponseBody{ + Name: "ok", + }) + } + }) + + return mux +} + +func createFCMv1ErrorResponse(w http.ResponseWriter, code int, status string) error { + w.WriteHeader(code) + enc := json.NewEncoder(w) + return enc.Encode(fcmv1.ResponseBody{ + Error: &fcmv1.FCMError{ + Status: status, + Message: "mock error:" + status, + }, + }) +} diff --git a/supervisor.go b/supervisor.go index 80fec60..b26f64b 100644 --- a/supervisor.go +++ b/supervisor.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "io" - "net/http" + "math" "os" "os/exec" "sync" @@ -33,15 +33,13 @@ type Supervisor struct { // Worker sends notification to apns. type Worker struct { - ac *apns.Client - fcv1 *fcmv1.Client - queue chan Request - respq chan SenderResponse - wgrp *sync.WaitGroup - sn int - id int - errorHandler func(Request, *http.Response, error) - successHandler func(Request, *http.Response) + ac *apns.Client + fcv1 *fcmv1.Client + queue chan Request + respq chan SenderResponse + wgrp *sync.WaitGroup + sn int + id int } // SenderResponse is responses to worker from sender. @@ -113,17 +111,22 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { for cnt := 0; cnt < RetryOnceCount; cnt++ { select { case req := <-s.retryq: - reqs := &[]Request{req} - select { - case s.queue <- reqs: - LogWithFields(logrus.Fields{"type": "retry", "resend_cnt": req.Tries}). - Debugf("Enqueue to retry to send notification.") - default: - LogWithFields(logrus.Fields{"type": "retry"}). - Infof("Could not retry to enqueue because the supervisor queue is full.") + var delay time.Duration + if RetryBackoff { + delay = time.Duration(math.Pow(float64(req.Tries), 2)) * 100 * time.Millisecond } + time.AfterFunc(delay, func() { + reqs := &[]Request{req} + select { + case s.queue <- reqs: + LogWithFields(logrus.Fields{"delay": delay, "type": "retry", "resend_cnt": req.Tries}). + Debugf("Enqueue to retry to send notification.") + default: + LogWithFields(logrus.Fields{"delay": delay, "type": "retry"}). + Infof("Could not retry to enqueue because the supervisor queue is full.") + } + }) default: - break } } case <-s.exit: @@ -172,7 +175,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { return Supervisor{}, errors.New("FCM legacy is not supported") } if conf.FCMv1.Enabled { - fcv1, err = fcmv1.NewClient(conf.FCMv1.TokenSource, conf.FCMv1.ProjectID, nil, fcmv1.ClientTimeout) + fcv1, err = fcmv1.NewClient(conf.FCMv1.TokenSource, conf.FCMv1.ProjectID, conf.FCMv1.Endpoint, fcmv1.ClientTimeout) if err != nil { LogWithFields(logrus.Fields{ "type": "supervisor", @@ -192,7 +195,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { s.workers = append(s.workers, &worker) s.wgrp.Add(1) - go s.spawnWorker(worker, conf) + go s.spawnWorker(worker) LogWithFields(logrus.Fields{ "type": "worker", "worker_id": i, @@ -245,7 +248,7 @@ func (s *Supervisor) Shutdown() { }).Infoln("Stoped supervisor.") } -func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) { +func (s *Supervisor) spawnWorker(w Worker) { atomic.AddInt64(&(srvStats.Workers), 1) defer func() { atomic.AddInt64(&(srvStats.Workers), -1) @@ -367,23 +370,7 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com if resp.Err != nil { req := resp.Req LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error()) - if req.Tries < SendRetryCount { - req.Tries++ - atomic.AddInt64(&(srvStats.RetryCount), 1) - logf["resend_cnt"] = req.Tries - - select { - case retryq <- req: - LogWithFields(logf). - Debugf("Retry to enqueue into retryq because of http connection error with FCM.") - default: - LogWithFields(logf). - Warnf("Supervisor retry queue is full.") - } - } else { - LogWithFields(logf). - Warnf("Retry count is over than %d. Could not deliver notification.", SendRetryCount) - } + retry(retryq, req, resp.Err, logf) return } @@ -395,12 +382,17 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com LogWithFields(logf).Info("Succeeded to send a notification") continue } - // handle error response each registration_id - atomic.AddInt64(&(srvStats.ErrCount), 1) switch err.Error() { + case fcmv1.Internal, fcmv1.Unavailable: + LogWithFields(logf).Warn("retrying:", err) + retry(retryq, resp.Req, err, logf) + case fcmv1.QuotaExceeded: + LogWithFields(logf).Warn("retrying after 1 min:", err) + time.AfterFunc(time.Minute, func() { retry(retryq, resp.Req, err, logf) }) case fcmv1.Unregistered, fcmv1.InvalidArgument, fcmv1.NotFound: + LogWithFields(logf).Errorf("calling error hook: %s", err) + atomic.AddInt64(&(srvStats.ErrCount), 1) onResponse(result, errorResponseHandler.HookCmd(), cmdq) - LogWithFields(logf).Errorf("%s", err) default: LogWithFields(logf).Errorf("Unknown error message: %s", err) } @@ -438,7 +430,7 @@ func spawnSender(wq <-chan Request, respq chan<- SenderResponse, wgrp *sync.Wait no := req.Notification.(apns.Notification) start := time.Now() results, err := ac.Send(no) - respTime := time.Now().Sub(start).Seconds() + respTime := time.Since(start).Seconds() rs := make([]Result, 0, len(results)) for _, v := range results { rs = append(rs, v) @@ -459,7 +451,7 @@ func spawnSender(wq <-chan Request, respq chan<- SenderResponse, wgrp *sync.Wait p := req.Notification.(fcmv1.Payload) start := time.Now() results, err := fcv1.Send(p) - respTime := time.Now().Sub(start).Seconds() + respTime := time.Since(start).Seconds() rs := make([]Result, 0, len(results)) for _, v := range results { rs = append(rs, v) diff --git a/supervisor_test.go b/supervisor_test.go index 708d914..205e0b2 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -58,6 +58,7 @@ func (tr TestResponseHandler) HookCmd() string { func init() { logrus.SetLevel(logrus.WarnLevel) conf.Apns.Host = gunfish.MockServer + gunfish.RetryBackoff = false // for testing } func TestEnqueuRequestToSupervisor(t *testing.T) { diff --git a/test/gunfish_test.toml b/test/gunfish_test.toml index 9724b83..9178543 100644 --- a/test/gunfish_test.toml +++ b/test/gunfish_test.toml @@ -11,3 +11,9 @@ cert_file = "{{ env `PROJECT_ROOT` `.` }}/test/server.crt" key_file = "{{ env `PROJECT_ROOT` `.` }}/test/server.key" request_per_sec = 2000 sender_num = 50 + +[fcm_v1] +# google_application_credentials = "{{ env `PROJECT_ROOT` `.` }}/credentials.json" +enabled = true +endpoint = "http://localhost:8888/v1/projects" +projectid = "test" diff --git a/test/tools/fcmv1mock/fcmv1mock.go b/test/tools/fcmv1mock/fcmv1mock.go new file mode 100644 index 0000000..f226db1 --- /dev/null +++ b/test/tools/fcmv1mock/fcmv1mock.go @@ -0,0 +1,29 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + + "github.com/kayac/Gunfish/mock" +) + +func main() { + var ( + port int + projectID string + verbose bool + ) + + flag.IntVar(&port, "port", 8888, "fcmv1 mock server port") + flag.StringVar(&projectID, "project-id", "test", "fcmv1 mock project id") + flag.BoolVar(&verbose, "verbose", false, "verbose flag") + flag.Parse() + + mux := mock.FCMv1MockServer(projectID, verbose) + log.Println("start fcmv1mock server port:", port, "project_id:", projectID) + if err := http.ListenAndServe(fmt.Sprintf(":%d", port), mux); err != nil { + log.Fatal(err) + } +}