Skip to content

Commit

Permalink
Ignore error while reading table data in Schema.Engine reload (#13421) (
Browse files Browse the repository at this point in the history
#13424)

* ignore views load error



* added unit test



---------

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Jul 7, 2023
1 parent 99d39f9 commit b6755ae
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 84 deletions.
85 changes: 85 additions & 0 deletions go/event/syslogger/fake_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package syslogger

import (
"fmt"

"vitess.io/vitess/go/vt/log"
)

type loggerMsg struct {
msg string
level string
}
type TestLogger struct {
logs []loggerMsg
savedInfof func(format string, args ...any)
savedWarningf func(format string, args ...any)
savedErrorf func(format string, args ...any)
}

func NewTestLogger() *TestLogger {
tl := &TestLogger{
savedInfof: log.Infof,
savedWarningf: log.Warningf,
savedErrorf: log.Errorf,
}
log.Infof = tl.recordInfof
log.Warningf = tl.recordWarningf
log.Errorf = tl.recordErrorf
return tl
}

func (tl *TestLogger) Close() {
log.Infof = tl.savedInfof
log.Warningf = tl.savedWarningf
log.Errorf = tl.savedErrorf
}

func (tl *TestLogger) recordInfof(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
tl.logs = append(tl.logs, loggerMsg{msg, "INFO"})
tl.savedInfof(msg)
}

func (tl *TestLogger) recordWarningf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
tl.logs = append(tl.logs, loggerMsg{msg, "WARNING"})
tl.savedWarningf(msg)
}

func (tl *TestLogger) recordErrorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
tl.logs = append(tl.logs, loggerMsg{msg, "ERROR"})
tl.savedErrorf(msg)
}

func (tl *TestLogger) getLog() loggerMsg {
if len(tl.logs) > 0 {
return tl.logs[len(tl.logs)-1]
}
return loggerMsg{"no logs!", "ERROR"}
}

func (tl *TestLogger) GetAllLogs() []string {
var logs []string
for _, l := range tl.logs {
logs = append(logs, l.level+":"+l.msg)
}
return logs
}
57 changes: 1 addition & 56 deletions go/event/syslogger/syslogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"testing"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/vt/log"
)

type TestEvent struct {
Expand Down Expand Up @@ -63,60 +62,6 @@ func (fw *fakeWriter) Info(msg string) error { return fw.write(syslog.LOG_INF
func (fw *fakeWriter) Notice(msg string) error { return fw.write(syslog.LOG_NOTICE, msg) }
func (fw *fakeWriter) Warning(msg string) error { return fw.write(syslog.LOG_WARNING, msg) }

type loggerMsg struct {
msg string
level string
}
type testLogger struct {
logs []loggerMsg
savedInfof func(format string, args ...any)
savedWarningf func(format string, args ...any)
savedErrorf func(format string, args ...any)
}

func newTestLogger() *testLogger {
tl := &testLogger{
savedInfof: log.Infof,
savedWarningf: log.Warningf,
savedErrorf: log.Errorf,
}
log.Infof = tl.recordInfof
log.Warningf = tl.recordWarningf
log.Errorf = tl.recordErrorf
return tl
}

func (tl *testLogger) Close() {
log.Infof = tl.savedInfof
log.Warningf = tl.savedWarningf
log.Errorf = tl.savedErrorf
}

func (tl *testLogger) recordInfof(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
tl.logs = append(tl.logs, loggerMsg{msg, "INFO"})
tl.savedInfof(msg)
}

func (tl *testLogger) recordWarningf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
tl.logs = append(tl.logs, loggerMsg{msg, "WARNING"})
tl.savedWarningf(msg)
}

func (tl *testLogger) recordErrorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
tl.logs = append(tl.logs, loggerMsg{msg, "ERROR"})
tl.savedErrorf(msg)
}

func (tl *testLogger) getLog() loggerMsg {
if len(tl.logs) > 0 {
return tl.logs[len(tl.logs)-1]
}
return loggerMsg{"no logs!", "ERROR"}
}

// TestSyslog checks that our callback works.
func TestSyslog(t *testing.T) {
writer = &fakeWriter{}
Expand All @@ -132,7 +77,7 @@ func TestSyslog(t *testing.T) {
// TestBadWriter verifies we are still triggering (to normal logs) if
// the syslog connection failed
func TestBadWriter(t *testing.T) {
tl := newTestLogger()
tl := NewTestLogger()
defer tl.Close()

writer = nil
Expand Down
9 changes: 2 additions & 7 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -66,7 +65,7 @@ type Engine struct {
tables map[string]*Table
lastChange int64
reloadTime time.Duration
//the position at which the schema was last loaded. it is only used in conjunction with ReloadAt
// the position at which the schema was last loaded. it is only used in conjunction with ReloadAt
reloadAtPos mysql.Position
notifierMu sync.Mutex
notifiers map[string]notifier
Expand Down Expand Up @@ -413,7 +412,6 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
return err
}

rec := concurrency.AllErrorRecorder{}
// curTables keeps track of tables in the new snapshot so we can detect what was dropped.
curTables := map[string]bool{"dual": true}
// changedTables keeps track of tables that have changed so we can reload their pk info.
Expand Down Expand Up @@ -456,7 +454,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
log.V(2).Infof("Reading schema for table: %s", tableName)
table, err := LoadTable(conn, se.cp.DBName(), tableName, row[3].ToString())
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "in Engine.reload(), reading table %s", tableName))
log.Warningf("Failed reading schema for the table: %s, error: %v", tableName, err)
continue
}
if includeStats {
Expand All @@ -471,9 +469,6 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
created = append(created, tableName)
}
}
if rec.HasErrors() {
return rec.Error()
}

