Skip to content

Commit

Permalink
infoschema, http: add two HTTP API for keyvis (pingcap#54608)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jul 16, 2024
1 parent 7a09434 commit 9f1d9e6
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 30 deletions.
22 changes: 22 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,34 @@
curl http://{TiDBIP}:10080/schema/{db}
```

```shell
curl http://{TiDBIP}:10080/schema/{db}?id_name_only=true
[
{
"id": 119,
"name": {
"O": "t1",
"L": "t1"
}
},
{
"id": 125,
"name": {
"O": "t2",
"L": "t2"
}
}
]
```

9. Get schema Information about db.table, and you can get schema info by tableID (tableID is the **unique** identifier of table in TiDB)

```shell
curl http://{TiDBIP}:10080/schema/{db}/{table}
curl http://{TiDBIP}:10080/schema?table_id={tableID}
curl http://{TiDBIP}:10080/schema?table_ids={tableID,...}
```

10. Get database information, table information and tidb info schema version by tableID.
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/context/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type MetaOnlyInfoSchema interface {
AllSchemas() []*model.DBInfo
AllSchemaNames() []model.CIStr
SchemaTableInfos(schema model.CIStr) []*model.TableInfo
SchemaSimpleTableInfos(schema model.CIStr) []*model.TableNameInfo
Misc
}

Expand Down
18 changes: 17 additions & 1 deletion pkg/infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,27 @@ func (is *infoSchema) FindTableInfoByPartitionID(
return getTableInfo(tbl), db, partDef
}

// SchemaTableInfos implements InfoSchema.FindTableInfoByPartitionID
// SchemaTableInfos implements MetaOnlyInfoSchema.
func (is *infoSchema) SchemaTableInfos(schema model.CIStr) []*model.TableInfo {
return getTableInfoList(is.SchemaTables(schema))
}

// SchemaSimpleTableInfos implements MetaOnlyInfoSchema.
func (is *infoSchema) SchemaSimpleTableInfos(schema model.CIStr) []*model.TableNameInfo {
schemaTables, ok := is.schemaMap[schema.L]
if !ok {
return nil
}
ret := make([]*model.TableNameInfo, 0, len(schemaTables.tables))
for _, t := range schemaTables.tables {
ret = append(ret, &model.TableNameInfo{
ID: t.Meta().ID,
Name: t.Meta().Name,
})
}
return ret
}

type tableInfoResult struct {
DBName string
TableInfos []*model.TableInfo
Expand Down
48 changes: 47 additions & 1 deletion pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (is *infoschemaV2) TableInfoByID(id int64) (*model.TableInfo, bool) {
return getTableInfo(tbl), ok
}

// SchemaTableInfos implements InfoSchema.FindTableInfoByPartitionID
// SchemaTableInfos implements MetaOnlyInfoSchema.
func (is *infoschemaV2) SchemaTableInfos(schema model.CIStr) []*model.TableInfo {
if IsSpecialDB(schema.L) {
raw, ok := is.Data.specials.Load(schema.L)
Expand Down Expand Up @@ -546,6 +546,52 @@ retry:
return tblInfos
}

// SchemaSimpleTableInfos implements MetaOnlyInfoSchema.
func (is *infoschemaV2) SchemaSimpleTableInfos(schema model.CIStr) []*model.TableNameInfo {
if IsSpecialDB(schema.L) {
raw, ok := is.Data.specials.Load(schema.L)
if ok {
schTbls := raw.(*schemaTables)
ret := make([]*model.TableNameInfo, 0, len(schTbls.tables))
for _, tbl := range schTbls.tables {
ret = append(ret, &model.TableNameInfo{
ID: tbl.Meta().ID,
Name: tbl.Meta().Name,
})
}
return ret
}
return nil // something wrong?
}

retry:
dbInfo, ok := is.SchemaByName(schema)
if !ok {
return nil
}
snapshot := is.r.Store().GetSnapshot(kv.NewVersion(is.ts))
// Using the KV timeout read feature to address the issue of potential DDL lease expiration when
// the meta region leader is slow.
snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms.
m := meta.NewSnapshotMeta(snapshot)
tblInfos, err := m.ListSimpleTables(dbInfo.ID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
return nil
}
// Flashback statement could cause such kind of error.
// In theory that error should be handled in the lower layer, like client-go.
// But it's not done, so we retry here.
if strings.Contains(err.Error(), "in flashback progress") {
time.Sleep(200 * time.Millisecond)
goto retry
}
// TODO: error could happen, so do not panic!
panic(err)
}
return tblInfos
}

// FindTableInfoByPartitionID implements InfoSchema.FindTableInfoByPartitionID
func (is *infoschemaV2) FindTableInfoByPartitionID(
partitionID int64,
Expand Down
61 changes: 61 additions & 0 deletions pkg/server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
"net/http"
"net/http/httptest"
"net/http/httputil"
"slices"
"sort"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -939,6 +942,64 @@ func TestGetSchema(t *testing.T) {
PARTITION p1 VALUES LESS THAN (5),
PARTITION p2 VALUES LESS THAN (7),
PARTITION p3 VALUES LESS THAN (9))`)
dbt.MustExec(`CREATE TABLE t2 (c INT)`)

