Skip to content

Commit

Permalink
Fix schema sharing optimization when calling setConnectionData for mu…
Browse files Browse the repository at this point in the history
…ltiple connections. Closes #547
  • Loading branch information
kaidaguerre authored May 5, 2023
1 parent 528703c commit 9475aca
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 48 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## v5.4.1 [tbd]

* Avoid loading schema repeatedly when initializing plugin with multiple connections. ([#547](https://github.com/turbot/steampipe-plugin-sdk/issues/547))

## v5.4.0 [2023-04-27]
_What's new_
* Add SetCacheOptions to allow control of cache at server level. ([#546](https://github.com/turbot/steampipe-plugin-sdk/issues/546))
Expand Down
15 changes: 15 additions & 0 deletions plugin/connection_update_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package plugin

import "github.com/turbot/steampipe-plugin-sdk/v5/grpc"

type connectionUpdateData struct {
failedConnections map[string]error
exemplarSchema *grpc.PluginSchema
exemplarTableMap map[string]*Table
}

func NewConnectionUpdateData() *connectionUpdateData {
return &connectionUpdateData{
failedConnections: make(map[string]error),
}
}
39 changes: 20 additions & 19 deletions plugin/plugin_connection_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,51 @@ func (p *Plugin) setAggregatorSchemas() (logMessages map[string][]string, err er
if connectionData.isAggregator() {
logMessages, err := connectionData.initAggregatorSchema(connectionData.config)
if err != nil {
return logMessages, nil
return logMessages, err
}
}
}
return logMessages, err
return logMessages, nil
}

func (p *Plugin) updateConnections(ctx context.Context, changed []*proto.ConnectionConfig, failedConnections map[string]error) error {
func (p *Plugin) updateConnections(ctx context.Context, changed []*proto.ConnectionConfig, updateData *connectionUpdateData) {
if len(changed) == 0 {
return nil
return
}

// first get the existing connection data - used for the update events
existingConnections := make(map[string]*ConnectionData, len(changed))
for _, changedConnection := range changed {
connectionData, ok := p.ConnectionMap[changedConnection.Connection]
if !ok {
return fmt.Errorf("no connection config found for changed connection %s", changedConnection.Connection)
updateData.failedConnections[changedConnection.Connection] = fmt.Errorf("no connection config found for changed connection %s", changedConnection.Connection)
return
}
existingConnections[changedConnection.Connection] = connectionData
}

// now just call addConnections for the changed connections
p.addConnections(changed, failedConnections, nil, nil)
p.addConnections(changed, updateData)

// call the ConnectionConfigChanged callback function for each changed connection
for _, changedConnection := range changed {
c := changedConnection.Connection
p.ConnectionConfigChangedFunc(ctx, p, existingConnections[c].Connection, p.ConnectionMap[changedConnection.Connection].Connection)
}
return nil
return
}

// add connections for all the provided configs
// NOTE: this (may) mutate failedConnections, exemplarSchema and exemplarTableMap
func (p *Plugin) addConnections(configs []*proto.ConnectionConfig, failedConnections map[string]error, exemplarSchema *grpc.PluginSchema, exemplarTableMap map[string]*Table) {
func (p *Plugin) addConnections(configs []*proto.ConnectionConfig, updateData *connectionUpdateData) {
log.Printf("[TRACE] SetAllConnectionConfigs setting %d configs", len(configs))

for _, config := range configs {
if len(config.ChildConnections) > 0 {
log.Printf("[TRACE] connection %s is an aggregator - handle separately", config.Connection)
p.setAggregatorConnectionData(config)
} else {
p.setConnectionData(config, failedConnections, exemplarSchema, exemplarTableMap)
p.setConnectionData(config, updateData)
}
}
}
Expand All @@ -72,12 +73,12 @@ func (p *Plugin) addConnections(configs []*proto.ConnectionConfig, failedConnect
// ConnectionData contains the table map, the schema and the connection
// NOTE: this (may) mutate failedConnections, exemplarSchema and exemplarTableMap
// NOTE: this is NOT called for aggregator connections
func (p *Plugin) setConnectionData(config *proto.ConnectionConfig, failedConnections map[string]error, exemplarSchema *grpc.PluginSchema, exemplarTableMap map[string]*Table) {
func (p *Plugin) setConnectionData(config *proto.ConnectionConfig, updateData *connectionUpdateData) {
connectionName := config.Connection
connectionConfigString := config.Config
if connectionName == "" {
log.Printf("[WARN] setConnectionData failed - ConnectionConfig contained empty connection name")
failedConnections["unknown"] = fmt.Errorf("setConnectionData failed - ConnectionConfig contained empty connection name")
updateData.failedConnections["unknown"] = fmt.Errorf("setConnectionData failed - ConnectionConfig contained empty connection name")
return
}

Expand All @@ -86,13 +87,13 @@ func (p *Plugin) setConnectionData(config *proto.ConnectionConfig, failedConnect
// if config was provided, parse it
if connectionConfigString != "" {
if p.ConnectionConfigSchema == nil {
failedConnections[connectionName] = fmt.Errorf("connection config has been set for connection '%s', but plugin '%s' does not define connection config schema", connectionName, p.Name)
updateData.failedConnections[connectionName] = fmt.Errorf("connection config has been set for connection '%s', but plugin '%s' does not define connection config schema", connectionName, p.Name)
return
}
// ask plugin for a struct to deserialise the config into
config, err := p.ConnectionConfigSchema.parse(config)
if err != nil {
failedConnections[connectionName] = err
updateData.failedConnections[connectionName] = err
log.Printf("[WARN] setConnectionData failed for connection %s, config validation failed: %s", connectionName, err.Error())
return
}
Expand All @@ -101,21 +102,21 @@ func (p *Plugin) setConnectionData(config *proto.ConnectionConfig, failedConnect

var err error
// set table map ands schema exemplar
schema := exemplarSchema
tableMap := exemplarTableMap
schema := updateData.exemplarSchema
tableMap := updateData.exemplarTableMap

// if tableMap is nil, exemplar is not yet set
if tableMap == nil {
log.Printf("[TRACE] connection %s build schema and table map", connectionName)
tableMap, schema, err = p.getConnectionSchema(c)
if err != nil {
failedConnections[connectionName] = err
updateData.failedConnections[connectionName] = err
return
}

if p.SchemaMode == SchemaModeStatic {
exemplarSchema = schema
exemplarTableMap = tableMap
updateData.exemplarSchema = schema
updateData.exemplarTableMap = tableMap
}
}

Expand All @@ -129,7 +130,7 @@ func (p *Plugin) setConnectionData(config *proto.ConnectionConfig, failedConnect
err = p.updateConnectionWatchPaths(c)
if err != nil {
log.Printf("[WARN] SetAllConnectionConfigs failed to update the watched paths for connection %s: %s", c.Name, err.Error())
failedConnections[connectionName] = err
updateData.failedConnections[connectionName] = err
}
return
}
Expand Down
48 changes: 21 additions & 27 deletions plugin/plugin_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,23 @@ setAllConnectionConfigs sets the connection config for a list of connections.
This is the handler function for the setAllConnectionConfigs GRPC function.
*/
func (p *Plugin) setAllConnectionConfigs(configs []*proto.ConnectionConfig, maxCacheSizeMb int) (failedConnections map[string]error, err error) {
func (p *Plugin) setAllConnectionConfigs(configs []*proto.ConnectionConfig, maxCacheSizeMb int) (_ map[string]error, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("setAllConnectionConfigs failed: %s", helpers.ToError(r).Error())
} else {
p.Logger.Debug("setAllConnectionConfigs finished")
}
}()
failedConnections = make(map[string]error)

p.addConnections(configs, failedConnections, nil, nil)
// create a struct to populate with exemplar schema and connection failures
// this will be passed into update functions and may be mutated
updateData := NewConnectionUpdateData()
p.addConnections(configs, updateData)

// TODO report log messages back somewhere
_, err = p.setAggregatorSchemas()
if err != nil {
return failedConnections, err
return updateData.failedConnections, err
}

// if the version of the CLI does not support SetCacheOptions,
Expand All @@ -76,13 +77,13 @@ func (p *Plugin) setAllConnectionConfigs(configs []*proto.ConnectionConfig, maxC
}
err = p.ensureCache(connectionSchemaMap, opts)
if err != nil {
return failedConnections, err
return updateData.failedConnections, err
}
}

// if there are any failed connections, raise an error
err = error_helpers.CombineErrors(maps.Values(failedConnections)...)
return failedConnections, err
err = error_helpers.CombineErrors(maps.Values(updateData.failedConnections)...)
return updateData.failedConnections, err
}

/*
Expand All @@ -96,49 +97,42 @@ updateConnectionConfigs handles added, changed and deleted connections:
This is the handler function for the updateConnectionConfigs GRPC function.
*/
func (p *Plugin) updateConnectionConfigs(added []*proto.ConnectionConfig, deleted []*proto.ConnectionConfig, changed []*proto.ConnectionConfig) (failedConnections map[string]error, err error) {
func (p *Plugin) updateConnectionConfigs(added []*proto.ConnectionConfig, deleted []*proto.ConnectionConfig, changed []*proto.ConnectionConfig) (map[string]error, error) {
ctx := context.WithValue(context.Background(), context_key.Logger, p.Logger)

p.logChanges(added, deleted, changed)

// create a struct to populate with exemplar schema and connection failures
// this will be passed into update functions and may be mutated
updateData := NewConnectionUpdateData()

// if this plugin does not have dynamic config, we can share table map and schema
var exemplarSchema *grpc.PluginSchema
var exemplarTableMap map[string]*Table
if p.SchemaMode == SchemaModeStatic {
for _, connectionData := range p.ConnectionMap {
exemplarSchema = connectionData.Schema
exemplarTableMap = connectionData.TableMap
updateData.exemplarSchema = connectionData.Schema
updateData.exemplarTableMap = connectionData.TableMap
// just take the first item
break
}
}

// key track of connections which have errors
failedConnections = make(map[string]error)

// remove deleted connections
for _, deletedConnection := range deleted {
delete(p.ConnectionMap, deletedConnection.Connection)
}

// add added connections
p.addConnections(added, failedConnections, exemplarSchema, exemplarTableMap)
if err != nil {
return failedConnections, err
}
p.addConnections(added, updateData)

// update changed connections
// build map of current connection data for each changed connection
err = p.updateConnections(ctx, changed, failedConnections)
if err != nil {
return failedConnections, err
}
p.updateConnections(ctx, changed, updateData)

// if there are any added or changed connections, we need to rebuild all aggregator schemas
if len(added)+len(deleted)+len(changed) > 0 {
_, err = p.setAggregatorSchemas()
_, err := p.setAggregatorSchemas()
if err != nil {
return failedConnections, err
return updateData.failedConnections, err
}
}

Expand All @@ -147,7 +141,7 @@ func (p *Plugin) updateConnectionConfigs(added []*proto.ConnectionConfig, delete
p.queryCache.PluginSchemaMap = p.buildConnectionSchemaMap()
}

return failedConnections, nil
return updateData.failedConnections, nil
}

/*
Expand Down
4 changes: 2 additions & 2 deletions version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
var ProtocolVersion int64 = 20220201

// Version is the main version number that is being run at the moment.
var version = "5.4.0"
var version = "5.4.1"

// A pre-release marker for the version. If this is "" (empty string)
// then it means that it is a final release. Otherwise, this is a pre-release
// such as "dev" (in development), "beta", "rc1", etc.
var prerelease = ""
var prerelease = "rc.0"

// semVer is an instance of version.Version. This has the secondary
// benefit of verifying during tests and init time that our version is a
Expand Down

0 comments on commit 9475aca

Please sign in to comment.