diff --git a/CHANGELOG.md b/CHANGELOG.md index 79118eee95f..1e7d67a78d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - [#6252](https://github.com/influxdata/influxdb/pull/6252): Remove TSDB listener accept message @simnv - [#6202](https://github.com/influxdata/influxdb/pull/6202): Check default SHARD DURATION when recreating the same database. - [#6296](https://github.com/influxdata/influxdb/issues/6296): Allow the implicit time field to be renamed again. +- [#6383](https://github.com/influxdata/influxdb/pull/6383): Recover from a panic during query execution. ## v0.12.0 [2016-04-05] ### Release Notes diff --git a/influxql/query_executor.go b/influxql/query_executor.go index 875c64764bf..31445d2ccb5 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -153,6 +153,7 @@ func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize in func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize int, closing <-chan struct{}, results chan *Result) { defer close(results) + defer e.recover(query, results) e.statMap.Add(statQueriesActive, 1) defer func(start time.Time) { @@ -269,6 +270,15 @@ loop: } } +func (e *QueryExecutor) recover(query *Query, results chan *Result) { + if err := recover(); err != nil { + results <- &Result{ + StatementID: -1, + Err: fmt.Errorf("%s [panic:%s]", query.String(), err), + } + } +} + func (e *QueryExecutor) executeKillQueryStatement(stmt *KillQueryStatement) error { return e.killQuery(stmt.QueryID) } diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index 3073931f69a..d819e10a708 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -250,6 +250,29 @@ func TestQueryExecutor_Close(t *testing.T) { } } +func TestQueryExecutor_Panic(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + if err != nil { + t.Fatal(err) + } + + e := influxql.NewQueryExecutor() + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + panic("test error") + }, + } + + results := e.ExecuteQuery(q, "mydb", 100, nil) + result := <-results + if len(result.Series) != 0 { + t.Errorf("expected %d rows, got %d", 0, len(result.Series)) + } + if result.Err == nil || result.Err.Error() != "SELECT count(value) FROM cpu [panic:test error]" { + t.Errorf("unexpected error: %s", result.Err) + } +} + func discardOutput(results <-chan *influxql.Result) { for range results { // Read all results and discard.