var simpleTableInfos []*model.TableNameInfo
resp, err = ts.FetchStatus("/schema/test?id_name_only=true")
require.NoError(t, err)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&simpleTableInfos)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
slices.SortFunc(simpleTableInfos, func(i, j *model.TableNameInfo) int {
return strings.Compare(i.Name.L, j.Name.L)
})
require.Len(t, simpleTableInfos, 2)
require.Equal(t, "t1", simpleTableInfos[0].Name.L)
require.Equal(t, "t2", simpleTableInfos[1].Name.L)
id1 := simpleTableInfos[0].ID
id2 := simpleTableInfos[1].ID
require.NotZero(t, id1)
require.NotZero(t, id2)

// check table_ids=... happy path
ids := strings.Join([]string{strconv.FormatInt(id1, 10), strconv.FormatInt(id2, 10)}, ",")
resp, err = ts.FetchStatus(fmt.Sprintf("/schema?table_ids=%s", ids))
require.NoError(t, err)
var tis map[int]*model.TableInfo
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&tis)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, 2, len(tis))
require.Equal(t, "t1", tis[int(id1)].Name.L)
require.Equal(t, "t2", tis[int(id2)].Name.L)

// check table_ids=... partial missing
ids = ids + ",99999"
resp, err = ts.FetchStatus(fmt.Sprintf("/schema?table_ids=%s", ids))
require.NoError(t, err)
clear(tis)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&tis)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, 2, len(tis))
require.Equal(t, "t1", tis[int(id1)].Name.L)
require.Equal(t, "t2", tis[int(id2)].Name.L)

// check wrong format in table_ids
ids = ids + ",abc"
resp, err = ts.FetchStatus(fmt.Sprintf("/schema?table_ids=%s", ids))
require.NoError(t, err)
clear(tis)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&tis)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, 2, len(tis))
require.Equal(t, "t1", tis[int(id1)].Name.L)
require.Equal(t, "t2", tis[int(id2)].Name.L)

resp, err = ts.FetchStatus("/schema/test/t1")
require.NoError(t, err)
Expand Down
93 changes: 70 additions & 23 deletions pkg/server/handler/tikvhandler/tikv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,14 +916,26 @@ func (h SchemaStorageHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
}
}

