Skip to content

Commit

Permalink
Return data for all columns returned by hydrate functions, not just r…
Browse files Browse the repository at this point in the history
…equested columns. Closes #156
  • Loading branch information
kaidaguerre authored Jul 20, 2021
1 parent 7ee891e commit e31b71e
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 54 deletions.
13 changes: 9 additions & 4 deletions plugin/hydrate_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,15 @@ type HydrateCall struct {
// the dependencies expressed using function name
Depends []string
Config *HydrateConfig
Name string
}

func newHydrateCall(hydrateFunc HydrateFunc, config *HydrateConfig) *HydrateCall {
res := &HydrateCall{Func: hydrateFunc, Config: config}
res := &HydrateCall{
Name: helpers.GetFunctionName(hydrateFunc),
Func: hydrateFunc,
Config: config,
}
for _, f := range config.Depends {
res.Depends = append(res.Depends, helpers.GetFunctionName(f))
}
Expand All @@ -81,14 +86,14 @@ func (h HydrateCall) CanStart(rowData *RowData, name string, concurrencyManager
}

// Start starts a hydrate call
func (h *HydrateCall) Start(ctx context.Context, r *RowData, d *QueryData, hydrateFuncName string, concurrencyManager *ConcurrencyManager) {
func (h *HydrateCall) Start(ctx context.Context, r *RowData, d *QueryData, concurrencyManager *ConcurrencyManager) {
// tell the rowdata to wait for this call to complete
r.wg.Add(1)

// call callHydrate async, ignoring return values
go func() {
r.callHydrate(ctx, d, h.Func, hydrateFuncName, h.Config.RetryConfig, h.Config.ShouldIgnoreError)
r.callHydrate(ctx, d, h.Func, h.Name, h.Config.RetryConfig, h.Config.ShouldIgnoreError)
// decrement number of hydrate functions running
concurrencyManager.Finished(hydrateFuncName)
concurrencyManager.Finished(h.Name)
}()
}
6 changes: 3 additions & 3 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"strconv"
"syscall"

connection_manager "github.com/turbot/steampipe-plugin-sdk/connection"

"github.com/hashicorp/go-hclog"
"github.com/turbot/go-kit/helpers"
connection_manager "github.com/turbot/steampipe-plugin-sdk/connection"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/logging"
"github.com/turbot/steampipe-plugin-sdk/plugin/context_key"
Expand Down Expand Up @@ -159,7 +159,7 @@ func (p *Plugin) SetConnectionConfig(connectionName, connectionConfigString stri

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("SetConnectionConfig failed: %s", ToError(r).Error())
err = fmt.Errorf("SetConnectionConfig failed: %s", helpers.ToError(r).Error())
} else {
p.Logger.Debug("SetConnectionConfig finished")
}
Expand Down
45 changes: 33 additions & 12 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

const itemBufferSize = 100

// NOTE - any field added here must also be added to ShallowCopy

type QueryData struct {
// The table this query is associated with
Table *Table
Expand Down Expand Up @@ -46,14 +48,18 @@ type QueryData struct {
// event for the child list of a parent child list call
StreamLeafListItem func(ctx context.Context, item interface{})
// internal

// a list of the required hydrate calls (EXCLUDING the fetch call)
hydrateCalls []*HydrateCall
// all the columns that will be returned by this query
columns []string

concurrencyManager *ConcurrencyManager
rowDataChan chan *RowData
errorChan chan error
streamCount int
stream proto.WrapperPlugin_ExecuteServer

stream proto.WrapperPlugin_ExecuteServer
// wait group used to synchronise parent-child list fetches - each child hydrate function increments this wait group
listWg *sync.WaitGroup
// when executing parent child list calls, we cache the parent list result in the query data passed to the child list call
Expand Down Expand Up @@ -91,7 +97,10 @@ func newQueryData(queryContext *QueryContext, table *Table, stream proto.Wrapper
// NOTE: for count(*) queries, there will be no columns - add in 1 column so that we have some data to return
ensureColumns(queryContext, table)

// build list of required hydrate calls, based on requested columns
d.hydrateCalls = table.requiredHydrateCalls(queryContext.Columns, d.FetchType)
// build list of all columns returned by these hydrate calls (and the fetch call)
d.populateColumns()
d.concurrencyManager = newConcurrencyManager(table)

return d
Expand All @@ -117,6 +126,7 @@ func (d *QueryData) ShallowCopy() *QueryData {
stream: d.stream,
streamCount: d.streamCount,
listWg: d.listWg,
columns: d.columns,
}

// NOTE: we create a deep copy of the keyColumnQuals
Expand All @@ -134,6 +144,27 @@ func (d *QueryData) ShallowCopy() *QueryData {
return clone
}

// build list of all columns returned by the fetch call and required hydrate calls
func (d *QueryData) populateColumns() {
// add columns returned by fetch call
fetchName := helpers.GetFunctionName(d.Table.getFetchFunc(d.FetchType))
d.columns = append(d.columns, d.addColumnsForHydrate(fetchName)...)

// add columns returned by required hydrate calls
for _, h := range d.hydrateCalls {
d.columns = append(d.columns, d.addColumnsForHydrate(h.Name)...)
}
}

// get the column returned by the given hydrate call
func (d *QueryData) addColumnsForHydrate(hydrateName string) []string {
var cols []string
for _, columnName := range d.Table.hydrateColumnMap[hydrateName] {
cols = append(cols, columnName)
}
return cols
}

// KeyColumnQualString looks for the specified key column quals and if it exists, return the value as a string
func (d *QueryData) KeyColumnQualString(key string) string {
qualValue, ok := d.KeyColumnQuals[key]
Expand Down Expand Up @@ -442,7 +473,7 @@ func (d *QueryData) buildRows(ctx context.Context) chan *proto.Row {
func (d *QueryData) buildRow(ctx context.Context, rowData *RowData, rowChan chan *proto.Row, wg *sync.WaitGroup) {
defer func() {
if r := recover(); r != nil {
d.streamError(ToError(r))
d.streamError(helpers.ToError(r))
}
wg.Done()
}()
Expand All @@ -464,13 +495,3 @@ func (d *QueryData) waitForRowsToComplete(rowWg *sync.WaitGroup, rowChan chan *p
log.Println("[TRACE] rowWg complete - CLOSING ROW CHANNEL")
close(rowChan)
}

// ToError is used to return an error or format the supplied value as error.
// Can be removed once go-kit version 0.2.0 is released
func ToError(val interface{}) error {
if e, ok := val.(error); ok {
return e
} else {
return fmt.Errorf("%v", val)
}
}
22 changes: 8 additions & 14 deletions plugin/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/logging"
"github.com/turbot/steampipe-plugin-sdk/plugin/context_key"
Expand Down Expand Up @@ -82,7 +81,7 @@ func (r *RowData) startAllHydrateCalls(rowDataCtx context.Context, rowQueryData
for {
var allStarted = true
for _, call := range r.queryData.hydrateCalls {
hydrateFuncName := helpers.GetFunctionName(call.Func)
hydrateFuncName := call.Name
// if it is already started, continue to next call
if callsStarted[hydrateFuncName] {
continue
Expand All @@ -91,7 +90,7 @@ func (r *RowData) startAllHydrateCalls(rowDataCtx context.Context, rowQueryData
// so call needs to start - can it?
if call.CanStart(r, hydrateFuncName, r.queryData.concurrencyManager) {
// execute the hydrate call asynchronously
call.Start(rowDataCtx, r, rowQueryData, hydrateFuncName, r.queryData.concurrencyManager)
call.Start(rowDataCtx, r, rowQueryData, r.queryData.concurrencyManager)
callsStarted[hydrateFuncName] = true
} else {
allStarted = false
Expand Down Expand Up @@ -149,21 +148,16 @@ func (r *RowData) waitForHydrateCallsToComplete(rowDataCtx context.Context) (*pr
// generate the column values for for all requested columns
func (r *RowData) getColumnValues(ctx context.Context) (*proto.Row, error) {
row := &proto.Row{Columns: make(map[string]*proto.Column)}
// only populate columns which have been asked for
for _, columnName := range r.queryData.QueryContext.Columns {
// get columns schema
column := r.table.getColumn(columnName)
if column == nil {
// postgres asked for a non existent column. Shouldn't happen but just ignore
continue
}

var err error
row.Columns[columnName], err = r.table.getColumnValue(ctx, r, column)
// queryData.columns contains all columns returned by the hydrate calls which have been executed
for _, columnName := range r.queryData.columns {
val, err := r.table.getColumnValue(ctx, r, columnName)
if err != nil {
return nil, err
}
row.Columns[columnName] = val
}

return row, nil
}

Expand Down Expand Up @@ -223,6 +217,7 @@ func shouldRetryError(err error, d *QueryData, retryConfig *RetryConfig) bool {
if d.streamCount != 0 {
return false
}

shouldRetryErrorFunc := retryConfig.ShouldRetryError
if shouldRetryErrorFunc == nil {
return false
Expand Down Expand Up @@ -265,7 +260,6 @@ func (r *RowData) getHydrateKeys() []string {

// GetColumnData returns the root item, and, if this column has a hydrate function registered, the associated hydrate data
func (r *RowData) GetColumnData(column *Column) (interface{}, error) {

if column.resolvedHydrateName == "" {
return nil, fmt.Errorf("column %s has no resolved hydrate function name", column.Name)
}
Expand Down
55 changes: 37 additions & 18 deletions plugin/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Table struct {
// definitions of dependencies between hydrate functions
HydrateDependencies []HydrateDependencies
HydrateConfig []HydrateConfig
// map of hydrate function name to columns it provides
hydrateColumnMap map[string][]string
}

type GetConfig struct {
Expand All @@ -54,40 +56,57 @@ type ListConfig struct {
// build a list of required hydrate function calls which must be executed, based on the columns which have been requested
// NOTE: 'get' and 'list' calls are hydration functions, but they must be omitted from this list as they are called
// first. BEFORE the other hydration functions
// NOTE2: this function also populates the resolvedHydrateName for each column (used to retrieve column values),
// and the hydrateColumnMap (used to determine which columns to return)
func (t *Table) requiredHydrateCalls(colsUsed []string, fetchType fetchType) []*HydrateCall {
log.Printf("[TRACE] requiredHydrateCalls, table '%s' fetchType %s colsUsed %v\n", t.Name, fetchType, colsUsed)

// what is the name of the fetch call (i.e. the get/list call)
var fetchCallName string
if fetchType == fetchTypeList {
fetchCallName = helpers.GetFunctionName(t.List.Hydrate)
} else {
fetchCallName = helpers.GetFunctionName(t.Get.Hydrate)
}
fetchFunc := t.getFetchFunc(fetchType)
fetchCallName := helpers.GetFunctionName(fetchFunc)

// initialise hydrateColumnMap
t.hydrateColumnMap = make(map[string][]string)
requiredCallBuilder := newRequiredHydrateCallBuilder(t, fetchCallName)

// populate a map keyed by function name to ensure we only store each hydrate function once
for _, column := range t.Columns {
if helpers.StringSliceContains(colsUsed, column.Name) {

// see if this column specifies a hydrate function
hydrateFunc := column.Hydrate
if hydrateFunc != nil {
hydrateName := helpers.GetFunctionName(hydrateFunc)
column.resolvedHydrateName = hydrateName
// see if this column specifies a hydrate function

var hydrateName string
if hydrateFunc := column.Hydrate; hydrateFunc == nil {
// so there is NO hydrate call registered for the column
// the column is provided by the fetch call
// do not add to map of hydrate functions as the fetch call will always be called
hydrateFunc = fetchFunc
hydrateName = fetchCallName
} else {
// there is a hydrate call registered
hydrateName = helpers.GetFunctionName(hydrateFunc)
// if this column was requested in query, add the hydrate call to required calls
if helpers.StringSliceContains(colsUsed, column.Name) {
requiredCallBuilder.Add(hydrateFunc)
} else {
column.resolvedHydrateName = fetchCallName
// so there is no hydrate call registered for the column - the resolvedHydrateName is the fetch call
// do not add to map of hydrate functions as the fetch call will always be called
}
}
}

// now update hydrateColumnMap
t.hydrateColumnMap[hydrateName] = append(t.hydrateColumnMap[hydrateName], column.Name)
// store the hydrate name in the column object
column.resolvedHydrateName = hydrateName
}
log.Printf("[WARN] requiredHydrateCalls %v hydrateColumnMap %v", requiredCallBuilder.Get(), t.hydrateColumnMap)
return requiredCallBuilder.Get()
}

func (t *Table) getFetchFunc(fetchType fetchType) HydrateFunc {

if fetchType == fetchTypeList {
return t.List.Hydrate
}
return t.Get.Hydrate

}

func (t *Table) getHydrateDependencies(hydrateFuncName string) []HydrateFunc {
for _, d := range t.HydrateDependencies {
if helpers.GetFunctionName(d.Func) == hydrateFuncName {
Expand Down
9 changes: 8 additions & 1 deletion plugin/table_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ func (t *Table) getColumnType(columnName string) proto.ColumnType {
}

// take the raw value returned by the get/list/hydrate call, apply transforms and convert to protobuf value
func (t *Table) getColumnValue(ctx context.Context, rowData *RowData, column *Column) (*proto.Column, error) {
func (t *Table) getColumnValue(ctx context.Context, rowData *RowData, columnName string) (*proto.Column, error) {
// get columns schema
column := t.getColumn(columnName)
if column == nil {
// postgres asked for a non existent column. Shouldn't happen.
return nil, fmt.Errorf("hydrateColumnMap contains non existent column %s", columnName)
}

hydrateItem, err := rowData.GetColumnData(column)
if err != nil {
log.Printf("[ERROR] table '%s' failed to get column data: %v\n", t.Name, err)
Expand Down
3 changes: 1 addition & 2 deletions plugin/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"testing"

"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
)

Expand Down Expand Up @@ -318,7 +317,7 @@ func hydrateArrayToString(calls []*HydrateCall) string {
}

func hydrateCallToString(call *HydrateCall) string {
str := fmt.Sprintf("Func: %s", helpers.GetFunctionName(call.Func))
str := fmt.Sprintf("Func: %s", call.Name)
if len(call.Depends) > 0 {
str += "\n Depends:"
}
Expand Down

0 comments on commit e31b71e

Please sign in to comment.