Skip to content

Commit

Permalink
Add SELECT DISTINCT queries support (#264)
Browse files Browse the repository at this point in the history
Uses hash set to make projection result unique.
  • Loading branch information
tdakkota authored Oct 31, 2020
1 parent 53dd92b commit 3f06b4a
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 3 deletions.
19 changes: 19 additions & 0 deletions sql/parser/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ func (p *Parser) parseSelectStatement() (*planner.Tree, error) {
var cfg selectConfig
var err error

cfg.Distinct, err = p.parseDistinct()
if err != nil {
return nil, err
}

// Parse path list or query.Wildcard
cfg.ProjectionExprs, err = p.parseResultFields()
if err != nil {
Expand Down Expand Up @@ -122,6 +127,15 @@ func (p *Parser) parseResultField() (planner.ProjectedField, error) {
return rf, nil
}

func (p *Parser) parseDistinct() (bool, error) {
if tok, _, _ := p.ScanIgnoreWhitespace(); tok != scanner.DISTINCT {
p.Unscan()
return false, nil
}

return true, nil
}

func (p *Parser) parseFrom() (string, bool, error) {
if tok, _, _ := p.ScanIgnoreWhitespace(); tok != scanner.FROM {
p.Unscan()
Expand Down Expand Up @@ -208,6 +222,7 @@ func (p *Parser) parseOffset() (expr.Expr, error) {
// SelectConfig holds SELECT configuration.
type selectConfig struct {
TableName string
Distinct bool
WhereExpr expr.Expr
GroupByExpr expr.Expr
OrderBy expr.Path
Expand Down Expand Up @@ -235,6 +250,10 @@ func (cfg selectConfig) ToTree() (*planner.Tree, error) {

n = planner.NewProjectionNode(n, cfg.ProjectionExprs, cfg.TableName)

if cfg.Distinct {
n = planner.NewDedupNode(n, cfg.TableName)
}

if cfg.OrderBy != nil {
n = planner.NewSortNode(n, cfg.OrderBy, cfg.OrderByDirection)
}
Expand Down
43 changes: 43 additions & 0 deletions sql/planner/distinct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package planner

import (
"github.com/genjidb/genji/database"
"github.com/genjidb/genji/document"
"github.com/genjidb/genji/sql/query/expr"
)

type dedupNode struct {
node

tableName string
indexes map[string]database.Index
}

func NewDedupNode(n Node, tableName string) Node {
return &dedupNode{
node: node{
op: Dedup,
left: n,
},
tableName: tableName,
}
}

func (n *dedupNode) Bind(tx *database.Transaction, params []expr.Param) (err error) {
table, err := tx.GetTable(n.tableName)
if err != nil {
return
}

n.indexes, err = table.Indexes()
return
}

func (n *dedupNode) toStream(st document.Stream) (document.Stream, error) {
set := newDocumentHashSet(nil) // use default hashing algorithm
return st.Filter(set.Filter), nil
}

func (n *dedupNode) String() string {
return "Dedup()"
}
68 changes: 68 additions & 0 deletions sql/planner/hash_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package planner

import (
"hash"
"hash/maphash"

"github.com/genjidb/genji/document"
"github.com/genjidb/genji/key"
)

type documentHashSet struct {
hash hash.Hash64
set map[uint64]struct{}
}

func newDocumentHashSet(hash hash.Hash64) *documentHashSet {
if hash == nil {
hash = &maphash.Hash{}
}

return &documentHashSet{
hash: hash,
set: map[uint64]struct{}{},
}
}

func (s documentHashSet) generateKey(d document.Document) (uint64, error) {
defer s.hash.Reset()

fields, err := document.Fields(d)
if err != nil {
return 0, err
}

for _, field := range fields {
value, err := d.GetByField(field)
if err != nil {
return 0, err
}

buf, err := key.AppendValue(nil, value)
if err != nil {
return 0, err
}

_, err = s.hash.Write(buf)
if err != nil {
return 0, err
}
}

return s.hash.Sum64(), nil
}

func (s documentHashSet) Filter(d document.Document) (bool, error) {
k, err := s.generateKey(d)
if err != nil {
return false, err
}

_, ok := s.set[k]
if ok {
return false, nil
}

s.set[k] = struct{}{}
return true, nil
}
63 changes: 63 additions & 0 deletions sql/planner/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var optimizerRules = []func(t *Tree) (*Tree, error){
SplitANDConditionRule,
PrecalculateExprRule,
RemoveUnnecessarySelectionNodesRule,
RemoveUnnecessaryDedupNodeRule,
UseIndexBasedOnSelectionNodeRule,
}

Expand Down Expand Up @@ -248,6 +249,68 @@ func RemoveUnnecessarySelectionNodesRule(t *Tree) (*Tree, error) {
return t, nil
}

// RemoveUnnecessaryDedupNodeRule removes any Dedup nodes
// where projection is already unique.
func RemoveUnnecessaryDedupNodeRule(t *Tree) (*Tree, error) {
n := t.Root
var prev Node

for n != nil {
if n.Operation() == Dedup {
d, ok := n.(*dedupNode)
if !ok {
continue
}

pn, ok := d.left.(*ProjectionNode)
if !ok {
continue
}

// if the projection is unique, we remove the node from the tree
if isProjectionUnique(d.indexes, pn) {
if prev != nil {
prev.SetLeft(n.Left())
} else {
t.Root = n.Left()
}
}
}

prev = n
n = n.Left()
}

return t, nil
}

func isProjectionUnique(indexes map[string]database.Index, pn *ProjectionNode) bool {
pk := pn.info.GetPrimaryKey()
for _, field := range pn.Expressions {
e, ok := field.(ProjectedExpr)
if !ok {
return false
}

switch v := e.Expr.(type) {
case expr.Path:
if pk != nil && pk.Path.IsEqual(document.Path(v)) {
continue
}

if idx, ok := indexes[v.String()]; ok && idx.Unique {
continue
}
case expr.PKFunc:
continue
}

return false // if one field is not unique, so projection is not unique too.
}

return true
}

// UseIndexBasedOnSelectionNodeRule scans the tree for the first selection node whose condition is an
// operator that satisfies the following criterias:
// - implements the indexIteratorOperator interface
Expand Down
114 changes: 114 additions & 0 deletions sql/planner/optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,120 @@ func TestRemoveUnnecessarySelectionNodesRule(t *testing.T) {
}
}

func TestRemoveUnnecessaryDedupNodeRule(t *testing.T) {
tests := []struct {
name string
root, expected planner.Node
}{
{
"non-unique key",
planner.NewDedupNode(
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.Path{document.PathFragment{FieldName: "b"}},
ExprName: "b",
}},
"foo",
), "foo"),
nil,
},
{
"primary key",
planner.NewDedupNode(
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.Path{document.PathFragment{FieldName: "a"}},
ExprName: "a",
}},
"foo",
), "foo"),
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.Path{document.PathFragment{FieldName: "a"}},
ExprName: "a",
}},
"foo",
),
},
{
"unique index",
planner.NewDedupNode(
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.Path{document.PathFragment{FieldName: "c"}},
ExprName: "c",
}},
"foo",
), "foo"),
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.Path{document.PathFragment{FieldName: "c"}},
ExprName: "c",
}},
"foo",
),
},
{
"pk() function",
planner.NewDedupNode(
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.PKFunc{},
ExprName: "pk()",
}},
"foo",
), "foo"),
planner.NewProjectionNode(
planner.NewTableInputNode("foo"),
[]planner.ProjectedField{planner.ProjectedExpr{
Expr: expr.PKFunc{},
ExprName: "pk()",
}},
"foo",
),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
db, err := genji.Open(":memory:")
require.NoError(t, err)
defer db.Close()

