Skip to content

Commit

Permalink
change ChangeStreamOptions implement
Browse files Browse the repository at this point in the history
  • Loading branch information
nktks committed Sep 18, 2023
1 parent 0ce221d commit 28aa87c
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 99 deletions.
33 changes: 27 additions & 6 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,11 +1806,10 @@ type CreateIndex struct {

// CreateChangeStream is CREATE CHANGE STREAM statement node.
//
// CREATE CHANGE STREAM {{.Name | sql}} {{.For | sql }}
// {{ if .Options }}OPTIONS {{ .Options.Exprs | sqlJoin ","}}{{end}}
// CREATE CHANGE STREAM {{.Name | sql}} {{.For | sql }} {{.Options | sql }}
type CreateChangeStream struct {
// pos = Create
// end = Options.Rparen + 1 || .For.end
// end = Options.end || For.end
Create token.Pos // position of "CREATE" keyword
Name *Ident
For ChangeStreamFor
Expand Down Expand Up @@ -1846,9 +1845,28 @@ type ChangeStreamForTable struct {
Rparen token.Pos // position of ")"
}

// ChangeStreamOptions is OPTIONS clause node in CREATE CHANGE STREAM.
//
// OPTIONS ({{.Records | sqlJoin ","}})
type ChangeStreamOptions struct {
Exprs []Expr
Rparen token.Pos // position of ")"
// pos = Options
// end = Rparen + 1

Options token.Pos // position of "OPTIONS" keyword
Rparen token.Pos // position of ")"

Records []*ChangeStreamOptionsRecord
}

// ChangeStreamOptionsRecord is OPTIONS record node.
//
// {{.Key | sql}}={{.Expr | sql}}
type ChangeStreamOptionsRecord struct {
// pos = Key
// end = Value.end

Key *Ident
Value Expr
}

// ChangeStreamAlternationSetFor is SET FOR tables node in ALTER CHANGE STREAM
Expand All @@ -1857,6 +1875,7 @@ type ChangeStreamOptions struct {
type ChangeStreamAlternationSetFor struct {
// pos = Set
// end = For.end

Set token.Pos // position of "SET" keyword
For ChangeStreamFor
}
Expand All @@ -1865,16 +1884,18 @@ type ChangeStreamAlternationSetFor struct {
type ChangeStreamAlternationDropForAll struct {
// pos = Drop
// end = All + 3

Drop token.Pos // position of "DROP" keyword
All token.Pos // position of "ALL" keyword
}

// ChangeStreamAlternationSetFor is DROP FOR ALL node in ALTER CHANGE STREAM
//
// SET OPTIONS {{ .Options.Exprs | sqlJoin "," }}
// SET {{ .Options | sql }}
type ChangeStreamAlternationSetOptions struct {
// pos = Set
// end = Options.Rparen + 1

Set token.Pos // position of "SET" keyword
Options *ChangeStreamOptions
}
Expand Down
5 changes: 4 additions & 1 deletion ast/pos.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,14 +670,17 @@ func (c *CreateChangeStream) Pos() token.Pos {

func (c *CreateChangeStream) End() token.Pos {
if c.Options != nil {
return c.Options.Rparen + token.Pos(len(")"))
return c.Options.End()
}
if c.For != nil {
return c.For.End()
}
return c.Name.End()
}

func (c *ChangeStreamOptions) Pos() token.Pos { return c.Options }
func (c *ChangeStreamOptions) End() token.Pos { return c.Rparen + 1 }

func (c *ChangeStreamForAll) Pos() token.Pos { return c.For }
func (c *ChangeStreamForAll) End() token.Pos { return c.All }
func (c *ChangeStreamForTables) Pos() token.Pos { return c.For }
Expand Down
30 changes: 14 additions & 16 deletions ast/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,14 +974,7 @@ func (c *CreateChangeStream) SQL() string {
sql += " " + c.For.SQL()
}
if c.Options != nil {
sql += " OPTIONS ("
for i, expr := range c.Options.Exprs {
if i > 0 {
sql += ", "
}
sql += expr.SQL()
}
sql += ")"
sql += c.Options.SQL()
}
return sql
}
Expand All @@ -998,6 +991,18 @@ func (c *ChangeStreamForTables) SQL() string {
return sql
}

func (c *ChangeStreamOptions) SQL() string {
sql := " OPTIONS ("
for i, record := range c.Records {
if i > 0 {
sql += ", "
}
sql += record.Key.SQL() + "=" + record.Value.SQL()
}
sql += ")"
return sql
}

func (a *AlterChangeStream) SQL() string {
return "ALTER CHANGE STREAM " + a.Name.SQL() + " " + a.ChangeStreamAlternation.SQL()
}
Expand All @@ -1007,14 +1012,7 @@ func (a ChangeStreamAlternationSetFor) SQL() string {
}
func (a ChangeStreamAlternationDropForAll) SQL() string { return "DROP FOR ALL" }
func (a ChangeStreamAlternationSetOptions) SQL() string {
sql := "SET OPTIONS ("
for i, expr := range a.Options.Exprs {
if i > 0 {
sql += ", "
}
sql += expr.SQL()
}
sql += ")"
sql := "SET " + a.Options.SQL()
return sql
}
func (c *ChangeStreamForTable) SQL() string {
Expand Down
11 changes: 7 additions & 4 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,6 @@ func (p *Parser) parseCreateChangeStream(pos token.Pos) *ast.CreateChangeStream
cs.For = p.parseChangeStreamFor()
}
if p.Token.IsKeywordLike("OPTIONS") {
p.nextToken()
cs.Options = p.parseChangeStreamOptions()
}
return cs
Expand All @@ -2532,7 +2531,6 @@ func (p *Parser) parseAlterChangeStream(pos token.Pos) *ast.AlterChangeStream {
cs.ChangeStreamAlternation = csa
return cs
} else if p.Token.IsKeywordLike("OPTIONS") {
p.nextToken()
csa := &ast.ChangeStreamAlternationSetOptions{
Set: setpos,
Options: p.parseChangeStreamOptions(),
Expand Down Expand Up @@ -2610,10 +2608,15 @@ func (p *Parser) parseChangeStreamFor() ast.ChangeStreamFor {
// This is for the key, value which will supported in the future.
// We don't need to modify this code for them.
func (p *Parser) parseChangeStreamOptions() *ast.ChangeStreamOptions {
pos := p.Token.Pos
p.nextToken()
p.expect("(")
cso := &ast.ChangeStreamOptions{}
cso := &ast.ChangeStreamOptions{Options: pos}
for {
cso.Exprs = append(cso.Exprs, p.parseExpr())
key := p.parseIdent()
p.expect("=")
value := p.parseExpr()
cso.Records = append(cso.Records, &ast.ChangeStreamOptionsRecord{Key: key, Value: value})
if p.Token.Kind == "," {
p.nextToken()
continue
Expand Down
21 changes: 10 additions & 11 deletions testdata/result/ddl/alter_change_stream_options.sql.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,37 @@ ALTER CHANGE STREAM change_stream_name SET OPTIONS (retention_period = '1d', val
ChangeStreamAlternation: &ast.ChangeStreamAlternationSetOptions{
Set: 39,
Options: &ast.ChangeStreamOptions{
Exprs: []ast.Expr{
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
Options: 43,
Rparen: 118,
Records: []*ast.ChangeStreamOptionsRecord{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 52,
NameEnd: 68,
Name: "retention_period",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 71,
ValueEnd: 75,
Value: "1d",
},
},
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 77,
NameEnd: 95,
Name: "value_capture_type",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 98,
ValueEnd: 118,
Value: "OLD_AND_NEW_VALUES",
},
},
},
Rparen: 118,
},
},
}

--- SQL
ALTER CHANGE STREAM change_stream_name SET OPTIONS (retention_period = "1d", value_capture_type = "OLD_AND_NEW_VALUES")
ALTER CHANGE STREAM change_stream_name SET OPTIONS (retention_period="1d", value_capture_type="OLD_AND_NEW_VALUES")
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,24 @@ OPTIONS(retention_period = '1d')
},
},
Options: &ast.ChangeStreamOptions{
Exprs: []ast.Expr{
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
Options: 105,
Rparen: 136,
Records: []*ast.ChangeStreamOptionsRecord{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 113,
NameEnd: 129,
Name: "retention_period",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 132,
ValueEnd: 136,
Value: "1d",
},
},
},
Rparen: 136,
},
}

--- SQL
CREATE CHANGE STREAM change_stream_name FOR table_name1(column1, column2), table_name2(column1, column2) OPTIONS (retention_period = "1d")
CREATE CHANGE STREAM change_stream_name FOR table_name1(column1, column2), table_name2(column1, column2) OPTIONS (retention_period="1d")
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,22 @@ OPTIONS(retention_period = null)
},
},
Options: &ast.ChangeStreamOptions{
Exprs: []ast.Expr{
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
Options: 105,
Rparen: 136,
Records: []*ast.ChangeStreamOptionsRecord{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 113,
NameEnd: 129,
Name: "retention_period",
},
Right: &ast.NullLiteral{
Value: &ast.NullLiteral{
Null: 132,
},
},
},
Rparen: 136,
},
}

