From b6bb521f3fc8b4087df615ac10e37a0669f3b573 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 14:12:54 +0800 Subject: [PATCH 01/15] introduce execCloseWatcher to watch ctx and stop executor --- executor/adapter.go | 47 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/executor/adapter.go b/executor/adapter.go index 6a8443ddbd81d..d76603ec8bf16 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -220,6 +220,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { if err != nil { return nil, errors.Trace(err) } + e = wrapCloseWatcher(e) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -484,3 +485,49 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco return false, nil } } + +// execCloseWatcher used to watch the context specified in Open and +// stop the wrapped executor when this context is cancelled. +type execCloseWatcher struct { + Executor + // 0, 1, 2 represent not started, running, closed + status int32 + exit chan struct{} +} + +func wrapCloseWatcher(exec Executor) Executor { + return &execCloseWatcher{exec, 0, make(chan struct{})} +} + +func (ecw *execCloseWatcher) Open(ctx context.Context) error { + if !atomic.CompareAndSwapInt32(&ecw.status, 0, 1) { + return nil + } + + if err := ecw.Executor.Open(ctx); err != nil { + return err + } + go ecw.watch(ctx) + + return nil +} + +func (ecw *execCloseWatcher) Close() error { + if !atomic.CompareAndSwapInt32(&ecw.status, 1, 2) { + return nil + } + close(ecw.exit) + return ecw.Executor.Close() +} + +func (ecw *execCloseWatcher) watch(ctx context.Context) { + select { + case <-ctx.Done(): + if err := ecw.Close(); err != nil { + logutil.Logger(ctx).Error("cancel executor in execCloseWatcher err", zap.Error(err)) + } + return + case <-ecw.exit: + return + } +} From 0f9037a13524e7703f4172554d5da7628690726d Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 15:00:48 +0800 Subject: [PATCH 02/15] add UT --- executor/adapter.go | 25 ++++----- executor/adapter_test.go | 111 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 12 deletions(-) create mode 100644 executor/adapter_test.go diff --git a/executor/adapter.go b/executor/adapter.go index d76603ec8bf16..990522c17bc98 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -220,7 +220,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { if err != nil { return nil, errors.Trace(err) } - e = wrapCloseWatcher(e) + e = wrapCtxWatcher(e) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -486,25 +486,26 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco } } -// execCloseWatcher used to watch the context specified in Open and +// execCtxWatcher used to watch the context specified in Open and // stop the wrapped executor when this context is cancelled. -type execCloseWatcher struct { +type execCtxWatcher struct { Executor - // 0, 1, 2 represent not started, running, closed + // 0, 1, 2, 3 represent not started, failed in start, running, closed status int32 exit chan struct{} } -func wrapCloseWatcher(exec Executor) Executor { - return &execCloseWatcher{exec, 0, make(chan struct{})} +func wrapCtxWatcher(exec Executor) Executor { + return &execCtxWatcher{exec, 0, make(chan struct{})} } -func (ecw *execCloseWatcher) Open(ctx context.Context) error { - if !atomic.CompareAndSwapInt32(&ecw.status, 0, 1) { +func (ecw *execCtxWatcher) Open(ctx context.Context) error { + if !atomic.CompareAndSwapInt32(&ecw.status, 0, 2) { return nil } if err := ecw.Executor.Open(ctx); err != nil { + atomic.StoreInt32(&ecw.status, 1) return err } go ecw.watch(ctx) @@ -512,19 +513,19 @@ func (ecw *execCloseWatcher) Open(ctx context.Context) error { return nil } -func (ecw *execCloseWatcher) Close() error { - if !atomic.CompareAndSwapInt32(&ecw.status, 1, 2) { +func (ecw *execCtxWatcher) Close() error { + if !atomic.CompareAndSwapInt32(&ecw.status, 2, 3) { return nil } close(ecw.exit) return ecw.Executor.Close() } -func (ecw *execCloseWatcher) watch(ctx context.Context) { +func (ecw *execCtxWatcher) watch(ctx context.Context) { select { case <-ctx.Done(): if err := ecw.Close(); err != nil { - logutil.Logger(ctx).Error("cancel executor in execCloseWatcher err", zap.Error(err)) + logutil.Logger(ctx).Error("cancel executor in execCtxWatcher err", zap.Error(err)) } return case <-ecw.exit: diff --git a/executor/adapter_test.go b/executor/adapter_test.go new file mode 100644 index 0000000000000..421e601e3ffb4 --- /dev/null +++ b/executor/adapter_test.go @@ -0,0 +1,111 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/mock" + "github.com/pkg/errors" + "time" +) + +var _ = Suite(&adapterTestSuite{}) + +type adapterTestSuite struct{} + +type emptyExec struct { + baseExecutor + closeCnt int + openCnt int + openErr error +} + +func (ee *emptyExec) Close() error { + ee.closeCnt++ + return nil +} + +func (ee *emptyExec) Open(ctx context.Context) error { + ee.openCnt++ + return ee.openErr +} + +func newEmptyExec() *emptyExec { + sctx := mock.NewContext() + return &emptyExec{newBaseExecutor(sctx, nil, ""), 0, 0, nil} +} + +func (s *adapterTestSuite) TestCtxWatcher(c *C) { + // Open and Close multiple times + e := newEmptyExec() + we := wrapCtxWatcher(e) + ctx, cancel := context.WithCancel(context.Background()) + we.Open(ctx) + we.Open(ctx) + c.Assert(e.openCnt, Equals, 1) + we.Close() + we.Close() + c.Assert(e.closeCnt, Equals, 1) + + // Open and Cancel + e = newEmptyExec() + we = wrapCtxWatcher(e) + ctx, cancel = context.WithCancel(context.Background()) + we.Open(ctx) + c.Assert(e.openCnt, Equals, 1) + cancel() + time.Sleep(time.Millisecond * 10) + c.Assert(e.closeCnt, Equals, 1) + we.Close() + c.Assert(e.closeCnt, Equals, 1) + + // Open and Close anc Cancel + e = newEmptyExec() + we = wrapCtxWatcher(e) + ctx, cancel = context.WithCancel(context.Background()) + we.Open(ctx) + c.Assert(e.openCnt, Equals, 1) + we.Close() + c.Assert(e.closeCnt, Equals, 1) + cancel() + time.Sleep(time.Millisecond * 10) + c.Assert(e.closeCnt, Equals, 1) + + // Open error + e = newEmptyExec() + we = wrapCtxWatcher(e) + ctx, cancel = context.WithCancel(context.Background()) + e.openErr = errors.New("foo") + we.Open(ctx) + c.Assert(e.openCnt, Equals, 1) + we.Close() + c.Assert(e.closeCnt, Equals, 0) + cancel() + time.Sleep(time.Millisecond * 10) + c.Assert(e.closeCnt, Equals, 0) + + // Close and Cancel multiple times + e = newEmptyExec() + we = wrapCtxWatcher(e) + ctx, cancel = context.WithCancel(context.Background()) + we.Open(ctx) + c.Assert(e.openCnt, Equals, 1) + for i := 0; i < 10;i ++ { + we.Close() + cancel() + } + time.Sleep(time.Millisecond * 10) + c.Assert(e.closeCnt, Equals, 1) +} From 9fcf1dfaddb4b4bc556702d22f2794942b91588c Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 15:01:51 +0800 Subject: [PATCH 03/15] reformat --- executor/adapter_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executor/adapter_test.go b/executor/adapter_test.go index 421e601e3ffb4..8701fe81e0d65 100644 --- a/executor/adapter_test.go +++ b/executor/adapter_test.go @@ -15,10 +15,11 @@ package executor import ( "context" + "time" + . "github.com/pingcap/check" "github.com/pingcap/tidb/util/mock" "github.com/pkg/errors" - "time" ) var _ = Suite(&adapterTestSuite{}) @@ -102,7 +103,7 @@ func (s *adapterTestSuite) TestCtxWatcher(c *C) { ctx, cancel = context.WithCancel(context.Background()) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) - for i := 0; i < 10;i ++ { + for i := 0; i < 10; i ++ { we.Close() cancel() } From 2a9f9d4b52e23a17af50dc29e354c1e674c4a81b Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 15:11:07 +0800 Subject: [PATCH 04/15] reformat --- executor/adapter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/adapter_test.go b/executor/adapter_test.go index 8701fe81e0d65..c2623d692f4f8 100644 --- a/executor/adapter_test.go +++ b/executor/adapter_test.go @@ -103,7 +103,7 @@ func (s *adapterTestSuite) TestCtxWatcher(c *C) { ctx, cancel = context.WithCancel(context.Background()) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) - for i := 0; i < 10; i ++ { + for i := 0; i < 10; i++ { we.Close() cancel() } From 00c692357be8724ce50a7b1944b1ce01f2e7824e Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 15:26:27 +0800 Subject: [PATCH 05/15] fix handleNoDelayExecutor --- executor/adapter.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 990522c17bc98..34ed751a5a1b6 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -216,11 +216,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }() } - e, err := a.buildExecutor(sctx) + exec, err := a.buildExecutor(sctx) if err != nil { return nil, errors.Trace(err) } - e = wrapCtxWatcher(e) + e := wrapCtxWatcher(exec) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -247,7 +247,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { // If the executor doesn't return any result to the client, we execute it without delay. if e.Schema().Len() == 0 { return a.handleNoDelayExecutor(ctx, sctx, e) - } else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay { + } else if proj, ok := e.Executor.(*ProjectionExec); ok && proj.calculateNoDelay { // Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example: // the Projection has two expressions and two columns in the schema, but we should // not return the result of the two expressions. @@ -269,7 +269,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }, nil } -func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) { +func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execCtxWatcher) (sqlexec.RecordSet, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -277,7 +277,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. - switch e.(type) { + switch e.Executor.(type) { case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec: snapshotTS := sctx.GetSessionVars().SnapshotTS if snapshotTS != 0 { @@ -495,7 +495,7 @@ type execCtxWatcher struct { exit chan struct{} } -func wrapCtxWatcher(exec Executor) Executor { +func wrapCtxWatcher(exec Executor) *execCtxWatcher { return &execCtxWatcher{exec, 0, make(chan struct{})} } From bd39348bd5d5f9cb8dc1f885f4452908490ca2fa Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 15:30:52 +0800 Subject: [PATCH 06/15] update errors pkg --- executor/adapter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/adapter_test.go b/executor/adapter_test.go index c2623d692f4f8..0a58baa31b489 100644 --- a/executor/adapter_test.go +++ b/executor/adapter_test.go @@ -15,11 +15,11 @@ package executor import ( "context" + "errors" "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/util/mock" - "github.com/pkg/errors" ) var _ = Suite(&adapterTestSuite{}) From c41c8eb30ff74a2c1fc5e2eecf2ec01dff0d50b4 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 15:32:53 +0800 Subject: [PATCH 07/15] fix CI --- executor/adapter_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/adapter_test.go b/executor/adapter_test.go index 0a58baa31b489..cf3ec578bef39 100644 --- a/executor/adapter_test.go +++ b/executor/adapter_test.go @@ -52,7 +52,7 @@ func (s *adapterTestSuite) TestCtxWatcher(c *C) { // Open and Close multiple times e := newEmptyExec() we := wrapCtxWatcher(e) - ctx, cancel := context.WithCancel(context.Background()) + ctx, _ := context.WithCancel(context.Background()) we.Open(ctx) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) @@ -63,7 +63,7 @@ func (s *adapterTestSuite) TestCtxWatcher(c *C) { // Open and Cancel e = newEmptyExec() we = wrapCtxWatcher(e) - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) cancel() From cbe72128585458a5c54673d5c17af8a4a8b2cf67 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 19:21:39 +0800 Subject: [PATCH 08/15] fix CI --- executor/adapter_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executor/adapter_test.go b/executor/adapter_test.go index cf3ec578bef39..0ec3407ba76ee 100644 --- a/executor/adapter_test.go +++ b/executor/adapter_test.go @@ -52,18 +52,19 @@ func (s *adapterTestSuite) TestCtxWatcher(c *C) { // Open and Close multiple times e := newEmptyExec() we := wrapCtxWatcher(e) - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) we.Open(ctx) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) we.Close() we.Close() c.Assert(e.closeCnt, Equals, 1) + cancel() // Open and Cancel e = newEmptyExec() we = wrapCtxWatcher(e) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(context.Background()) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) cancel() From 7dbb6fced57c1206878ccdb78e8b3dd7a27361c8 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 21 Mar 2019 19:23:03 +0800 Subject: [PATCH 09/15] fix CI --- executor/adapter_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/executor/adapter_test.go b/executor/adapter_test.go index 0ec3407ba76ee..78b79d377db63 100644 --- a/executor/adapter_test.go +++ b/executor/adapter_test.go @@ -104,10 +104,12 @@ func (s *adapterTestSuite) TestCtxWatcher(c *C) { ctx, cancel = context.WithCancel(context.Background()) we.Open(ctx) c.Assert(e.openCnt, Equals, 1) - for i := 0; i < 10; i++ { - we.Close() - cancel() - } + we.Close() + cancel() + we.Close() + cancel() + we.Close() + cancel() time.Sleep(time.Millisecond * 10) c.Assert(e.closeCnt, Equals, 1) } From 933fd0493d7d10954055a32d8167d452ecf98e92 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 27 Mar 2019 18:59:11 +0800 Subject: [PATCH 10/15] update --- executor/adapter.go | 45 ++++++--------- executor/adapter_test.go | 115 --------------------------------------- 2 files changed, 18 insertions(+), 142 deletions(-) delete mode 100644 executor/adapter_test.go diff --git a/executor/adapter.go b/executor/adapter.go index 34ed751a5a1b6..f26216ce03d7d 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -220,7 +220,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { if err != nil { return nil, errors.Trace(err) } - e := wrapCtxWatcher(exec) + e := wrapReentrant(exec) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -269,7 +269,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }, nil } -func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execCtxWatcher) (sqlexec.RecordSet, error) { +func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execReentrantWrapper) (sqlexec.RecordSet, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -486,49 +486,40 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco } } -// execCtxWatcher used to watch the context specified in Open and +const ( + reentrantWrapperNotStart = iota + reentrantWrapperStartFail + reentrantWrapperRunning + reentrantWrapperClosed +) + +// execReentrantWrapper used to watch the context specified in Open and // stop the wrapped executor when this context is cancelled. -type execCtxWatcher struct { +type execReentrantWrapper struct { Executor // 0, 1, 2, 3 represent not started, failed in start, running, closed status int32 - exit chan struct{} } -func wrapCtxWatcher(exec Executor) *execCtxWatcher { - return &execCtxWatcher{exec, 0, make(chan struct{})} +func wrapReentrant(exec Executor) *execReentrantWrapper { + return &execReentrantWrapper{exec, reentrantWrapperNotStart} } -func (ecw *execCtxWatcher) Open(ctx context.Context) error { - if !atomic.CompareAndSwapInt32(&ecw.status, 0, 2) { +func (ecw *execReentrantWrapper) Open(ctx context.Context) error { + if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperNotStart, reentrantWrapperRunning) { return nil } if err := ecw.Executor.Open(ctx); err != nil { - atomic.StoreInt32(&ecw.status, 1) + atomic.StoreInt32(&ecw.status, reentrantWrapperStartFail) return err } - go ecw.watch(ctx) - return nil } -func (ecw *execCtxWatcher) Close() error { - if !atomic.CompareAndSwapInt32(&ecw.status, 2, 3) { +func (ecw *execReentrantWrapper) Close() error { + if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperRunning, reentrantWrapperClosed) { return nil } - close(ecw.exit) return ecw.Executor.Close() } - -func (ecw *execCtxWatcher) watch(ctx context.Context) { - select { - case <-ctx.Done(): - if err := ecw.Close(); err != nil { - logutil.Logger(ctx).Error("cancel executor in execCtxWatcher err", zap.Error(err)) - } - return - case <-ecw.exit: - return - } -} diff --git a/executor/adapter_test.go b/executor/adapter_test.go deleted file mode 100644 index 78b79d377db63..0000000000000 --- a/executor/adapter_test.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package executor - -import ( - "context" - "errors" - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/tidb/util/mock" -) - -var _ = Suite(&adapterTestSuite{}) - -type adapterTestSuite struct{} - -type emptyExec struct { - baseExecutor - closeCnt int - openCnt int - openErr error -} - -func (ee *emptyExec) Close() error { - ee.closeCnt++ - return nil -} - -func (ee *emptyExec) Open(ctx context.Context) error { - ee.openCnt++ - return ee.openErr -} - -func newEmptyExec() *emptyExec { - sctx := mock.NewContext() - return &emptyExec{newBaseExecutor(sctx, nil, ""), 0, 0, nil} -} - -func (s *adapterTestSuite) TestCtxWatcher(c *C) { - // Open and Close multiple times - e := newEmptyExec() - we := wrapCtxWatcher(e) - ctx, cancel := context.WithCancel(context.Background()) - we.Open(ctx) - we.Open(ctx) - c.Assert(e.openCnt, Equals, 1) - we.Close() - we.Close() - c.Assert(e.closeCnt, Equals, 1) - cancel() - - // Open and Cancel - e = newEmptyExec() - we = wrapCtxWatcher(e) - ctx, cancel = context.WithCancel(context.Background()) - we.Open(ctx) - c.Assert(e.openCnt, Equals, 1) - cancel() - time.Sleep(time.Millisecond * 10) - c.Assert(e.closeCnt, Equals, 1) - we.Close() - c.Assert(e.closeCnt, Equals, 1) - - // Open and Close anc Cancel - e = newEmptyExec() - we = wrapCtxWatcher(e) - ctx, cancel = context.WithCancel(context.Background()) - we.Open(ctx) - c.Assert(e.openCnt, Equals, 1) - we.Close() - c.Assert(e.closeCnt, Equals, 1) - cancel() - time.Sleep(time.Millisecond * 10) - c.Assert(e.closeCnt, Equals, 1) - - // Open error - e = newEmptyExec() - we = wrapCtxWatcher(e) - ctx, cancel = context.WithCancel(context.Background()) - e.openErr = errors.New("foo") - we.Open(ctx) - c.Assert(e.openCnt, Equals, 1) - we.Close() - c.Assert(e.closeCnt, Equals, 0) - cancel() - time.Sleep(time.Millisecond * 10) - c.Assert(e.closeCnt, Equals, 0) - - // Close and Cancel multiple times - e = newEmptyExec() - we = wrapCtxWatcher(e) - ctx, cancel = context.WithCancel(context.Background()) - we.Open(ctx) - c.Assert(e.openCnt, Equals, 1) - we.Close() - cancel() - we.Close() - cancel() - we.Close() - cancel() - time.Sleep(time.Millisecond * 10) - c.Assert(e.closeCnt, Equals, 1) -} From 93257988143d566ac14dfd4f36f45fa6e2304212 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 27 Mar 2019 19:29:36 +0800 Subject: [PATCH 11/15] update the way to kill background executors --- executor/adapter.go | 47 ++++----------------------------------------- server/conn.go | 8 ++++++-- server/server.go | 6 ++++++ 3 files changed, 16 insertions(+), 45 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index f26216ce03d7d..6a8443ddbd81d 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -216,11 +216,10 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }() } - exec, err := a.buildExecutor(sctx) + e, err := a.buildExecutor(sctx) if err != nil { return nil, errors.Trace(err) } - e := wrapReentrant(exec) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -247,7 +246,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { // If the executor doesn't return any result to the client, we execute it without delay. if e.Schema().Len() == 0 { return a.handleNoDelayExecutor(ctx, sctx, e) - } else if proj, ok := e.Executor.(*ProjectionExec); ok && proj.calculateNoDelay { + } else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay { // Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example: // the Projection has two expressions and two columns in the schema, but we should // not return the result of the two expressions. @@ -269,7 +268,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }, nil } -func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execReentrantWrapper) (sqlexec.RecordSet, error) { +func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -277,7 +276,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. - switch e.Executor.(type) { + switch e.(type) { case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec: snapshotTS := sctx.GetSessionVars().SnapshotTS if snapshotTS != 0 { @@ -485,41 +484,3 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco return false, nil } } - -const ( - reentrantWrapperNotStart = iota - reentrantWrapperStartFail - reentrantWrapperRunning - reentrantWrapperClosed -) - -// execReentrantWrapper used to watch the context specified in Open and -// stop the wrapped executor when this context is cancelled. -type execReentrantWrapper struct { - Executor - // 0, 1, 2, 3 represent not started, failed in start, running, closed - status int32 -} - -func wrapReentrant(exec Executor) *execReentrantWrapper { - return &execReentrantWrapper{exec, reentrantWrapperNotStart} -} - -func (ecw *execReentrantWrapper) Open(ctx context.Context) error { - if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperNotStart, reentrantWrapperRunning) { - return nil - } - - if err := ecw.Executor.Open(ctx); err != nil { - atomic.StoreInt32(&ecw.status, reentrantWrapperStartFail) - return err - } - return nil -} - -func (ecw *execReentrantWrapper) Close() error { - if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperRunning, reentrantWrapperClosed) { - return nil - } - return ecw.Executor.Close() -} diff --git a/server/conn.go b/server/conn.go index dba6ef82a0ffd..4c808058f82fb 100644 --- a/server/conn.go +++ b/server/conn.go @@ -71,8 +71,8 @@ import ( const ( connStatusDispatching int32 = iota connStatusReading - connStatusShutdown // Closed by server. - connStatusWaitShutdown // Notified by server to close. + connStatusShutdown // Closed by server. + connStatusWaitShutdown // Notified by server to close. ) // newClientConn creates a *clientConn object. @@ -112,6 +112,7 @@ type clientConn struct { mu struct { sync.RWMutex cancelFunc context.CancelFunc + resultSets []ResultSet } } @@ -1047,6 +1048,9 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err)).Inc() return errors.Trace(err) } + cc.mu.Lock() + cc.mu.resultSets = rs + cc.mu.Unlock() if rs != nil { if len(rs) == 1 { err = cc.writeResultset(ctx, rs[0], false, 0, 0) diff --git a/server/server.go b/server/server.go index f76b7d8504ff2..5108369e47645 100644 --- a/server/server.go +++ b/server/server.go @@ -519,6 +519,12 @@ func killConn(conn *clientConn, query bool) { } conn.mu.RLock() + for _, resultSet := range conn.mu.resultSets { + // resultSet.Close() is reentrant so it's safe to kill a same connID concurrently or multiple times + if err := resultSet.Close(); err != nil { + logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err)) + } + } cancelFunc := conn.mu.cancelFunc conn.mu.RUnlock() if cancelFunc != nil { From b4c0bfe36e4c208483d177fe57980f4a3d248a4d Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 27 Mar 2019 19:55:21 +0800 Subject: [PATCH 12/15] fmt comments --- server/conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/conn.go b/server/conn.go index 4c808058f82fb..90497674e2f38 100644 --- a/server/conn.go +++ b/server/conn.go @@ -71,8 +71,8 @@ import ( const ( connStatusDispatching int32 = iota connStatusReading - connStatusShutdown // Closed by server. - connStatusWaitShutdown // Notified by server to close. + connStatusShutdown // Closed by server. + connStatusWaitShutdown // Notified by server to close. ) // newClientConn creates a *clientConn object. From e02a2c1be4a950b006dae695961f28219a415111 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 27 Mar 2019 20:12:37 +0800 Subject: [PATCH 13/15] update comment --- server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 5108369e47645..aedfd748fcebf 100644 --- a/server/server.go +++ b/server/server.go @@ -520,7 +520,7 @@ func killConn(conn *clientConn, query bool) { conn.mu.RLock() for _, resultSet := range conn.mu.resultSets { - // resultSet.Close() is reentrant so it's safe to kill a same connID concurrently or multiple times + // resultSet.Close() is reentrant so it's safe to kill a same connID multiple times if err := resultSet.Close(); err != nil { logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err)) } From 620ab5bc2524f213b4d3b5055d33e2b6936d8be0 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Fri, 29 Mar 2019 14:58:00 +0800 Subject: [PATCH 14/15] address comments --- server/server.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/server/server.go b/server/server.go index aedfd748fcebf..2ab8800c744ee 100644 --- a/server/server.go +++ b/server/server.go @@ -508,16 +508,15 @@ func (s *Server) Kill(connectionID uint64, query bool) { return } - killConn(conn, query) -} - -func killConn(conn *clientConn, query bool) { if !query { // Mark the client connection status as WaitShutdown, when the goroutine detect // this, it will end the dispatch loop and exit. atomic.StoreInt32(&conn.status, connStatusWaitShutdown) } + killConn(conn) +} +func killConn(conn *clientConn) { conn.mu.RLock() for _, resultSet := range conn.mu.resultSets { // resultSet.Close() is reentrant so it's safe to kill a same connID multiple times @@ -541,12 +540,7 @@ func (s *Server) KillAllConnections() { for _, conn := range s.clients { atomic.StoreInt32(&conn.status, connStatusShutdown) terror.Log(errors.Trace(conn.closeWithoutLock())) - conn.mu.RLock() - cancelFunc := conn.mu.cancelFunc - conn.mu.RUnlock() - if cancelFunc != nil { - cancelFunc() - } + killConn(conn) } } From d51bbc3980235e0c5a3de8fc2e74bf30e776a3b7 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Fri, 29 Mar 2019 15:30:32 +0800 Subject: [PATCH 15/15] address comments --- server/conn.go | 6 ++++++ server/driver_tidb.go | 6 +++--- server/server.go | 7 ++++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/server/conn.go b/server/conn.go index 90497674e2f38..f2fa4264eddc8 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1050,6 +1050,12 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { } cc.mu.Lock() cc.mu.resultSets = rs + status := atomic.LoadInt32(&cc.status) + if status == connStatusShutdown || status == connStatusWaitShutdown { + cc.mu.Unlock() + killConn(cc) + return errors.New("killed by another connection") + } cc.mu.Unlock() if rs != nil { if len(rs) == 1 { diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 46e9f37f4a5e8..315387a8a1ee3 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -17,6 +17,7 @@ import ( "context" "crypto/tls" "fmt" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -354,7 +355,7 @@ type tidbResultSet struct { recordSet sqlexec.RecordSet columns []*ColumnInfo rows []chunk.Row - closed bool + closed int32 } func (trs *tidbResultSet) NewRecordBatch() *chunk.RecordBatch { @@ -377,10 +378,9 @@ func (trs *tidbResultSet) GetFetchedRows() []chunk.Row { } func (trs *tidbResultSet) Close() error { - if trs.closed { + if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) { return nil } - trs.closed = true return trs.recordSet.Close() } diff --git a/server/server.go b/server/server.go index 2ab8800c744ee..1881ab7cf2bb6 100644 --- a/server/server.go +++ b/server/server.go @@ -518,14 +518,15 @@ func (s *Server) Kill(connectionID uint64, query bool) { func killConn(conn *clientConn) { conn.mu.RLock() - for _, resultSet := range conn.mu.resultSets { + resultSets := conn.mu.resultSets + cancelFunc := conn.mu.cancelFunc + conn.mu.RUnlock() + for _, resultSet := range resultSets { // resultSet.Close() is reentrant so it's safe to kill a same connID multiple times if err := resultSet.Close(); err != nil { logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err)) } } - cancelFunc := conn.mu.cancelFunc - conn.mu.RUnlock() if cancelFunc != nil { cancelFunc() }