Skip to content

Commit

Permalink
pass GRPC stream context to plugin rather than creating new one. #17
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre committed Jun 25, 2021
1 parent 9069309 commit 72db53f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
2 changes: 1 addition & 1 deletion plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (p *Plugin) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_E
// 3) Build row spawns goroutines for any required hydrate functions.
// 4) When hydrate functions are complete, apply transforms to generate column values. When row is ready, send on rowChan
// 5) Range over rowChan - for each row, send on results stream
ctx := context.WithValue(context.Background(), context_key.Logger, p.Logger)
ctx := context.WithValue(stream.Context(), context_key.Logger, p.Logger)

var matrixItem []map[string]interface{}
var connection *Connection
Expand Down
18 changes: 5 additions & 13 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package plugin

import (
"context"
"errors"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -55,9 +54,6 @@ type QueryData struct {
listWg sync.WaitGroup
// when executing parent child list calls, we cache the parent list result in the query data passed to the child list call
parentItem interface{}

// there was an error streaming to the grpc stream
streamingError error
}

func newQueryData(queryContext *QueryContext, table *Table, stream proto.WrapperPlugin_ExecuteServer, connection *Connection, matrix []map[string]interface{}, connectionManager *connection_manager.Manager) *QueryData {
Expand Down Expand Up @@ -265,9 +261,11 @@ func (d *QueryData) verifyCallerIsListCall(callingFunction string) bool {
}

func (d *QueryData) streamLeafListItem(ctx context.Context, item interface{}) {
if d.streamingError != nil {
// if there is streaming error, panic to force exit thread - this will be recovered higher up
panic(d.streamingError)
// if the context is cancelled, panic to break out
select {
case <-d.stream.Context().Done():
panic(contextCancelledError)
default:
}

// create rowData, passing matrixItem from context
Expand All @@ -292,8 +290,6 @@ func (d *QueryData) streamRows(_ context.Context, rowChan chan *proto.Row) error
for {
// wait for either an item or an error
select {
case <-d.stream.Context().Done():
d.streamingError = errors.New(contextCancelledError)
case err := <-d.errorChan:
log.Printf("[ERROR] streamRows error chan select: %v\n", err)
return err
Expand All @@ -306,10 +302,6 @@ func (d *QueryData) streamRows(_ context.Context, rowChan chan *proto.Row) error
return nil
}
if err := d.streamRow(row); err != nil {
// if there was an error streaming, store in d.streamingError
// - this is checked by the thread streaming list items and will cause it to terminate
d.streamingError = err

return err
}
}
Expand Down

0 comments on commit 72db53f

Please sign in to comment.