Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL election module #3318

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cmd/trillian_log_signer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@
"github.com/google/trillian/util/election"
"github.com/google/trillian/util/election2"
etcdelect "github.com/google/trillian/util/election2/etcd"
mysqlElection "github.com/google/trillian/util/election2/mysql"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"k8s.io/klog/v2"

// Register supported storage providers.
_ "github.com/google/trillian/storage/cloudspanner"
_ "github.com/google/trillian/storage/crdb"
_ "github.com/google/trillian/storage/mysql"
"github.com/google/trillian/storage/mysql"

// Load quota providers
_ "github.com/google/trillian/quota/crdbqm"
Expand All @@ -71,6 +72,7 @@
numSeqFlag = flag.Int("num_sequencers", 10, "Number of sequencer workers to run in parallel")
sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing")
forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs")
electionBackend = flag.String("election_backend", "etcd", "Election backend to use. One of: mysql, etcd, noop")
etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under")
lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path")
healthzTimeout = flag.Duration("healthz_timeout", time.Second*5, "Timeout used during healthz checks")
Expand Down Expand Up @@ -143,13 +145,22 @@
instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid())
var electionFactory election2.Factory
switch {
case *forceMaster:
case *forceMaster || *electionBackend == "noop":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not fantastic that we can now control this behavior with two different flags, but I also cannot thing of a better way forward for now. On the long run, we could / should probably deprecate forceMaster. No action required!

klog.Warning("**** Acting as master for all logs ****")
electionFactory = election2.NoopFactory{}
case client != nil:
case client != nil && *electionBackend == "etcd":
electionFactory = etcdelect.NewFactory(instanceID, client, *lockDir)
case *storageSystem == "mysql" && *electionBackend == "mysql":
db, err := mysql.GetDatabase()
if err != nil {
klog.Exit("Failed to get MySQL database when reuested: %v", err)

Check failure on line 156 in cmd/trillian_log_signer/main.go

View workflow job for this annotation

GitHub Actions / lint

printf: k8s.io/klog/v2.Exit call has possible Printf formatting directive %v (govet)
}
electionFactory, err = mysqlElection.NewFactory(instanceID, db)
if err != nil {
klog.Exitf("Failed to create MySQL election factory: %v", err)
}
default:
klog.Exit("Either --force_master or --etcd_servers must be supplied")
klog.Exit("Either --force_master, --etcd_servers, or --election_backend=mysql must be supplied")
}

qm, err := quota.NewManager(*quotaSystem)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/letsencrypt/pkcs11key/v4 v4.0.0
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.20
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/pseudomuto/protoc-gen-doc v1.5.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0=
github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/miekg/pkcs11 v1.0.2/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
Expand Down
6 changes: 6 additions & 0 deletions scripts/resetdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Accepts environment variables:
(default: zaphod).
- MYSQL_USER_HOST: The host that the Trillian user will connect from; use '%' as
a wildcard (default: localhost).
- MYSQL_USE_ELECTION: If set to true, create election tables as well.
EOF
}

Expand All @@ -36,6 +37,7 @@ collect_vars() {
[ -z ${MYSQL_USER+x} ] && MYSQL_USER="test"
[ -z ${MYSQL_PASSWORD+x} ] && MYSQL_PASSWORD="zaphod"
[ -z ${MYSQL_USER_HOST+x} ] && MYSQL_USER_HOST="localhost"

FLAGS=()

# handle flags
Expand Down Expand Up @@ -85,6 +87,10 @@ main() {
die "Error: Failed to grant '${MYSQL_USER}' user all privileges on '${MYSQL_DATABASE}'."
mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/storage/mysql/schema/storage.sql || \
die "Error: Failed to create tables in '${MYSQL_DATABASE}' database."
if [[ "${MYSQL_USE_ELECTION}" = 'true' ]]; then
mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/util/election2/sql/election.sql || \
die "Error: Failed to create election tables in '${MYSQL_DATABASE}' database."
fi
echo "Reset Complete"
fi
}
Expand Down
280 changes: 280 additions & 0 deletions util/election2/mysql/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2023 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package mysql provides an implementation of leader election based on a SQL database.
package mysql

import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

"github.com/google/trillian/util/election2"
"k8s.io/klog/v2"
)

