Skip to content

Commit

Permalink
Fix transform crash for empty hydrate items. Fix plugin and table lev…
Browse files Browse the repository at this point in the history
…el error handling defaults. Respect retry config in DefaultGetConfig. Closes #325

If a hydrate function fails, instead of passing an empty struct to transform functions (which could cause casting errors), do not call the transform function at all.
  • Loading branch information
kaidaguerre authored May 12, 2022
1 parent 6ddc378 commit 348c461
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 181 deletions.
5 changes: 2 additions & 3 deletions plugin/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ type GetConfig struct {
}

func (c *GetConfig) initialise(table *Table) {
log.Printf("[TRACE] GetConfig initialise table %s", table.Name)

log.Printf("[TRACE] GetConfig.initialise table %s", table.Name)
// create RetryConfig if needed
if c.RetryConfig == nil {
c.RetryConfig = &RetryConfig{}
Expand All @@ -42,7 +41,7 @@ func (c *GetConfig) initialise(table *Table) {
c.RetryConfig.DefaultTo(table.DefaultRetryConfig)
c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig)

log.Printf("[TRACE] RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String())
log.Printf("[TRACE] GetConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())
}

func (c *GetConfig) Validate(table *Table) []string {
Expand Down
60 changes: 0 additions & 60 deletions plugin/hydrate.go
Original file line number Diff line number Diff line change
@@ -1,61 +1 @@
package plugin

import (
"context"
"fmt"
"log"
"time"

"github.com/sethvargo/go-retry"
"github.com/turbot/go-kit/helpers"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// RetryHydrate function invokes the hydrate function with retryable errors and retries the function until the maximum attemptes before throwing error
func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrateFunc HydrateFunc, retryConfig *RetryConfig) (interface{}, error) {
backoff, err := retry.NewFibonacci(100 * time.Millisecond)
if err != nil {
return nil, err
}
var hydrateResult interface{}
shouldRetryErrorFunc := retryConfig.ShouldRetryError

err = retry.Do(ctx, retry.WithMaxRetries(10, backoff), func(ctx context.Context) error {
hydrateResult, err = hydrateFunc(ctx, d, hydrateData)
if err != nil && shouldRetryErrorFunc(err) {
err = retry.RetryableError(err)
}
return err
})
return hydrateResult, err
}

// WrapHydrate is a higher order function which returns a HydrateFunc which handles Ignorable errors
func WrapHydrate(hydrateFunc HydrateFunc, ignoreConfig *IgnoreConfig) HydrateFunc {
return func(ctx context.Context, d *QueryData, h *HydrateData) (item interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[WARN] recovered a panic from a wrapped hydrate function: %v\n", r)
err = status.Error(codes.Internal, fmt.Sprintf("hydrate function %s failed with panic %v", helpers.GetFunctionName(hydrateFunc), r))
}
}()
// call the underlying get function
item, err = hydrateFunc(ctx, d, h)
if err != nil {
log.Printf("[TRACE] wrapped hydrate call %s returned error %v\n", helpers.GetFunctionName(hydrateFunc), err)
// see if the ignoreConfig defines a should ignore function
if ignoreConfig.ShouldIgnoreError != nil && ignoreConfig.ShouldIgnoreError(err) {
log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err)
return nil, nil
}
if ignoreConfig.ShouldIgnoreErrorFunc != nil && ignoreConfig.ShouldIgnoreErrorFunc(ctx, d, h, err) {
log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err)
return nil, nil
}
// pass any other error on
return nil, err
}
return item, nil
}
}
4 changes: 2 additions & 2 deletions plugin/hydrate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Depends: %s`,
}

func (c *HydrateConfig) initialise(table *Table) {
log.Printf("[TRACE] HydrateConfig initialise func %s, table %s", helpers.GetFunctionName(c.Func), table.Name)
log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", helpers.GetFunctionName(c.Func), table.Name)

// create RetryConfig if needed
if c.RetryConfig == nil {
Expand All @@ -58,7 +58,7 @@ func (c *HydrateConfig) initialise(table *Table) {
c.RetryConfig.DefaultTo(table.DefaultRetryConfig)
c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig)

log.Printf("[TRACE] RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String())
log.Printf("[TRACE] HydrateConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())
}

func (c *HydrateConfig) Validate(table *Table) []string {
Expand Down
85 changes: 85 additions & 0 deletions plugin/hydrate_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package plugin

import (
"context"
"fmt"
"github.com/sethvargo/go-retry"
"github.com/turbot/go-kit/helpers"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"time"
)

// RetryHydrate function invokes the hydrate function with retryable errors and retries the function until the maximum attemptes before throwing error
func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrateFunc HydrateFunc, retryConfig *RetryConfig) (interface{}, error) {
backoff, err := retry.NewFibonacci(100 * time.Millisecond)
if err != nil {
return nil, err
}
var hydrateResult interface{}

err = retry.Do(ctx, retry.WithMaxRetries(10, backoff), func(ctx context.Context) error {
hydrateResult, err = hydrateFunc(ctx, d, hydrateData)
if err != nil {
if shouldRetryError(ctx, d, hydrateData, err, retryConfig) {
err = retry.RetryableError(err)
}
}
return err
})

return hydrateResult, err
}

// WrapHydrate is a higher order function which returns a HydrateFunc which handles Ignorable errors
func WrapHydrate(hydrateFunc HydrateFunc, ignoreConfig *IgnoreConfig) HydrateFunc {
log.Printf("[TRACE] WrapHydrate %s, ignore config %s\n", helpers.GetFunctionName(hydrateFunc), ignoreConfig.String())

return func(ctx context.Context, d *QueryData, h *HydrateData) (item interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[WARN] recovered a panic from a wrapped hydrate function: %v\n", r)
err = status.Error(codes.Internal, fmt.Sprintf("hydrate function %s failed with panic %v", helpers.GetFunctionName(hydrateFunc), r))
}
}()
// call the underlying get function
item, err = hydrateFunc(ctx, d, h)
if err != nil {
log.Printf("[TRACE] wrapped hydrate call %s returned error %v, ignore config %s\n", helpers.GetFunctionName(hydrateFunc), err, ignoreConfig.String())
// see if the ignoreConfig defines a should ignore function
if ignoreConfig.ShouldIgnoreError != nil && ignoreConfig.ShouldIgnoreError(err) {
log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err)
return nil, nil
}
if ignoreConfig.ShouldIgnoreErrorFunc != nil && ignoreConfig.ShouldIgnoreErrorFunc(ctx, d, h, err) {
log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err)
return nil, nil
}
// pass any other error on
return nil, err
}
return item, nil
}
}

func shouldRetryError(ctx context.Context, d *QueryData, h *HydrateData, err error, retryConfig *RetryConfig) bool {
log.Printf("[TRACE] shouldRetryError err: %v, retryConfig: %s", err, retryConfig.String())

if retryConfig == nil {
log.Printf("[TRACE] shouldRetryError nil retry config - return false")
return false
}

if retryConfig.ShouldRetryError != nil {
log.Printf("[TRACE] shouldRetryError - calling legacy ShouldRetryError")
return retryConfig.ShouldRetryError(err)
}

if retryConfig.ShouldRetryErrorFunc != nil {
log.Printf("[TRACE] shouldRetryError - calling ShouldRetryFunc")
return retryConfig.ShouldRetryErrorFunc(ctx, d, h, err)
}

return false
}
11 changes: 9 additions & 2 deletions plugin/ignore_error_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

type IgnoreConfig struct {
ShouldIgnoreError ErrorPredicate
ShouldIgnoreErrorFunc ErrorPredicateWithContext
// deprecated, used ShouldIgnoreErrorFunc
ShouldIgnoreError ErrorPredicate
}

func (c *IgnoreConfig) String() interface{} {
Expand All @@ -25,7 +26,7 @@ func (c *IgnoreConfig) String() interface{} {
}

func (c *IgnoreConfig) Validate(table *Table) []string {
log.Printf("[TRACE] IgnoreConfig validate c.ShouldIgnoreError %p, c.ShouldIgnoreErrorFunc %p", c.ShouldIgnoreError, c.ShouldIgnoreErrorFunc)
log.Printf("[TRACE] IgnoreConfig validate: ShouldIgnoreError %p, ShouldIgnoreErrorFunc: %p", c.ShouldIgnoreError, c.ShouldIgnoreErrorFunc)

if c.ShouldIgnoreError != nil && c.ShouldIgnoreErrorFunc != nil {
log.Printf("[TRACE] IgnoreConfig validate failed - both ShouldIgnoreError and ShouldIgnoreErrorFunc are defined")
Expand All @@ -35,6 +36,12 @@ func (c *IgnoreConfig) Validate(table *Table) []string {
}

func (c *IgnoreConfig) DefaultTo(other *IgnoreConfig) {
// if either ShouldIgnoreError or ShouldIgnoreErrorFunc are set, do not default to other
if c.ShouldIgnoreError != nil || c.ShouldIgnoreErrorFunc != nil {
log.Printf("[TRACE] IgnoreConfig DefaultTo: config defines a should ignore function so not defaulting to base")
return
}

// legacy func
if c.ShouldIgnoreError == nil && other.ShouldIgnoreError != nil {
log.Printf("[TRACE] IgnoreConfig DefaultTo: using base ShouldIgnoreError: %s", helpers.GetFunctionName(other.ShouldIgnoreError))
Expand Down
4 changes: 2 additions & 2 deletions plugin/list_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ListConfig struct {
}

func (c *ListConfig) initialise(table *Table) {
log.Printf("[TRACE] ListConfig initialise table %s", table.Name)
log.Printf("[TRACE] ListConfig.initialise table %s", table.Name)

// create RetryConfig if needed
if c.RetryConfig == nil {
Expand All @@ -42,7 +42,7 @@ func (c *ListConfig) initialise(table *Table) {
c.RetryConfig.DefaultTo(table.DefaultRetryConfig)
c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig)

log.Printf("[TRACE] RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String())
log.Printf("[TRACE] ListConfig.initialise complete: RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String())
}

func (c *ListConfig) Validate(table *Table) []string {
Expand Down
22 changes: 15 additions & 7 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ type Plugin struct {
// TableMapFunc is a callback function which can be used to populate the table map
// this con optionally be provided by the plugin, and allows the connection config to be used in the table creation
// (connection config is not available at plugin creation time)
TableMapFunc func(ctx context.Context, p *Plugin) (map[string]*Table, error)
DefaultTransform *transform.ColumnTransforms
DefaultGetConfig *GetConfig
DefaultConcurrency *DefaultConcurrencyConfig
DefaultRetryConfig *RetryConfig
DefaultIgnoreConfig *IgnoreConfig
TableMapFunc func(ctx context.Context, p *Plugin) (map[string]*Table, error)
DefaultTransform *transform.ColumnTransforms
DefaultConcurrency *DefaultConcurrencyConfig
DefaultRetryConfig *RetryConfig
DefaultIgnoreConfig *IgnoreConfig

// deprecated - use DefaultRetryConfig and DefaultIgnoreConfig
DefaultGetConfig *GetConfig
// deprecated - use DefaultIgnoreConfig
DefaultShouldIgnoreError ErrorPredicate
// every table must implement these columns
RequiredColumns []*Column
Expand All @@ -76,20 +79,25 @@ func (p *Plugin) Initialise() {
p.Logger = p.setupLogger()
// default the schema mode to static
if p.SchemaMode == "" {
log.Println("[TRACE] defaulting SchemaMode to SchemaModeStatic")
p.SchemaMode = SchemaModeStatic
}

// create DefaultRetryConfig if needed
if p.DefaultRetryConfig == nil {
log.Printf("[TRACE] no DefaultRetryConfig defined - creating empty")
p.DefaultRetryConfig = &RetryConfig{}
}

// create DefaultIgnoreConfig if needed
if p.DefaultIgnoreConfig == nil {
log.Printf("[TRACE] no DefaultIgnoreConfig defined - creating empty")
p.DefaultIgnoreConfig = &IgnoreConfig{}
}
// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
p.DefaultIgnoreConfig.ShouldIgnoreError = p.DefaultShouldIgnoreError
if p.DefaultShouldIgnoreError != nil && p.DefaultIgnoreConfig.ShouldIgnoreError == nil {
p.DefaultIgnoreConfig.ShouldIgnoreError = p.DefaultShouldIgnoreError
}

// set file limit
p.setuLimit()
Expand Down
2 changes: 1 addition & 1 deletion plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (d *QueryData) streamListItem(ctx context.Context, item interface{}) {
return
}

// if this table has no parent hydrate function, just call steramLeafListItem directly
// if this table has no parent hydrate function, just call streamLeafListItem directly
parentListHydrate := d.Table.List.ParentHydrate
if parentListHydrate == nil {
d.streamLeafListItem(ctx, item)
Expand Down
2 changes: 1 addition & 1 deletion plugin/required_hydrate_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c requiredHydrateCallBuilder) Add(hydrateFunc HydrateFunc) {
}

// get the config for this hydrate function
config := c.table.getHydrateConfig(hydrateName)
config := c.table.hydrateConfigMap[hydrateName]

c.requiredHydrateCalls[hydrateName] = newHydrateCall(hydrateFunc, config)

Expand Down
36 changes: 35 additions & 1 deletion plugin/retry_config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package plugin

import (
"context"
"fmt"
"log"

"github.com/turbot/go-kit/helpers"
)

type RetryConfig struct {
ShouldRetryError ErrorPredicate
ShouldRetryErrorFunc ErrorPredicateWithContext
// deprecated use ShouldRetryErrorFunc
ShouldRetryError ErrorPredicate
}

func (c *RetryConfig) String() interface{} {
Expand All @@ -36,6 +38,12 @@ func (c *RetryConfig) Validate(table *Table) []string {
}

func (c *RetryConfig) DefaultTo(other *RetryConfig) {
// if either ShouldIgnoreError or ShouldIgnoreErrorFunc are set, do not default to other
if c.ShouldRetryError != nil || c.ShouldRetryErrorFunc != nil {
log.Printf("[TRACE] RetryConfig DefaultTo: config defines a should retry function so not defaulting to base")
return
}

// legacy func
if c.ShouldRetryError == nil && other.ShouldRetryError != nil {
log.Printf("[TRACE] RetryConfig DefaultTo: using base ShouldRetryError: %s", helpers.GetFunctionName(other.ShouldRetryError))
Expand All @@ -46,3 +54,29 @@ func (c *RetryConfig) DefaultTo(other *RetryConfig) {
c.ShouldRetryErrorFunc = other.ShouldRetryErrorFunc
}
}

// GetListRetryConfig wraps the ShouldRetry function with an additional check of the rows streamed
// (as we cannot retry errors in the list hydrate function after streaming has started)
func (c *RetryConfig) GetListRetryConfig() *RetryConfig {
listRetryConfig := &RetryConfig{}
if c.ShouldRetryErrorFunc != nil {
listRetryConfig.ShouldRetryErrorFunc = func(ctx context.Context, d *QueryData, h *HydrateData, err error) bool {
if d.QueryStatus.rowsStreamed != 0 {
log.Printf("[TRACE] shouldRetryError we have started streaming rows (%d) - return false", d.QueryStatus.rowsStreamed)
return false
}
res := c.ShouldRetryErrorFunc(ctx, d, h, err)
return res
}
} else if c.ShouldRetryError != nil {
listRetryConfig.ShouldRetryErrorFunc = func(ctx context.Context, d *QueryData, h *HydrateData, err error) bool {
if d.QueryStatus.rowsStreamed != 0 {
log.Printf("[TRACE] shouldRetryError we have started streaming rows (%d) - return false", d.QueryStatus.rowsStreamed)
return false
}
// call the legacy function
return c.ShouldRetryError(err)
}
}
return listRetryConfig
}
Loading

0 comments on commit 348c461

Please sign in to comment.