Skip to content

Commit

Permalink
Merge pull request #7831 from planetscale/rn-vr-log2
Browse files Browse the repository at this point in the history
VReplication: add table for logging stream errors, state changes and key workflow steps
  • Loading branch information
rohit-nayak-ps authored Apr 27, 2021
2 parents 37c09d3 + f57350c commit 01fb7e5
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 7 deletions.
14 changes: 14 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package binlogplayer

import (
"regexp"
"strings"
"testing"
"time"

Expand All @@ -35,6 +36,7 @@ type MockDBClient struct {
expect []*mockExpect
currentResult int
done chan struct{}
invariants map[string]*sqltypes.Result
}

type mockExpect struct {
Expand All @@ -50,6 +52,11 @@ func NewMockDBClient(t *testing.T) *MockDBClient {
t: t,
UName: mockClientUNameFiltered,
done: make(chan struct{}),
invariants: map[string]*sqltypes.Result{
"CREATE TABLE IF NOT EXISTS _vt.vreplication_log": {},
"select id, type, state, message from _vt.vreplication_log": {},
"insert into _vt.vreplication_log": {},
},
}
}

Expand Down Expand Up @@ -142,6 +149,13 @@ func (dc *MockDBClient) Close() {
func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
dc.t.Helper()
dc.t.Logf("DBClient query: %v", query)

for q, result := range dc.invariants {
if strings.Contains(query, q) {
return result, nil
}
}

if dc.currentResult >= len(dc.expect) {
dc.t.Fatalf("DBClientMock: query: %s, no more requests are expected", query)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) {

func buildSelectPlan(sel *sqlparser.Select) (*controllerPlan, error) {
switch sqlparser.String(sel.From) {
case vreplicationTableName, reshardingJournalTableName, copyStateTableName:
case vreplicationTableName, reshardingJournalTableName, copyStateTableName, vreplicationLogTableName:
return &controllerPlan{
opcode: selectQuery,
}, nil
Expand Down
12 changes: 11 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func init() {
allddls := append([]string{}, binlogplayer.CreateVReplicationTable()...)
allddls = append(allddls, binlogplayer.AlterVReplicationTable...)
allddls = append(allddls, createReshardingJournalTable, createCopyState)
allddls = append(allddls, createVReplicationLog)
withDDL = withddl.New(allddls)
}

Expand Down Expand Up @@ -342,7 +343,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
}

dbClient := vre.getDBClient(runAsAdmin)

vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
if err := dbClient.Connect(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -379,6 +380,9 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}
vre.controllers[id] = ct
if err := insertLogWithParams(vdbc, LogStreamCreate, uint32(id), params); err != nil {
return nil, err
}
}
return qr, nil
case updateQuery:
Expand Down Expand Up @@ -417,6 +421,9 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}
vre.controllers[id] = ct
if err := insertLog(vdbc, LogStateChange, uint32(id), params["state"], ""); err != nil {
return nil, err
}
}
return qr, nil
case deleteQuery:
Expand All @@ -433,6 +440,9 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
ct.Stop()
delete(vre.controllers, id)
}
if err := insertLogWithParams(vdbc, LogStreamDelete, uint32(id), nil); err != nil {
return nil, err
}
}
if err := dbClient.Begin(); err != nil {
return nil, err
Expand Down
28 changes: 26 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func TestMain(m *testing.M) {
return 1
}

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createVReplicationLog); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}

return m.Run()
}()
os.Exit(exitCode)
Expand Down Expand Up @@ -459,6 +464,18 @@ func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan in
}
}

func shouldIgnoreQuery(query string) bool {
queriesToIgnore := []string{
"_vt.vreplication_log", // ignore all selects, updates and inserts into this table
}
for _, q := range queriesToIgnore {
if strings.Contains(query, q) {
return true
}
}
return heartbeatRe.MatchString(query)
}