// Compute and handle dropped tables.
var dropped []string
Expand Down
48 changes: 27 additions & 21 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package schema

import (
"context"
"errors"
"expvar"
"fmt"
"net/http"
Expand All @@ -27,20 +28,19 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema/schematest"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
)

const baseShowTablesPattern = `SELECT t\.table_name.*`
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestOpenAndReload(t *testing.T) {
assert.Equal(t, int64(0), se.tableAllocatedSizeGauge.Counts()["msg"])
assert.Equal(t, int64(0), se.tableFileSizeGauge.Counts()["msg"])

//ReloadAt tests
// ReloadAt tests
pos1, err := mysql.DecodePosition("MariaDB/0-41983-20")
require.NoError(t, err)
pos2, err := mysql.DecodePosition("MariaDB/0-41983-40")
Expand Down Expand Up @@ -423,35 +423,41 @@ func TestOpenFailedDueToExecErr(t *testing.T) {
}
}

func TestOpenFailedDueToTableErr(t *testing.T) {
// TestOpenFailedDueToLoadTableErr tests that schema engine load should not fail instead should log the failures.
func TestOpenFailedDueToLoadTableErr(t *testing.T) {
tl := syslogger.NewTestLogger()
defer tl.Close()
db := fakesqldb.New(t)
defer db.Close()
schematest.AddDefaultQueries(db)
db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{
Fields: mysql.BaseShowTablesFields,
Rows: [][]sqltypes.Value{
mysql.BaseShowTablesRow("test_table", false, ""),
mysql.BaseShowTablesRow("test_view", true, "VIEW"),
},
})
db.MockQueriesForTable("test_table", &sqltypes.Result{
// this will cause NewTable error, as it expects zero rows.
Fields: []*querypb.Field{
{
Type: querypb.Type_VARCHAR,
},
},
Rows: [][]sqltypes.Value{
{sqltypes.NewVarBinary("")},
},
})
// this will cause NewTable error, as it expects zero rows.
db.MockQueriesForTable("test_table", sqltypes.MakeTestResult(sqltypes.MakeTestFields("foo", "varchar"), ""))

// adding column query for table_view
db.AddQueryPattern(fmt.Sprintf(mysql.GetColumnNamesQueryPatternForTable, "test_view"),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("column_name", "varchar"), ""))
// rejecting the impossible query
db.AddRejectedQuery("SELECT * FROM `fakesqldb`.`test_view` WHERE 1 != 1", errors.New("The user specified as a definer ('root'@'%') does not exist (errno 1449) (sqlstate HY000)"))

AddFakeInnoDBReadRowsResult(db, 0)
se := newEngine(10, 1*time.Second, 1*time.Second, db)
err := se.Open()
want := "Row count exceeded"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("se.Open: %v, want %s", err, want)
}
// failed load should not return any error, instead should be logged.
require.NoError(t, err)

logs := tl.GetAllLogs()
logOutput := strings.Join(logs, ":::")
assert.Contains(t, logOutput, "WARNING:Failed reading schema for the table: test_table")
assert.Contains(t, logOutput, "Row count exceeded")
assert.Contains(t, logOutput, "WARNING:Failed reading schema for the table: test_view")
assert.Contains(t, logOutput, "The user specified as a definer ('root'@'%') does not exist (errno 1449) (sqlstate HY000)")
}

func TestExportVars(t *testing.T) {
Expand Down

0 comments on commit b6755ae

Please sign in to comment.