From 348c4616326214a2fcc5772bf2519c44ee793312 Mon Sep 17 00:00:00 2001 From: kaidaguerre Date: Thu, 12 May 2022 12:04:38 +0100 Subject: [PATCH] Fix transform crash for empty hydrate items. Fix plugin and table level error handling defaults. Respect retry config in DefaultGetConfig. Closes #325 If a hydrate function fails, instead of passing an empty struct to transform functions (which could cause casting errors), do not call the transform function at all. --- plugin/get_config.go | 5 +- plugin/hydrate.go | 60 ---------------- plugin/hydrate_config.go | 4 +- plugin/hydrate_error.go | 85 ++++++++++++++++++++++ plugin/ignore_error_config.go | 11 ++- plugin/list_config.go | 4 +- plugin/plugin.go | 22 ++++-- plugin/query_data.go | 2 +- plugin/required_hydrate_calls.go | 2 +- plugin/retry_config.go | 36 +++++++++- plugin/row.go | 37 ++++------ plugin/table.go | 116 +++++++++++++++++++------------ plugin/table_column.go | 37 +++++----- plugin/table_fetch.go | 13 +++- plugin/table_test.go | 6 +- plugin/table_validate.go | 60 +++++++++++++--- 16 files changed, 319 insertions(+), 181 deletions(-) create mode 100644 plugin/hydrate_error.go diff --git a/plugin/get_config.go b/plugin/get_config.go index ffdf21f5..c26a9ad0 100644 --- a/plugin/get_config.go +++ b/plugin/get_config.go @@ -22,8 +22,7 @@ type GetConfig struct { } func (c *GetConfig) initialise(table *Table) { - log.Printf("[TRACE] GetConfig initialise table %s", table.Name) - + log.Printf("[TRACE] GetConfig.initialise table %s", table.Name) // create RetryConfig if needed if c.RetryConfig == nil { c.RetryConfig = &RetryConfig{} @@ -42,7 +41,7 @@ func (c *GetConfig) initialise(table *Table) { c.RetryConfig.DefaultTo(table.DefaultRetryConfig) c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig) - log.Printf("[TRACE] RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String()) + log.Printf("[TRACE] GetConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String()) } func (c *GetConfig) Validate(table *Table) []string { diff --git a/plugin/hydrate.go b/plugin/hydrate.go index dbe30db5..b0736c3a 100644 --- a/plugin/hydrate.go +++ b/plugin/hydrate.go @@ -1,61 +1 @@ package plugin - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/sethvargo/go-retry" - "github.com/turbot/go-kit/helpers" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// RetryHydrate function invokes the hydrate function with retryable errors and retries the function until the maximum attemptes before throwing error -func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrateFunc HydrateFunc, retryConfig *RetryConfig) (interface{}, error) { - backoff, err := retry.NewFibonacci(100 * time.Millisecond) - if err != nil { - return nil, err - } - var hydrateResult interface{} - shouldRetryErrorFunc := retryConfig.ShouldRetryError - - err = retry.Do(ctx, retry.WithMaxRetries(10, backoff), func(ctx context.Context) error { - hydrateResult, err = hydrateFunc(ctx, d, hydrateData) - if err != nil && shouldRetryErrorFunc(err) { - err = retry.RetryableError(err) - } - return err - }) - return hydrateResult, err -} - -// WrapHydrate is a higher order function which returns a HydrateFunc which handles Ignorable errors -func WrapHydrate(hydrateFunc HydrateFunc, ignoreConfig *IgnoreConfig) HydrateFunc { - return func(ctx context.Context, d *QueryData, h *HydrateData) (item interface{}, err error) { - defer func() { - if r := recover(); r != nil { - log.Printf("[WARN] recovered a panic from a wrapped hydrate function: %v\n", r) - err = status.Error(codes.Internal, fmt.Sprintf("hydrate function %s failed with panic %v", helpers.GetFunctionName(hydrateFunc), r)) - } - }() - // call the underlying get function - item, err = hydrateFunc(ctx, d, h) - if err != nil { - log.Printf("[TRACE] wrapped hydrate call %s returned error %v\n", helpers.GetFunctionName(hydrateFunc), err) - // see if the ignoreConfig defines a should ignore function - if ignoreConfig.ShouldIgnoreError != nil && ignoreConfig.ShouldIgnoreError(err) { - log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err) - return nil, nil - } - if ignoreConfig.ShouldIgnoreErrorFunc != nil && ignoreConfig.ShouldIgnoreErrorFunc(ctx, d, h, err) { - log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err) - return nil, nil - } - // pass any other error on - return nil, err - } - return item, nil - } -} diff --git a/plugin/hydrate_config.go b/plugin/hydrate_config.go index f5a46456..45d717bf 100644 --- a/plugin/hydrate_config.go +++ b/plugin/hydrate_config.go @@ -38,7 +38,7 @@ Depends: %s`, } func (c *HydrateConfig) initialise(table *Table) { - log.Printf("[TRACE] HydrateConfig initialise func %s, table %s", helpers.GetFunctionName(c.Func), table.Name) + log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", helpers.GetFunctionName(c.Func), table.Name) // create RetryConfig if needed if c.RetryConfig == nil { @@ -58,7 +58,7 @@ func (c *HydrateConfig) initialise(table *Table) { c.RetryConfig.DefaultTo(table.DefaultRetryConfig) c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig) - log.Printf("[TRACE] RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String()) + log.Printf("[TRACE] HydrateConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String()) } func (c *HydrateConfig) Validate(table *Table) []string { diff --git a/plugin/hydrate_error.go b/plugin/hydrate_error.go new file mode 100644 index 00000000..eb5fbdea --- /dev/null +++ b/plugin/hydrate_error.go @@ -0,0 +1,85 @@ +package plugin + +import ( + "context" + "fmt" + "github.com/sethvargo/go-retry" + "github.com/turbot/go-kit/helpers" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "log" + "time" +) + +// RetryHydrate function invokes the hydrate function with retryable errors and retries the function until the maximum attemptes before throwing error +func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrateFunc HydrateFunc, retryConfig *RetryConfig) (interface{}, error) { + backoff, err := retry.NewFibonacci(100 * time.Millisecond) + if err != nil { + return nil, err + } + var hydrateResult interface{} + + err = retry.Do(ctx, retry.WithMaxRetries(10, backoff), func(ctx context.Context) error { + hydrateResult, err = hydrateFunc(ctx, d, hydrateData) + if err != nil { + if shouldRetryError(ctx, d, hydrateData, err, retryConfig) { + err = retry.RetryableError(err) + } + } + return err + }) + + return hydrateResult, err +} + +// WrapHydrate is a higher order function which returns a HydrateFunc which handles Ignorable errors +func WrapHydrate(hydrateFunc HydrateFunc, ignoreConfig *IgnoreConfig) HydrateFunc { + log.Printf("[TRACE] WrapHydrate %s, ignore config %s\n", helpers.GetFunctionName(hydrateFunc), ignoreConfig.String()) + + return func(ctx context.Context, d *QueryData, h *HydrateData) (item interface{}, err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("[WARN] recovered a panic from a wrapped hydrate function: %v\n", r) + err = status.Error(codes.Internal, fmt.Sprintf("hydrate function %s failed with panic %v", helpers.GetFunctionName(hydrateFunc), r)) + } + }() + // call the underlying get function + item, err = hydrateFunc(ctx, d, h) + if err != nil { + log.Printf("[TRACE] wrapped hydrate call %s returned error %v, ignore config %s\n", helpers.GetFunctionName(hydrateFunc), err, ignoreConfig.String()) + // see if the ignoreConfig defines a should ignore function + if ignoreConfig.ShouldIgnoreError != nil && ignoreConfig.ShouldIgnoreError(err) { + log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err) + return nil, nil + } + if ignoreConfig.ShouldIgnoreErrorFunc != nil && ignoreConfig.ShouldIgnoreErrorFunc(ctx, d, h, err) { + log.Printf("[TRACE] wrapped hydrate call %s returned error but we are ignoring it: %v", helpers.GetFunctionName(hydrateFunc), err) + return nil, nil + } + // pass any other error on + return nil, err + } + return item, nil + } +} + +func shouldRetryError(ctx context.Context, d *QueryData, h *HydrateData, err error, retryConfig *RetryConfig) bool { + log.Printf("[TRACE] shouldRetryError err: %v, retryConfig: %s", err, retryConfig.String()) + + if retryConfig == nil { + log.Printf("[TRACE] shouldRetryError nil retry config - return false") + return false + } + + if retryConfig.ShouldRetryError != nil { + log.Printf("[TRACE] shouldRetryError - calling legacy ShouldRetryError") + return retryConfig.ShouldRetryError(err) + } + + if retryConfig.ShouldRetryErrorFunc != nil { + log.Printf("[TRACE] shouldRetryError - calling ShouldRetryFunc") + return retryConfig.ShouldRetryErrorFunc(ctx, d, h, err) + } + + return false +} diff --git a/plugin/ignore_error_config.go b/plugin/ignore_error_config.go index 27dc3912..e4534d07 100644 --- a/plugin/ignore_error_config.go +++ b/plugin/ignore_error_config.go @@ -9,8 +9,9 @@ import ( ) type IgnoreConfig struct { - ShouldIgnoreError ErrorPredicate ShouldIgnoreErrorFunc ErrorPredicateWithContext + // deprecated, used ShouldIgnoreErrorFunc + ShouldIgnoreError ErrorPredicate } func (c *IgnoreConfig) String() interface{} { @@ -25,7 +26,7 @@ func (c *IgnoreConfig) String() interface{} { } func (c *IgnoreConfig) Validate(table *Table) []string { - log.Printf("[TRACE] IgnoreConfig validate c.ShouldIgnoreError %p, c.ShouldIgnoreErrorFunc %p", c.ShouldIgnoreError, c.ShouldIgnoreErrorFunc) + log.Printf("[TRACE] IgnoreConfig validate: ShouldIgnoreError %p, ShouldIgnoreErrorFunc: %p", c.ShouldIgnoreError, c.ShouldIgnoreErrorFunc) if c.ShouldIgnoreError != nil && c.ShouldIgnoreErrorFunc != nil { log.Printf("[TRACE] IgnoreConfig validate failed - both ShouldIgnoreError and ShouldIgnoreErrorFunc are defined") @@ -35,6 +36,12 @@ func (c *IgnoreConfig) Validate(table *Table) []string { } func (c *IgnoreConfig) DefaultTo(other *IgnoreConfig) { + // if either ShouldIgnoreError or ShouldIgnoreErrorFunc are set, do not default to other + if c.ShouldIgnoreError != nil || c.ShouldIgnoreErrorFunc != nil { + log.Printf("[TRACE] IgnoreConfig DefaultTo: config defines a should ignore function so not defaulting to base") + return + } + // legacy func if c.ShouldIgnoreError == nil && other.ShouldIgnoreError != nil { log.Printf("[TRACE] IgnoreConfig DefaultTo: using base ShouldIgnoreError: %s", helpers.GetFunctionName(other.ShouldIgnoreError)) diff --git a/plugin/list_config.go b/plugin/list_config.go index 1a6107b8..3b5e9938 100644 --- a/plugin/list_config.go +++ b/plugin/list_config.go @@ -22,7 +22,7 @@ type ListConfig struct { } func (c *ListConfig) initialise(table *Table) { - log.Printf("[TRACE] ListConfig initialise table %s", table.Name) + log.Printf("[TRACE] ListConfig.initialise table %s", table.Name) // create RetryConfig if needed if c.RetryConfig == nil { @@ -42,7 +42,7 @@ func (c *ListConfig) initialise(table *Table) { c.RetryConfig.DefaultTo(table.DefaultRetryConfig) c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig) - log.Printf("[TRACE] RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String()) + log.Printf("[TRACE] ListConfig.initialise complete: RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String()) } func (c *ListConfig) Validate(table *Table) []string { diff --git a/plugin/plugin.go b/plugin/plugin.go index f46a07e2..bf6cee40 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -45,12 +45,15 @@ type Plugin struct { // TableMapFunc is a callback function which can be used to populate the table map // this con optionally be provided by the plugin, and allows the connection config to be used in the table creation // (connection config is not available at plugin creation time) - TableMapFunc func(ctx context.Context, p *Plugin) (map[string]*Table, error) - DefaultTransform *transform.ColumnTransforms - DefaultGetConfig *GetConfig - DefaultConcurrency *DefaultConcurrencyConfig - DefaultRetryConfig *RetryConfig - DefaultIgnoreConfig *IgnoreConfig + TableMapFunc func(ctx context.Context, p *Plugin) (map[string]*Table, error) + DefaultTransform *transform.ColumnTransforms + DefaultConcurrency *DefaultConcurrencyConfig + DefaultRetryConfig *RetryConfig + DefaultIgnoreConfig *IgnoreConfig + + // deprecated - use DefaultRetryConfig and DefaultIgnoreConfig + DefaultGetConfig *GetConfig + // deprecated - use DefaultIgnoreConfig DefaultShouldIgnoreError ErrorPredicate // every table must implement these columns RequiredColumns []*Column @@ -76,20 +79,25 @@ func (p *Plugin) Initialise() { p.Logger = p.setupLogger() // default the schema mode to static if p.SchemaMode == "" { + log.Println("[TRACE] defaulting SchemaMode to SchemaModeStatic") p.SchemaMode = SchemaModeStatic } // create DefaultRetryConfig if needed if p.DefaultRetryConfig == nil { + log.Printf("[TRACE] no DefaultRetryConfig defined - creating empty") p.DefaultRetryConfig = &RetryConfig{} } // create DefaultIgnoreConfig if needed if p.DefaultIgnoreConfig == nil { + log.Printf("[TRACE] no DefaultIgnoreConfig defined - creating empty") p.DefaultIgnoreConfig = &IgnoreConfig{} } // copy the (deprecated) top level ShouldIgnoreError property into the ignore config - p.DefaultIgnoreConfig.ShouldIgnoreError = p.DefaultShouldIgnoreError + if p.DefaultShouldIgnoreError != nil && p.DefaultIgnoreConfig.ShouldIgnoreError == nil { + p.DefaultIgnoreConfig.ShouldIgnoreError = p.DefaultShouldIgnoreError + } // set file limit p.setuLimit() diff --git a/plugin/query_data.go b/plugin/query_data.go index 5888dc0c..95c93e80 100644 --- a/plugin/query_data.go +++ b/plugin/query_data.go @@ -361,7 +361,7 @@ func (d *QueryData) streamListItem(ctx context.Context, item interface{}) { return } - // if this table has no parent hydrate function, just call steramLeafListItem directly + // if this table has no parent hydrate function, just call streamLeafListItem directly parentListHydrate := d.Table.List.ParentHydrate if parentListHydrate == nil { d.streamLeafListItem(ctx, item) diff --git a/plugin/required_hydrate_calls.go b/plugin/required_hydrate_calls.go index d7f08b88..e463307d 100644 --- a/plugin/required_hydrate_calls.go +++ b/plugin/required_hydrate_calls.go @@ -29,7 +29,7 @@ func (c requiredHydrateCallBuilder) Add(hydrateFunc HydrateFunc) { } // get the config for this hydrate function - config := c.table.getHydrateConfig(hydrateName) + config := c.table.hydrateConfigMap[hydrateName] c.requiredHydrateCalls[hydrateName] = newHydrateCall(hydrateFunc, config) diff --git a/plugin/retry_config.go b/plugin/retry_config.go index 7b4d38de..2e3ee8f4 100644 --- a/plugin/retry_config.go +++ b/plugin/retry_config.go @@ -1,6 +1,7 @@ package plugin import ( + "context" "fmt" "log" @@ -8,8 +9,9 @@ import ( ) type RetryConfig struct { - ShouldRetryError ErrorPredicate ShouldRetryErrorFunc ErrorPredicateWithContext + // deprecated use ShouldRetryErrorFunc + ShouldRetryError ErrorPredicate } func (c *RetryConfig) String() interface{} { @@ -36,6 +38,12 @@ func (c *RetryConfig) Validate(table *Table) []string { } func (c *RetryConfig) DefaultTo(other *RetryConfig) { + // if either ShouldIgnoreError or ShouldIgnoreErrorFunc are set, do not default to other + if c.ShouldRetryError != nil || c.ShouldRetryErrorFunc != nil { + log.Printf("[TRACE] RetryConfig DefaultTo: config defines a should retry function so not defaulting to base") + return + } + // legacy func if c.ShouldRetryError == nil && other.ShouldRetryError != nil { log.Printf("[TRACE] RetryConfig DefaultTo: using base ShouldRetryError: %s", helpers.GetFunctionName(other.ShouldRetryError)) @@ -46,3 +54,29 @@ func (c *RetryConfig) DefaultTo(other *RetryConfig) { c.ShouldRetryErrorFunc = other.ShouldRetryErrorFunc } } + +// GetListRetryConfig wraps the ShouldRetry function with an additional check of the rows streamed +// (as we cannot retry errors in the list hydrate function after streaming has started) +func (c *RetryConfig) GetListRetryConfig() *RetryConfig { + listRetryConfig := &RetryConfig{} + if c.ShouldRetryErrorFunc != nil { + listRetryConfig.ShouldRetryErrorFunc = func(ctx context.Context, d *QueryData, h *HydrateData, err error) bool { + if d.QueryStatus.rowsStreamed != 0 { + log.Printf("[TRACE] shouldRetryError we have started streaming rows (%d) - return false", d.QueryStatus.rowsStreamed) + return false + } + res := c.ShouldRetryErrorFunc(ctx, d, h, err) + return res + } + } else if c.ShouldRetryError != nil { + listRetryConfig.ShouldRetryErrorFunc = func(ctx context.Context, d *QueryData, h *HydrateData, err error) bool { + if d.QueryStatus.rowsStreamed != 0 { + log.Printf("[TRACE] shouldRetryError we have started streaming rows (%d) - return false", d.QueryStatus.rowsStreamed) + return false + } + // call the legacy function + return c.ShouldRetryError(err) + } + } + return listRetryConfig +} diff --git a/plugin/row.go b/plugin/row.go index bc19b240..aa337984 100644 --- a/plugin/row.go +++ b/plugin/row.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe-plugin-sdk/v3/grpc/proto" "github.com/turbot/steampipe-plugin-sdk/v3/logging" "github.com/turbot/steampipe-plugin-sdk/v3/plugin/context_key" @@ -173,58 +174,44 @@ func (r *RowData) callHydrate(ctx context.Context, d *QueryData, hydrateFunc Hyd logging.LogTime(hydrateKey + " start") + log.Printf("[TRACE] callHydrate %s, hydrateConfig %s\n", helpers.GetFunctionName(hydrateFunc), hydrateConfig.String()) + // now call the hydrate function, passing the item and hydrate results so far hydrateData, err := r.callHydrateWithRetries(ctx, d, hydrateFunc, hydrateConfig.IgnoreConfig, hydrateConfig.RetryConfig) if err != nil { log.Printf("[ERROR] callHydrate %s finished with error: %v\n", hydrateKey, err) r.setError(hydrateKey, err) r.errorChan <- err - } else if hydrateData != nil { - r.set(hydrateKey, hydrateData) } else { - // the the hydrate results to an empty data object - r.set(hydrateKey, emptyHydrateResults{}) + // set the hydrate data, even if it is nil + // (it may legitimately be nil if the hydrate function returned an ignored error) + // if we do not set it for nil values, we will get error that required hydrate functions hav enot been called + r.set(hydrateKey, hydrateData) } - logging.LogTime(hydrateKey + " end") } // invoke a hydrate function, retrying as required based on the retry config, and return the result and/or error func (r *RowData) callHydrateWithRetries(ctx context.Context, d *QueryData, hydrateFunc HydrateFunc, ignoreConfig *IgnoreConfig, retryConfig *RetryConfig) (interface{}, error) { - hydrateData := &HydrateData{Item: r.Item, ParentItem: r.ParentItem, HydrateResults: r.hydrateResults} + log.Printf("[TRACE] callHydrateWithRetries: %s", helpers.GetFunctionName(hydrateFunc)) + h := &HydrateData{Item: r.Item, ParentItem: r.ParentItem, HydrateResults: r.hydrateResults} // WrapHydrate function returns a HydrateFunc which handles Ignorable errors var hydrateWithIgnoreError = WrapHydrate(hydrateFunc, ignoreConfig) - hydrateResult, err := hydrateWithIgnoreError(ctx, d, hydrateData) + hydrateResult, err := hydrateWithIgnoreError(ctx, d, h) if err != nil { log.Printf("[TRACE] hydrateWithIgnoreError returned error %v", err) - if shouldRetryError(err, d, retryConfig) { + if shouldRetryError(ctx, d, h, err, retryConfig) { log.Printf("[TRACE] retrying hydrate") hydrateData := &HydrateData{Item: r.Item, ParentItem: r.ParentItem, HydrateResults: r.hydrateResults} hydrateResult, err = RetryHydrate(ctx, d, hydrateData, hydrateFunc, retryConfig) + log.Printf("[TRACE] back from retry") } } return hydrateResult, err } -func shouldRetryError(err error, d *QueryData, retryConfig *RetryConfig) bool { - if retryConfig == nil { - return false - } - // if we have started streaming, we cannot retry - if d.QueryStatus.rowsStreamed != 0 { - return false - } - - shouldRetryErrorFunc := retryConfig.ShouldRetryError - if shouldRetryErrorFunc == nil { - return false - } - // a shouldRetryErrorFunc is declared in the retry config - call it to see if we should retry - return shouldRetryErrorFunc(err) -} - func (r *RowData) set(key string, item interface{}) error { r.mut.Lock() defer r.mut.Unlock() diff --git a/plugin/table.go b/plugin/table.go index 766b6928..e694b085 100644 --- a/plugin/table.go +++ b/plugin/table.go @@ -26,10 +26,12 @@ type Table struct { // default transform applied to all columns DefaultTransform *transform.ColumnTransforms // function controlling default error handling behaviour + DefaultIgnoreConfig *IgnoreConfig + DefaultRetryConfig *RetryConfig + // deprecated - use DefaultIgnoreConfig DefaultShouldIgnoreError ErrorPredicate - DefaultIgnoreConfig *IgnoreConfig - DefaultRetryConfig *RetryConfig + // the parent plugin object Plugin *Plugin // Deprecated: used HydrateConfig @@ -40,6 +42,7 @@ type Table struct { // map of hydrate function name to columns it provides hydrateColumnMap map[string][]string + hydrateConfigMap map[string]*HydrateConfig } func (t *Table) initialise(p *Plugin) { @@ -50,33 +53,95 @@ func (t *Table) initialise(p *Plugin) { // create DefaultRetryConfig if needed if t.DefaultRetryConfig == nil { + log.Printf("[TRACE] no DefaultRetryConfig defined - creating empty") t.DefaultRetryConfig = &RetryConfig{} } // create DefaultIgnoreConfig if needed if t.DefaultIgnoreConfig == nil { + log.Printf("[TRACE] no DefaultIgnoreConfig defined - creating empty") t.DefaultIgnoreConfig = &IgnoreConfig{} } - // copy the (deprecated) top level ShouldIgnoreError property into the ignore config - t.DefaultIgnoreConfig.ShouldIgnoreError = t.DefaultShouldIgnoreError + + if t.DefaultShouldIgnoreError != nil && t.DefaultIgnoreConfig.ShouldIgnoreError == nil { + // copy the (deprecated) top level ShouldIgnoreError property into the ignore config + t.DefaultIgnoreConfig.ShouldIgnoreError = t.DefaultShouldIgnoreError + log.Printf("[TRACE] legacy DefaultShouldIgnoreError defined - copying into DefaultIgnoreConfig") + } // apply plugin defaults for retry and ignore config + log.Printf("[TRACE] apply plugin defaults for DefaultRetryConfig") t.DefaultRetryConfig.DefaultTo(t.Plugin.DefaultRetryConfig) + log.Printf("[TRACE] apply plugin defaults for DefaultIgnoreConfig, table %v plugin %v", t.DefaultIgnoreConfig, t.Plugin.DefaultIgnoreConfig) t.DefaultIgnoreConfig.DefaultTo(t.Plugin.DefaultIgnoreConfig) log.Printf("[TRACE] DefaultRetryConfig: %s", t.DefaultRetryConfig.String()) log.Printf("[TRACE] DefaultIgnoreConfig: %s", t.DefaultIgnoreConfig.String()) - for _, h := range t.HydrateConfig { - h.initialise(t) - } + // HydrateConfig contains explicit config for hydrate functions but there may be other hydrate functions + // declared for specific columns which do not have config defined + // build a map of all hydrate functions, with empty config if needed + // NOTE: this map also includes information from the legacy HydrateDependencies property + t.initialiseHydrateConfigs() + + log.Printf("[TRACE] back from initialiseHydrateConfigs") + + // get and list configs are similar to hydrate configs but they cannot specify dependencies + // we initialise them separately if t.Get != nil { + log.Printf("[TRACE] t.Get.initialise") t.Get.initialise(t) + log.Printf("[TRACE] back from t.Get.initialise") } if t.List != nil { + log.Printf("[TRACE] t.List.initialise") t.List.initialise(t) + log.Printf("[TRACE] back from t.List.initialise") + } + log.Printf("[TRACE] initialise table %s COMPLETE", t.Name) +} + +// build map of all hydrate configs, and initialise them +func (t *Table) initialiseHydrateConfigs() { + // first build a map of all hydrate functions + t.buildHydrateConfigMap() + + // initialise all hydrate configs in map + for _, h := range t.hydrateConfigMap { + h.initialise(t) } +} +// build map of all hydrate configs, including those specified in the legacy HydrateDependencies, +// and those mentioned only in column config +func (t *Table) buildHydrateConfigMap() { + t.hydrateConfigMap = make(map[string]*HydrateConfig) + for i := range t.HydrateConfig { + // as we are converting into a pointer, we cannot use the array value direct from the range as + // this was causing incorrect values - go must be reusing memory addresses for successive items + h := &t.HydrateConfig[i] + funcName := helpers.GetFunctionName(h.Func) + t.hydrateConfigMap[funcName] = h + } + // add in hydrate config for all hydrate dependencies declared using legacy property HydrateDependencies + for _, d := range t.HydrateDependencies { + hydrateName := helpers.GetFunctionName(d.Func) + // if there is already a hydrate config, do nothing here + // (this is a validation error that will be picked up by the validation check later) + if _, ok := t.hydrateConfigMap[hydrateName]; !ok { + t.hydrateConfigMap[hydrateName] = &HydrateConfig{Func: d.Func, Depends: d.Depends} + } + } + // now add all hydrate functions with no explicit config + for _, c := range t.Columns { + if c.Hydrate == nil { + continue + } + hydrateName := helpers.GetFunctionName(c.Hydrate) + if _, ok := t.hydrateConfigMap[hydrateName]; !ok { + t.hydrateConfigMap[hydrateName] = &HydrateConfig{Func: c.Hydrate} + } + } } // build a list of required hydrate function calls which must be executed, based on the columns which have been requested @@ -120,45 +185,8 @@ func (t *Table) requiredHydrateCalls(colsUsed []string, fetchType fetchType) []* } func (t *Table) getFetchFunc(fetchType fetchType) HydrateFunc { - if fetchType == fetchTypeList { return t.List.Hydrate } return t.Get.Hydrate } - -// search through HydrateConfig and HydrateDependencies finding a function with the given name -// if found return its dependencies -func (t *Table) getHydrateDependencies(hydrateFuncName string) []HydrateFunc { - for _, d := range t.HydrateConfig { - if helpers.GetFunctionName(d.Func) == hydrateFuncName { - return d.Depends - } - } - for _, d := range t.HydrateDependencies { - if helpers.GetFunctionName(d.Func) == hydrateFuncName { - return d.Depends - } - } - return []HydrateFunc{} -} - -func (t *Table) getHydrateConfig(hydrateFuncName string) *HydrateConfig { - config := &HydrateConfig{} - // if a hydrate config is defined see whether this call exists in it - for _, d := range t.HydrateConfig { - if helpers.GetFunctionName(d.Func) == hydrateFuncName { - config = &d - break - } - } - // initialise the config to ensure retry and ignore config are populated, using table defaults if necessary - config.initialise(t) - - // if no hydrate dependencies are specified in the hydrate config, check the deprecated "HydrateDependencies" property - if config.Depends == nil { - config.Depends = t.getHydrateDependencies(hydrateFuncName) - } - - return config -} diff --git a/plugin/table_column.go b/plugin/table_column.go index b0453db0..c2384ca3 100644 --- a/plugin/table_column.go +++ b/plugin/table_column.go @@ -39,22 +39,27 @@ func (t *Table) getColumnValue(ctx context.Context, rowData *RowData, column *Qu log.Printf("[ERROR] table '%s' failed to get column data, callId %s: %v", t.Name, rowData.queryData.callId, err) return nil, err } - // are there any generate transforms defined? if not apply default generate - // NOTE: we must call getColumnTransforms to ensure the default is used if none is defined - columnTransforms := t.getColumnTransforms(column) - defaultTransform := t.getDefaultColumnTransform(column) - - qualValueMap := rowData.queryData.Quals.ToQualMap() - transformData := &transform.TransformData{ - HydrateItem: hydrateItem, - HydrateResults: rowData.hydrateResults, - ColumnName: column.Name, - KeyColumnQuals: qualValueMap, - } - value, err := columnTransforms.Execute(ctx, transformData, defaultTransform) - if err != nil { - log.Printf("[ERROR] failed to populate column '%s': %v\n", column.Name, err) - return nil, fmt.Errorf("failed to populate column '%s': %v", column.Name, err) + + var value interface{} = nil + // only call transforms if the hydrate item is non nil + if !helpers.IsNil(hydrateItem) { + // are there any generate transforms defined? if not apply default generate + // NOTE: we must call getColumnTransforms to ensure the default is used if none is defined + columnTransforms := t.getColumnTransforms(column) + defaultTransform := t.getDefaultColumnTransform(column) + + qualValueMap := rowData.queryData.Quals.ToQualMap() + transformData := &transform.TransformData{ + HydrateItem: hydrateItem, + HydrateResults: rowData.hydrateResults, + ColumnName: column.Name, + KeyColumnQuals: qualValueMap, + } + value, err = columnTransforms.Execute(ctx, transformData, defaultTransform) + if err != nil { + log.Printf("[ERROR] failed to populate column '%s': %v\n", column.Name, err) + return nil, fmt.Errorf("failed to populate column '%s': %v", column.Name, err) + } } // now convert the value to a protobuf column value c, err := t.interfaceToColumnValue(column, value) diff --git a/plugin/table_fetch.go b/plugin/table_fetch.go index 4173f76f..eabf0e5e 100644 --- a/plugin/table_fetch.go +++ b/plugin/table_fetch.go @@ -364,7 +364,7 @@ func (t *Table) executeListCall(ctx context.Context, queryData *QueryData) { // invoke list call - hydrateResults is nil as list call does not use it (it must comply with HydrateFunc signature) listCall := t.List.Hydrate // if there is a parent hydrate function, call that - // - the child 'Hydrate' function will be called by QueryData.StreamListIte, + // - the child 'Hydrate' function will be called by QueryData.StreamListItem, if t.List.ParentHydrate != nil { listCall = t.List.ParentHydrate } @@ -453,7 +453,11 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra if len(queryData.Matrix) == 0 { log.Printf("[TRACE] doList: no matrix item") - if _, err := rd.callHydrateWithRetries(ctx, queryData, listCall, t.List.IgnoreConfig, t.List.RetryConfig); err != nil { + + // we cannot retry errors in the list hydrate function after streaming has started + listRetryConfig := t.List.RetryConfig.GetListRetryConfig() + + if _, err := rd.callHydrateWithRetries(ctx, queryData, listCall, t.List.IgnoreConfig, listRetryConfig); err != nil { queryData.streamError(err) } } else { @@ -497,7 +501,10 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall log.Printf("[TRACE] callHydrateWithRetries for matrixItem %v, key columns %v", matrixItem, matrixQueryData.KeyColumnQuals) - _, err := rd.callHydrateWithRetries(fetchContext, matrixQueryData, listCall, t.List.IgnoreConfig, t.List.RetryConfig) + // we cannot retry errors in the list hydrate function after streaming has started + listRetryConfig := t.List.RetryConfig.GetListRetryConfig() + + _, err := rd.callHydrateWithRetries(fetchContext, matrixQueryData, listCall, t.List.IgnoreConfig, listRetryConfig) if err != nil { log.Printf("[TRACE] callHydrateWithRetries returned error %v", err) queryData.streamError(err) diff --git a/plugin/table_test.go b/plugin/table_test.go index a09a350f..050bd972 100644 --- a/plugin/table_test.go +++ b/plugin/table_test.go @@ -224,7 +224,7 @@ var testCasesRequiredHydrateCalls = map[string]requiredHydrateCallsTest{ Name: "table", Columns: []*Column{ {Name: "c1", Hydrate: hydrate1}, - {Name: "c2"}, + {Name: "c2", Hydrate: hydrate2}, }, List: &ListConfig{Hydrate: listHydrate}, Get: &GetConfig{Hydrate: getHydrate}, @@ -298,7 +298,7 @@ var testCasesRequiredHydrateCalls = map[string]requiredHydrateCallsTest{ Name: "table", Columns: []*Column{ {Name: "c1", Hydrate: hydrate1}, - {Name: "c2"}, + {Name: "c2", Hydrate: hydrate2}, }, List: &ListConfig{Hydrate: listHydrate}, Get: &GetConfig{Hydrate: getHydrate}, @@ -766,7 +766,7 @@ func TestGetHydrateConfig(t *testing.T) { test.table.Plugin.Initialise() test.table.initialise(test.table.Plugin) - result := test.table.getHydrateConfig(test.funcName) + result := test.table.hydrateConfigMap[test.funcName] actualString := result.String() expectedString := test.expected.String() if expectedString != actualString { diff --git a/plugin/table_validate.go b/plugin/table_validate.go index e6b7ba20..b5a86830 100644 --- a/plugin/table_validate.go +++ b/plugin/table_validate.go @@ -5,6 +5,7 @@ import ( "log" "strings" + "github.com/gertd/go-pluralize" "github.com/stevenle/topsort" "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe-plugin-sdk/v3/grpc/proto" @@ -19,22 +20,31 @@ func (t *Table) validate(name string, requiredColumns []*Column) []string { if t.Name == "" { validationErrors = append(validationErrors, fmt.Sprintf("table with key '%s' in plugin table map does not have a name property set", name)) } + + log.Printf("[TRACE] validateRequiredColumns") // verify all required columns exist validationErrors = t.validateRequiredColumns(requiredColumns) + log.Printf("[TRACE] validateListAndGetConfig") // validated list and get config // NOTE: this also sets key column require and operators to default value if not specified validationErrors = append(validationErrors, t.validateListAndGetConfig()...) + log.Printf("[TRACE] validateHydrateDependencies") // verify hydrate dependencies are valid // the map entries are strings - ensure they correspond to actual functions validationErrors = append(validationErrors, t.validateHydrateDependencies()...) + log.Printf("[TRACE] DefaultRetryConfig") validationErrors = append(validationErrors, t.DefaultRetryConfig.Validate(t)...) + log.Printf("[TRACE] DefaultIgnoreConfig") validationErrors = append(validationErrors, t.DefaultIgnoreConfig.Validate(t)...) - for _, h := range t.HydrateConfig { + log.Printf("[TRACE] validate hydrate configs") + + for _, h := range t.hydrateConfigMap { + log.Printf("[TRACE] validate hydrate config for '%s'", helpers.GetFunctionName(h.Func)) validationErrors = append(validationErrors, h.Validate(t)...) } @@ -120,15 +130,46 @@ func (t *Table) validateHydrateDependencies() []string { var validationErrors []string if len(t.HydrateDependencies)+len(t.HydrateConfig) != 0 { if t.List != nil { - deps := t.getHydrateDependencies(helpers.GetFunctionName(t.List.Hydrate)) - if len(deps) > 0 { - validationErrors = append(validationErrors, fmt.Sprintf("table '%s' List hydrate function '%s' has %d dependencies - List hydrate functions cannot have dependencies", t.Name, helpers.GetFunctionName(t.List.Hydrate), len(deps))) + // there should be no config in the hydrateConfigMap matching the list or get config + if invalidListHydrateConfig, ok := t.hydrateConfigMap[helpers.GetFunctionName(t.List.Hydrate)]; ok { + // so there is a hydrate config for the list call - this is invalid + + // is it because hydrate dependencies were declared for the list call? + if len(invalidListHydrateConfig.Depends) > 0 { + numDeps := len(invalidListHydrateConfig.Depends) + validationErrors = append(validationErrors, fmt.Sprintf("table '%s' List hydrate function '%s' has %d %s - List hydrate functions cannot have dependencies", + t.Name, + helpers.GetFunctionName(t.List.Hydrate), + numDeps, + pluralize.NewClient().Pluralize("dependency", numDeps, false))) + } else { + // otherwise, show general error + validationErrors = append(validationErrors, fmt.Sprintf("table '%s' List hydrate function '%s' has a hydrate config declared - this is invalid, this function must be configured using the ListConfig only", + t.Name, + helpers.GetFunctionName(t.List.Hydrate), + )) + } } } if t.Get != nil { - deps := t.getHydrateDependencies(helpers.GetFunctionName(t.Get.Hydrate)) - if len(deps) > 0 { - validationErrors = append(validationErrors, fmt.Sprintf("table '%s' Get hydrate function '%s' has %d dependencies - Get hydrate functions cannot have dependencies", t.Name, helpers.GetFunctionName(t.Get.Hydrate), len(deps))) + if invalidGetHydrateConfig, ok := t.hydrateConfigMap[helpers.GetFunctionName(t.Get.Hydrate)]; ok { + // so there is a hydrate config for the get call - this is invalid + + // is it because hydrate dependencies were declared for the get call? + if len(invalidGetHydrateConfig.Depends) > 0 { + numDeps := len(invalidGetHydrateConfig.Depends) + validationErrors = append(validationErrors, fmt.Sprintf("table '%s' Get hydrate function '%s' has %d %s - Get hydrate functions cannot have dependencies", + t.Name, + helpers.GetFunctionName(t.Get.Hydrate), + numDeps, + pluralize.NewClient().Pluralize("dependency", numDeps, false))) + } else { + // otherwise, show general error + validationErrors = append(validationErrors, fmt.Sprintf("table '%s' Get hydrate function '%s' has a hydrate config declared - this is invalid, this function must be configured using the GetConfig only", + t.Name, + helpers.GetFunctionName(t.Get.Hydrate), + )) + } } } } @@ -157,10 +198,7 @@ func (t *Table) detectCyclicHydrateDependencies() string { } } - for _, hydrateDeps := range t.HydrateDependencies { - updateDependencyGraph(hydrateDeps.Func, hydrateDeps.Depends) - } - for _, hydrateConfig := range t.HydrateConfig { + for _, hydrateConfig := range t.hydrateConfigMap { updateDependencyGraph(hydrateConfig.Func, hydrateConfig.Depends) }