Skip to content

Commit

Permalink
Check for tasks errors in batchProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Apr 10, 2019
1 parent 397d95d commit cf1b2a0
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions gossip/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ func (d *BatchProcessor) wasProcessed(b *protocol.BatchSnapshots) bool {
log.Infof("Error encoding batchsnapshots to calculate its digest. Dropping batch.")
return false
}
digest := hashing.NewSha256Hasher().Do(buf.Bytes())
bb := buf.Bytes()
digest := hashing.NewSha256Hasher().Do(bb)
// batch already processed, discard it
_, err = d.a.Cache.Get(digest)
if err == nil {
log.Debugf("Batch processor found %v on cache calculated from %v", digest, bb)
return true
}
d.a.Cache.Set(digest, []byte{0x0}, 0)
d.a.Cache.Set(digest, []byte{0x1}, 0)
return false
}

Expand Down Expand Up @@ -127,7 +129,10 @@ func (d *BatchProcessor) Subscribe(id int, ch <-chan *Message) {
ctx := context.WithValue(d.ctx, "batch", batch)
for _, t := range d.tf {
log.Debugf("Batch processor creating a new task")
d.a.Tasks.Add(t.New(ctx))
err := d.a.Tasks.Add(t.New(ctx))
if err != nil {
log.Infof("BatchProcessor was unable to enqueue new task becasue %v", err)
}
}

d.a.Out.Publish(msg)
Expand Down

0 comments on commit cf1b2a0

Please sign in to comment.