Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: TiFlash supports stale read #40048

Merged
merged 13 commits into from
Dec 27, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comment
Signed-off-by: hehechen <[email protected]>
hehechen committed Dec 26, 2022
commit 705a542f4ee137ecf106b1f8290a3c8aeaa76ca0
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
@@ -38,11 +38,11 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, rootDestinationTask *kv.MPPTask) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, mppQueryID kv.MPPQueryID) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, rootDestinationTask)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID)
if resp == nil {
return nil, errors.New("client returns nil response")
}
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
@@ -1407,6 +1407,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
// Clean the MPP query info
sessVars.StmtCtx.MPPQueryInfo.QueryID.Store(0)
sessVars.StmtCtx.MPPQueryInfo.QueryTS.Store(0)
sessVars.StmtCtx.MPPQueryInfo.AllocatedMPPTaskID.Store(0)

if sessVars.StmtCtx.ReadFromTableCache {
metrics.ReadFromTableCacheCounter.Inc()
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
@@ -132,8 +132,8 @@ func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *Te
isStaleness: staleread.IsStmtStaleness(ctx),
txnScope: txnManager.GetTxnScope(),
readReplicaScope: txnManager.GetReadReplicaScope(),
queryTS: uint64(time.Now().UnixNano()),
localQueryID: plannercore.AllocMPPQueryID(),
queryTS: getMPPQueryTS(ctx),
localQueryID: getMPPQueryID(ctx),
}
}

17 changes: 15 additions & 2 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
@@ -16,10 +16,12 @@ package executor

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
@@ -38,6 +40,18 @@ func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader
return ok
}

func getMPPQueryID(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
mppQueryInfo.QueryID.CompareAndSwap(0, plannercore.AllocMPPQueryID())
return mppQueryInfo.QueryID.Load()
}

func getMPPQueryTS(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
mppQueryInfo.QueryTS.CompareAndSwap(0, uint64(time.Now().UnixNano()))
return mppQueryInfo.QueryTS.Load()
}

