Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres driver: speed up list operations #151

Closed
wants to merge 14 commits into from
54 changes: 52 additions & 2 deletions pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pgsql
import (
"context"
"database/sql"
"fmt"
"net/url"
"regexp"
"strconv"
Expand Down Expand Up @@ -37,13 +38,22 @@ var (
value bytea,
old_value bytea
);`,

// Note: indexes below assume the 'C' locale, otherwise LIKE queries on `name` will not be able to use them
// To support non-C locales, new indexes should be added for each index involving the `name` column with the
// adding the varchar_pattern_ops operator class, eg.
// `CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`
// +`CREATE INDEX IF NOT EXISTS kine_name_index_patterns ON kine (name varchar_pattern_ops)`
// See: https://www.postgresql.org/docs/15/indexes-opclass.html

`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
`CREATE INDEX IF NOT EXISTS kine_list_query_index on kine(name, id DESC, deleted)`,
}
createDB = "CREATE DATABASE "
createDB = "CREATE DATABASE %s LOCALE 'C' TEMPLATE 'template0';"
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
Expand Down Expand Up @@ -94,6 +104,46 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return err.Error()
}

listSQL := `
SELECT
(SELECT MAX(rkv.id) AS id FROM kine AS rkv),
(SELECT MAX(crkv.prev_revision) AS prev_revision FROM kine AS crkv WHERE crkv.name = 'compact_rev_key'),
maxkv.*
FROM (
SELECT DISTINCT ON (name)
kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value
FROM
kine AS kv
WHERE
kv.name LIKE ?
AND %s
AND %s
ORDER BY kv.name, theid DESC
) AS maxkv
WHERE
maxkv.deleted = 0 OR ?
ORDER BY maxkv.name, maxkv.theid DESC
`

dialect.GetCurrentSQL = q(fmt.Sprintf(listSQL, "TRUE", "TRUE"))
dialect.ListRevisionStartSQL = q(fmt.Sprintf(listSQL, "kv.id <= ?", "TRUE"))
// HACK: "AND ? > 0" is a way to ignore that parameter (it is an auto-incremented id so always positive)
dialect.GetRevisionAfterSQL = q(fmt.Sprintf(listSQL, "kv.id <= ?", "kv.name > ? AND ? > 0"))
dialect.CountSQL = q(fmt.Sprintf(`
SELECT
(SELECT MAX(rkv.id) AS id FROM kine AS rkv),
COUNT(c.theid)
FROM (
SELECT DISTINCT ON (name)
kv.id AS theid,
kv.deleted
FROM kine AS kv
WHERE
kv.name LIKE ?
ORDER BY kv.name, theid DESC
) AS c
WHERE c.deleted = 0 OR ?`))

if err := setup(dialect.DB); err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,7 +196,7 @@ func createDBIfNotExist(dataSourceName string) error {
return err
}
defer db.Close()
stmt := createDB + dbName + ";"
stmt := fmt.Sprintf(createDB, dbName)
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err = db.Exec(stmt)
if err != nil {
Expand Down