Skip to content

Commit

Permalink
move applyEventScheduler logic and eventscheduler to builder (#2729)
Browse files Browse the repository at this point in the history
  • Loading branch information
jycor authored Nov 1, 2024
1 parent f3be1d2 commit 2766041
Show file tree
Hide file tree
Showing 27 changed files with 133 additions and 170 deletions.
14 changes: 8 additions & 6 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (e *Engine) AnalyzeQuery(
ctx *sql.Context,
query string,
) (sql.Node, error) {
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
parsed, _, _, qFlags, err := binder.Parse(query, nil, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -237,7 +237,7 @@ func (e *Engine) PrepareParsedQuery(
statementKey, query string,
stmt sqlparser.Statement,
) (sql.Node, error) {
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
node, _, err := binder.BindOnly(stmt, query, nil)

if err != nil {
Expand Down Expand Up @@ -517,7 +517,7 @@ func (e *Engine) BoundQueryPlan(ctx *sql.Context, query string, parsed sqlparser

query = sql.RemoveSpaceAndDelimiter(query, ';')

binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
binder.SetBindings(bindings)

// Begin a transaction if necessary (no-op if one is in flight)
Expand Down Expand Up @@ -571,7 +571,7 @@ func (e *Engine) preparedStatement(ctx *sql.Context, query string, parsed sqlpar
preparedAst, preparedDataFound = e.PreparedDataCache.GetCachedStmt(ctx.Session.ID(), query)
}

binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
if preparedDataFound {
parsed = preparedAst
binder.SetBindings(bindings)
Expand Down Expand Up @@ -804,6 +804,10 @@ func (e *Engine) EngineAnalyzer() *analyzer.Analyzer {
return e.Analyzer
}

func (e *Engine) EngineEventScheduler() sql.EventScheduler {
return e.EventScheduler
}

// InitializeEventScheduler initializes the EventScheduler for the engine with the given sql.Context
// getter function, |ctxGetterFunc, the EventScheduler |status|, and the |period| for the event scheduler
// to check for events to execute. If |period| is less than 1, then it is ignored and the default period
Expand All @@ -814,8 +818,6 @@ func (e *Engine) InitializeEventScheduler(ctxGetterFunc func() (*sql.Context, fu
if err != nil {
return err
}

e.Analyzer.EventScheduler = e.EventScheduler
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion enginetest/engine_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestAnalyzer_Exp(t *testing.T) {
require.NoError(t, err)

ctx := enginetest.NewContext(harness)
b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, sql.NewMysqlParser())
b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, e.EngineEventScheduler(), nil)
parsed, _, _, _, err := b.Parse(tt.query, nil, false)
require.NoError(t, err)

Expand Down
1 change: 0 additions & 1 deletion enginetest/enginetests.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ func TestQueryPlan(t *testing.T, harness Harness, e QueryEngine, tt queries.Quer
func TestQueryPlanWithName(t *testing.T, name string, harness Harness, e QueryEngine, query, expectedPlan string, options sql.DescribeOptions) {
t.Run(name, func(t *testing.T) {
ctx := NewContext(harness)

parsed, qFlags, err := planbuilder.Parse(ctx, e.EngineAnalyzer().Catalog, query)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion enginetest/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func injectBindVarsAndPrepare(
}
}

b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, sql.NewMysqlParser())
b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, e.EngineEventScheduler(), nil)
b.SetParserOptions(sql.LoadSqlMode(ctx).ParserOptions())
resPlan, _, err := b.BindOnly(parsed, q, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion enginetest/plangen/cmd/plangen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func generatePlansForSuite(spec PlanSpec, w *bytes.Buffer) error {

if !tt.Skip {
ctx := enginetest.NewContext(harness)
binder := planbuilder.New(ctx, engine.EngineAnalyzer().Catalog, sql.NewMysqlParser())
binder := planbuilder.New(ctx, engine.EngineAnalyzer().Catalog, engine.EngineEventScheduler(), nil)
parsed, _, _, qFlags, err := binder.Parse(tt.Query, nil, false)
if err != nil {
exit(fmt.Errorf("%w\nfailed to parse query: %s", err, tt.Query))
Expand Down
1 change: 1 addition & 0 deletions enginetest/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type QueryEngine interface {
Query(ctx *sql.Context, query string) (sql.Schema, sql.RowIter, *sql.QueryFlags, error)
// TODO: get rid of this, should not be exposed to engine tests
EngineAnalyzer() *analyzer.Analyzer
EngineEventScheduler() sql.EventScheduler
// TODO: get rid of this, should not be exposed to engine tests
EnginePreparedDataCache() *sqle.PreparedDataCache
QueryWithBindings(ctx *sql.Context, query string, parsed sqlparser.Statement, bindings map[string]sqlparser.Expr, qFlags *sql.QueryFlags) (sql.Schema, sql.RowIter, *sql.QueryFlags, error)
Expand Down
6 changes: 5 additions & 1 deletion enginetest/server_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (s *ServerQueryEngine) EngineAnalyzer() *analyzer.Analyzer {
return s.engine.Analyzer
}

func (s *ServerQueryEngine) EngineEventScheduler() sql.EventScheduler {
return s.engine.EventScheduler
}

func (s *ServerQueryEngine) EnginePreparedDataCache() *sqle.PreparedDataCache {
return s.engine.PreparedDataCache
}
Expand Down Expand Up @@ -632,7 +636,7 @@ func convertGoSqlType(columnType *gosql.ColumnType) (sql.Type, error) {
// It cannot sort user-defined binding variables (e.g. :var, :foo)
func prepareBindingArgs(ctx *sql.Context, bindings map[string]sqlparser.Expr) ([]any, error) {
// NOTE: using binder with nil catalog and parser since we're only using it to convert SQLVal.
binder := planbuilder.New(ctx, nil, nil)
binder := planbuilder.New(ctx, nil, nil, nil)
numBindVars := len(bindings)
args := make([]any, numBindVars)
for i := 0; i < numBindVars; i++ {
Expand Down
24 changes: 24 additions & 0 deletions eventscheduler/event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func InitEventScheduler(

// Close closes the EventScheduler.
func (es *EventScheduler) Close() {
if es == nil {
return
}
es.status = SchedulerOff
es.executor.shutdown()
}
Expand All @@ -99,6 +102,9 @@ func (es *EventScheduler) Close() {
// This function requires valid analyzer and sql context to evaluate all events in all databases
// to load enabled events to the EventScheduler.
func (es *EventScheduler) TurnOnEventScheduler(a *analyzer.Analyzer) error {
if es == nil {
return nil
}
if es.status == SchedulerDisabled {
return ErrEventSchedulerDisabled
} else if es.status == SchedulerOn {
Expand All @@ -120,6 +126,9 @@ func (es *EventScheduler) TurnOnEventScheduler(a *analyzer.Analyzer) error {

// TurnOffEventScheduler is called when user sets --event-scheduler system variable to OFF or 0.
func (es *EventScheduler) TurnOffEventScheduler() error {
if es == nil {
return nil
}
if es.status == SchedulerDisabled {
return ErrEventSchedulerDisabled
} else if es.status == SchedulerOff {
Expand All @@ -135,6 +144,9 @@ func (es *EventScheduler) TurnOffEventScheduler() error {
// loadEventsAndStartEventExecutor evaluates all events in all databases and evaluates the enabled events
// with valid schedule to load into the eventExecutor. Then, it starts the eventExecutor.
func (es *EventScheduler) loadEventsAndStartEventExecutor(ctx *sql.Context, a *analyzer.Analyzer) error {
if es == nil {
return nil
}
es.executor.catalog = a.Catalog
es.executor.loadAllEvents(ctx)
go es.executor.start()
Expand All @@ -144,6 +156,9 @@ func (es *EventScheduler) loadEventsAndStartEventExecutor(ctx *sql.Context, a *a
// AddEvent implements sql.EventScheduler interface.
// This function is called when there is an event created at runtime.
func (es *EventScheduler) AddEvent(ctx *sql.Context, edb sql.EventDatabase, details sql.EventDefinition) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand All @@ -153,6 +168,9 @@ func (es *EventScheduler) AddEvent(ctx *sql.Context, edb sql.EventDatabase, deta
// UpdateEvent implements sql.EventScheduler interface.
// This function is called when there is an event altered at runtime.
func (es *EventScheduler) UpdateEvent(ctx *sql.Context, edb sql.EventDatabase, orgEventName string, details sql.EventDefinition) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand All @@ -163,6 +181,9 @@ func (es *EventScheduler) UpdateEvent(ctx *sql.Context, edb sql.EventDatabase, o
// This function is called when there is an event dropped at runtime. This function
// removes the given event if it exists in the enabled events list of the EventScheduler.
func (es *EventScheduler) RemoveEvent(dbName, eventName string) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand All @@ -173,6 +194,9 @@ func (es *EventScheduler) RemoveEvent(dbName, eventName string) {
// This function is called when there is a database dropped at runtime. This function
// removes all events of given database that exist in the enabled events list of the EventScheduler.
func (es *EventScheduler) RemoveSchemaEvents(dbName string) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand Down
3 changes: 0 additions & 3 deletions sql/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,6 @@ type Analyzer struct {
Coster memo.Coster
// ExecBuilder converts a sql.Node tree into an executable iterator.
ExecBuilder sql.NodeExecBuilder
// EventScheduler is used to communiate with the event scheduler
// for any EVENT related statements. It can be nil if EventScheduler is not defined.
EventScheduler sql.EventScheduler
}

// NewDefault creates a default Analyzer instance with all default Rules and configuration.
Expand Down
34 changes: 0 additions & 34 deletions sql/analyzer/apply_event_scheduler_notifier.go

This file was deleted.

2 changes: 1 addition & 1 deletion sql/analyzer/optimization_rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestPushNotFilters(t *testing.T) {
ctx := sql.NewContext(context.Background(), sql.WithSession(sess))
ctx.SetCurrentDatabase("mydb")

b := planbuilder.New(ctx, cat, sql.NewMysqlParser())
b := planbuilder.New(ctx, cat, nil, nil)

for _, tt := range tests {
t.Run(tt.in, func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion sql/analyzer/rule_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const (
validateReadOnlyTransactionId // validateReadOnlyTransaction
validateDatabaseSetId // validateDatabaseSet
validatePrivilegesId // validatePrivileges
applyEventSchedulerId // applyEventScheduler

// default
flattenTableAliasesId // flattenTableAliases
Expand Down
105 changes: 52 additions & 53 deletions sql/analyzer/ruleid_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion sql/analyzer/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func init() {
var OnceBeforeDefault = []Rule{
{applyDefaultSelectLimitId, applyDefaultSelectLimit},
{replaceCountStarId, replaceCountStar},
{applyEventSchedulerId, applyEventScheduler},
{validateOffsetAndLimitId, validateOffsetAndLimit},
{validateCreateTableId, validateCreateTable},
{validateAlterTableId, validateAlterTable},
Expand Down
Loading

0 comments on commit 2766041

Please sign in to comment.