// MPPGather dispatch MPP tasks and read data from root tasks.
type MPPGather struct {
// following fields are construct needed
@@ -119,7 +133,6 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
if err != nil {
return errors.Trace(err)
}
rootDestinationTask := frags[0].ExchangeSender.TargetTasks[0]
for _, frag := range frags {
err = e.appendMPPDispatchReq(frag)
if err != nil {
@@ -131,7 +144,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, rootDestinationTask)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, kv.MPPQueryID{QueryTs: e.queryTS, LocalQueryID: e.localQueryID, ServerID: domain.GetDomain(e.ctx).ServerID()})
if err != nil {
return errors.Trace(err)
}
4 changes: 3 additions & 1 deletion executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/testkit"
@@ -267,7 +268,8 @@ func TestMppExecution(t *testing.T) {
tk.MustExec("begin")
tk.MustQuery("select count(*) from ( select * from t2 group by a, b) A group by A.b").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from t1 where t1.a+100 > ( select count(*) from t2 where t1.a=t2.a and t1.b=t2.b) group by t1.b").Check(testkit.Rows("4"))

taskID := plannercore.AllocMPPTaskID(tk.Session())
require.Equal(t, int64(1), taskID)
tk.MustExec("commit")

failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`)
8 changes: 7 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,12 @@ type MPPTaskMeta interface {
GetAddress() string
}

type MPPQueryID struct {
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
ServerID uint64
}

// MPPTask means the minimum execution unit of a mpp computation job.
type MPPTask struct {
Meta MPPTaskMeta // on which store this task will execute
@@ -93,7 +99,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, rootDestinationTask *MPPTask) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
18 changes: 9 additions & 9 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
@@ -103,16 +103,16 @@ func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, queryTs uint64
return g.generateMPPTasks(sender)
}

var mppTaskID int64 = 1

// AllocMPPTaskID allocates task id for mpp tasks.
func AllocMPPTaskID() int64 {
return atomic.AddInt64(&mppTaskID, 1)
func AllocMPPTaskID(ctx sessionctx.Context) int64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
return mppQueryInfo.AllocatedMPPTaskID.Add(1)
}

// AllocMPPQueryID allocates local query id for mpp queries, just reuse mppTaskID.
var mppQueryID uint64 = 1

// AllocMPPQueryID allocates local query id for mpp queries.
func AllocMPPQueryID() uint64 {
return uint64(atomic.AddInt64(&mppTaskID, 1))
return atomic.AddUint64(&mppQueryID, 1)
}

func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) {
@@ -154,7 +154,7 @@ func (e *mppTaskGenerator) constructMPPTasksByChildrenTasks(tasks []*kv.MPPTask)
if !ok {
mppTask := &kv.MPPTask{
Meta: &mppAddr{addr: addr},
ID: AllocMPPTaskID(),
ID: AllocMPPTaskID(e.ctx),
QueryTs: e.queryTS,
LocalQueryID: e.localQueryID,
ServerID: domain.GetDomain(e.ctx).ServerID(),
@@ -410,7 +410,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
tasks := make([]*kv.MPPTask, 0, len(metas))
for _, meta := range metas {
task := &kv.MPPTask{Meta: meta,
ID: AllocMPPTaskID(),
ID: AllocMPPTaskID(e.ctx),
StartTs: e.startTS,
QueryTs: e.queryTS,
LocalQueryID: e.localQueryID,
7 changes: 7 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
@@ -374,6 +374,13 @@ type StatementContext struct {
HasFKCascades bool
}

// MPPQueryInfo stores some id and timestamp of current MPP query statement.
MPPQueryInfo struct {
QueryID atomic2.Uint64
QueryTS atomic2.Uint64
AllocatedMPPTaskID atomic2.Int64
}

// TableStats stores the visited runtime table stats by table id during query
TableStats map[int64]interface{}
// useChunkAlloc indicates whether statement use chunk alloc
7 changes: 4 additions & 3 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
@@ -129,9 +129,10 @@ func TestSession(t *testing.T) {
}

func TestAllocMPPID(t *testing.T) {
require.Equal(t, int64(2), plannercore.AllocMPPTaskID())
require.Equal(t, int64(3), plannercore.AllocMPPTaskID())
require.Equal(t, int64(4), plannercore.AllocMPPTaskID())
ctx := mock.NewContext()
require.Equal(t, int64(1), plannercore.AllocMPPTaskID(ctx))
require.Equal(t, int64(2), plannercore.AllocMPPTaskID(ctx))
require.Equal(t, int64(3), plannercore.AllocMPPTaskID(ctx))
}

func TestSlowLogFormat(t *testing.T) {
16 changes: 8 additions & 8 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
@@ -143,8 +143,8 @@ type mppIterator struct {
tasks []*kv.MPPDispatchRequest
finishCh chan struct{}

startTs uint64
rootDestinationTask *kv.MPPTask
startTs uint64
mppQueryID kv.MPPQueryID

respChan chan *mppResponse

@@ -336,7 +336,7 @@ func (m *mppIterator) cancelMppTasks() {
m.mu.Lock()
defer m.mu.Unlock()
killReq := &mpp.CancelTaskRequest{
Meta: &mpp.TaskMeta{StartTs: m.startTs, QueryTs: m.rootDestinationTask.QueryTs, LocalQueryId: m.rootDestinationTask.LocalQueryID, ServerId: m.rootDestinationTask.ServerID},
Meta: &mpp.TaskMeta{StartTs: m.startTs, QueryTs: m.mppQueryID.QueryTs, LocalQueryId: m.mppQueryID.LocalQueryID, ServerId: m.mppQueryID.ServerID},
}

disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash
@@ -377,9 +377,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
SenderMeta: taskMeta,
ReceiverMeta: &mpp.TaskMeta{
StartTs: req.StartTs,
QueryTs: m.rootDestinationTask.QueryTs,
LocalQueryId: m.rootDestinationTask.LocalQueryID,
ServerId: m.rootDestinationTask.ServerID,
QueryTs: m.mppQueryID.QueryTs,
LocalQueryId: m.mppQueryID.LocalQueryID,
ServerId: m.mppQueryID.ServerID,
TaskId: -1,
},
}
@@ -533,7 +533,7 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
}

// DispatchMPPTasks dispatches all the mpp task and waits for the responses.
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool, startTs uint64, rootDestinationTask *kv.MPPTask) kv.Response {
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID kv.MPPQueryID) kv.Response {
vars := variables.(*tikv.Variables)
ctxChild, cancelFunc := context.WithCancel(ctx)
iter := &mppIterator{
@@ -544,7 +544,7 @@ func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{},
cancelFunc: cancelFunc,
respChan: make(chan *mppResponse),
startTs: startTs,
rootDestinationTask: rootDestinationTask,
mppQueryID: mppQueryID,
vars: vars,
needTriggerFallback: needTriggerFallback,
enableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(),