--- SQL
CREATE CHANGE STREAM change_stream_name FOR table_name1(column1, column2), table_name2(column1, column2) OPTIONS (retention_period = NULL)
CREATE CHANGE STREAM change_stream_name FOR table_name1(column1, column2), table_name2(column1, column2) OPTIONS (retention_period=NULL)
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,36 @@ OPTIONS(retention_period = '1d', value_capture_type = 'NEW_ROW')
},
},
Options: &ast.ChangeStreamOptions{
Exprs: []ast.Expr{
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
Options: 105,
Rparen: 168,
Records: []*ast.ChangeStreamOptionsRecord{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 113,
NameEnd: 129,
Name: "retention_period",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 132,
ValueEnd: 136,
Value: "1d",
},
},
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 138,
NameEnd: 156,
Name: "value_capture_type",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 159,
ValueEnd: 168,
Value: "NEW_ROW",
},
},
},
Rparen: 168,
},
}

--- SQL
CREATE CHANGE STREAM change_stream_name FOR table_name1(column1, column2), table_name2(column1, column2) OPTIONS (retention_period = "1d", value_capture_type = "NEW_ROW")
CREATE CHANGE STREAM change_stream_name FOR table_name1(column1, column2), table_name2(column1, column2) OPTIONS (retention_period="1d", value_capture_type="NEW_ROW")
21 changes: 10 additions & 11 deletions testdata/result/statement/alter_change_stream_options.sql.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,37 @@ ALTER CHANGE STREAM change_stream_name SET OPTIONS (retention_period = '1d', val
ChangeStreamAlternation: &ast.ChangeStreamAlternationSetOptions{
Set: 39,
Options: &ast.ChangeStreamOptions{
Exprs: []ast.Expr{
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
Options: 43,
Rparen: 118,
Records: []*ast.ChangeStreamOptionsRecord{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 52,
NameEnd: 68,
Name: "retention_period",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 71,
ValueEnd: 75,
Value: "1d",
},
},
&ast.BinaryExpr{
Op: "=",
Left: &ast.Ident{
&ast.ChangeStreamOptionsRecord{
Key: &ast.Ident{
NamePos: 77,
NameEnd: 95,
Name: "value_capture_type",
},
Right: &ast.StringLiteral{
Value: &ast.StringLiteral{
ValuePos: 98,
ValueEnd: 118,
Value: "OLD_AND_NEW_VALUES",
},
},
},
Rparen: 118,
},
},
}

--- SQL
ALTER CHANGE STREAM change_stream_name SET OPTIONS (retention_period = "1d", value_capture_type = "OLD_AND_NEW_VALUES")
ALTER CHANGE STREAM change_stream_name SET OPTIONS (retention_period="1d", value_capture_type="OLD_AND_NEW_VALUES")
Loading

0 comments on commit 28aa87c

Please sign in to comment.