Skip to content

Commit

Permalink
multiple channels for errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Zach Abrahamson committed Jul 18, 2016
1 parent 99669f8 commit 50c4fa2
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type EventsCollection struct {
Entries []*Event `json:"entries"`
}

type EventChanEntry struct {
Event *Event
Err error
}

func (e *EventService) getSeedStreamPos(startTime string) string {
var respBoxEventsJSON EventsCollection
req, err := http.NewRequest("GET", e.BaseUrl.String()+"/events?stream_type=admin_logs&limit=1&created_after="+startTime, nil)
Expand All @@ -53,36 +58,47 @@ func (e *EventService) getSeedStreamPos(startTime string) string {
return respBoxEventsJSON.NextStreamPosition
}

func (e *EventService) getEvents(eventLimit string, streamPos string) *EventsCollection {
func (e *EventService) getEvents(eventLimit string, streamPos string) (*EventsCollection, error) {
var respBoxEventsJSON EventsCollection
req, err := http.NewRequest("GET", e.BaseUrl.String()+"/events?stream_type=admin_logs&limit="+eventLimit+"&stream_position="+streamPos, nil)
req.Header.Add("Authorization", "Bearer "+e.Token)
_, err = e.DoWithRetries(req, &respBoxEventsJSON, 5)
if err != nil {

fmt.Println(err)
return nil, err
}
return &respBoxEventsJSON
return &respBoxEventsJSON, nil
}

func (e *EventService) streamEvents(eventLimit string, streamPos string, tunnel chan *Event) {
func (e *EventService) streamEvents(eventLimit string, streamPos string, tunnel chan *EventChanEntry) {
for {
if streamPos == "" {
close(tunnel)
return
}
eventsCollection := e.getEvents(eventLimit, streamPos)
eventsCollection, err := e.getEvents(eventLimit, streamPos)
if err != nil {
ev := &EventChanEntry{
nil,
err,
}
tunnel <- ev
continue
}
for _, event := range eventsCollection.Entries {
tunnel <- event
ev := &EventChanEntry{
event,
nil,
}
tunnel <- ev
}
streamPos = eventsCollection.NextStreamPosition
//time.Sleep(2 * time.Second)
}
}

func (e *EventService) Channel(eventLimit string, startTime string) chan *Event {
func (e *EventService) Channel(eventLimit string, startTime string) chan *EventChanEntry {
streamPos := e.getSeedStreamPos(startTime)
eventStream := make(chan *Event)
eventStream := make(chan *EventChanEntry)
go e.streamEvents(eventLimit, streamPos, eventStream)
return eventStream
}

0 comments on commit 50c4fa2

Please sign in to comment.