// WriteDBTablesData writes all the table data in a database. The format is the marshal result of []*model.TableInfo, you can
// unmarshal it to []*model.TableInfo. In this function, we manually construct the marshal result so that the memory
// can be deallocated quickly.
// For every table in the input, we marshal them. The result such as {tb1} {tb2} {tb3}.
// Then we add some bytes to make it become [{tb1}, {tb2}, {tb3}], so we can unmarshal it to []*model.TableInfo.
// Note: It would return StatusOK even if errors occur. But if errors occur, there must be some bugs.
// WriteDBTablesData writes all the table data in a database. The format is the
// marshal result of []*model.TableInfo, you can unmarshal it to
// []*model.TableInfo.
//
// Note: It would return StatusOK even if errors occur. But if errors occur,
// there must be some bugs.
func WriteDBTablesData(w http.ResponseWriter, tbs []*model.TableInfo) {
if len(tbs) == 0 {
a := make([]any, 0, len(tbs))
for _, tb := range tbs {
a = append(a, tb)
}
manualWriteJSONArray(w, a)
}

// manualWriteJSONArray manually construct the marshal result so that the memory
// can be deallocated quickly. For every item in the input, we marshal them. The
// result such as {tb1} {tb2} {tb3}. Then we add some bytes to make it become
// [{tb1}, {tb2}, {tb3}] to build a valid JSON array.
func manualWriteJSONArray(w http.ResponseWriter, array []any) {
if len(array) == 0 {
handler.WriteData(w, []*model.TableInfo{})
return
}
Expand All @@ -936,7 +948,7 @@ func WriteDBTablesData(w http.ResponseWriter, tbs []*model.TableInfo) {
return
}
init := false
for _, tb := range tbs {
for _, item := range array {
if init {
_, err = w.Write(hack.Slice(",\n"))
if err != nil {
Expand All @@ -946,7 +958,7 @@ func WriteDBTablesData(w http.ResponseWriter, tbs []*model.TableInfo) {
} else {
init = true
}
js, err := json.MarshalIndent(tb, "", " ")
js, err := json.MarshalIndent(item, "", " ")
if err != nil {
terror.Log(errors.Trace(err))
return
Expand All @@ -961,6 +973,14 @@ func WriteDBTablesData(w http.ResponseWriter, tbs []*model.TableInfo) {
terror.Log(errors.Trace(err))
}

func writeDBSimpleTablesData(w http.ResponseWriter, tbs []*model.TableNameInfo) {
a := make([]any, 0, len(tbs))
for _, tb := range tbs {
a = append(a, tb)
}
manualWriteJSONArray(w, a)
}

// ServeHTTP handles request of list a database or table's schemas.
func (h SchemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
schema, err := h.Schema()
Expand All @@ -987,6 +1007,11 @@ func (h SchemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// all table schemas in a specified database
if schema.SchemaExists(cDBName) {
if a := req.FormValue(handler.IDNameOnly); a == "true" {
tbs := schema.SchemaSimpleTableInfos(cDBName)
writeDBSimpleTablesData(w, tbs)
return
}
tbs := schema.SchemaTableInfos(cDBName)
WriteDBTablesData(w, tbs)
return
Expand All @@ -997,33 +1022,55 @@ func (h SchemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

if tableID := req.FormValue(handler.TableIDQuery); len(tableID) > 0 {
// table schema of a specified tableID
tid, err := strconv.Atoi(tableID)
data, err := getTableByIDStr(schema, tableID)
if err != nil {
handler.WriteError(w, err)
return
}
if tid < 0 {
handler.WriteError(w, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID))
return
}
if data, ok := schema.TableByID(int64(tid)); ok {
handler.WriteData(w, data.Meta())
return
handler.WriteData(w, data)
return
}

if tableIDsStr := req.FormValue(handler.TableIDsQuery); len(tableIDsStr) > 0 {
tableIDs := strings.Split(tableIDsStr, ",")
data := make(map[int64]*model.TableInfo, len(tableIDs))
for _, tableID := range tableIDs {
tbl, err := getTableByIDStr(schema, tableID)
if err == nil {
data[tbl.ID] = tbl
}
}
// The tid maybe a partition ID of the partition-table.
tbl, _, _ := schema.FindTableByPartitionID(int64(tid))
if tbl == nil {
handler.WriteError(w, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID))
return
if len(data) > 0 {
handler.WriteData(w, data)
} else {
handler.WriteError(w, errors.New("All tables are not found"))
}
handler.WriteData(w, tbl)
return
}

// all databases' schemas
handler.WriteData(w, schema.AllSchemas())
}

func getTableByIDStr(schema infoschema.InfoSchema, tableID string) (*model.TableInfo, error) {
tid, err := strconv.Atoi(tableID)
if err != nil {
return nil, err
}
if tid < 0 {
return nil, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID)
}
if data, ok := schema.TableByID(int64(tid)); ok {
return data.Meta(), nil
}
// The tid maybe a partition ID of the partition-table.
tbl, _, _ := schema.FindTableByPartitionID(int64(tid))
if tbl == nil {
return nil, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID)
}
return tbl.Meta(), nil
}

// ServeHTTP handles table related requests, such as table's region information, disk usage.
func (h *TableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// parse params
Expand Down
12 changes: 7 additions & 5 deletions pkg/server/handler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ const (

// For query string
const (
TableIDQuery = "table_id"
Limit = "limit"
JobID = "start_job_id"
Operation = "op"
Seconds = "seconds"
TableIDQuery = "table_id"
TableIDsQuery = "table_ids"
IDNameOnly = "id_name_only"
Limit = "limit"
JobID = "start_job_id"
Operation = "op"
Seconds = "seconds"
)

const (
Expand Down

0 comments on commit 9f1d9e6

Please sign in to comment.