type leaderData struct {
currentLeader string
timestamp time.Time
}

// Election is an implementation of election2.Election based on a SQL database.
type Election struct {
db *sql.DB
instanceID string
resourceID string

currentLeader leaderData
leaderLock sync.Cond

// If a channel is supplied with the cancel, it will be signalled when the election routine has exited.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, cancel is always created by Await, and Resign always sends a message there? I'm not sure I understand what's conditional.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this before realizing that I could re-use Resign for Close, and I think one of those did not wait for the error in the first implementation. I can remove the comment, and change the type from *chan error to chan error probably...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha - makes sense to me to remove the comment then yes!

cancel chan *chan error
electionInterval time.Duration
}

var _ election2.Election = (*Election)(nil)

// Await implements election2.Election
func (e *Election) Await(ctx context.Context) error {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.cancel == nil {
e.cancel = make(chan *chan error)
go e.becomeLeaderLoop(context.Background(), e.cancel)
}
if e.currentLeader.currentLeader == e.instanceID {
return nil
}
for e.currentLeader.currentLeader != e.instanceID {
e.leaderLock.Wait()

select {
case <-ctx.Done():
return ctx.Err()
default:
klog.Infof("Waiting for leadership, %s is the leader at %s", e.currentLeader.currentLeader, e.currentLeader.timestamp)
}
}
klog.Infof("%s became leader for %s at %s", e.instanceID, e.resourceID, e.currentLeader.timestamp)
return nil
}

// Close implements election2.Election
func (e *Election) Close(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is essentially a wrapper around Resign, this election object could be re-used for a new election, which is slightly at odds with the interface definition. In practice, I don't think anything relies on this behaviour, and there aren't any resources that can be released, so I think it's safe. Does that match your understanding?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Do you want me to make the object unusable, to catch errors that might occur by calling other methods after Close?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can think of a way of an elegant way of doing this, sure thing. I don't think it's super super important, it's just some extra care to make sure it's aligned with the interface definition :)

if err := e.Resign(ctx); err != nil {
klog.Errorf("Failed to resign leadership: %v", err)
return err
}
return nil
}

// Resign implements election2.Election
func (e *Election) Resign(ctx context.Context) error {
e.leaderLock.L.Lock()
closer := e.cancel
e.cancel = nil
e.leaderLock.L.Unlock()
if closer == nil {
return nil
}
// Stop trying to elect ourselves
done := make(chan error)
closer <- &done
return <-done
}

// WithMastership implements election2.Election
func (e *Election) WithMastership(ctx context.Context) (context.Context, error) {
Comment on lines +100 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the inclusivity and diversity in mind, may I suggest replacing master with another inclusive term?

https://developers.google.com/style/word-list#master

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rewrite the election2.Election interface if you'd like. 😁

Otherwise, I think those are the only two instances of the word "master" rather than "leader".

cctx, cancel := context.WithCancel(ctx)
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.currentLeader.currentLeader != e.instanceID {
// Not the leader, cancel
cancel()
return cctx, nil
}

// Start a goroutine to cancel the context when we are no longer leader
go func() {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
for e.currentLeader.currentLeader == e.instanceID {
e.leaderLock.Wait()
}
select {
case <-ctx.Done():
// Don't complain if our context already completed.
return
default:
cancel()
klog.Warningf("%s cancelled: lost leadership, %s is the leader at %s", e.resourceID, e.currentLeader.currentLeader, e.currentLeader.timestamp)
}
}()

return cctx, nil
}

// becomeLeaderLoop runs continuously to participate in elections until a message is sent on `cancel`
func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error) {
for {
select {
case ch := <-closer:
err := e.tearDown()
klog.Infof("Election teardown for %s: %v", e.resourceID, err)
if ch != nil {
*ch <- err
}
return
default:
leader, err := e.tryBecomeLeader(ctx)
if err != nil {
klog.Errorf("Failed attempt to become leader for %s, retrying: %v", e.resourceID, err)
} else {
e.leaderLock.L.Lock()
if leader != e.currentLeader {
// Note: this code does not actually care _which_ instance was
// elected, it sends notifications on each leadership change.
e.currentLeader = leader
e.leaderLock.Broadcast()
}
e.leaderLock.L.Unlock()
}
time.Sleep(e.electionInterval)
}
}
}

