Skip to content

Commit

Permalink
Instead of renaming _ctx column to dedupe it, implement a set of rese…
Browse files Browse the repository at this point in the history
…rved columns which may not be used. Closes #536
  • Loading branch information
kaidaguerre authored Feb 27, 2023
1 parent af3f938 commit fcc4783
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 75 deletions.
21 changes: 10 additions & 11 deletions plugin/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (d *ConnectionData) logInitAggregatorSchema(aggregatorConfig *proto.Connect
log.Printf("[INFO] ")
}

// for each table in AggregatedTablesByConnection, verify all connections have the same schema and if so,
// add to table map
// for each table in AggregatedTablesByConnection, verify all connections have the key columns, and if so,
// build a superset schema and add to table map
func (d *ConnectionData) resolveAggregatorTableMap(aggregatorConfig *proto.ConnectionConfig, logMessages map[string][]string) {
// clear table map and schema before we start
d.TableMap = make(map[string]*Table)
Expand All @@ -161,13 +161,11 @@ func (d *ConnectionData) resolveAggregatorTableMap(aggregatorConfig *proto.Conne
continue
}

// try to build a schema for this table - this will compare the schemas for all connections and
// if they are the same, or the Aggregation property if merge_*, it will build a schema
// If the tables have a different schema between connections, and the aggregation mode is not merge,
// this table will be EXCLUDED
// try to build a superset schema for this table
// If the tables have a different key columns connections this table will be EXCLUDED
tableSchema, messages := d.buildAggregatorTableSchema(aggregatorConfig, tableName)
if tableSchema != nil {
// so we managed to b
// so we managed to build a schema
d.TableMap[tableName] = table
d.Schema.Schema[tableName] = tableSchema
} else {
Expand Down Expand Up @@ -250,15 +248,15 @@ func (d *ConnectionData) getAggregatedTables(aggregatorConfig *proto.ConnectionC
func (d *ConnectionData) buildAggregatorTableSchema(aggregatorConfig *proto.ConnectionConfig, tableName string) (*proto.TableSchema, []string) {
exemplarSchema, connectionTableDiffs, messages := d.getSchemaDiffBetweenConnections(aggregatorConfig, tableName)

// if there are no diffs, there is nothing more to do
// if there is no exemplar schema, or there are no diffs, there is nothing more to do
if exemplarSchema == nil || !connectionTableDiffs.HasDiffs() {
return exemplarSchema, messages
}

// so there are diffs between the schemas for this table for each connection

// build a superset schema
var subsetSchema = &proto.TableSchema{
var superset = &proto.TableSchema{
Description: exemplarSchema.Description,
GetCallKeyColumns: exemplarSchema.GetCallKeyColumns,
ListCallKeyColumns: exemplarSchema.ListCallKeyColumns,
Expand Down Expand Up @@ -292,12 +290,12 @@ func (d *ConnectionData) buildAggregatorTableSchema(aggregatorConfig *proto.Conn
}

// ok including this column
subsetSchema.Columns = append(subsetSchema.Columns, column)
superset.Columns = append(superset.Columns, column)
includedColumns[column.Name] = struct{}{}
}
}

return subsetSchema, messages
return superset, messages
}

func (d *ConnectionData) getSchemaDiffBetweenConnections(aggregatorConfig *proto.ConnectionConfig, tableName string) (*proto.TableSchema, *proto.TableSchemaDiff, []string) {
Expand Down Expand Up @@ -332,6 +330,7 @@ func (d *ConnectionData) getSchemaDiffBetweenConnections(aggregatorConfig *proto
// merge the diffs
connectionTableDiffs.Merge(schemaDiff)
}

return exemplarSchema, connectionTableDiffs, messages
}

Expand Down
24 changes: 22 additions & 2 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/gertd/go-pluralize"
"log"
"os"
"path"
Expand Down Expand Up @@ -561,9 +562,11 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq
}

log.Printf("[INFO] queryCacheGet returned CACHE MISS (%s)", connectionCallId)

// NOTE: update the cache request to include ALL the columns which will be fetched, not just those requested
// this means subsequent queries requesting other columns from same hydrate func(s) can be served from the cache
cacheRequest.Columns = queryData.getColumnNames()

p.queryCache.StartSet(ctx, cacheRequest)
} else {
log.Printf("[INFO] Cache DISABLED connectionCallId: %s", connectionCallId)
Expand Down Expand Up @@ -659,12 +662,29 @@ func (p *Plugin) initialiseTables(ctx context.Context, connection *Connection) (

// now validate the plugin
// NOTE: must do this after calling TableMapFunc
if validationErrors := p.validate(tableMap); validationErrors != "" {
return nil, fmt.Errorf("plugin %s connection %s validation failed: \n%s", p.Name, connection.Name, validationErrors)
validationWarnings, validationErrors := p.validate(tableMap)

if len(validationWarnings) > 0 {
logValidationWarning(connection, validationWarnings)
}
if len(validationErrors) > 0 {
return nil, fmt.Errorf("plugin %s connection %s validation failed: \n%s", p.Name, connection.Name, strings.Join(validationErrors, "\n"))
}
return tableMap, nil
}

func logValidationWarning(connection *Connection, warnings []string) {
count := len(warnings)
log.Printf("[WARN] connection %s, has %d table validation %s",
connection.Name,
count,
pluralize.NewClient().Pluralize("warning", count, false))

for _, w := range warnings {
log.Printf("[WARN] %s", w)
}
}

func (p *Plugin) setupLogger() hclog.Logger {
// time will be provided by the plugin manager logger
logger := logging.NewLogger(&hclog.LoggerOptions{DisableTime: true})
Expand Down
35 changes: 18 additions & 17 deletions plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package plugin

import (
"context"
"strings"
"testing"

"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
)

type validateTest struct {
plugin Plugin
expected string
expected []string
}

// test hydrate functions
Expand Down Expand Up @@ -72,7 +73,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "",
expected: []string{""},
},
"get with hydrate dependency": {
plugin: Plugin{
Expand Down Expand Up @@ -104,7 +105,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' Get hydrate function 'getHydrate' has 1 dependency - Get hydrate functions cannot have dependencies",
expected: []string{"table 'table' Get hydrate function 'getHydrate' has 1 dependency - Get hydrate functions cannot have dependencies"},
},
"get with explicit hydrate config": {
plugin: Plugin{
Expand Down Expand Up @@ -136,7 +137,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' Get hydrate function 'getHydrate' also has an explicit hydrate config declared in `HydrateConfig`",
expected: []string{"table 'table' Get hydrate function 'getHydrate' also has an explicit hydrate config declared in `HydrateConfig`"},
},
"list with hydrate dependency": {
plugin: Plugin{
Expand Down Expand Up @@ -168,7 +169,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' List hydrate function 'listHydrate' has 1 dependency - List hydrate functions cannot have dependencies",
expected: []string{"table 'table' List hydrate function 'listHydrate' has 1 dependency - List hydrate functions cannot have dependencies"},
},
"list with explicit hydrate config": {
plugin: Plugin{
Expand Down Expand Up @@ -200,7 +201,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' List hydrate function 'listHydrate' also has an explicit hydrate config declared in `HydrateConfig`",
expected: []string{"table 'table' List hydrate function 'listHydrate' also has an explicit hydrate config declared in `HydrateConfig`"},
},
// non deterministic - skip
//"circular dep": {
Expand Down Expand Up @@ -241,7 +242,7 @@ var testCasesValidate = map[string]validateTest{
// },
// RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
// },
// expected: "Hydration dependencies contains cycle: : hydrate1 -> hydrate2 -> hydrate1",
// expected: []string{"Hydration dependencies contains cycle: : hydrate1 -> hydrate2 -> hydrate1",
//},
"no get key": {
plugin: Plugin{
Expand Down Expand Up @@ -272,7 +273,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' GetConfig does not specify a KeyColumn",
expected: []string{"table 'table' GetConfig does not specify a KeyColumn"},
},
"no get hydrate": {
plugin: Plugin{
Expand Down Expand Up @@ -303,7 +304,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' GetConfig does not specify a hydrate function\ntable 'table' HydrateConfig does not specify a hydrate function",
expected: []string{"table 'table' GetConfig does not specify a hydrate function\ntable 'table' HydrateConfig does not specify a hydrate function"},
},
"no list hydrate": {
plugin: Plugin{
Expand Down Expand Up @@ -333,7 +334,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' ListConfig does not specify a hydrate function",
expected: []string{"table 'table' ListConfig does not specify a hydrate function"},
},
"no list or get config": {
plugin: Plugin{
Expand All @@ -357,7 +358,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' does not have either GetConfig or ListConfig - one of these must be provided",
expected: []string{"table 'table' does not have either GetConfig or ListConfig - one of these must be provided"},
},
"required column wrong type": {
plugin: Plugin{
Expand Down Expand Up @@ -389,7 +390,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' required column 'name' should be type 'ColumnType_STRING' but is type 'ColumnType_INT'",
expected: []string{"table 'table' required column 'name' should be type 'ColumnType_STRING' but is type 'ColumnType_INT'"},
},
"missing required column": {
plugin: Plugin{
Expand Down Expand Up @@ -421,7 +422,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "missing", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' does not implement required column 'missing'",
expected: []string{"table 'table' does not implement required column 'missing'"},
},
"missing get key": {
plugin: Plugin{
Expand Down Expand Up @@ -452,7 +453,7 @@ var testCasesValidate = map[string]validateTest{
},
RequiredColumns: []*Column{{Name: "name", Type: proto.ColumnType_STRING}},
},
expected: "table 'table' GetConfig does not specify a KeyColumn",
expected: []string{"table 'table' GetConfig does not specify a KeyColumn"},
},
}

Expand All @@ -461,10 +462,10 @@ func TestValidate(t *testing.T) {
test.plugin.initialise()
test.plugin.initialiseTables(context.Background(), &Connection{Name: "test"})

validationErrors := test.plugin.validate(test.plugin.TableMap)
_, validationErrors := test.plugin.validate(test.plugin.TableMap)

if test.expected != validationErrors {
t.Errorf("Test: '%s'' FAILED. \nExpected: '%s' \nGot: '%s' ", name, test.expected, validationErrors)
if strings.Join(test.expected, "\n") != strings.Join(validationErrors, "\n") {
t.Errorf("Test: '%s'' FAILED. \nexpected: []string{'%s' \nGot: '%s' ", name, test.expected, validationErrors)
}
}
}
18 changes: 12 additions & 6 deletions plugin/plugin_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package plugin

import (
"fmt"
"github.com/gertd/go-pluralize"
"log"
"strings"
)

func (p *Plugin) validate(tableMap map[string]*Table) string {
func (p *Plugin) validate(tableMap map[string]*Table) (validationWarnings, validationErrors []string) {
log.Printf("[TRACE] validate plugin %s, required columns %v", p.Name, p.RequiredColumns)
var validationErrors []string
for tableName, table := range tableMap {
validationErrors = append(validationErrors, table.validate(tableName, p.RequiredColumns)...)
w, e := table.validate(tableName, p.RequiredColumns)
validationWarnings = append(validationWarnings, w...)
validationErrors = append(validationErrors, e...)
}
if p.ConnectionConfigSchema != nil {
validationErrors = append(validationErrors, p.ConnectionConfigSchema.Validate()...)
Expand All @@ -30,8 +31,13 @@ func (p *Plugin) validate(tableMap map[string]*Table) string {
log.Printf("[TRACE] validate table names")
validationErrors = append(validationErrors, p.validateTableNames()...)

log.Printf("[TRACE] plugin has %d validation errors", len(validationErrors))
return strings.Join(validationErrors, "\n")
log.Printf("[INFO] plugin validation result: %d %s %d %s",
len(validationWarnings),
pluralize.NewClient().Pluralize("warning", len(validationWarnings), false),
len(validationErrors),
pluralize.NewClient().Pluralize("error", len(validationErrors), false))

return validationWarnings, validationErrors
}

// validate that table names are consistent with their key in the table map
Expand Down
1 change: 0 additions & 1 deletion plugin/query_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func NewQueryContext(p *proto.QueryContext, limit *proto.NullableInt, cacheEnabl
// NOTE: only set columns which are supported by this table
// (in the case of dynamic aggregators, the query may request
// columns that this table does not provide for this connection)
contextColumnName := contextColumnName(table.columnNameMap)
for _, c := range p.Columns {
// context column is not in the table column map
if _, hasColumn := table.columnNameMap[c]; hasColumn || c == contextColumnName {
Expand Down
35 changes: 29 additions & 6 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ type QueryData struct {
freeMemInterval int64

// temp dir for the connection
tempDir string
contextColumnName string
tempDir string
reservedColumns map[string]struct{}
}

func newQueryData(connectionCallId string, p *Plugin, queryContext *QueryContext, table *Table, connectionData *ConnectionData, executeData *proto.ExecuteConnectionData, outputChan chan *proto.ExecuteResponse) (*QueryData, error) {
Expand Down Expand Up @@ -160,6 +160,7 @@ func newQueryData(connectionCallId string, p *Plugin, queryContext *QueryContext
listWg: &wg,

freeMemInterval: GetFreeMemInterval(),
reservedColumns: getReservedColumns(table),

// temporary dir for this connection
// this will only created if GetSourceFiles is used
Expand All @@ -181,11 +182,20 @@ func newQueryData(connectionCallId string, p *Plugin, queryContext *QueryContext
// populate the query status
// if a limit is set, use this to set rows required - otherwise just set to MaxInt32
d.queryStatus = newQueryStatus(d.QueryContext.Limit)
// set ctx column name
d.contextColumnName = contextColumnName(d.Table.columnNameMap)
return d, nil
}

// build a list of reserved columns for this table
func getReservedColumns(table *Table) map[string]struct{} {
var res = make(map[string]struct{}, len(table.Columns))
for columnName := range table.columnNameMap {
if IsReservedColumnName(columnName) {
res[columnName] = struct{}{}
}
}
return res
}

// ShallowCopy creates a shallow copy of the QueryData, i.e. most pointer properties are copied
// this is used to pass different quals to multiple list/get calls, when an 'in' clause is specified
func (d *QueryData) ShallowCopy() *QueryData {
Expand Down Expand Up @@ -322,6 +332,11 @@ func (d *QueryData) populateColumns() {
// get the column returned by the given hydrate call
func (d *QueryData) addColumnsForHydrate(hydrateName string) {
for _, columnName := range d.hydrateColumnMap[hydrateName] {
// skip reserved columns
if _, isReserved := d.reservedColumns[columnName]; isReserved {
continue
}

// get the column from the table
column := d.Table.getColumn(columnName)
d.columns[columnName] = NewQueryColumn(column, hydrateName)
Expand Down Expand Up @@ -736,6 +751,8 @@ func (d *QueryData) buildRowAsync(ctx context.Context, rowData *rowData, rowChan
log.Printf("[WARN] getRow failed with error %v", err)
d.streamError(err)
} else {
// remove reserved columns
d.removeReservedColumns(row)
// NOTE: add the Steampipecontext data to the row
d.addContextData(row)

Expand All @@ -747,7 +764,7 @@ func (d *QueryData) buildRowAsync(ctx context.Context, rowData *rowData, rowChan
func (d *QueryData) addContextData(row *proto.Row) {
jsonValue, _ := json.Marshal(map[string]string{"connection_name": d.Connection.Name})

row.Columns[d.contextColumnName] = &proto.Column{Value: &proto.Column_JsonValue{JsonValue: jsonValue}}
row.Columns[contextColumnName] = &proto.Column{Value: &proto.Column_JsonValue{JsonValue: jsonValue}}
}

func (d *QueryData) waitForRowsToComplete(rowWg *sync.WaitGroup, rowChan chan *proto.Row) {
Expand All @@ -772,5 +789,11 @@ func (d *QueryData) getCacheQualMap() map[string]*proto.Quals {

// return the names of all columns that will be returned, adding in the _ctx column
func (d *QueryData) getColumnNames() []string {
return append(maps.Keys(d.columns), d.contextColumnName)
return append(maps.Keys(d.columns), contextColumnName)
}

func (d *QueryData) removeReservedColumns(row *proto.Row) {
for c := range d.reservedColumns {
delete(row.Columns, c)
}
}
Loading

0 comments on commit fcc4783

Please sign in to comment.