Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #75 from RobotsAndPencils/wait
Browse files Browse the repository at this point in the history
calling code needs to ensure all responses are handled (not just received)
  • Loading branch information
nathany authored Jul 29, 2016
2 parents 122e0fb + c454670 commit 1589c17
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 91 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,29 @@ See `example/push` for the complete listing.
HTTP/2 can send multiple requests over a single connection, but `service.Push` waits for a response before returning. Instead, you can wrap a `Service` in a queue to handle responses independently, allowing you to send multiple notifications at once.

```go
var wg sync.WaitGroup
queue := push.NewQueue(service, numWorkers)

// process responses (responses may be received in any order)
go func() {
for resp := range queue.Responses {
log.Println(resp)
// done receiving and processing one response
wg.Done()
}
}()

// send the notifications
for i := 0; i < 100; i++ {
// increment count of notifications sent and queue it
wg.Add(1)
queue.Push(deviceToken, nil, b)
}

// done sending notifications, wait for all responses and shutdown
queue.Wait()
// wait for all responses to be processed
wg.Wait()
// shutdown the channels and workers for the queue
queue.Close()
```

It's important to set up a goroutine to handle responses before sending any notifications, otherwise Push will block waiting for room to return a Response.
Expand Down
7 changes: 6 additions & 1 deletion example/concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"sync"
"time"

"github.com/RobotsAndPencils/buford/certificate"
Expand Down Expand Up @@ -60,6 +61,7 @@ func main() {
exitOnError(err)
service := push.NewService(client, host)
queue := push.NewQueue(service, workers)
var wg sync.WaitGroup

// process responses
// NOTE: Responses may be received in any order.
Expand All @@ -72,6 +74,7 @@ func main() {
log.Printf("(%d) device: %s, apns-id: %s", count, resp.DeviceToken, resp.ID)
}
count++
wg.Done()
}
}()

Expand All @@ -85,10 +88,12 @@ func main() {
// send notifications:
start := time.Now()
for i := 0; i < number; i++ {
wg.Add(1)
queue.Push(deviceToken, nil, b)
}
// done sending notifications, wait for all responses and shutdown:
queue.Wait()
wg.Wait()
queue.Close()
elapsed := time.Since(start)

log.Printf("Time for %d responses: %s (%s ea.)", number, elapsed, elapsed/time.Duration(number))
Expand Down
78 changes: 0 additions & 78 deletions push/benchmark_test.go

This file was deleted.

12 changes: 3 additions & 9 deletions push/queue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package push

import "sync"

// Queue up notifications without waiting for the response.
type Queue struct {
service *Service
notifications chan notification
Responses chan Response
wg sync.WaitGroup
}

// notification to send.
Expand Down Expand Up @@ -46,17 +43,15 @@ func (q *Queue) Push(deviceToken string, headers *Headers, payload []byte) {
Headers: headers,
Payload: payload,
}
q.wg.Add(1)
q.notifications <- n
}

// Wait for all responses to be handled and then close channels.
func (q *Queue) Wait() {
// Close the channels for notifications and Responses and shutdown workers.
// You should only call this after all responses have been received.
func (q *Queue) Close() {
// Stop accepting new notifications and shutdown workers after existing notifications
// are processed:
close(q.notifications)
// Wait for all responses to be handled:
q.wg.Wait()
// Close responses channel to clean up:
close(q.Responses)
}
Expand All @@ -65,6 +60,5 @@ func worker(q *Queue) {
for n := range q.notifications {
id, err := q.service.Push(n.DeviceToken, n.Headers, n.Payload)
q.Responses <- Response{DeviceToken: n.DeviceToken, ID: id, Err: err}
q.wg.Done()
}
}
7 changes: 6 additions & 1 deletion push/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/RobotsAndPencils/buford/push"
Expand All @@ -28,6 +29,7 @@ func TestQueuePush(t *testing.T) {

service := push.NewService(http.DefaultClient, server.URL)
queue := push.NewQueue(service, workers)
var wg sync.WaitGroup

go func() {
for resp := range queue.Responses {
Expand All @@ -37,11 +39,14 @@ func TestQueuePush(t *testing.T) {
if resp.ID != resp.DeviceToken {
t.Errorf("Expected %q == %q.", resp.ID, resp.DeviceToken)
}
wg.Done()
}
}()

for i := 0; i < number; i++ {
wg.Add(1)
queue.Push(fmt.Sprintf("%04d", i), nil, payload)
}
queue.Wait()
wg.Wait()
queue.Close()
}

0 comments on commit 1589c17

Please sign in to comment.