Skip to content

Commit

Permalink
allow cancelation of long queries
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeljesus committed Mar 6, 2018
1 parent 4c3f424 commit 957a013
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
48 changes: 36 additions & 12 deletions pkg/reader/generic/sql.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package generic

import (
"context"
"database/sql"
"fmt"
"sync"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/hellofresh/klepto/pkg/database"
Expand All @@ -20,6 +22,8 @@ type (
tables []string
// columns is a cache variable for tables and there columns in the db
columns sync.Map
// timeout is the sql read operation timeout
timeout time.Duration
}

SqlEngine interface {
Expand All @@ -44,8 +48,8 @@ type (
)

// NewSqlReader creates a new sql reader
func NewSqlReader(se SqlEngine) *SqlReader {
return &SqlReader{SqlEngine: se}
func NewSqlReader(se SqlEngine, t time.Duration) *SqlReader {
return &SqlReader{SqlEngine: se, timeout: t}
}

// GetTables gets a list of all tables in the database
Expand Down Expand Up @@ -93,20 +97,40 @@ func (s *SqlReader) ReadTable(tableName string, rowChan chan<- database.Row, opt
opts.Columns = s.formatColumns(tableName, columns)
}

query, err := s.buildQuery(tableName, opts)
var (
query sq.SelectBuilder
err error
)
query, err = s.buildQuery(tableName, opts)
if err != nil {
return errors.Wrapf(err, "failed to build query for %s", tableName)
}

rows, err := query.RunWith(s.GetConnection()).Query()
if err != nil {
querySQL, queryParams, _ := query.ToSql()
logger.WithFields(log.Fields{
"query": querySQL,
"params": queryParams,
}).Warn("failed to query rows")

return errors.Wrap(err, "failed to query rows")
var rows *sql.Rows
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()

errchan := make(chan error)
defer close(errchan)
go func() {
rows, err = query.RunWith(s.GetConnection()).QueryContext(ctx)
errchan <- err
}()

select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), fmt.Sprintf("timeout during read %s table", tableName))
case err := <-errchan:
if err != nil {
querySQL, queryParams, _ := query.ToSql()
logger.WithError(err).
WithFields(log.Fields{
"query": querySQL,
"params": queryParams,
}).Warn("failed to query rows")
return errors.Wrap(err, "failed to query rows")
}
break
}

return s.publishRows(rows, rowChan, tableName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reader/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *driver) NewConnection(opts reader.ConnOpts) (reader.Reader, error) {
conn.SetMaxIdleConns(opts.MaxIdleConns)
conn.SetConnMaxLifetime(opts.MaxConnLifetime)

return NewStorage(conn), nil
return NewStorage(conn, opts.Timeout), nil
}

func init() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/reader/mysql/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type storage struct {
}

// NewStorage ...
func NewStorage(conn *sql.DB) reader.Reader {
return generic.NewSqlReader(&storage{conn})
func NewStorage(conn *sql.DB, t time.Duration) reader.Reader {
return generic.NewSqlReader(&storage{conn}, t)
}

// GetConnection return the connection
Expand Down
2 changes: 1 addition & 1 deletion pkg/reader/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (m *driver) NewConnection(opts reader.ConnOpts) (reader.Reader, error) {
return nil, err
}

return NewStorage(conn, dump), nil
return NewStorage(conn, dump, opts.Timeout), nil
}

func init() {
Expand Down
14 changes: 7 additions & 7 deletions pkg/reader/postgres/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgres
import (
"database/sql"
"strconv"
"time"

"github.com/hellofresh/klepto/pkg/reader"
"github.com/hellofresh/klepto/pkg/reader/generic"
Expand All @@ -17,13 +18,12 @@ type storage struct {
}

// NewStorage ...
func NewStorage(conn *sql.DB, dumper PgDump) reader.Reader {
return generic.NewSqlReader(
&storage{
PgDump: dumper,
connection: conn,
},
)
func NewStorage(conn *sql.DB, dumper PgDump, t time.Duration) reader.Reader {
s := &storage{
PgDump: dumper,
connection: conn,
}
return generic.NewSqlReader(s, t)
}

func (s *storage) GetConnection() *sql.DB {
Expand Down

0 comments on commit 957a013

Please sign in to comment.