Skip to content

Commit

Permalink
Merge branch 'master' into uncacheable
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Jan 17, 2023
2 parents 79fae17 + 4620df6 commit 8961afd
Show file tree
Hide file tree
Showing 20 changed files with 378 additions and 23 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=",
version = "v2.0.5-0.20230110071533-f313ddf58d73",
sum = "h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=",
version = "v2.0.5-0.20230112062023-fe5b35c5f5dc",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
21 changes: 20 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,9 @@ type LimitExec struct {

// columnIdxsUsedByChild keep column indexes of child executor used for inline projection
columnIdxsUsedByChild []int

// Log the close time when opentracing is enabled.
span opentracing.Span
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -1470,13 +1473,29 @@ func (e *LimitExec) Open(ctx context.Context) error {
e.childResult = tryNewCacheChunk(e.children[0])
e.cursor = 0
e.meetFirstBatch = e.begin == 0
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
e.span = span
}
return nil
}

// Close implements the Executor Close interface.
func (e *LimitExec) Close() error {
start := time.Now()

e.childResult = nil
return e.baseExecutor.Close()
err := e.baseExecutor.Close()

elapsed := time.Since(start)
if elapsed > time.Millisecond {
logutil.BgLogger().Info("limit executor close takes a long time",
zap.Duration("elapsed", elapsed))
if e.span != nil {
span1 := e.span.Tracer().StartSpan("limitExec.Close", opentracing.ChildOf(e.span.Context()), opentracing.StartTime(start))
defer span1.Finish()
}
}
return err
}

