diff --git a/README.md b/README.md index 6581397..129fef4 100644 --- a/README.md +++ b/README.md @@ -110,22 +110,29 @@ See `example/push` for the complete listing. HTTP/2 can send multiple requests over a single connection, but `service.Push` waits for a response before returning. Instead, you can wrap a `Service` in a queue to handle responses independently, allowing you to send multiple notifications at once. ```go +var wg sync.WaitGroup queue := push.NewQueue(service, numWorkers) // process responses (responses may be received in any order) go func() { for resp := range queue.Responses { log.Println(resp) + // done receiving and processing one response + wg.Done() } }() // send the notifications for i := 0; i < 100; i++ { + // increment count of notifications sent and queue it + wg.Add(1) queue.Push(deviceToken, nil, b) } -// done sending notifications, wait for all responses and shutdown -queue.Wait() +// wait for all responses to be processed +wg.Wait() +// shutdown the channels and workers for the queue +queue.Close() ``` It's important to set up a goroutine to handle responses before sending any notifications, otherwise Push will block waiting for room to return a Response. diff --git a/example/concurrent/main.go b/example/concurrent/main.go index afe459a..8abf9e6 100644 --- a/example/concurrent/main.go +++ b/example/concurrent/main.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "sync" "time" "github.com/RobotsAndPencils/buford/certificate" @@ -60,6 +61,7 @@ func main() { exitOnError(err) service := push.NewService(client, host) queue := push.NewQueue(service, workers) + var wg sync.WaitGroup // process responses // NOTE: Responses may be received in any order. @@ -72,6 +74,7 @@ func main() { log.Printf("(%d) device: %s, apns-id: %s", count, resp.DeviceToken, resp.ID) } count++ + wg.Done() } }() @@ -85,10 +88,12 @@ func main() { // send notifications: start := time.Now() for i := 0; i < number; i++ { + wg.Add(1) queue.Push(deviceToken, nil, b) } // done sending notifications, wait for all responses and shutdown: - queue.Wait() + wg.Wait() + queue.Close() elapsed := time.Since(start) log.Printf("Time for %d responses: %s (%s ea.)", number, elapsed, elapsed/time.Duration(number)) diff --git a/push/benchmark_test.go b/push/benchmark_test.go deleted file mode 100644 index 3431c1e..0000000 --- a/push/benchmark_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package push_test - -import ( - "encoding/json" - "flag" - "testing" - - "github.com/RobotsAndPencils/buford/certificate" - "github.com/RobotsAndPencils/buford/payload" - "github.com/RobotsAndPencils/buford/payload/badge" - "github.com/RobotsAndPencils/buford/push" -) - -var ( - deviceToken string - filename, password string - workers uint -) - -func init() { - flag.StringVar(&deviceToken, "token", "", "Device token") - flag.StringVar(&filename, "cert", "", "Path to p12 certificate file") - flag.StringVar(&password, "pwd", "", "Password for p12 file.") - flag.UintVar(&workers, "w", 20, "Workers to send notifications") - flag.Parse() -} - -// GODEBUG=http2debug=1 go test ./push -cert ../cert.p12 -token device-token -v -bench . -benchtime 10s -func BenchmarkPush(b *testing.B) { - if filename == "" || deviceToken == "" { - b.Skip("Skipping benchmark without cert file and device token.") - } - - cert, err := certificate.Load(filename, password) - if err != nil { - b.Fatal(err) - } - - client, err := push.NewClient(cert) - if err != nil { - b.Fatal(err) - } - - service := push.NewService(client, push.Development) - queue := push.NewQueue(service, workers) - - p := payload.APS{ - Alert: payload.Alert{Body: "Hello HTTP/2"}, - Badge: badge.New(42), - } - bytes, err := json.Marshal(p) - if err != nil { - b.Fatal(err) - } - - // warm up the connection - _, err = service.Push(deviceToken, nil, bytes) - if err != nil { - b.Fatal(err) - } - - // handle responses - go func() { - for resp := range queue.Responses { - if resp.Err != nil { - b.Fatal(resp.Err) - } - } - }() - - b.ResetTimer() - // this benchmark is the time to send the notifications without waiting - // for the responses - for i := 0; i < b.N; i++ { - queue.Push(deviceToken, nil, bytes) - } - queue.Wait() -} diff --git a/push/queue.go b/push/queue.go index 0a331ec..2e40dfc 100644 --- a/push/queue.go +++ b/push/queue.go @@ -1,13 +1,10 @@ package push -import "sync" - // Queue up notifications without waiting for the response. type Queue struct { service *Service notifications chan notification Responses chan Response - wg sync.WaitGroup } // notification to send. @@ -46,17 +43,15 @@ func (q *Queue) Push(deviceToken string, headers *Headers, payload []byte) { Headers: headers, Payload: payload, } - q.wg.Add(1) q.notifications <- n } -// Wait for all responses to be handled and then close channels. -func (q *Queue) Wait() { +// Close the channels for notifications and Responses and shutdown workers. +// You should only call this after all responses have been received. +func (q *Queue) Close() { // Stop accepting new notifications and shutdown workers after existing notifications // are processed: close(q.notifications) - // Wait for all responses to be handled: - q.wg.Wait() // Close responses channel to clean up: close(q.Responses) } @@ -65,6 +60,5 @@ func worker(q *Queue) { for n := range q.notifications { id, err := q.service.Push(n.DeviceToken, n.Headers, n.Payload) q.Responses <- Response{DeviceToken: n.DeviceToken, ID: id, Err: err} - q.wg.Done() } } diff --git a/push/queue_test.go b/push/queue_test.go index d46aea0..eac2fbc 100644 --- a/push/queue_test.go +++ b/push/queue_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "github.com/RobotsAndPencils/buford/push" @@ -28,6 +29,7 @@ func TestQueuePush(t *testing.T) { service := push.NewService(http.DefaultClient, server.URL) queue := push.NewQueue(service, workers) + var wg sync.WaitGroup go func() { for resp := range queue.Responses { @@ -37,11 +39,14 @@ func TestQueuePush(t *testing.T) { if resp.ID != resp.DeviceToken { t.Errorf("Expected %q == %q.", resp.ID, resp.DeviceToken) } + wg.Done() } }() for i := 0; i < number; i++ { + wg.Add(1) queue.Push(fmt.Sprintf("%04d", i), nil, payload) } - queue.Wait() + wg.Wait() + queue.Close() }