func expectDBClientQueries(t *testing.T, queries []string) {
t.Helper()
failed := false
Expand All @@ -473,7 +490,7 @@ func expectDBClientQueries(t *testing.T, queries []string) {
case got = <-globalDBQueries:
// We rule out heartbeat time update queries because otherwise our query list
// is indeterminable and varies with each test execution.
if heartbeatRe.MatchString(got) {
if shouldIgnoreQuery(got) {
goto retry
}

Expand All @@ -498,6 +515,9 @@ func expectDBClientQueries(t *testing.T, queries []string) {
for {
select {
case got := <-globalDBQueries:
if shouldIgnoreQuery(got) {
continue
}
t.Errorf("unexpected query: %s", got)
default:
return
Expand All @@ -520,7 +540,8 @@ func expectNontxQueries(t *testing.T, queries []string) {
retry:
select {
case got = <-globalDBQueries:
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) {
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") ||
shouldIgnoreQuery(got) {
goto retry
}

Expand All @@ -546,6 +567,9 @@ func expectNontxQueries(t *testing.T, queries []string) {
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "_vt.vreplication") {
continue
}
if shouldIgnoreQuery(got) {
continue
}
t.Errorf("unexpected query: %s", got)
default:
return
Expand Down
123 changes: 123 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"context"
"encoding/json"
"fmt"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)

const (
vreplicationLogTableName = "_vt.vreplication_log"
createVReplicationLog = `CREATE TABLE IF NOT EXISTS _vt.vreplication_log (
id BIGINT(20) AUTO_INCREMENT,
vrepl_id INT NOT NULL,
type VARBINARY(256) NOT NULL,
state VARBINARY(100) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
message text NOT NULL,
count BIGINT(20) NOT NULL DEFAULT 1,
PRIMARY KEY (id))`
)

const (
// Enum values for type column of _vt.vreplication_log

// LogStreamCreate is used when a row in _vt.vreplication is inserted via VReplicationExec
LogStreamCreate = "Stream Created"
// LogStreamUpdate is used when a row in _vt.vreplication is updated via VReplicationExec
LogStreamUpdate = "Stream Updated"
// LogStreamDelete is used when a row in _vt.vreplication is deleted via VReplicationExec
LogStreamDelete = "Stream Deleted"
// LogMessage is used for generic log messages
LogMessage = "Message"
// LogCopyStart is used when the copy phase is started
LogCopyStart = "Started Copy Phase"
// LogCopyEnd is used when the copy phase is done
LogCopyEnd = "Ended Copy Phase"
// LogStateChange is used when the state of the stream changes
LogStateChange = "State Changed"

// TODO: LogError is not used atm. Currently irrecoverable errors, resumable errors and informational messages
// are all treated the same: the message column is updated and state left as Running.
// Every five seconds we reset the message and retry streaming so that we can automatically resume from a temporary
// loss of connectivity or a reparent. We need to detect if errors are not recoverable and set an Error status.
// Since this device (of overloading the message) is strewn across the code, and incorrectly flagging resumable
// errors can stall workflows, we have deferred implementing it.

// LogError indicates that there is an error from which we cannot recover and the operator needs to intervene.
LogError = "Error"
)

func getLastLog(dbClient *vdbClient, vreplID uint32) (int64, string, string, string, error) {
var qr *sqltypes.Result
var err error
query := fmt.Sprintf("select id, type, state, message from _vt.vreplication_log where vrepl_id = %d order by id desc limit 1", vreplID)
if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch); err != nil {
return 0, "", "", "", err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 4 {
return 0, "", "", "", nil
}
row := qr.Rows[0]
id, _ := evalengine.ToInt64(row[0])
typ := row[1].ToString()
state := row[2].ToString()
message := row[3].ToString()
return id, typ, state, message, nil
}

func insertLog(dbClient *vdbClient, typ string, vreplID uint32, state, message string) error {
var query string

// getLastLog returns the last log for a stream. During insertion, if the id/type/state/message match we do not insert
// a new log but increment the count. This prevents spamming of the log table in case the same message is logged continuously.
id, currentType, currentState, currentMessage, err := getLastLog(dbClient, vreplID)
if err != nil {
return err
}

if id > 0 && typ == currentType && state == currentState && message == currentMessage {
query = fmt.Sprintf("update _vt.vreplication_log set count = count + 1 where id = %d", id)
} else {
query = `insert into _vt.vreplication_log(vrepl_id, type, state, message) values(%d, '%s', '%s', %s)`
query = fmt.Sprintf(query, vreplID, typ, state, encodeString(message))
}

if _, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch); err != nil {
return fmt.Errorf("could not insert into log table: %v: %v", query, err)
}
return nil
}

// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string
func insertLogWithParams(dbClient *vdbClient, action string, vreplID uint32, params map[string]string) error {
var message string
if params != nil {
obj, _ := json.Marshal(params)
message = string(obj)
}
if err := insertLog(dbClient, action, vreplID, params["state"], message); err != nil {
return err
}
return nil
}
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
if err := vc.vr.setState(binlogplayer.VReplicationCopying, ""); err != nil {
return err
}
if err := vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase started for %d table(s)",
len(plan.TargetTables))); err != nil {
return err
}
} else {
if err := vc.vr.setState(binlogplayer.BlpStopped, "There is nothing to replicate"); err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,27 @@ func TestPlayerCopyTables(t *testing.T) {
})
expectData(t, "yes", [][]string{})
validateCopyRowCountStat(t, 2)
ctx, cancel := context.WithCancel(context.Background())

type logTestCase struct {
name string
typ string
}
testCases := []logTestCase{
{name: "Check log for start of copy", typ: "LogCopyStarted"},
{name: "Check log for end of copy", typ: "LogCopyEnded"},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query = fmt.Sprintf("select count(*) from _vt.vreplication_log where type = '%s'", testCase.typ)
qr, err := env.Mysqld.FetchSuperQuery(ctx, query)
require.NoError(t, err)
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
})
}
cancel()

}

// TestPlayerCopyBigTable ensures the copy-catchup back-and-forth loop works correctly.
Expand Down
26 changes: 26 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2456,6 +2456,32 @@ func TestPlayerJSONDocs(t *testing.T) {
}
}

func TestVReplicationLogs(t *testing.T) {
defer deleteTablet(addTablet(100))
dbClient := playerEngine.dbClientFactoryDba()
err := dbClient.Connect()
require.NoError(t, err)
defer dbClient.Close()
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
query := "select vrepl_id, state, message, count from _vt.vreplication_log order by id desc limit 1"

expected := []string{
"[[INT32(1) VARBINARY(\"Running\") TEXT(\"message1\") INT64(1)]]",
"[[INT32(1) VARBINARY(\"Running\") TEXT(\"message1\") INT64(2)]]",
}

for _, want := range expected {
t.Run("", func(t *testing.T) {
err = insertLog(vdbc, LogMessage, 1, "Running", "message1")
require.NoError(t, err)
qr, err := env.Mysqld.FetchSuperQuery(context.Background(), query)
require.NoError(t, err)
require.Equal(t, want, fmt.Sprintf("%v", qr.Rows))
})

}
}

func expectJSON(t *testing.T, table string, values [][]string, id int, exec func(ctx context.Context, query string) (*sqltypes.Result, error)) {
t.Helper()

Expand Down
Loading

0 comments on commit 01fb7e5

Please sign in to comment.