func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) {
leader := leaderData{}
tx, err := e.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return leader, fmt.Errorf("BeginTX: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil {
klog.Errorf("Rollback failed: %v", err)
}
}()
row := tx.QueryRow(
"SELECT leader, last_update FROM LeaderElection WHERE resource_id = ?",
e.resourceID)
if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil {
return leader, fmt.Errorf("Select: %w", err)
}

if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) {
return leader, nil // Someone else won the election
}

timestamp := time.Now()
_, err = tx.Exec(
"UPDATE LeaderElection SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?",
e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp)
if err != nil {
return leader, fmt.Errorf("Update: %w", err)
}

if err := tx.Commit(); err != nil {
return leader, fmt.Errorf("Commit failed: %w", err)
}
leader = leaderData{currentLeader: e.instanceID, timestamp: timestamp}
return leader, nil
}

func (e *Election) tearDown() error {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.currentLeader.currentLeader != e.instanceID {
return nil
}
e.currentLeader.currentLeader = "empty leader"
e.leaderLock.Broadcast()

// Reset election time to epoch to allow a faster fail-over
res, err := e.db.Exec(
"UPDATE LeaderElection SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?",
time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp)
if err != nil {
return fmt.Errorf("Update: %w", err)
}
if n, err := res.RowsAffected(); n != 1 || err != nil {
return fmt.Errorf("failed to resign leadership: %d, %w", n, err)
}
return nil
}

func (e *Election) initializeLock(ctx context.Context) error {
var leader string
err := e.db.QueryRow(
"SELECT leader FROM LeaderElection WHERE resource_id = ?",
e.resourceID,
).Scan(&leader)
if errors.Is(err, sql.ErrNoRows) {
_, err = e.db.Exec(
"INSERT INTO LeaderElection (resource_id, leader, last_update) VALUES (?, ?, ?)",
e.resourceID, "empty leader", time.Time{},
)
}
return err
}

type SqlFactory struct {
db *sql.DB
instanceID string
opts []Option
}

var _ election2.Factory = (*SqlFactory)(nil)

type Option func(*Election) *Election

func NewFactory(instanceID string, database *sql.DB, opts ...Option) (*SqlFactory, error) {
return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil
}

func WithElectionInterval(interval time.Duration) Option {
return func(f *Election) *Election {
f.electionInterval = interval
return f
}
}

// NewElection implements election2.Factory.
func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) {
// Ensure we have a database connection
if f.db == nil {
return nil, fmt.Errorf("no database connection")
}
if err := f.db.Ping(); err != nil {
return nil, err
}
e := &Election{
db: f.db,
instanceID: f.instanceID,
resourceID: resourceID,
leaderLock: sync.Cond{L: &sync.Mutex{}},
electionInterval: 1 * time.Second,
}
for _, opt := range f.opts {
e = opt(e)
}
if err := e.initializeLock(ctx); err != nil {
return nil, err
}

return e, nil
}
17 changes: 17 additions & 0 deletions util/election2/mysql/election.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- MySQL / MariaDB version of the leader election schema

-- We only have a single table called LeaderElection. It contains
-- a row holding the current leader for each resource, as well as the
-- timestamp that the election was acquired at (last_update).
--
-- This is less an election than a mad scramble at the start, but once
-- a leader has won the election, they remain in power until they
-- resign or fail to update the last_update time for 10x the
-- electionInterval, which should be coordinated across participants.
-- This is extremely simple, and doesn't perform any sort of
-- load-shedding or fairness at this layer.
CREATE TABLE IF NOT EXISTS LeaderElection(
resource_id VARCHAR(50) PRIMARY KEY,
leader VARCHAR(300) NOT NULL,
last_update TIMESTAMP NOT NULL
);
Loading
Loading