Skip to content

Commit

Permalink
When executing a query with matrix items, if non key-column quals are…
Browse files Browse the repository at this point in the history
… used which filter the matrix item list, they must be included in the cache key. Closes #402
  • Loading branch information
kaidaguerre committed Sep 2, 2022
1 parent 152e719 commit 45be502
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 33 deletions.
7 changes: 3 additions & 4 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,13 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq
executeSpan.End()
}()

log.Printf("[TRACE] GetMatrixItem")

// get the matrix item
queryData, err := newQueryData(connectionCallId, p, queryContext, table, connectionData, executeData, outputChan)
if err != nil {
return err
}

// get the matrix item
log.Printf("[TRACE] GetMatrixItem")
var matrixItem []map[string]interface{}
if table.GetMatrixItem != nil {
matrixItem = table.GetMatrixItem(ctx, connectionData.Connection)
Expand All @@ -389,7 +388,7 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq
limit := queryContext.GetLimit()

// convert qual map to type used by cache
cacheQualMap := queryData.Quals.ToProtoQualMap()
cacheQualMap := queryData.getCacheQualMap()
// build cache request
cacheRequest := &query_cache.CacheRequest{
Table: table.Name,
Expand Down
20 changes: 20 additions & 0 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type QueryData struct {
// when executing parent child list calls, we cache the parent list result in the query data passed to the child list call
parentItem interface{}
filteredMatrix []map[string]interface{}
// column quals which were used to filter the matrix
filteredMatrixColumns []string

// ttl for the execute call
cacheTtl int64
Expand Down Expand Up @@ -313,6 +315,10 @@ func (d *QueryData) filterMatrixItems() {
// if there IS a single equals qual which DOES NOT match this matrix item, exclude the matrix item
if matrixQuals.SingleEqualsQual() {
includeMatrixItem = d.shouldIncludeMatrixItem(matrixQuals, val)
// store this column - we will need this when building a cache key
if !includeMatrixItem {
d.filteredMatrixColumns = append(d.filteredMatrixColumns, col)
}
}
} else {
log.Printf("[TRACE] quals found for matrix column: %s", col)
Expand Down Expand Up @@ -676,3 +682,17 @@ func (d *QueryData) waitForRowsToComplete(rowWg *sync.WaitGroup, rowChan chan *p
logging.DisplayProfileData(10 * time.Millisecond)
close(rowChan)
}

// build a map of all quals to include in the cache key
// this will include all key column quals, and also any quals which were used to filter the matrix items
func (d *QueryData) getCacheQualMap() map[string]*proto.Quals {
res := d.Quals.ToProtoQualMap()
// now add in any additional (non-keycolumn) quals which were used to folter the matrix
for _, col := range d.filteredMatrixColumns {
if _, ok := res[col]; !ok {
log.Printf("[TRACE] getCacheQualMap - adding non-key column qual %s as it was used to filter the matrix items", col)
res[col] = d.QueryContext.UnsafeQuals[col]
}
}
return res
}
34 changes: 5 additions & 29 deletions plugin/table_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra

rd := newRowData(queryData, nil)

if len(queryData.Matrix) == 0 {
// if a matrix is defined, run listForEach
if len(queryData.Matrix) > 0 {
log.Printf("[TRACE] doList: matrix len %d - calling listForEach", len(queryData.Matrix))
t.listForEach(ctx, queryData, listCall)
} else {
log.Printf("[TRACE] doList: no matrix item")

// we cannot retry errors in the list hydrate function after streaming has started
Expand All @@ -483,8 +487,6 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra
log.Printf("[WARN] doList callHydrateWithRetries (%s) returned err %s", queryData.connectionCallId, err.Error())
queryData.streamError(err)
}
} else {
t.listForEach(ctx, queryData, listCall)
}
}

Expand All @@ -500,14 +502,6 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall
// NOTE - we use the filtered matrix - which means we may not actually run any hydrate calls
// if the quals have filtered out all matrix items (e.g. select where region = 'invalid')
for _, matrixItem := range queryData.filteredMatrix {

// check whether there is a single equals qual for each matrix item property and if so, check whether
// the matrix item property values satisfy the conditions
if !t.matrixItemMeetsQuals(matrixItem, queryData) {
log.Printf("[INFO] listForEach: matrix item item does not meet quals, %v\n", matrixItem)
continue
}

// create a context with the matrixItem
fetchContext := context.WithValue(ctx, context_key.MatrixItem, matrixItem)
wg.Add(1)
Expand Down Expand Up @@ -538,21 +532,3 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall
}
wg.Wait()
}

func (t *Table) matrixItemMeetsQuals(matrixItem map[string]interface{}, queryData *QueryData) bool {
// for the purposes of optimisation , assume matrix item properties correspond to column names
// if this is NOT the case, it will not fail, but this optimisation will not do anything
for columnName, metadataValue := range matrixItem {
// is there a single equals qual for this column
if qualValue, ok := queryData.Quals[columnName]; ok {
if qualValue.SingleEqualsQual() {
// get the underlying qual value
requiredValue := grpc.GetQualValue(qualValue.Quals[0].Value)
if requiredValue != metadataValue {
return false
}
}
}
}
return true
}

0 comments on commit 45be502

Please sign in to comment.