-
Notifications
You must be signed in to change notification settings - Fork 381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SQL election module #3318
base: master
Are you sure you want to change the base?
Add SQL election module #3318
Changes from all commits
219fc8d
5f4b850
4d31437
c2b24eb
05b113d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,280 @@ | ||
// Copyright 2023 Google LLC. All Rights Reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package mysql provides an implementation of leader election based on a SQL database. | ||
package mysql | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/trillian/util/election2" | ||
"k8s.io/klog/v2" | ||
) | ||
|
||
type leaderData struct { | ||
currentLeader string | ||
timestamp time.Time | ||
} | ||
|
||
// Election is an implementation of election2.Election based on a SQL database. | ||
type Election struct { | ||
db *sql.DB | ||
instanceID string | ||
resourceID string | ||
|
||
currentLeader leaderData | ||
leaderLock sync.Cond | ||
|
||
// If a channel is supplied with the cancel, it will be signalled when the election routine has exited. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In practice, cancel is always created by Await, and Resign always sends a message there? I'm not sure I understand what's conditional. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wrote this before realizing that I could re-use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha - makes sense to me to remove the comment then yes! |
||
cancel chan *chan error | ||
electionInterval time.Duration | ||
} | ||
|
||
var _ election2.Election = (*Election)(nil) | ||
|
||
// Await implements election2.Election | ||
func (e *Election) Await(ctx context.Context) error { | ||
e.leaderLock.L.Lock() | ||
defer e.leaderLock.L.Unlock() | ||
if e.cancel == nil { | ||
e.cancel = make(chan *chan error) | ||
go e.becomeLeaderLoop(context.Background(), e.cancel) | ||
} | ||
if e.currentLeader.currentLeader == e.instanceID { | ||
return nil | ||
} | ||
for e.currentLeader.currentLeader != e.instanceID { | ||
e.leaderLock.Wait() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
klog.Infof("Waiting for leadership, %s is the leader at %s", e.currentLeader.currentLeader, e.currentLeader.timestamp) | ||
} | ||
} | ||
klog.Infof("%s became leader for %s at %s", e.instanceID, e.resourceID, e.currentLeader.timestamp) | ||
return nil | ||
} | ||
|
||
// Close implements election2.Election | ||
func (e *Election) Close(ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given this is essentially a wrapper around Resign, this election object could be re-used for a new election, which is slightly at odds with the interface definition. In practice, I don't think anything relies on this behaviour, and there aren't any resources that can be released, so I think it's safe. Does that match your understanding? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Do you want me to make the object unusable, to catch errors that might occur by calling other methods after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you can think of a way of an elegant way of doing this, sure thing. I don't think it's super super important, it's just some extra care to make sure it's aligned with the interface definition :) |
||
if err := e.Resign(ctx); err != nil { | ||
klog.Errorf("Failed to resign leadership: %v", err) | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// Resign implements election2.Election | ||
func (e *Election) Resign(ctx context.Context) error { | ||
e.leaderLock.L.Lock() | ||
closer := e.cancel | ||
e.cancel = nil | ||
e.leaderLock.L.Unlock() | ||
if closer == nil { | ||
return nil | ||
} | ||
// Stop trying to elect ourselves | ||
done := make(chan error) | ||
closer <- &done | ||
return <-done | ||
} | ||
|
||
// WithMastership implements election2.Election | ||
func (e *Election) WithMastership(ctx context.Context) (context.Context, error) { | ||
Comment on lines
+100
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having the inclusivity and diversity in mind, may I suggest replacing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can rewrite the Otherwise, I think those are the only two instances of the word "master" rather than "leader". |
||
cctx, cancel := context.WithCancel(ctx) | ||
e.leaderLock.L.Lock() | ||
defer e.leaderLock.L.Unlock() | ||
if e.currentLeader.currentLeader != e.instanceID { | ||
// Not the leader, cancel | ||
cancel() | ||
return cctx, nil | ||
} | ||
|
||
// Start a goroutine to cancel the context when we are no longer leader | ||
go func() { | ||
e.leaderLock.L.Lock() | ||
defer e.leaderLock.L.Unlock() | ||
for e.currentLeader.currentLeader == e.instanceID { | ||
e.leaderLock.Wait() | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
// Don't complain if our context already completed. | ||
return | ||
default: | ||
cancel() | ||
klog.Warningf("%s cancelled: lost leadership, %s is the leader at %s", e.resourceID, e.currentLeader.currentLeader, e.currentLeader.timestamp) | ||
} | ||
}() | ||
|
||
return cctx, nil | ||
} | ||
|
||
// becomeLeaderLoop runs continuously to participate in elections until a message is sent on `cancel` | ||
func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error) { | ||
for { | ||
select { | ||
case ch := <-closer: | ||
err := e.tearDown() | ||
klog.Infof("Election teardown for %s: %v", e.resourceID, err) | ||
if ch != nil { | ||
*ch <- err | ||
} | ||
return | ||
default: | ||
leader, err := e.tryBecomeLeader(ctx) | ||
if err != nil { | ||
klog.Errorf("Failed attempt to become leader for %s, retrying: %v", e.resourceID, err) | ||
} else { | ||
e.leaderLock.L.Lock() | ||
if leader != e.currentLeader { | ||
// Note: this code does not actually care _which_ instance was | ||
// elected, it sends notifications on each leadership change. | ||
e.currentLeader = leader | ||
e.leaderLock.Broadcast() | ||
} | ||
e.leaderLock.L.Unlock() | ||
} | ||
time.Sleep(e.electionInterval) | ||
} | ||
} | ||
} | ||
|
||
func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) { | ||
leader := leaderData{} | ||
tx, err := e.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) | ||
if err != nil { | ||
return leader, fmt.Errorf("BeginTX: %w", err) | ||
} | ||
defer func() { | ||
if err := tx.Rollback(); err != nil { | ||
klog.Errorf("Rollback failed: %v", err) | ||
} | ||
}() | ||
row := tx.QueryRow( | ||
"SELECT leader, last_update FROM LeaderElection WHERE resource_id = ?", | ||
e.resourceID) | ||
if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil { | ||
return leader, fmt.Errorf("Select: %w", err) | ||
} | ||
|
||
if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) { | ||
return leader, nil // Someone else won the election | ||
} | ||
|
||
timestamp := time.Now() | ||
_, err = tx.Exec( | ||
"UPDATE LeaderElection SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", | ||
e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp) | ||
if err != nil { | ||
return leader, fmt.Errorf("Update: %w", err) | ||
} | ||
|
||
if err := tx.Commit(); err != nil { | ||
return leader, fmt.Errorf("Commit failed: %w", err) | ||
} | ||
leader = leaderData{currentLeader: e.instanceID, timestamp: timestamp} | ||
return leader, nil | ||
} | ||
|
||
func (e *Election) tearDown() error { | ||
e.leaderLock.L.Lock() | ||
defer e.leaderLock.L.Unlock() | ||
if e.currentLeader.currentLeader != e.instanceID { | ||
return nil | ||
} | ||
e.currentLeader.currentLeader = "empty leader" | ||
e.leaderLock.Broadcast() | ||
|
||
// Reset election time to epoch to allow a faster fail-over | ||
res, err := e.db.Exec( | ||
"UPDATE LeaderElection SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", | ||
time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp) | ||
if err != nil { | ||
return fmt.Errorf("Update: %w", err) | ||
} | ||
if n, err := res.RowsAffected(); n != 1 || err != nil { | ||
return fmt.Errorf("failed to resign leadership: %d, %w", n, err) | ||
} | ||
return nil | ||
} | ||
|
||
func (e *Election) initializeLock(ctx context.Context) error { | ||
var leader string | ||
err := e.db.QueryRow( | ||
"SELECT leader FROM LeaderElection WHERE resource_id = ?", | ||
e.resourceID, | ||
).Scan(&leader) | ||
if errors.Is(err, sql.ErrNoRows) { | ||
_, err = e.db.Exec( | ||
"INSERT INTO LeaderElection (resource_id, leader, last_update) VALUES (?, ?, ?)", | ||
e.resourceID, "empty leader", time.Time{}, | ||
) | ||
} | ||
return err | ||
} | ||
|
||
type SqlFactory struct { | ||
db *sql.DB | ||
instanceID string | ||
opts []Option | ||
} | ||
|
||
var _ election2.Factory = (*SqlFactory)(nil) | ||
|
||
type Option func(*Election) *Election | ||
|
||
func NewFactory(instanceID string, database *sql.DB, opts ...Option) (*SqlFactory, error) { | ||
return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil | ||
} | ||
|
||
func WithElectionInterval(interval time.Duration) Option { | ||
return func(f *Election) *Election { | ||
f.electionInterval = interval | ||
return f | ||
} | ||
} | ||
|
||
// NewElection implements election2.Factory. | ||
func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) { | ||
// Ensure we have a database connection | ||
if f.db == nil { | ||
return nil, fmt.Errorf("no database connection") | ||
} | ||
if err := f.db.Ping(); err != nil { | ||
return nil, err | ||
} | ||
e := &Election{ | ||
db: f.db, | ||
instanceID: f.instanceID, | ||
resourceID: resourceID, | ||
leaderLock: sync.Cond{L: &sync.Mutex{}}, | ||
electionInterval: 1 * time.Second, | ||
} | ||
for _, opt := range f.opts { | ||
e = opt(e) | ||
} | ||
if err := e.initializeLock(ctx); err != nil { | ||
return nil, err | ||
} | ||
|
||
return e, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
-- MySQL / MariaDB version of the leader election schema | ||
|
||
-- We only have a single table called LeaderElection. It contains | ||
-- a row holding the current leader for each resource, as well as the | ||
-- timestamp that the election was acquired at (last_update). | ||
-- | ||
-- This is less an election than a mad scramble at the start, but once | ||
-- a leader has won the election, they remain in power until they | ||
-- resign or fail to update the last_update time for 10x the | ||
-- electionInterval, which should be coordinated across participants. | ||
-- This is extremely simple, and doesn't perform any sort of | ||
-- load-shedding or fairness at this layer. | ||
CREATE TABLE IF NOT EXISTS LeaderElection( | ||
resource_id VARCHAR(50) PRIMARY KEY, | ||
leader VARCHAR(300) NOT NULL, | ||
last_update TIMESTAMP NOT NULL | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not fantastic that we can now control this behavior with two different flags, but I also cannot thing of a better way forward for now. On the long run, we could / should probably deprecate forceMaster. No action required!