tx, err := db.Begin(true)
require.NoError(t, err)
defer tx.Rollback()

err = tx.Exec(context.Background(), `
CREATE TABLE foo(a integer PRIMARY KEY, b integer, c integer);
CREATE UNIQUE INDEX idx_foo_idx ON foo(c);
INSERT INTO foo (a, b, c) VALUES
(1, 1, 1),
(2, 2, 2),
(3, 3, 3)
`)
require.NoError(t, err)

err = planner.Bind(planner.NewTree(test.root), tx.Transaction, nil)
require.NoError(t, err)

res, err := planner.RemoveUnnecessaryDedupNodeRule(planner.NewTree(test.root))
require.NoError(t, err)
if test.expected != nil {
require.Equal(t, planner.NewTree(test.expected).String(), res.String())
} else {
require.Equal(t, test.root, res.Root)
}
})
}
}

func TestUseIndexBasedOnSelectionNodeRule(t *testing.T) {
tests := []struct {
name string
Expand Down
27 changes: 24 additions & 3 deletions sql/planner/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,35 @@ type documentMask struct {

var _ document.Document = documentMask{}

func (r documentMask) GetByField(field string) (document.Value, error) {
func (r documentMask) GetByField(field string) (v document.Value, err error) {
for _, rf := range r.resultFields {
if rf.Name() == field || rf.Name() == "*" {
return r.d.GetByField(field)
v, err = r.d.GetByField(field)
if err != document.ErrFieldNotFound {
return
}

stack := expr.EvalStack{
Document: r.d,
Info: r.info,
}
var found bool
err = rf.Iterate(stack, func(f string, value document.Value) error {
if f == field {
v = value
found = true
}
return nil
})

if found || err != nil {
return
}
}
}

return document.Value{}, document.ErrFieldNotFound
err = document.ErrFieldNotFound
return
}

func (r documentMask) Iterate(fn func(field string, value document.Value) error) error {
Expand Down
Loading

0 comments on commit 3f06b4a

Please sign in to comment.