Skip to content

Commit

Permalink
Add support for pushing down sort order. Closes #596
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre authored and pskrbasu committed Oct 4, 2024
1 parent 2342401 commit 0a7151d
Show file tree
Hide file tree
Showing 15 changed files with 768 additions and 479 deletions.
1,093 changes: 630 additions & 463 deletions grpc/proto/plugin.pb.go

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions grpc/proto/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ message QueryContext {
repeated string columns = 1;
map<string, Quals> quals = 2;
NullableInt limit = 3 [deprecated = true];
repeated SortColumn sort_order = 4;

}

message NullableInt {
Expand All @@ -103,6 +105,11 @@ message ExecuteRequest {
map<string, ExecuteConnectionData> executeConnectionData = 8;
}

message SortColumn{
string column = 1;
SortOrder order = 2;
}

message ExecuteConnectionData {
NullableInt limit = 2;
bool cache_enabled = 3;
Expand Down Expand Up @@ -199,6 +206,12 @@ message TableSchema
repeated KeyColumn listCallKeyColumnList = 7;
}

enum SortOrder {
None = 0;
Asc = 1;
Desc = 2;
All = 3;
}
// a set of Key Columns, required for get/list calls
// deprecated - kept for compatibility
message KeyColumnsSet
Expand Down Expand Up @@ -264,6 +277,7 @@ message ColumnDefinition {
string description = 3;
string hydrate = 4;
Column default = 5;
SortOrder sort_order = 6;
}

enum ColumnType {
Expand Down Expand Up @@ -297,6 +311,7 @@ message IndexItem{
int64 limit = 5;
int64 page_count = 6;
google.protobuf.Timestamp insertion_time =7;
repeated SortColumn sort_order = 8;
}

message SetCacheOptionsRequest {
Expand Down
7 changes: 4 additions & 3 deletions grpc/proto/query_context.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package proto

// NewQueryContext creates a proto.QueryContext from provided columns, qualMap, and if non-nul, the limit
func NewQueryContext(columns []string, qualMap map[string]*Quals, limit int64) *QueryContext {
func NewQueryContext(columns []string, qualMap map[string]*Quals, limit int64, sortOrder []*SortColumn) *QueryContext {
var queryContext = &QueryContext{
Columns: columns,
Quals: qualMap,
Columns: columns,
Quals: qualMap,
SortOrder: sortOrder,
}
if limit != -1 {
queryContext.Limit = &NullableInt{Value: limit}
Expand Down
5 changes: 5 additions & 0 deletions grpc/proto/sort_column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package proto

func (x *SortColumn) Equals(other *SortColumn) bool {
return x.Column == other.Column && x.Order == other.Order
}
43 changes: 43 additions & 0 deletions plugin/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,48 @@ import (
"net"
)

// SortColumn is used to specify the sort order that a column supports
type SortColumn struct {
Column string
Order SortOrder
}

type SortOrder int

const (
SortNone SortOrder = iota
SortAsc
SortDesc
SortAll
)

// method to convert to proto SortOrder
func (s SortOrder) toProto() proto.SortOrder {
switch s {
case SortAsc:
return proto.SortOrder_Asc
case SortDesc:
return proto.SortOrder_Desc
case SortAll:
return proto.SortOrder_All
default:
return proto.SortOrder_None
}
}

func (s SortOrder) String() string {
switch s {
case SortAsc:
return "asc"
case SortDesc:
return "desc"
case SortAll:
return "asc or desc"
default:
return "none"
}
}

/*
Column defines a column of a table.
Expand Down Expand Up @@ -82,6 +124,7 @@ type Column struct {
Transform *transform.ColumnTransforms

namedHydrate namedHydrateFunc
Sort SortOrder
}

func (c *Column) initialise() {
Expand Down
2 changes: 2 additions & 0 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func (p *Plugin) executeForConnection(streamContext context.Context, req *proto.
if cacheEnabled {
// get a fresh context which includes telemetry data and logger
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
}
ctx = p.buildExecuteContext(ctx, req, logger)

Expand Down Expand Up @@ -454,6 +455,7 @@ func (p *Plugin) executeForConnection(streamContext context.Context, req *proto.
ConnectionName: connectionName,
TtlSeconds: queryContext.CacheTTL,
CallId: connectionCallId,
SortOrder: queryContext.SortOrder,
StreamContext: streamContext,
}
// can we satisfy this request from the cache?
Expand Down
7 changes: 7 additions & 0 deletions plugin/query_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin

import (
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"log"
)

/*
Expand All @@ -21,6 +22,7 @@ type QueryContext struct {
Limit *int64
CacheEnabled bool
CacheTTL int64
SortOrder []*proto.SortColumn
}

// NewQueryContext maps from a [proto.QueryContext] to a [plugin.QueryContext].
Expand All @@ -29,6 +31,7 @@ func NewQueryContext(p *proto.QueryContext, limit *proto.NullableInt, cacheEnabl
UnsafeQuals: p.Quals,
CacheEnabled: cacheEnabled,
CacheTTL: cacheTTL,
SortOrder: p.SortOrder,
}
if limit != nil {
q.Limit = &limit.Value
Expand All @@ -43,6 +46,10 @@ func NewQueryContext(p *proto.QueryContext, limit *proto.NullableInt, cacheEnabl
q.Columns = append(q.Columns, c)
}
}

if len(q.SortOrder) > 0 {
log.Printf("[INFO] Sort order pushed down: (%d), %v:", len(q.SortOrder), q.SortOrder)
}
return q
}

Expand Down
23 changes: 19 additions & 4 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ func (d *QueryData) streamLeafListItem(ctx context.Context, items ...interface{}
// set the parent item on the row data
rd.parentItem = d.parentItem
// NOTE: add the item as the hydrate data for the list call
rd.set(d.Table.List.namedHydrate.Name, item)
// we do not expect this to fail as we just created the rowdata
// (it only fails for duplicate hydrate func values)
_ = rd.set(d.Table.List.namedHydrate.Name, item)

d.rowDataChan <- rd
}
Expand Down Expand Up @@ -693,6 +695,8 @@ func (d *QueryData) buildRowsAsync(ctx context.Context, rowChan chan *proto.Row,

// start goroutine to read items from item chan and generate row data
go func() {
// wait group used to ensure row ordering
var prevRowOrderingWg *sync.WaitGroup
for {
// wait for either an rowData or an error
select {
Expand All @@ -715,7 +719,6 @@ func (d *QueryData) buildRowsAsync(ctx context.Context, rowChan chan *proto.Row,
//log.Printf("[INFO] buildRowsAsync acquire semaphore (%s)", d.connectionCallId)
if err := rowSemaphore.Acquire(ctx, 1); err != nil {
log.Printf("[INFO] SEMAPHORE ERROR %s", err)
// TODO KAI does this quit??
d.errorChan <- err
return
}
Expand All @@ -724,7 +727,12 @@ func (d *QueryData) buildRowsAsync(ctx context.Context, rowChan chan *proto.Row,
}
}
rowWg.Add(1)
d.buildRowAsync(ctx, rowData, rowChan, &rowWg, rowSemaphore)

// increment ordering wg
rowData.orderingWg.Add(1)
d.buildRowAsync(ctx, rowData, rowChan, &rowWg, rowSemaphore, prevRowOrderingWg)
// update the wait group
prevRowOrderingWg = &rowData.orderingWg
}
}
}()
Expand Down Expand Up @@ -839,7 +847,7 @@ func (d *QueryData) streamError(err error) {

// TODO KAI this seems to get called even after cancellation
// execute necessary hydrate calls to populate row data
func (d *QueryData) buildRowAsync(ctx context.Context, rowData *rowData, rowChan chan *proto.Row, wg *sync.WaitGroup, sem *semaphore.Weighted) {
func (d *QueryData) buildRowAsync(ctx context.Context, rowData *rowData, rowChan chan *proto.Row, wg *sync.WaitGroup, sem *semaphore.Weighted, prevRowWg *sync.WaitGroup) {
go func() {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -867,7 +875,14 @@ func (d *QueryData) buildRowAsync(ctx context.Context, rowData *rowData, rowChan
// NOTE: add the Steampipecontext data to the row
d.addContextData(row, rowData)
}
// if ordering is being applied, wait until prev row is ready to ensure ordering
if len(d.QueryContext.SortOrder) > 0 && prevRowWg != nil {
prevRowWg.Wait()
}

rowChan <- row
// close our own wait group
rowData.orderingWg.Done()
}
}()
}
Expand Down
3 changes: 3 additions & 0 deletions plugin/row_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type rowData struct {

delayMapMut sync.RWMutex
hydrateConcurrencyDelay map[string]*hydrateConcurrencyDelay

// wait group to ensure correct row ordering
orderingWg sync.WaitGroup
}

// newRowData creates an empty rowData object
Expand Down
1 change: 1 addition & 0 deletions plugin/table_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (t *Table) GetSchema() (*proto.TableSchema, error) {
Name: column.Name,
Type: column.Type,
Description: column.Description,
SortOrder: column.Sort.toProto(),
}
if column.Hydrate != nil {
columnDef.Hydrate = column.namedHydrate.Name
Expand Down
14 changes: 13 additions & 1 deletion query_cache/cache_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package query_cache

import (
"context"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"fmt"
"strings"
"time"

sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
)

type CacheRequest struct {
Expand All @@ -14,6 +17,7 @@ type CacheRequest struct {
Limit int64
ConnectionName string
TtlSeconds int64
SortOrder []*sdkproto.SortColumn

resultKeyRoot string
pageCount int64
Expand All @@ -24,3 +28,11 @@ type CacheRequest struct {
func (req *CacheRequest) ttl() time.Duration {
return time.Duration(req.TtlSeconds) * time.Second
}

func (req *CacheRequest) sortOrderString() string {
var strs []string
for _, sortColumn := range req.SortOrder {
strs = append(strs, fmt.Sprintf("%s(%s)", sortColumn.Column, sortColumn.Order))
}
return strings.Join(strs, "_")
}
3 changes: 2 additions & 1 deletion query_cache/index_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (b *IndexBucket) Get(req *CacheRequest, keyColumns map[string]*proto.KeyCol
log.Printf("[TRACE] IndexBucket.Get %d items", len(b.Items))
for _, item := range b.Items {
log.Printf("[TRACE] IndexBucket.Get key %s limit %d (%s)", item.Key, item.Limit, req.CallId)
satisfiedRequest := item.satisfiesRequest(req.Columns, req.Limit, req.QualMap, keyColumns)
satisfiedRequest := item.satisfiesRequest(req.Columns, req.Limit, req.QualMap, req.SortOrder, keyColumns)
satisfiesTtl := item.satisfiesTtl(req.TtlSeconds)

log.Printf("[TRACE] satisfiedRequest: %v, satisfiesTtl: %v ttlSec: %d (%s)", satisfiedRequest, satisfiesTtl, req.TtlSeconds, req.CallId)
Expand Down Expand Up @@ -57,6 +57,7 @@ func (b *IndexBucket) AsProto() *proto.IndexBucket {
Columns: item.Columns,
Limit: item.Limit,
PageCount: item.PageCount,
SortOrder: item.SortOrder,
InsertionTime: timestamppb.New(item.InsertionTime),
}
}
Expand Down
24 changes: 20 additions & 4 deletions query_cache/index_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type IndexItem struct {
Columns []string
Key string
Limit int64
SortOrder []*proto.SortColumn
Quals map[string]*proto.Quals
InsertionTime time.Time
PageCount int64
Expand All @@ -27,24 +28,26 @@ func NewIndexItem(req *CacheRequest) *IndexItem {
Key: req.resultKeyRoot,
Limit: req.Limit,
Quals: req.QualMap,
SortOrder: req.SortOrder,
InsertionTime: time.Now(),
PageCount: req.pageCount,
}
}

func (i IndexItem) satisfiesRequest(columns []string, limit int64, qualMap map[string]*proto.Quals, keyColumns map[string]*proto.KeyColumn) bool {
func (i IndexItem) satisfiesRequest(columns []string, limit int64, qualMap map[string]*proto.Quals, sortOrder []*proto.SortColumn, keyColumns map[string]*proto.KeyColumn) bool {
satisfiedColumns := i.satisfiesColumns(columns)
satisfiesLimit := i.satisfiesLimit(limit)
satisfiesQuals := i.satisfiesQuals(qualMap, keyColumns)
satisfiesSortOrder := i.satisfiesSortOrder(sortOrder)

log.Printf("[TRACE] IndexItem satisfiesRequest: satisfiedColumns %v satisfiesLimit %v satisfiesQuals %v", satisfiedColumns, satisfiesLimit, satisfiesQuals)
return satisfiedColumns && satisfiesLimit && satisfiesQuals
log.Printf("[TRACE] IndexItem satisfiesRequest: satisfiedColumns %v satisfiesLimit %v satisfiesQuals %v satisfiesSortOrder %v", satisfiedColumns, satisfiesLimit, satisfiesQuals, satisfiesSortOrder)
return satisfiedColumns && satisfiesLimit && satisfiesQuals && satisfiesSortOrder
}

func (i IndexItem) satisfiedByRequest(req *CacheRequest, keyColumns map[string]*proto.KeyColumn) bool {
// make an index item for the request
requestIndexItem := NewIndexItem(req)
return requestIndexItem.satisfiesRequest(i.Columns, i.Limit, i.Quals, keyColumns)
return requestIndexItem.satisfiesRequest(i.Columns, i.Limit, i.Quals, i.SortOrder, keyColumns)
}

// satisfiesColumns returns whether this index item satisfies the given columns
Expand Down Expand Up @@ -151,5 +154,18 @@ func (i IndexItem) satisfiesTtl(ttlSeconds int64) bool {
log.Printf("[TRACE] satisfiesTtl: cache ttl %d has NOT expired (%fs)", ttlSeconds, timeSince.Seconds())

return true
}

// does this item satisfy the sort order
// if an order was specified, this item must implement the exact same order
func (i IndexItem) satisfiesSortOrder(sortOrder []*proto.SortColumn) bool {
if len(sortOrder) != len(i.SortOrder) {
return false
}
for idx, o := range sortOrder {
if !o.Equals(i.SortOrder[idx]) {
return false
}
}
return true
}
2 changes: 1 addition & 1 deletion query_cache/pending_index_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newPendingIndexItem(pendingSetRequest *setRequest) *pendingIndexItem {

// SatisfiesRequest returns whether our index item satisfies the given cache request
func (i *pendingIndexItem) SatisfiesRequest(req *CacheRequest, keyColumns map[string]*proto.KeyColumn) bool {
return i.item.satisfiesRequest(req.Columns, req.Limit, req.QualMap, keyColumns)
return i.item.satisfiesRequest(req.Columns, req.Limit, req.QualMap, req.SortOrder, keyColumns)
}

// SatisfiedByRequest returns whether our index item would be satisfied by the given cache request
Expand Down
5 changes: 3 additions & 2 deletions query_cache/query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,13 @@ func (c *QueryCache) buildResultKey(req *CacheRequest) string {
if len(req.QualMap) > 0 {
qualString = fmt.Sprintf("_%s", c.formatQualMapForKey(req.QualMap))
}
str := c.sanitiseKey(fmt.Sprintf("%s_%s%s_%s_%d",
str := c.sanitiseKey(fmt.Sprintf("%s_%s%s_%s_%d_%s",
req.ConnectionName,
req.Table,
qualString,
strings.Join(req.Columns, ","),
req.Limit))
req.Limit,
req.sortOrderString()))
return str
}

Expand Down

0 comments on commit 0a7151d

Please sign in to comment.