diff --git a/plugin/plugin.go b/plugin/plugin.go index a0e09989..7c942eaa 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -367,14 +367,13 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq executeSpan.End() }() - log.Printf("[TRACE] GetMatrixItem") - - // get the matrix item queryData, err := newQueryData(connectionCallId, p, queryContext, table, connectionData, executeData, outputChan) if err != nil { return err } + // get the matrix item + log.Printf("[TRACE] GetMatrixItem") var matrixItem []map[string]interface{} if table.GetMatrixItem != nil { matrixItem = table.GetMatrixItem(ctx, connectionData.Connection) @@ -389,7 +388,7 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq limit := queryContext.GetLimit() // convert qual map to type used by cache - cacheQualMap := queryData.Quals.ToProtoQualMap() + cacheQualMap := queryData.getCacheQualMap() // build cache request cacheRequest := &query_cache.CacheRequest{ Table: table.Name, diff --git a/plugin/query_data.go b/plugin/query_data.go index d7d2abe7..0f30e7e4 100644 --- a/plugin/query_data.go +++ b/plugin/query_data.go @@ -80,6 +80,8 @@ type QueryData struct { // when executing parent child list calls, we cache the parent list result in the query data passed to the child list call parentItem interface{} filteredMatrix []map[string]interface{} + // column quals which were used to filter the matrix + filteredMatrixColumns []string // ttl for the execute call cacheTtl int64 @@ -313,6 +315,10 @@ func (d *QueryData) filterMatrixItems() { // if there IS a single equals qual which DOES NOT match this matrix item, exclude the matrix item if matrixQuals.SingleEqualsQual() { includeMatrixItem = d.shouldIncludeMatrixItem(matrixQuals, val) + // store this column - we will need this when building a cache key + if !includeMatrixItem { + d.filteredMatrixColumns = append(d.filteredMatrixColumns, col) + } } } else { log.Printf("[TRACE] quals found for matrix column: %s", col) @@ -676,3 +682,17 @@ func (d *QueryData) waitForRowsToComplete(rowWg *sync.WaitGroup, rowChan chan *p logging.DisplayProfileData(10 * time.Millisecond) close(rowChan) } + +// build a map of all quals to include in the cache key +// this will include all key column quals, and also any quals which were used to filter the matrix items +func (d *QueryData) getCacheQualMap() map[string]*proto.Quals { + res := d.Quals.ToProtoQualMap() + // now add in any additional (non-keycolumn) quals which were used to folter the matrix + for _, col := range d.filteredMatrixColumns { + if _, ok := res[col]; !ok { + log.Printf("[TRACE] getCacheQualMap - adding non-key column qual %s as it was used to filter the matrix items", col) + res[col] = d.QueryContext.UnsafeQuals[col] + } + } + return res +} diff --git a/plugin/table_fetch.go b/plugin/table_fetch.go index 49daf560..8a85f76e 100644 --- a/plugin/table_fetch.go +++ b/plugin/table_fetch.go @@ -473,7 +473,11 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra rd := newRowData(queryData, nil) - if len(queryData.Matrix) == 0 { + // if a matrix is defined, run listForEach + if len(queryData.Matrix) > 0 { + log.Printf("[TRACE] doList: matrix len %d - calling listForEach", len(queryData.Matrix)) + t.listForEach(ctx, queryData, listCall) + } else { log.Printf("[TRACE] doList: no matrix item") // we cannot retry errors in the list hydrate function after streaming has started @@ -483,8 +487,6 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra log.Printf("[WARN] doList callHydrateWithRetries (%s) returned err %s", queryData.connectionCallId, err.Error()) queryData.streamError(err) } - } else { - t.listForEach(ctx, queryData, listCall) } } @@ -500,14 +502,6 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall // NOTE - we use the filtered matrix - which means we may not actually run any hydrate calls // if the quals have filtered out all matrix items (e.g. select where region = 'invalid') for _, matrixItem := range queryData.filteredMatrix { - - // check whether there is a single equals qual for each matrix item property and if so, check whether - // the matrix item property values satisfy the conditions - if !t.matrixItemMeetsQuals(matrixItem, queryData) { - log.Printf("[INFO] listForEach: matrix item item does not meet quals, %v\n", matrixItem) - continue - } - // create a context with the matrixItem fetchContext := context.WithValue(ctx, context_key.MatrixItem, matrixItem) wg.Add(1) @@ -538,21 +532,3 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall } wg.Wait() } - -func (t *Table) matrixItemMeetsQuals(matrixItem map[string]interface{}, queryData *QueryData) bool { - // for the purposes of optimisation , assume matrix item properties correspond to column names - // if this is NOT the case, it will not fail, but this optimisation will not do anything - for columnName, metadataValue := range matrixItem { - // is there a single equals qual for this column - if qualValue, ok := queryData.Quals[columnName]; ok { - if qualValue.SingleEqualsQual() { - // get the underlying qual value - requiredValue := grpc.GetQualValue(qualValue.Quals[0].Value) - if requiredValue != metadataValue { - return false - } - } - } - } - return true -}