func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk {
Expand Down
6 changes: 4 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5556,6 +5556,8 @@ func TestAdmin(t *testing.T) {
}))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("insert admin_test (c1) values (1),(2),(NULL)")
Expand Down Expand Up @@ -5680,7 +5682,7 @@ func TestAdmin(t *testing.T) {
// check that the result set has no duplication
defer wg.Done()
for i := 0; i < 10; i++ {
result := tk.MustQuery(`admin show ddl job queries 20`)
result := tk2.MustQuery(`admin show ddl job queries 20`)
rows := result.Rows()
rowIDs := make(map[string]struct{})
for _, row := range rows {
Expand Down Expand Up @@ -5711,7 +5713,7 @@ func TestAdmin(t *testing.T) {
// check that the result set has no duplication
defer wg2.Done()
for i := 0; i < 10; i++ {
result := tk.MustQuery(`admin show ddl job queries limit 3 offset 2`)
result := tk2.MustQuery(`admin show ddl job queries limit 3 offset 2`)
rows := result.Rows()
rowIDs := make(map[string]struct{})
for _, row := range rows {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73
github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI=
github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=
github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
3 changes: 3 additions & 0 deletions planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ type planDigester struct {

// NormalizeFlatPlan normalizes a FlatPhysicalPlan and generates plan digest.
func NormalizeFlatPlan(flat *FlatPhysicalPlan) (normalized string, digest *parser.Digest) {
if flat == nil {
return "", parser.NewDigest(nil)
}
selectPlan, selectPlanOffset := flat.Main.GetSelectPlan()
if len(selectPlan) == 0 || !selectPlan[0].IsPhysicalPlan {
return "", parser.NewDigest(nil)
Expand Down
15 changes: 13 additions & 2 deletions resourcemanager/pooltask/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "pooltask",
srcs = ["task.go"],
srcs = [
"task.go",
"task_manager.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)

go_test(
name = "pooltask_test",
srcs = ["task_test.go"],
embed = [":pooltask"],
deps = ["@com_github_stretchr_testify//require"],
)
41 changes: 39 additions & 2 deletions resourcemanager/pooltask/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pooltask

import (
"sync"
"sync/atomic"
)

// Context is a interface that can be used to create a context.
Expand All @@ -31,18 +32,41 @@ func (NilContext) GetContext() any {
return nil
}

// TaskBox is a box which contains all info about pooltask.
const (
// PendingTask is a task waiting to start.
PendingTask int32 = iota
// RunningTask is a task running.
RunningTask
// StopTask is a stop task.
StopTask
)

// TaskBox is a box which contains all info about pool task.
type TaskBox[T any, U any, C any, CT any, TF Context[CT]] struct {
constArgs C
contextFunc TF
wg *sync.WaitGroup
task chan Task[T]
resultCh chan U
taskID uint64
status atomic.Int32 // task manager is able to make this task stop, wait or running
}

// GetStatus is to get the status of task.
func (t *TaskBox[T, U, C, CT, TF]) GetStatus() int32 {
return t.status.Load()
}

// SetStatus is to set the status of task.
func (t *TaskBox[T, U, C, CT, TF]) SetStatus(s int32) {
t.status.Store(s)
}

// NewTaskBox is to create a task box for pool.
func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contextFunc TF, wg *sync.WaitGroup, taskCh chan Task[T], resultCh chan U, taskID uint64) TaskBox[T, U, C, CT, TF] {
// We still need to do some work after a TaskBox finishes.
// So we need to add 1 to waitgroup. After we finish the work, we need to call TaskBox.Finish()
wg.Add(1)
return TaskBox[T, U, C, CT, TF]{
constArgs: constArgs,
contextFunc: contextFunc,
Expand All @@ -54,7 +78,7 @@ func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contex
}

// TaskID is to get the task id.
func (t TaskBox[T, U, C, CT, TF]) TaskID() uint64 {
func (t *TaskBox[T, U, C, CT, TF]) TaskID() uint64 {
return t.taskID
}

Expand Down Expand Up @@ -83,6 +107,11 @@ func (t *TaskBox[T, U, C, CT, TF]) Done() {
t.wg.Done()
}

// Finish is to set the TaskBox finish status.
func (t *TaskBox[T, U, C, CT, TF]) Finish() {
t.wg.Done()
}

// Clone is to copy the box
func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] {
newBox := NewTaskBox[T, U, C, CT, TF](t.constArgs, t.contextFunc, t.wg, t.task, t.resultCh, t.taskID)
Expand All @@ -92,6 +121,8 @@ func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] {
// GPool is a goroutine pool.
type GPool[T any, U any, C any, CT any, TF Context[CT]] interface {
Tune(size int)
DeleteTask(id uint64)
StopTask(id uint64)
}

// TaskController is a controller that can control or watch the pool.
Expand Down Expand Up @@ -119,6 +150,12 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() {
<-t.close
t.wg.Wait()
close(t.resultCh)
t.pool.DeleteTask(t.taskID)
}

// Stop is to send stop command to the task. But you still need to wait the task to stop.
func (t *TaskController[T, U, C, CT, TF]) Stop() {
t.pool.StopTask(t.TaskID())
}

// TaskID is to get the task id.
Expand Down
146 changes: 146 additions & 0 deletions resourcemanager/pooltask/task_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

import (
"container/list"
"sync"
"time"

"go.uber.org/atomic"
)

const shard int = 8

func getShardID(id uint64) uint64 {
return id % uint64(shard)
}

type tContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
task *TaskBox[T, U, C, CT, TF]
}

type meta struct {
stats *list.List
createTS time.Time
origin int32
running int32
}

func newStats(concurrency int32) *meta {
s := &meta{
createTS: time.Now(),
stats: list.New(),
origin: concurrency,
}
return s
}

func (m *meta) getOriginConcurrency() int32 {
return m.origin
}

// TaskStatusContainer is a container that can control or watch the pool.
type TaskStatusContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
stats map[uint64]*meta
rw sync.RWMutex
}

// TaskManager is a manager that can control or watch the pool.
type TaskManager[T any, U any, C any, CT any, TF Context[CT]] struct {
task []TaskStatusContainer[T, U, C, CT, TF]
running atomic.Int32
concurrency int32
}

// NewTaskManager create a new pooltask manager.
func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskManager[T, U, C, CT, TF] {
task := make([]TaskStatusContainer[T, U, C, CT, TF], shard)
for i := 0; i < shard; i++ {
task[i] = TaskStatusContainer[T, U, C, CT, TF]{
stats: make(map[uint64]*meta),
}
}
return TaskManager[T, U, C, CT, TF]{
task: task,
concurrency: c,
}
}

// RegisterTask register a task to the manager.
func (t *TaskManager[T, U, C, CT, TF]) RegisterTask(taskID uint64, concurrency int32) {
id := getShardID(taskID)
t.task[id].rw.Lock()
t.task[id].stats[taskID] = newStats(concurrency)
t.task[id].rw.Unlock()
}

// DeleteTask delete a task from the manager.
func (t *TaskManager[T, U, C, CT, TF]) DeleteTask(taskID uint64) {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
delete(t.task[shardID].stats, taskID)
t.task[shardID].rw.Unlock()
}

// hasTask check if the task is in the manager.
func (t *TaskManager[T, U, C, CT, TF]) hasTask(taskID uint64) bool {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
_, ok := t.task[shardID].stats[taskID]
return ok
}

// AddSubTask AddTask add a task to the manager.
func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T, U, C, CT, TF]) {
shardID := getShardID(taskID)
tc := tContainer[T, U, C, CT, TF]{
task: task,
}
t.running.Inc()
t.task[shardID].rw.Lock()
t.task[shardID].stats[taskID].stats.PushBack(tc)
t.task[shardID].stats[taskID].running++ // running job in this task
t.task[shardID].rw.Unlock()
}

// ExitSubTask is to exit a task, and it will decrease the count of running pooltask.
func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) {
shardID := getShardID(taskID)
t.running.Dec() // total running tasks
t.task[shardID].rw.Lock()
t.task[shardID].stats[taskID].running-- // running job in this task
t.task[shardID].rw.Unlock()
}

// Running return the count of running job in this task.
func (t *TaskManager[T, U, C, CT, TF]) Running(taskID uint64) int32 {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
return t.task[shardID].stats[taskID].running
}

// StopTask is to stop a task by TaskID.
func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
l := t.task[shardID].stats[taskID].stats
for e := l.Front(); e != nil; e = e.Next() {
e.Value.(tContainer[T, U, C, CT, TF]).task.SetStatus(StopTask)
}
}
Loading

0 comments on commit 8961afd

Please sign in to comment.