Skip to content

Commit

Permalink
perf: skip index creation for replicated table (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Dec 9, 2024
1 parent d8a990d commit 2950610
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 40 deletions.
49 changes: 28 additions & 21 deletions catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"

"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/configuration"
"github.com/apecloud/myduckserver/mycontext"
"github.com/dolthub/go-mysql-server/sql"
)

Expand Down Expand Up @@ -72,7 +74,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table
return nil, err
}
for _, t := range tables {
t.WithSchema(ctx)
t.withSchema(ctx)
}
return tables, nil
}
Expand All @@ -91,7 +93,7 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error
if err := rows.Scan(&tblName, &comment); err != nil {
return nil, ErrDuckDB.New(err)
}
t := NewTable(tblName, d).WithComment(DecodeComment[any](comment.String))
t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String))
tbls = append(tbls, t)
}
if err := rows.Err(); err != nil {
Expand All @@ -106,13 +108,13 @@ func (d *Database) Name() string {
return d.name
}

func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, is_temp bool) error {
func (d *Database) createAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, temporary bool) error {

var columns []string
var columnCommentSQLs []string
var fullTableName string

if is_temp {
if temporary {
fullTableName = FullTableName("temp", "main", name)
} else {
fullTableName = FullTableName(d.catalog, d.name, name)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim

var fullColumnName string

if is_temp {
if temporary {
fullColumnName = FullColumnName("temp", "main", name, col.Name)
} else {
fullColumnName = FullColumnName(d.catalog, d.name, name, col.Name)
Expand All @@ -151,41 +153,46 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim
if col.Comment != "" || typ.mysql.Name != "" || col.Default != nil {
columnCommentSQLs = append(columnCommentSQLs,
fmt.Sprintf(`COMMENT ON COLUMN %s IS '%s'`, fullColumnName,
NewCommentWithMeta[MySQLType](col.Comment, typ.mysql).Encode()))
NewCommentWithMeta(col.Comment, typ.mysql).Encode()))
}
}

var sqlsBuild strings.Builder
var b strings.Builder

if is_temp {
sqlsBuild.WriteString(fmt.Sprintf(`CREATE TEMP TABLE %s (%s`, name, strings.Join(columns, ", ")))
if temporary {
b.WriteString(fmt.Sprintf(`CREATE TEMP TABLE %s (%s`, name, strings.Join(columns, ", ")))
} else {
sqlsBuild.WriteString(fmt.Sprintf(`CREATE TABLE %s (%s`, fullTableName, strings.Join(columns, ", ")))
b.WriteString(fmt.Sprintf(`CREATE TABLE %s (%s`, fullTableName, strings.Join(columns, ", ")))
}

var primaryKeys []string
for _, pkord := range schema.PkOrdinals {
primaryKeys = append(primaryKeys, schema.Schema[pkord].Name)
}

if len(primaryKeys) > 0 {
sqlsBuild.WriteString(fmt.Sprintf(", PRIMARY KEY (%s)", strings.Join(primaryKeys, ", ")))
// https://github.com/apecloud/myduckserver/issues/272
if !(mycontext.IsReplicationQuery(ctx) && configuration.IsReplicationWithoutIndex()) {
if len(primaryKeys) > 0 {
b.WriteString(fmt.Sprintf(", PRIMARY KEY (%s)", strings.Join(primaryKeys, ", ")))
}
}

sqlsBuild.WriteString(")")
b.WriteString(")")

// Add comment to the table
if comment != "" {
sqlsBuild.WriteString(fmt.Sprintf("; COMMENT ON TABLE %s IS '%s'", fullTableName, NewComment[any](comment).Encode()))
}
b.WriteString(fmt.Sprintf(
"; COMMENT ON TABLE %s IS '%s'",
fullTableName,
NewCommentWithMeta(comment, ExtraTableInfo{schema.PkOrdinals}).Encode(),
))

// Add column comments
for _, s := range columnCommentSQLs {
sqlsBuild.WriteString(";")
sqlsBuild.WriteString(s)
b.WriteString(";")
b.WriteString(s)
}

_, err := adapter.Exec(ctx, sqlsBuild.String())
_, err := adapter.Exec(ctx, b.String())
if err != nil {
if IsDuckDBTableAlreadyExistsError(err) {
return sql.ErrTableAlreadyExists.New(name)
Expand All @@ -202,14 +209,14 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim
func (d *Database) CreateTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string) error {
d.mu.Lock()
defer d.mu.Unlock()
return d.CreateAllTable(ctx, name, schema, collation, comment, false)
return d.createAllTable(ctx, name, schema, collation, comment, false)
}

// CreateTemporaryTable implements sql.CreateTemporaryTable.
func (d *Database) CreateTemporaryTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID) error {
d.mu.Lock()
defer d.mu.Unlock()
return d.CreateAllTable(ctx, name, schema, collation, "", true)
return d.createAllTable(ctx, name, schema, collation, "", true)
}

// DropTable implements sql.TableDropper.
Expand Down
34 changes: 28 additions & 6 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"

"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/configuration"
"github.com/apecloud/myduckserver/mycontext"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/marcboeker/go-duckdb"
Expand All @@ -17,10 +19,14 @@ type Table struct {
mu *sync.RWMutex
name string
db *Database
comment *Comment[any] // save the comment to avoid querying duckdb everytime
comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb everytime
schema sql.PrimaryKeySchema
}

type ExtraTableInfo struct {
PkOrdinals []int
}

type ColumnInfo struct {
ColumnName string
ColumnIndex int
Expand Down Expand Up @@ -53,19 +59,30 @@ func NewTable(name string, db *Database) *Table {
}
}

func (t *Table) WithComment(comment *Comment[any]) *Table {
func (t *Table) withComment(comment *Comment[ExtraTableInfo]) *Table {
t.comment = comment
return t
}

func (t *Table) WithSchema(ctx *sql.Context) *Table {
t.mu.Lock()
defer t.mu.Unlock()

func (t *Table) withSchema(ctx *sql.Context) *Table {
t.schema = getPKSchema(ctx, t.db.catalog, t.db.name, t.name)

// https://github.com/apecloud/myduckserver/issues/272
if len(t.schema.PkOrdinals) == 0 && configuration.IsReplicationWithoutIndex() {
// Pretend that the primary key exists
for _, idx := range t.comment.Meta.PkOrdinals {
t.schema.Schema[idx].PrimaryKey = true
}
t.schema = sql.NewPrimaryKeySchema(t.schema.Schema, t.comment.Meta.PkOrdinals...)
}

return t
}

func (t *Table) ExtraTableInfo() ExtraTableInfo {
return t.comment.Meta
}

// Collation implements sql.Table.
func (t *Table) Collation() sql.CollationID {
return sql.Collation_Default
Expand Down Expand Up @@ -333,6 +350,11 @@ func (t *Table) CreateIndex(ctx *sql.Context, indexDef sql.IndexDef) error {
t.mu.Lock()
defer t.mu.Unlock()

// https://github.com/apecloud/myduckserver/issues/272
if mycontext.IsReplicationQuery(ctx) && configuration.IsReplicationWithoutIndex() {
return nil
}

if indexDef.IsPrimary() {
return fmt.Errorf("primary key cannot be created with CreateIndex, use ALTER TABLE ... ADD PRIMARY KEY instead")
}
Expand Down
18 changes: 18 additions & 0 deletions configuration/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package configuration

import (
"os"
"strings"
)

const (
replicationWithoutIndex = "REPLICATION_WITHOUT_INDEX"
)

func IsReplicationWithoutIndex() bool {
switch strings.ToLower(os.Getenv(replicationWithoutIndex)) {
case "", "t", "1", "true":
return true
}
return false
}
Loading

0 comments on commit 2950610

Please sign in to comment.