Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Gracefully handle panics, add --log flag #8

Merged
merged 2 commits into from
Nov 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/ingester/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
)

var (
logLevel uint32
listenAddr string
authorizationToken string
enableInfluxDB bool
Expand All @@ -13,6 +14,7 @@ var (

func init() {
pflag.StringVar(&listenAddr, "http.listenAddr", ":8080", "Address to listen on.")
pflag.Uint32Var(&logLevel, "log", 0, "Log level to use, defaults to 4 (INFO).")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it defaults to 0 (PANIC) to me.... :trollface:
Also, maybe document the log levels or link to logrus?

pflag.StringVar(&authorizationToken, "http.authToken", "",
"Optional authorization token that will be used to authenticate incoming requests.")
pflag.BoolVar(&enableInfluxDB, "backend.influxdb", false, "Enable the InfluxDB storage backend.")
Expand Down
7 changes: 7 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ func main() {
pflag.Parse()
mux := http.NewServeMux()

// Set log level
if logLevel > 0 {
level := log.Level(logLevel)
log.WithField("log_level", level).Info("setting log level")
log.SetLevel(level)
}

// Add middlewares
middlewares := []Middleware{
createLoggingHandler(log.StandardLogger()),
Expand Down
9 changes: 9 additions & 0 deletions pkg/backends/influxdb/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func (b *Backend) Name() string {
}

func (b *Backend) Write(payload *healthautoexport.Payload, targetName string) error {
// Properly handle nil data.
if payload.Data == nil {
log.WithFields(log.Fields{
"backend": b.Name(),
"target": targetName,
}).Warn("empty payload data received, skipping")
return nil
}

// Write metrics.
if err := b.writeMetrics(payload.Data.Metrics, targetName); err != nil {
return errors.Wrapf(err, "write metrics error")
Expand Down
12 changes: 10 additions & 2 deletions pkg/backends/influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"

apierrors "github.com/irvinlim/apple-health-ingester/pkg/errors"
)

// Client knows how to write to an InfluxDB database.
Expand Down Expand Up @@ -40,11 +42,17 @@ func NewClient() (Client, error) {
}

func (c *clientImpl) WriteMetrics(ctx context.Context, point ...*write.Point) error {
return c.client.WriteAPIBlocking(c.orgName, c.metricsBucketName).WritePoint(ctx, point...)
if err := c.client.WriteAPIBlocking(c.orgName, c.metricsBucketName).WritePoint(ctx, point...); err != nil {
return apierrors.WrapRetryableWrite(err)
}
return nil
}

func (c *clientImpl) WriteWorkouts(ctx context.Context, point ...*write.Point) error {
return c.client.WriteAPIBlocking(c.orgName, c.workoutsBucketName).WritePoint(ctx, point...)
if err := c.client.WriteAPIBlocking(c.orgName, c.workoutsBucketName).WritePoint(ctx, point...); err != nil {
return apierrors.WrapRetryableWrite(err)
}
return nil
}

// MockClient is a mock implementation of Client.
Expand Down
9 changes: 6 additions & 3 deletions pkg/backends/noop/backend.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package noop

import (
"errors"

"github.com/irvinlim/apple-health-ingester/pkg/backends"
apierrors "github.com/irvinlim/apple-health-ingester/pkg/errors"
"github.com/irvinlim/apple-health-ingester/pkg/healthautoexport"
)

type Backend struct {
Writes []*healthautoexport.Payload
ShouldError bool
ShouldPanic bool
}

var _ backends.Backend = &Backend{}
Expand All @@ -23,8 +23,11 @@ func (b *Backend) Name() string {
}

func (b *Backend) Write(payload *healthautoexport.Payload, _ string) error {
if b.ShouldPanic {
panic("backend panic during write")
}
if b.ShouldError {
return errors.New("noop error")
return apierrors.NewRetryableWriteError()
}
b.Writes = append(b.Writes, payload)
return nil
Expand Down
83 changes: 83 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package errors

import (
"fmt"

"github.com/pkg/errors"
)

type Reason string

const (
ReasonRetryableWrite Reason = "RetryableWrite"
ReasonUnknown Reason = "Unknown"
)

var (
// baseError is used when there is no error being wrapped and the reason refers to the error itself.
baseError = errors.New("base error")
)

// Error implements custom error types.
type Error interface {
error
GetReason() Reason
}

type wrappedError struct {
error
Reason Reason
Message string
}

var _ Error = (*wrappedError)(nil)

func (w *wrappedError) Error() string {
if w.error == baseError {
return w.error.Error()
}
return fmt.Sprintf("%v: %v", w.Message, w.error)
}

func (w *wrappedError) GetReason() Reason {
return w.Reason
}

// NewRetryableWriteError returns a new RetryableWriteError.
func NewRetryableWriteError() error {
return WrapRetryableWrite(baseError)
}

// WrapRetryableWrite wraps an error as a RetryableWriteError.
func WrapRetryableWrite(err error) error {
return WrapfRetryableWrite(err, "")
}

// WrapfRetryableWrite wraps an error as a RetryableWriteError.
func WrapfRetryableWrite(err error, message string) error {
return &wrappedError{
error: err,
Reason: ReasonRetryableWrite,
Message: wrapMessage("temporary error writing to backend, will retry again later", message),
}
}

// IsRetryableWrite tests if err is a RetryableWriteError.
func IsRetryableWrite(err error) bool {
return GetReason(err) == ReasonRetryableWrite
}

func GetReason(err error) Reason {
if wrappedErr := Error(nil); errors.As(err, &wrappedErr) {
return wrappedErr.GetReason()
}
return ReasonUnknown
}

func wrapMessage(base string, message string) string {
wrapped := base
if len(message) > 0 {
wrapped = fmt.Sprintf("%v: %v", message, wrapped)
}
return wrapped
}
26 changes: 21 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"
"fmt"
"io"
"runtime/debug"
"sync"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/irvinlim/apple-health-ingester/pkg/backends"
apierrors "github.com/irvinlim/apple-health-ingester/pkg/errors"
"github.com/irvinlim/apple-health-ingester/pkg/healthautoexport"
)

Expand Down Expand Up @@ -137,10 +139,12 @@ func (i *Ingester) processQueue(backend *backends.BackendQueue) {
logger = logger.WithField("elapsed", time.Since(startTime))

if err != nil {
backend.Queue.AddRateLimited(item)
logger.WithError(err).
WithField("retries", backend.Queue.NumRequeues(item)).
Error("write data error")
if apierrors.IsRetryableWrite(err) {
backend.Queue.AddRateLimited(item)
logger = logger.WithField("retries", backend.Queue.NumRequeues(item))
}

logger.WithError(err).Error("write data error")
} else {
logger.Info("write data success")
}
Expand All @@ -149,11 +153,23 @@ func (i *Ingester) processQueue(backend *backends.BackendQueue) {
}
}

func (i *Ingester) processWriteItem(item interface{}, backend backends.Backend) error {
func (i *Ingester) processWriteItem(item interface{}, backend backends.Backend) (err error) {
payload, ok := item.(*PayloadWithTarget)
if !ok {
return fmt.Errorf("cannot convert to *Payload")
}

// Handle panics in backend implementations.
defer func() {
if r := recover(); r != nil {
log.WithFields(log.Fields{
"backend": backend.Name(),
"payload": payload,
}).Error("recovered from panic in backend:\n" + string(debug.Stack()))
err = errors.New("recovered from panic")
}
}()

if err := backend.Write(payload.Payload, payload.TargetName); err != nil {
return errors.Wrapf(err, "cannot write payload to database")
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,25 @@ tickerLoop:
ingest.Shutdown()
assert.Equal(t, expectedWrites, len(backend.Writes))
}

func TestIngester_BackendPanic(t *testing.T) {
ingest := ingester.NewIngester()
backend := noop.NewBackend()
assert.NoError(t, ingest.AddBackend(backend))
ingest.Start()

var expectedWrites int

// Backend should panic and throw an error, but will recover
backend.ShouldPanic = true
assert.NoError(t, ingest.IngestFromString(payload, backend.Name(), ""))
time.Sleep(processingDelay)
assert.Equal(t, expectedWrites, len(backend.Writes))

// Backend will no longer panic and writes should still succeed
backend.ShouldPanic = false
assert.NoError(t, ingest.IngestFromString(payload, backend.Name(), ""))
time.Sleep(processingDelay)
expectedWrites++
assert.Equal(t, expectedWrites, len(backend.Writes))
}