Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): support graceful shutdown #459

Merged
merged 4 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ build_linux_lambda:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -tags 'lambda' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)' -o release/linux/lambda/$(DEPLOY_IMAGE)

docker_image:
docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f Dockerfile .
docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f ./docker/Dockerfile.linux.amd64 .

docker_release: docker_image

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ A push notification micro server using [Gin](https://github.com/gin-gonic/gin) f
- Support install TLS certificates from [Let's Encrypt](https://letsencrypt.org/) automatically.
- Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework.
- Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/))
- Support graceful shutdown that notifications workers and queue have are sent to APNs/FCM before a push notification service is shutdown.

See the default [YAML config example](config/config.yml):

Expand Down
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: '3'

services:
gorush:
image: appleboy/gorush
restart: always
ports:
- "8080:8080"
- "9000:9000"
logging:
options:
max-size: "100k"
max-file: "3"
environment:
- GORUSH_CORE_QUEUE_NUM=512
7 changes: 6 additions & 1 deletion gorush/notification_fcm_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package gorush

import (
"context"
"log"
"os"
"sync"
"testing"

"github.com/appleboy/go-fcm"
Expand All @@ -16,7 +18,10 @@ func init() {
log.Fatal(err)
}

InitWorkers(PushConf.Core.WorkerNum, PushConf.Core.QueueNum)
ctx := context.Background()
wg := &sync.WaitGroup{}
wg.Add(int(PushConf.Core.WorkerNum))
InitWorkers(ctx, wg, PushConf.Core.WorkerNum, PushConf.Core.QueueNum)

if err := InitAppStatus(); err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion gorush/server_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func RunHTTPServer() error {
return nil
}

LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.")
LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")

return gateway.ListenAndServe(PushConf.Core.Address+":"+PushConf.Core.Port, routerEngine())
}
2 changes: 1 addition & 1 deletion gorush/server_normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func RunHTTPServer() (err error) {
Handler: routerEngine(),
}

LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.")
LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")
if PushConf.Core.AutoTLS.Enabled {
return startServer(autoTLSServer())
} else if PushConf.Core.SSL {
Expand Down
3 changes: 2 additions & 1 deletion gorush/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/appleboy/gorush/storage/leveldb"
"github.com/appleboy/gorush/storage/memory"
"github.com/appleboy/gorush/storage/redis"

"github.com/gin-gonic/gin"
"github.com/thoas/stats"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ type IosStatus struct {

// InitAppStatus for initialize app status
func InitAppStatus() error {
LogAccess.Debug("Init App Status Engine as ", PushConf.Stat.Engine)
LogAccess.Info("Init App Status Engine as ", PushConf.Stat.Engine)
switch PushConf.Stat.Engine {
case "memory":
StatStorage = memory.New()
Expand Down
13 changes: 7 additions & 6 deletions gorush/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
)

// InitWorkers for initialize all workers.
func InitWorkers(workerNum int64, queueNum int64) {
LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum)
func InitWorkers(ctx context.Context, wg *sync.WaitGroup, workerNum int64, queueNum int64) {
LogAccess.Info("worker number is ", workerNum, ", queue number is ", queueNum)
QueueNotification = make(chan PushNotification, queueNum)
for i := int64(0); i < workerNum; i++ {
go startWorker()
go startWorker(ctx, wg, i)
}
}

Expand All @@ -33,11 +33,12 @@ func SendNotification(req PushNotification) {
}
}

func startWorker() {
for {
notification := <-QueueNotification
func startWorker(ctx context.Context, wg *sync.WaitGroup, num int64) {
defer wg.Done()
for notification := range QueueNotification {
SendNotification(notification)
}
LogAccess.Info("closed the worker num ", num)
}

// markFailedNotification adds failure logs for all tokens in push notification
Expand Down
48 changes: 40 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"

"github.com/appleboy/gorush/config"
Expand All @@ -18,6 +22,24 @@ import (
"golang.org/x/sync/errgroup"
)

func withContextFunc(ctx context.Context, f func()) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SA1017: the channel used with signal.Notify should be buffered (from staticcheck)

defer signal.Stop(c)

select {
case <-ctx.Done():
case <-c:
cancel()
f()
}
}()

return ctx
}

func main() {
opts := config.ConfYaml{}

Expand Down Expand Up @@ -223,19 +245,29 @@ func main() {
}

if err = gorush.InitAppStatus(); err != nil {
return
gorush.LogError.Fatal(err)
}

gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)
wg := &sync.WaitGroup{}
wg.Add(int(gorush.PushConf.Core.WorkerNum))
ctx := withContextFunc(context.Background(), func() {
gorush.LogAccess.Info("close the notification queue channel")
close(gorush.QueueNotification)
wg.Wait()
gorush.LogAccess.Info("the notification queue has been clear")
})

var g errgroup.Group
gorush.InitWorkers(ctx, wg, gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)

g.Go(gorush.InitAPNSClient)
if err = gorush.InitAPNSClient(); err != nil {
gorush.LogError.Fatal(err)
}

g.Go(func() error {
_, err := gorush.InitFCMClient(gorush.PushConf.Android.APIKey)
return err
})
if _, err = gorush.InitFCMClient(gorush.PushConf.Android.APIKey); err != nil {
gorush.LogError.Fatal(err)
}

var g errgroup.Group

g.Go(gorush.RunHTTPServer) // Run httpd server
g.Go(rpc.RunGRPCServer) // Run gRPC internal server
Expand Down
2 changes: 1 addition & 1 deletion rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot
// RunGRPCServer run gorush grpc server
func RunGRPCServer() error {
if !gorush.PushConf.GRPC.Enabled {
gorush.LogAccess.Debug("gRPC server is disabled.")
gorush.LogAccess.Info("gRPC server is disabled.")
return nil
}

Expand Down