diff --git a/sql/parser/select.go b/sql/parser/select.go index 6a760fc3b..3315f13a6 100644 --- a/sql/parser/select.go +++ b/sql/parser/select.go @@ -14,6 +14,11 @@ func (p *Parser) parseSelectStatement() (*planner.Tree, error) { var cfg selectConfig var err error + cfg.Distinct, err = p.parseDistinct() + if err != nil { + return nil, err + } + // Parse path list or query.Wildcard cfg.ProjectionExprs, err = p.parseResultFields() if err != nil { @@ -122,6 +127,15 @@ func (p *Parser) parseResultField() (planner.ProjectedField, error) { return rf, nil } +func (p *Parser) parseDistinct() (bool, error) { + if tok, _, _ := p.ScanIgnoreWhitespace(); tok != scanner.DISTINCT { + p.Unscan() + return false, nil + } + + return true, nil +} + func (p *Parser) parseFrom() (string, bool, error) { if tok, _, _ := p.ScanIgnoreWhitespace(); tok != scanner.FROM { p.Unscan() @@ -208,6 +222,7 @@ func (p *Parser) parseOffset() (expr.Expr, error) { // SelectConfig holds SELECT configuration. type selectConfig struct { TableName string + Distinct bool WhereExpr expr.Expr GroupByExpr expr.Expr OrderBy expr.Path @@ -235,6 +250,10 @@ func (cfg selectConfig) ToTree() (*planner.Tree, error) { n = planner.NewProjectionNode(n, cfg.ProjectionExprs, cfg.TableName) + if cfg.Distinct { + n = planner.NewDedupNode(n, cfg.TableName) + } + if cfg.OrderBy != nil { n = planner.NewSortNode(n, cfg.OrderBy, cfg.OrderByDirection) } diff --git a/sql/planner/distinct.go b/sql/planner/distinct.go new file mode 100644 index 000000000..3d4b8b180 --- /dev/null +++ b/sql/planner/distinct.go @@ -0,0 +1,43 @@ +package planner + +import ( + "github.com/genjidb/genji/database" + "github.com/genjidb/genji/document" + "github.com/genjidb/genji/sql/query/expr" +) + +type dedupNode struct { + node + + tableName string + indexes map[string]database.Index +} + +func NewDedupNode(n Node, tableName string) Node { + return &dedupNode{ + node: node{ + op: Dedup, + left: n, + }, + tableName: tableName, + } +} + +func (n *dedupNode) Bind(tx *database.Transaction, params []expr.Param) (err error) { + table, err := tx.GetTable(n.tableName) + if err != nil { + return + } + + n.indexes, err = table.Indexes() + return +} + +func (n *dedupNode) toStream(st document.Stream) (document.Stream, error) { + set := newDocumentHashSet(nil) // use default hashing algorithm + return st.Filter(set.Filter), nil +} + +func (n *dedupNode) String() string { + return "Dedup()" +} diff --git a/sql/planner/hash_set.go b/sql/planner/hash_set.go new file mode 100644 index 000000000..ccf133fb1 --- /dev/null +++ b/sql/planner/hash_set.go @@ -0,0 +1,68 @@ +package planner + +import ( + "hash" + "hash/maphash" + + "github.com/genjidb/genji/document" + "github.com/genjidb/genji/key" +) + +type documentHashSet struct { + hash hash.Hash64 + set map[uint64]struct{} +} + +func newDocumentHashSet(hash hash.Hash64) *documentHashSet { + if hash == nil { + hash = &maphash.Hash{} + } + + return &documentHashSet{ + hash: hash, + set: map[uint64]struct{}{}, + } +} + +func (s documentHashSet) generateKey(d document.Document) (uint64, error) { + defer s.hash.Reset() + + fields, err := document.Fields(d) + if err != nil { + return 0, err + } + + for _, field := range fields { + value, err := d.GetByField(field) + if err != nil { + return 0, err + } + + buf, err := key.AppendValue(nil, value) + if err != nil { + return 0, err + } + + _, err = s.hash.Write(buf) + if err != nil { + return 0, err + } + } + + return s.hash.Sum64(), nil +} + +func (s documentHashSet) Filter(d document.Document) (bool, error) { + k, err := s.generateKey(d) + if err != nil { + return false, err + } + + _, ok := s.set[k] + if ok { + return false, nil + } + + s.set[k] = struct{}{} + return true, nil +} diff --git a/sql/planner/optimizer.go b/sql/planner/optimizer.go index 799abe695..45ba776c7 100644 --- a/sql/planner/optimizer.go +++ b/sql/planner/optimizer.go @@ -11,6 +11,7 @@ var optimizerRules = []func(t *Tree) (*Tree, error){ SplitANDConditionRule, PrecalculateExprRule, RemoveUnnecessarySelectionNodesRule, + RemoveUnnecessaryDedupNodeRule, UseIndexBasedOnSelectionNodeRule, } @@ -248,6 +249,68 @@ func RemoveUnnecessarySelectionNodesRule(t *Tree) (*Tree, error) { return t, nil } +// RemoveUnnecessaryDedupNodeRule removes any Dedup nodes +// where projection is already unique. +func RemoveUnnecessaryDedupNodeRule(t *Tree) (*Tree, error) { + n := t.Root + var prev Node + + for n != nil { + if n.Operation() == Dedup { + d, ok := n.(*dedupNode) + if !ok { + continue + } + + pn, ok := d.left.(*ProjectionNode) + if !ok { + continue + } + + // if the projection is unique, we remove the node from the tree + if isProjectionUnique(d.indexes, pn) { + if prev != nil { + prev.SetLeft(n.Left()) + } else { + t.Root = n.Left() + } + } + } + + prev = n + n = n.Left() + } + + return t, nil +} + +func isProjectionUnique(indexes map[string]database.Index, pn *ProjectionNode) bool { + pk := pn.info.GetPrimaryKey() + for _, field := range pn.Expressions { + e, ok := field.(ProjectedExpr) + if !ok { + return false + } + + switch v := e.Expr.(type) { + case expr.Path: + if pk != nil && pk.Path.IsEqual(document.Path(v)) { + continue + } + + if idx, ok := indexes[v.String()]; ok && idx.Unique { + continue + } + case expr.PKFunc: + continue + } + + return false // if one field is not unique, so projection is not unique too. + } + + return true +} + // UseIndexBasedOnSelectionNodeRule scans the tree for the first selection node whose condition is an // operator that satisfies the following criterias: // - implements the indexIteratorOperator interface diff --git a/sql/planner/optimizer_test.go b/sql/planner/optimizer_test.go index b0d1001dc..58e909de9 100644 --- a/sql/planner/optimizer_test.go +++ b/sql/planner/optimizer_test.go @@ -206,6 +206,120 @@ func TestRemoveUnnecessarySelectionNodesRule(t *testing.T) { } } +func TestRemoveUnnecessaryDedupNodeRule(t *testing.T) { + tests := []struct { + name string + root, expected planner.Node + }{ + { + "non-unique key", + planner.NewDedupNode( + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.Path{document.PathFragment{FieldName: "b"}}, + ExprName: "b", + }}, + "foo", + ), "foo"), + nil, + }, + { + "primary key", + planner.NewDedupNode( + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.Path{document.PathFragment{FieldName: "a"}}, + ExprName: "a", + }}, + "foo", + ), "foo"), + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.Path{document.PathFragment{FieldName: "a"}}, + ExprName: "a", + }}, + "foo", + ), + }, + { + "unique index", + planner.NewDedupNode( + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.Path{document.PathFragment{FieldName: "c"}}, + ExprName: "c", + }}, + "foo", + ), "foo"), + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.Path{document.PathFragment{FieldName: "c"}}, + ExprName: "c", + }}, + "foo", + ), + }, + { + "pk() function", + planner.NewDedupNode( + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.PKFunc{}, + ExprName: "pk()", + }}, + "foo", + ), "foo"), + planner.NewProjectionNode( + planner.NewTableInputNode("foo"), + []planner.ProjectedField{planner.ProjectedExpr{ + Expr: expr.PKFunc{}, + ExprName: "pk()", + }}, + "foo", + ), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + db, err := genji.Open(":memory:") + require.NoError(t, err) + defer db.Close() + + tx, err := db.Begin(true) + require.NoError(t, err) + defer tx.Rollback() + + err = tx.Exec(context.Background(), ` + CREATE TABLE foo(a integer PRIMARY KEY, b integer, c integer); + CREATE UNIQUE INDEX idx_foo_idx ON foo(c); + INSERT INTO foo (a, b, c) VALUES + (1, 1, 1), + (2, 2, 2), + (3, 3, 3) + `) + require.NoError(t, err) + + err = planner.Bind(planner.NewTree(test.root), tx.Transaction, nil) + require.NoError(t, err) + + res, err := planner.RemoveUnnecessaryDedupNodeRule(planner.NewTree(test.root)) + require.NoError(t, err) + if test.expected != nil { + require.Equal(t, planner.NewTree(test.expected).String(), res.String()) + } else { + require.Equal(t, test.root, res.Root) + } + }) + } +} + func TestUseIndexBasedOnSelectionNodeRule(t *testing.T) { tests := []struct { name string diff --git a/sql/planner/projection.go b/sql/planner/projection.go index 77ef8741f..ecede40aa 100644 --- a/sql/planner/projection.go +++ b/sql/planner/projection.go @@ -126,14 +126,35 @@ type documentMask struct { var _ document.Document = documentMask{} -func (r documentMask) GetByField(field string) (document.Value, error) { +func (r documentMask) GetByField(field string) (v document.Value, err error) { for _, rf := range r.resultFields { if rf.Name() == field || rf.Name() == "*" { - return r.d.GetByField(field) + v, err = r.d.GetByField(field) + if err != document.ErrFieldNotFound { + return + } + + stack := expr.EvalStack{ + Document: r.d, + Info: r.info, + } + var found bool + err = rf.Iterate(stack, func(f string, value document.Value) error { + if f == field { + v = value + found = true + } + return nil + }) + + if found || err != nil { + return + } } } - return document.Value{}, document.ErrFieldNotFound + err = document.ErrFieldNotFound + return } func (r documentMask) Iterate(fn func(field string, value document.Value) error) error { diff --git a/sql/planner/tree.go b/sql/planner/tree.go index 79764f685..85705fa9e 100644 --- a/sql/planner/tree.go +++ b/sql/planner/tree.go @@ -43,6 +43,9 @@ const ( // Unset is an operation that removes a value at a given path from every document of a stream Unset // Group is an operation that groups documents based on a given path. + _ + // Dedup is an operation that removes duplicate documents from a stream + Dedup ) // A Tree describes the flow of a stream of documents. diff --git a/sql/query/select_test.go b/sql/query/select_test.go index 993e904a7..6218f61f2 100644 --- a/sql/query/select_test.go +++ b/sql/query/select_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "strconv" "testing" "github.com/genjidb/genji" @@ -33,6 +34,8 @@ func TestSelectStmt(t *testing.T) { {"No table, wildcard", "SELECT *", true, ``, nil}, {"No table, document", "SELECT {a: 1, b: 2 + 1}", false, `[{"{a: 1, b: 2 + 1}":{"a":1,"b":3}}]`, nil}, {"No cond", "SELECT * FROM test", false, `[{"k":1,"color":"red","size":10,"shape":"square"},{"k":2,"color":"blue","size":10,"weight":100},{"k":3,"height":100,"weight":200}]`, nil}, + {"With DISTINCT", "SELECT DISTINCT * FROM test", false, `[{"k":1,"color":"red","size":10,"shape":"square"},{"k":2,"color":"blue","size":10,"weight":100},{"k":3,"height":100,"weight":200}]`, nil}, + {"With DISTINCT and expr", "SELECT DISTINCT 'a' FROM test", false, `[{"'a'":"a"}]`, nil}, {"Multiple wildcards cond", "SELECT *, *, color FROM test", false, `[{"k":1,"color":"red","size":10,"shape":"square","k":1,"color":"red","size":10,"shape":"square","color":"red"},{"k":2,"color":"blue","size":10,"weight":100,"k":2,"color":"blue","size":10,"weight":100,"color":"blue"},{"k":3,"height":100,"weight":200,"k":3,"height":100,"weight":200,"color":null}]`, nil}, {"With fields", "SELECT color, shape FROM test", false, `[{"color":"red","shape":"square"},{"color":"blue","shape":null},{"color":null,"shape":null}]`, nil}, {"With expr fields", "SELECT color, color != 'red' AS notred FROM test", false, `[{"color":"red","notred":false},{"color":"blue","notred":true},{"color":null,"notred":null}]`, nil}, @@ -222,3 +225,80 @@ func TestSelectStmt(t *testing.T) { require.JSONEq(t, `[{"foo": true},{"foo": 1}, {"foo": 2},{"foo": "hello"}]`, buf.String()) }) } + +func TestDistinct(t *testing.T) { + ctx := context.Background() + + types := []struct { + name string + generateValue func(i, notUniqueCount int) (unique interface{}, notunique interface{}) + }{ + {`integer`, func(i, notUniqueCount int) (unique interface{}, notunique interface{}) { + return i, i % notUniqueCount + }}, + {`double`, func(i, notUniqueCount int) (unique interface{}, notunique interface{}) { + return float64(i), float64(i % notUniqueCount) + }}, + {`text`, func(i, notUniqueCount int) (unique interface{}, notunique interface{}) { + return strconv.Itoa(i), strconv.Itoa(i % notUniqueCount) + }}, + {`array`, func(i, notUniqueCount int) (unique interface{}, notunique interface{}) { + return []interface{}{i}, []interface{}{i % notUniqueCount} + }}, + } + + for _, typ := range types { + total := 100 + notUnique := total / 10 + + t.Run(typ.name, func(t *testing.T) { + db, err := genji.Open(":memory:") + require.NoError(t, err) + defer db.Close() + + tx, err := db.Begin(true) + require.NoError(t, err) + defer tx.Rollback() + + err = tx.Exec(ctx, "CREATE TABLE test(a "+typ.name+" PRIMARY KEY, b "+typ.name+", doc DOCUMENT, nullable "+typ.name+");") + require.NoError(t, err) + + err = tx.Exec(ctx, "CREATE UNIQUE INDEX test_doc_index ON test(doc);") + require.NoError(t, err) + + for i := 0; i < total; i++ { + unique, nonunique := typ.generateValue(i, notUnique) + err = tx.Exec(ctx, `INSERT INTO test VALUES {a: ?, b: ?, doc: {a: ?, b: ?}, nullable: null}`, unique, nonunique, unique, nonunique) + require.NoError(t, err) + } + err = tx.Commit() + require.NoError(t, err) + + tests := []struct { + name string + query string + expectedCount int + }{ + {`unique`, `SELECT DISTINCT a FROM test`, total}, + {`non-unique`, `SELECT DISTINCT b FROM test`, notUnique}, + {`documents`, `SELECT DISTINCT doc FROM test`, total}, + {`null`, `SELECT DISTINCT nullable FROM test`, 1}, + {`wildcard`, `SELECT DISTINCT * FROM test`, total}, + {`literal`, `SELECT DISTINCT 'a' FROM test`, 1}, + {`pk()`, `SELECT DISTINCT pk() FROM test`, total}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + q, err := db.Query(ctx, test.query) + require.NoError(t, err) + defer q.Close() + + c, err := q.Count() + require.NoError(t, err) + require.Equal(t, test.expectedCount, c) + }) + } + }) + } +} diff --git a/sql/scanner/scanner_test.go b/sql/scanner/scanner_test.go index 5e661cfe3..18eadacf1 100644 --- a/sql/scanner/scanner_test.go +++ b/sql/scanner/scanner_test.go @@ -130,6 +130,7 @@ func TestScanner_Scan(t *testing.T) { {s: `DEFAULT`, tok: scanner.DEFAULT, raw: `DEFAULT`}, {s: `DELETE`, tok: scanner.DELETE, raw: `DELETE`}, {s: `DESC`, tok: scanner.DESC, raw: `DESC`}, + {s: `DISTINCT`, tok: scanner.DISTINCT, raw: `DISTINCT`}, {s: `DROP`, tok: scanner.DROP, raw: `DROP`}, {s: `FIELD`, tok: scanner.FIELD, raw: `FIELD`}, {s: `FROM`, tok: scanner.FROM, raw: `FROM`}, diff --git a/sql/scanner/token.go b/sql/scanner/token.go index 695733d1f..dee2cac73 100644 --- a/sql/scanner/token.go +++ b/sql/scanner/token.go @@ -85,6 +85,7 @@ const ( DEFAULT DELETE DESC + DISTINCT DROP EXISTS EXPLAIN @@ -208,6 +209,7 @@ var tokens = [...]string{ DEFAULT: "DEFAULT", DELETE: "DELETE", DESC: "DESC", + DISTINCT: "DISTINCT", DROP: "DROP", EXISTS: "EXISTS", EXPLAIN: "EXPLAIN",