diff --git a/config/orchestrator/default.json b/config/orchestrator/default.json new file mode 100644 index 00000000000..c68029c540e --- /dev/null +++ b/config/orchestrator/default.json @@ -0,0 +1,12 @@ +{ + "Debug": true, + "MySQLTopologyUser": "orc_client_user", + "MySQLTopologyPassword": "orc_client_user_password", + "MySQLReplicaUser": "vt_repl", + "MySQLReplicaPassword": "", + "BackendDB": "sqlite", + "SQLite3DataFile": "/home/sougou/dev/src/vitess.io/vitess/vtdataroot/orchestrator.sqlite3", + "RecoverMasterClusterFilters": ["*"], + "RecoveryPeriodBlockSeconds": 1, + "DelayMasterPromotionIfSQLThreadNotUpToDate": true +} diff --git a/go/cmd/orchestrator/main.go b/go/cmd/orchestrator/main.go index 85b828f7ceb..cd7e70c5831 100644 --- a/go/cmd/orchestrator/main.go +++ b/go/cmd/orchestrator/main.go @@ -31,6 +31,7 @@ var AppVersion, GitCommit string // main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. func main() { + // TODO(sougou): change this to use vitess servenv framework configFile := flag.String("config", "", "config file name") command := flag.String("c", "", "command, required. See full list of commands via 'orchestrator -c help'") strict := flag.Bool("strict", false, "strict mode (more checks, slower)") @@ -57,7 +58,6 @@ func main() { config.RuntimeCLIFlags.Statement = flag.String("statement", "", "Statement/hint") config.RuntimeCLIFlags.GrabElection = flag.Bool("grab-election", false, "Grab leadership (only applies to continuous mode)") config.RuntimeCLIFlags.PromotionRule = flag.String("promotion-rule", "prefer", "Promotion rule for register-andidate (prefer|neutral|prefer_not|must_not)") - config.RuntimeCLIFlags.Version = flag.Bool("version", false, "Print version and exit") config.RuntimeCLIFlags.SkipContinuousRegistration = flag.Bool("skip-continuous-registration", false, "Skip cli commands performaing continuous registration (to reduce orchestratrator backend db load") config.RuntimeCLIFlags.EnableDatabaseUpdate = flag.Bool("enable-database-update", false, "Enable database update, overrides SkipOrchestratorDatabaseUpdate") config.RuntimeCLIFlags.IgnoreRaftSetup = flag.Bool("ignore-raft-setup", false, "Override RaftEnabled for CLI invocation (CLI by default not allowed for raft setups). NOTE: operations by CLI invocation may not reflect in all raft nodes.") @@ -91,11 +91,6 @@ func main() { if *stack { log.SetPrintStackTrace(*stack) } - if *config.RuntimeCLIFlags.Version { - fmt.Println(AppVersion) - fmt.Println(GitCommit) - return - } startText := "starting orchestrator" if AppVersion != "" { diff --git a/go/cmd/orchestrator/plugin_consultopo.go b/go/cmd/orchestrator/plugin_consultopo.go new file mode 100644 index 00000000000..59d6774fdbc --- /dev/null +++ b/go/cmd/orchestrator/plugin_consultopo.go @@ -0,0 +1,23 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// This plugin imports consultopo to register the consul implementation of TopoServer. + +import ( + _ "vitess.io/vitess/go/vt/topo/consultopo" +) diff --git a/go/cmd/orchestrator/plugin_etcd2topo.go b/go/cmd/orchestrator/plugin_etcd2topo.go new file mode 100644 index 00000000000..d99ef51d4af --- /dev/null +++ b/go/cmd/orchestrator/plugin_etcd2topo.go @@ -0,0 +1,23 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// This plugin imports etcd2topo to register the etcd2 implementation of TopoServer. + +import ( + _ "vitess.io/vitess/go/vt/topo/etcd2topo" +) diff --git a/go/cmd/orchestrator/plugin_grpctmclient.go b/go/cmd/orchestrator/plugin_grpctmclient.go new file mode 100644 index 00000000000..ce554da96df --- /dev/null +++ b/go/cmd/orchestrator/plugin_grpctmclient.go @@ -0,0 +1,23 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// Imports and register the gRPC tabletmanager client + +import ( + _ "vitess.io/vitess/go/vt/vttablet/grpctmclient" +) diff --git a/go/cmd/orchestrator/plugin_kubernetestopo.go b/go/cmd/orchestrator/plugin_kubernetestopo.go new file mode 100644 index 00000000000..671d0c8321f --- /dev/null +++ b/go/cmd/orchestrator/plugin_kubernetestopo.go @@ -0,0 +1,23 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// This plugin imports k8stopo to register the kubernetes implementation of TopoServer. + +import ( + _ "vitess.io/vitess/go/vt/topo/k8stopo" +) diff --git a/go/cmd/orchestrator/plugin_zk2topo.go b/go/cmd/orchestrator/plugin_zk2topo.go new file mode 100644 index 00000000000..ebf385ec1af --- /dev/null +++ b/go/cmd/orchestrator/plugin_zk2topo.go @@ -0,0 +1,23 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// Imports and register the zk2 TopologyServer + +import ( + _ "vitess.io/vitess/go/vt/topo/zk2topo" +) diff --git a/go/vt/orchestrator/app/http.go b/go/vt/orchestrator/app/http.go index 8a81794807d..96cdf606057 100644 --- a/go/vt/orchestrator/app/http.go +++ b/go/vt/orchestrator/app/http.go @@ -109,11 +109,11 @@ func standardHttp(continuousDiscovery bool) { m.Use(gzip.All()) // Render html templates from templates directory m.Use(render.Renderer(render.Options{ - Directory: "resources", + Directory: "web/orchestrator", Layout: "templates/layout", HTMLContentType: "text/html", })) - m.Use(martini.Static("resources/public", martini.StaticOptions{Prefix: config.Config.URLPrefix})) + m.Use(martini.Static("web/orchestrator/public", martini.StaticOptions{Prefix: config.Config.URLPrefix})) if config.Config.UseMutualTLS { m.Use(ssl.VerifyOUs(config.Config.SSLValidOUs)) } diff --git a/go/vt/orchestrator/config/config.go b/go/vt/orchestrator/config/config.go index 81c14aea867..ecf2838bf03 100644 --- a/go/vt/orchestrator/config/config.go +++ b/go/vt/orchestrator/config/config.go @@ -65,6 +65,7 @@ const ( // Configuration makes for orchestrator configuration input, which can be provided by user via JSON formatted file. // Some of the parameteres have reasonable default values, and some (like database credentials) are // strictly expected from user. +// TODO(sougou): change this to yaml parsing, and possible merge with tabletenv. type Configuration struct { Debug bool // set debug mode (similar to --debug option) EnableSyslog bool // Should logs be directed (in addition) to syslog daemon? @@ -74,6 +75,8 @@ type Configuration struct { AgentsServerPort string // port orchestrator agents talk back to MySQLTopologyUser string MySQLTopologyPassword string + MySQLReplicaUser string // If set, use this credential instead of discovering from mysql. TODO(sougou): deprecate this in favor of fetching from vttablet + MySQLReplicaPassword string MySQLTopologyCredentialsConfigFile string // my.cnf style configuration file from where to pick credentials. Expecting `user`, `password` under `[client]` section MySQLTopologySSLPrivateKeyFile string // Private key file used to authenticate with a Topology mysql instance with TLS MySQLTopologySSLCertFile string // Certificate PEM file used to authenticate with a Topology mysql instance with TLS diff --git a/go/vt/orchestrator/db/generate_base.go b/go/vt/orchestrator/db/generate_base.go index ee99371f711..a2cdb3fad55 100644 --- a/go/vt/orchestrator/db/generate_base.go +++ b/go/vt/orchestrator/db/generate_base.go @@ -851,4 +851,18 @@ var generateSQLBase = []string{ ` CREATE INDEX first_seen_idx_database_instance_stale_binlog_coordinates ON database_instance_stale_binlog_coordinates (first_seen) `, + ` + CREATE TABLE IF NOT EXISTS vitess_tablet ( + hostname varchar(128) CHARACTER SET ascii NOT NULL, + port smallint(5) unsigned NOT NULL, + cell varchar(128) CHARACTER SET ascii NOT NULL, + tablet_type smallint(5) NOT NULL, + master_timestamp timestamp NOT NULL, + info varchar(512) CHARACTER SET ascii NOT NULL, + PRIMARY KEY (hostname, port) + ) ENGINE=InnoDB DEFAULT CHARSET=ascii + `, + ` + CREATE INDEX cell_idx_vitess_tablet ON vitess_tablet (cell) + `, } diff --git a/go/vt/orchestrator/external/golib/log/log.go b/go/vt/orchestrator/external/golib/log/log.go index 26d33c99736..92f8256e431 100644 --- a/go/vt/orchestrator/external/golib/log/log.go +++ b/go/vt/orchestrator/external/golib/log/log.go @@ -21,8 +21,12 @@ import ( "fmt" "log/syslog" "os" + "runtime" "runtime/debug" + "strings" "time" + + "vitess.io/vitess/go/vt/log" ) // LogLevel indicates the severity of a log entry @@ -123,6 +127,11 @@ func SetSyslogLevel(logLevel LogLevel) { // logFormattedEntry nicely formats and emits a log entry func logFormattedEntry(logLevel LogLevel, message string, args ...interface{}) string { + return logDepth(logLevel, 0, message, args...) +} + +// logFormattedEntry nicely formats and emits a log entry +func logDepth(logLevel LogLevel, depth int, message string, args ...interface{}) string { if logLevel > globalLogLevel { return "" } @@ -137,7 +146,8 @@ func logFormattedEntry(logLevel LogLevel, message string, args ...interface{}) s } msgArgs := fmt.Sprintf(message, args...) - entryString := fmt.Sprintf("%s %s %s", localizedTime.Format(TimeFormat), logLevel, msgArgs) + sourceFile, pos := callerPos(depth) + entryString := fmt.Sprintf("%s %8s %s:%d] %s", localizedTime.Format(TimeFormat), logLevel, sourceFile, pos, msgArgs) fmt.Fprintln(os.Stderr, entryString) if syslogWriter != nil { @@ -167,13 +177,27 @@ func logFormattedEntry(logLevel LogLevel, message string, args ...interface{}) s return entryString } +func callerPos(depth int) (string, int) { + _, file, line, ok := runtime.Caller(4 + depth) + if !ok { + file = "???" + line = 1 + } else { + slash := strings.LastIndex(file, "/") + if slash >= 0 { + file = file[slash+1:] + } + } + return file, line +} + // logEntry emits a formatted log entry func logEntry(logLevel LogLevel, message string, args ...interface{}) string { entryString := message for _, s := range args { entryString += fmt.Sprintf(" %s", s) } - return logFormattedEntry(logLevel, entryString) + return logDepth(logLevel, 1, entryString) } // logErrorEntry emits a log entry based on given error object @@ -199,7 +223,8 @@ func Debugf(message string, args ...interface{}) string { } func Info(message string, args ...interface{}) string { - return logEntry(INFO, message, args...) + log.Infof(message, args...) + return fmt.Sprintf(message, args...) } func Infof(message string, args ...interface{}) string { @@ -227,7 +252,8 @@ func Error(message string, args ...interface{}) error { } func Errorf(message string, args ...interface{}) error { - return errors.New(logFormattedEntry(ERROR, message, args...)) + log.Infof(message, args...) + return fmt.Errorf(message, args...) } func Errore(err error) error { diff --git a/go/vt/orchestrator/inst/analysis.go b/go/vt/orchestrator/inst/analysis.go index 3b1ea3d5b79..e0524660078 100644 --- a/go/vt/orchestrator/inst/analysis.go +++ b/go/vt/orchestrator/inst/analysis.go @@ -20,8 +20,10 @@ import ( "encoding/json" "fmt" "strings" + "time" "vitess.io/vitess/go/vt/orchestrator/config" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) type AnalysisCode string @@ -29,10 +31,17 @@ type StructureAnalysisCode string const ( NoProblem AnalysisCode = "NoProblem" + ClusterHasNoMaster AnalysisCode = "ClusterHasNoMaster" DeadMasterWithoutReplicas AnalysisCode = "DeadMasterWithoutReplicas" DeadMaster AnalysisCode = "DeadMaster" DeadMasterAndReplicas AnalysisCode = "DeadMasterAndReplicas" DeadMasterAndSomeReplicas AnalysisCode = "DeadMasterAndSomeReplicas" + MasterHasMaster AnalysisCode = "MasterHasMaster" + MasterIsReadOnly AnalysisCode = "MasterIsReadOnly" + ReplicaIsWritable AnalysisCode = "ReplicaIsWritable" + NotConnectedToMaster AnalysisCode = "NotConnectedToMaster" + ConnectedToWrongMaster AnalysisCode = "ConnectedToWrongMaster" + ReplicationStopped AnalysisCode = "ReplicationStopped" UnreachableMasterWithLaggingReplicas AnalysisCode = "UnreachableMasterWithLaggingReplicas" UnreachableMaster AnalysisCode = "UnreachableMaster" MasterSingleReplicaNotReplicating AnalysisCode = "MasterSingleReplicaNotReplicating" @@ -116,12 +125,16 @@ const ( type ReplicationAnalysis struct { AnalyzedInstanceKey InstanceKey AnalyzedInstanceMasterKey InstanceKey + TabletType topodatapb.TabletType + MasterTimeStamp time.Time + SuggestedClusterAlias string ClusterDetails ClusterInfo AnalyzedInstanceDataCenter string AnalyzedInstanceRegion string AnalyzedInstancePhysicalEnvironment string AnalyzedInstanceBinlogCoordinates BinlogCoordinates IsMaster bool + IsClusterMaster bool IsCoMaster bool LastCheckValid bool LastCheckPartialSuccess bool @@ -134,6 +147,7 @@ type ReplicationAnalysis struct { Replicas InstanceKeyMap SlaveHosts InstanceKeyMap // for backwards compatibility. Equals `Replicas` IsFailingToConnectToMaster bool + ReplicationStopped bool Analysis AnalysisCode Description string StructureAnalysis []StructureAnalysisCode diff --git a/go/vt/orchestrator/inst/analysis_dao.go b/go/vt/orchestrator/inst/analysis_dao.go index 70e14e4b123..42e6cbf1302 100644 --- a/go/vt/orchestrator/inst/analysis_dao.go +++ b/go/vt/orchestrator/inst/analysis_dao.go @@ -25,7 +25,10 @@ import ( "vitess.io/vitess/go/vt/orchestrator/db" "vitess.io/vitess/go/vt/orchestrator/process" "vitess.io/vitess/go/vt/orchestrator/util" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "github.com/golang/protobuf/proto" "github.com/patrickmn/go-cache" "github.com/rcrowley/go-metrics" "vitess.io/vitess/go/vt/orchestrator/external/golib/log" @@ -50,82 +53,24 @@ func initializeAnalysisDaoPostConfiguration() { recentInstantAnalysis = cache.New(time.Duration(config.RecoveryPollSeconds*2)*time.Second, time.Second) } +type clusterAnalysis struct { + hasClusterwideAction bool + masterKey *InstanceKey +} + // GetReplicationAnalysis will check for replication problems (dead master; unreachable master; etc) func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) { result := []ReplicationAnalysis{} + // TODO(sougou); deprecate ReduceReplicationAnalysisCount args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName) - analysisQueryReductionClause := `` - - if config.Config.ReduceReplicationAnalysisCount { - analysisQueryReductionClause = ` - HAVING - ( - MIN( - master_instance.last_checked <= master_instance.last_seen - and master_instance.last_attempted_check <= master_instance.last_seen + interval ? second - ) = 1 - /* AS is_last_check_valid */ - ) = 0 - OR ( - IFNULL( - SUM( - replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.slave_io_running = 0 - AND replica_instance.last_io_error like '%error %connecting to master%' - AND replica_instance.slave_sql_running = 1 - ), - 0 - ) - /* AS count_replicas_failing_to_connect_to_master */ - > 0 - ) - OR ( - IFNULL( - SUM( - replica_instance.last_checked <= replica_instance.last_seen - ), - 0 - ) - /* AS count_valid_replicas */ - < COUNT(replica_instance.server_id) - /* AS count_replicas */ - ) - OR ( - IFNULL( - SUM( - replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.slave_io_running != 0 - AND replica_instance.slave_sql_running != 0 - ), - 0 - ) - /* AS count_valid_replicating_replicas */ - < COUNT(replica_instance.server_id) - /* AS count_replicas */ - ) - OR ( - MIN( - master_instance.slave_sql_running = 1 - AND master_instance.slave_io_running = 0 - AND master_instance.last_io_error like '%error %connecting to master%' - ) - /* AS is_failing_to_connect_to_master */ - ) - OR ( - COUNT(replica_instance.server_id) - /* AS count_replicas */ - > 0 - ) - ` - args = append(args, ValidSecondsFromSeenToLastAttemptedCheck()) - } - // "OR count_replicas > 0" above is a recent addition, which, granted, makes some previous conditions redundant. - // It gives more output, and more "NoProblem" messages that I am now interested in for purpose of auditing in database_instance_analysis_changelog - query := fmt.Sprintf(` + query := ` SELECT - master_instance.hostname, - master_instance.port, + vitess_tablet.info AS tablet_info, + vitess_tablet.hostname, + vitess_tablet.port, + vitess_tablet.tablet_type, + vitess_tablet.master_timestamp, master_instance.read_only AS read_only, MIN(master_instance.data_center) AS data_center, MIN(master_instance.region) AS region, @@ -135,6 +80,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) MIN(master_instance.cluster_name) AS cluster_name, MIN(master_instance.binary_log_file) AS binary_log_file, MIN(master_instance.binary_log_pos) AS binary_log_pos, + MIN(master_instance.suggested_cluster_alias) AS suggested_cluster_alias, MIN( IFNULL( master_instance.binary_log_file = database_instance_stale_binlog_coordinates.binary_log_file @@ -219,6 +165,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) AND master_instance.slave_io_running = 0 AND master_instance.last_io_error like '%%error %%connecting to master%%' ) AS is_failing_to_connect_to_master, + MIN( + master_instance.slave_sql_running = 0 + AND master_instance.slave_io_running = 0 + ) AS replication_stopped, MIN( master_downtime.downtime_active is not null and ifnull(master_downtime.end_timestamp, now()) > now() @@ -351,7 +301,11 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) AND replica_instance.log_slave_updates then replica_instance.major_version else NULL end ) AS count_distinct_logging_major_versions FROM - database_instance master_instance + vitess_tablet + LEFT JOIN database_instance master_instance ON ( + vitess_tablet.hostname = master_instance.hostname + AND vitess_tablet.port = master_instance.port + ) LEFT JOIN hostname_resolve ON ( master_instance.hostname = hostname_resolve.hostname ) @@ -391,16 +345,14 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) database_instance_maintenance.database_instance_maintenance_id IS NULL AND ? IN ('', master_instance.cluster_name) GROUP BY - master_instance.hostname, - master_instance.port - %s + vitess_tablet.hostname, + vitess_tablet.port ORDER BY - is_master DESC, - is_cluster_master DESC, - count_replicas DESC - `, - analysisQueryReductionClause) + tablet_type ASC, + master_timestamp DESC + ` + clusters := make(map[string]*clusterAnalysis) err := db.QueryOrchestrator(query, args, func(m sqlutils.RowMap) error { a := ReplicationAnalysis{ Analysis: NoProblem, @@ -408,6 +360,15 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ProcessingNodeToken: util.ProcessToken.Hash, } + tablet := &topodatapb.Tablet{} + if err := proto.UnmarshalText(m.GetString("tablet_info"), tablet); err != nil { + log.Errorf("could not read tablet %v: %v", m.GetString("tablet_info"), err) + return nil + } + + a.TabletType = tablet.Type + a.MasterTimeStamp = m.GetTime("master_timestamp") + a.IsMaster = m.GetBool("is_master") countCoMasterReplicas := m.GetUint("count_co_master_replicas") a.IsCoMaster = m.GetBool("is_co_master") || (countCoMasterReplicas > 0) @@ -422,6 +383,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) Type: BinaryLog, } isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates") + a.SuggestedClusterAlias = m.GetString("suggested_cluster_alias") a.ClusterDetails.ClusterName = m.GetString("cluster_name") a.ClusterDetails.ClusterAlias = m.GetString("cluster_alias") a.ClusterDetails.ClusterDomain = m.GetString("cluster_domain") @@ -435,6 +397,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) a.CountDowntimedReplicas = m.GetUint("count_downtimed_replicas") a.ReplicationDepth = m.GetUint("replication_depth") a.IsFailingToConnectToMaster = m.GetBool("is_failing_to_connect_to_master") + a.ReplicationStopped = m.GetBool("replication_stopped") a.IsDowntimed = m.GetBool("is_downtimed") a.DowntimeEndTimestamp = m.GetString("downtime_end_timestamp") a.DowntimeRemainingSeconds = m.GetInt("downtime_remaining_seconds") @@ -481,22 +444,69 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) log.Debugf(analysisMessage) } } - if a.IsMaster && !a.LastCheckValid && a.CountReplicas == 0 { + if clusters[a.SuggestedClusterAlias] == nil { + clusters[a.SuggestedClusterAlias] = &clusterAnalysis{} + if a.TabletType == topodatapb.TabletType_MASTER { + a.IsClusterMaster = true + clusters[a.SuggestedClusterAlias].masterKey = &a.AnalyzedInstanceKey + } + } + // ca has clusterwide info + ca := clusters[a.SuggestedClusterAlias] + if ca.hasClusterwideAction { + // We can only take one cluster level action at a time. + return nil + } + if a.IsClusterMaster && !a.LastCheckValid && a.CountReplicas == 0 { a.Analysis = DeadMasterWithoutReplicas a.Description = "Master cannot be reached by orchestrator and has no replica" + ca.hasClusterwideAction = true // - } else if a.IsMaster && !a.LastCheckValid && a.CountValidReplicas == a.CountReplicas && a.CountValidReplicatingReplicas == 0 { + } else if a.IsClusterMaster && !a.LastCheckValid && a.CountValidReplicas == a.CountReplicas && a.CountValidReplicatingReplicas == 0 { a.Analysis = DeadMaster a.Description = "Master cannot be reached by orchestrator and none of its replicas is replicating" + ca.hasClusterwideAction = true // - } else if a.IsMaster && !a.LastCheckValid && a.CountReplicas > 0 && a.CountValidReplicas == 0 && a.CountValidReplicatingReplicas == 0 { + } else if a.IsClusterMaster && !a.LastCheckValid && a.CountReplicas > 0 && a.CountValidReplicas == 0 && a.CountValidReplicatingReplicas == 0 { a.Analysis = DeadMasterAndReplicas a.Description = "Master cannot be reached by orchestrator and none of its replicas is replicating" + ca.hasClusterwideAction = true // - } else if a.IsMaster && !a.LastCheckValid && a.CountValidReplicas < a.CountReplicas && a.CountValidReplicas > 0 && a.CountValidReplicatingReplicas == 0 { + } else if a.IsClusterMaster && !a.LastCheckValid && a.CountValidReplicas < a.CountReplicas && a.CountValidReplicas > 0 && a.CountValidReplicatingReplicas == 0 { a.Analysis = DeadMasterAndSomeReplicas a.Description = "Master cannot be reached by orchestrator; some of its replicas are unreachable and none of its reachable replicas is replicating" + ca.hasClusterwideAction = true + // + } else if a.IsClusterMaster && !a.IsMaster { + a.Analysis = MasterHasMaster + a.Description = "Master is replicating from somewhere else" + ca.hasClusterwideAction = true // + } else if a.IsClusterMaster && a.IsReadOnly { + a.Analysis = MasterIsReadOnly + a.Description = "Master is read-only" + // + } else if topo.IsReplicaType(a.TabletType) && ca.masterKey == nil { + a.Analysis = ClusterHasNoMaster + a.Description = "Cluster has no master" + ca.hasClusterwideAction = true + } else if topo.IsReplicaType(a.TabletType) && !a.IsReadOnly { + a.Analysis = ReplicaIsWritable + a.Description = "Replica is writable" + // + } else if topo.IsReplicaType(a.TabletType) && a.IsMaster { + a.Analysis = NotConnectedToMaster + a.Description = "Not connected to the master" + // + } else if topo.IsReplicaType(a.TabletType) && !a.IsMaster && ca.masterKey != nil && a.AnalyzedInstanceMasterKey != *ca.masterKey { + a.Analysis = ConnectedToWrongMaster + a.Description = "Connected to wrong master" + // + } else if topo.IsReplicaType(a.TabletType) && !a.IsMaster && a.ReplicationStopped { + a.Analysis = ReplicationStopped + a.Description = "Replication is stopped" + // + // TODO(sougou): Events below here are either ignored or not possible. } else if a.IsMaster && !a.LastCheckValid && a.CountLaggingReplicas == a.CountReplicas && a.CountDelayedReplicas < a.CountReplicas && a.CountValidReplicatingReplicas > 0 { a.Analysis = UnreachableMasterWithLaggingReplicas a.Description = "Master cannot be reached by orchestrator and all of its replicas are lagging" @@ -523,7 +533,6 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) } else if a.IsMaster && a.LastCheckValid && a.CountReplicas == 1 && a.CountValidReplicas == a.CountReplicas && a.CountValidReplicatingReplicas == 0 { a.Analysis = MasterSingleReplicaNotReplicating a.Description = "Master is reachable but its single replica is not replicating" - // } else if a.IsMaster && a.LastCheckValid && a.CountReplicas == 1 && a.CountValidReplicas == 0 { a.Analysis = MasterSingleReplicaDead a.Description = "Master is reachable but its single replica is dead" diff --git a/go/vt/orchestrator/inst/candidate_database_instance_dao.go b/go/vt/orchestrator/inst/candidate_database_instance_dao.go index d0d108bd9db..acbfc701f1f 100644 --- a/go/vt/orchestrator/inst/candidate_database_instance_dao.go +++ b/go/vt/orchestrator/inst/candidate_database_instance_dao.go @@ -45,7 +45,6 @@ func RegisterCandidateInstance(candidate *CandidateDatabaseInstance) error { ` writeFunc := func() error { _, err := db.ExecOrchestrator(query, args...) - AuditOperation("register-candidate", candidate.Key(), string(candidate.PromotionRule)) return log.Errore(err) } return ExecDBWriteFunc(writeFunc) diff --git a/go/vt/orchestrator/inst/instance_dao.go b/go/vt/orchestrator/inst/instance_dao.go index e0fb3bdeacb..eaae26cb869 100644 --- a/go/vt/orchestrator/inst/instance_dao.go +++ b/go/vt/orchestrator/inst/instance_dao.go @@ -34,9 +34,12 @@ import ( "github.com/patrickmn/go-cache" "github.com/rcrowley/go-metrics" "github.com/sjmudd/stopwatch" + "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/orchestrator/external/golib/log" "vitess.io/vitess/go/vt/orchestrator/external/golib/math" "vitess.io/vitess/go/vt/orchestrator/external/golib/sqlutils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/orchestrator/attributes" "vitess.io/vitess/go/vt/orchestrator/collection" @@ -308,12 +311,13 @@ func expectReplicationThreadsState(instanceKey *InstanceKey, expectedState Repli func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) { defer func() { if r := recover(); r != nil { - err = logReadTopologyInstanceError(instanceKey, "Unexpected, aborting", fmt.Errorf("%+v", r)) + err = logReadTopologyInstanceError(instanceKey, "Unexpected, aborting", tb.Errorf("%+v", r)) } }() var waitGroup sync.WaitGroup var serverUuidWaitGroup sync.WaitGroup + var tablet *topodatapb.Tablet readingStartTime := time.Now() instance := NewInstance() instanceFound := false @@ -342,11 +346,19 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency.Start("instance") db, err := db.OpenDiscovery(instanceKey.Hostname, instanceKey.Port) - latency.Stop("instance") if err != nil { goto Cleanup } + tablet, err = ReadTablet(*instanceKey) + if err != nil { + goto Cleanup + } + if tablet == nil { + log.Errorf("tablet alias is nil") + goto Cleanup + } + instance.Key = *instanceKey if isMaxScale, resolvedHostname, err = instance.checkMaxScale(db, latency); err != nil { @@ -363,7 +375,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } } - latency.Start("instance") if isMaxScale { if strings.Contains(instance.Version, "1.1.0") { isMaxScale110 = true @@ -520,6 +531,8 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, goto Cleanup } go ResolveHostnameIPs(instance.Key.Hostname) + + // TODO(sougou) delete DataCenterPattern if config.Config.DataCenterPattern != "" { if pattern, err := regexp.Compile(config.Config.DataCenterPattern); err == nil { match := pattern.FindStringSubmatch(instance.Key.Hostname) @@ -737,6 +750,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, }() } + // TODO(sougou): delete DetectDataCenterQuery if config.Config.DetectDataCenterQuery != "" && !isMaxScale { waitGroup.Add(1) go func() { @@ -745,7 +759,9 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err) }() } + instance.DataCenter = tablet.Alias.Cell + // TODO(sougou): use cell alias to identify regions. if config.Config.DetectRegionQuery != "" && !isMaxScale { waitGroup.Add(1) go func() { @@ -764,6 +780,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, }() } + // TODO(sougou): delete DetectInstanceAliasQuery if config.Config.DetectInstanceAliasQuery != "" && !isMaxScale { waitGroup.Add(1) go func() { @@ -772,7 +789,9 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err) }() } + instance.InstanceAlias = topoproto.TabletAliasString(tablet.Alias) + // TODO(sougou): come up with a strategy for semi-sync if config.Config.DetectSemiSyncEnforcedQuery != "" && !isMaxScale { waitGroup.Add(1) go func() { @@ -816,57 +835,20 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } } - // First read the current PromotionRule from candidate_database_instance. - { - latency.Start("backend") - err = ReadInstancePromotionRule(instance) - latency.Stop("backend") - logReadTopologyInstanceError(instanceKey, "ReadInstancePromotionRule", err) - } - // Then check if the instance wants to set a different PromotionRule. - // We'll set it here on their behalf so there's no race between the first - // time an instance is discovered, and setting a rule like "must_not". - if config.Config.DetectPromotionRuleQuery != "" && !isMaxScale { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - var value string - err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value) - logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err) - promotionRule, err := ParseCandidatePromotionRule(value) - logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err) - if err == nil { - // We need to update candidate_database_instance. - // We register the rule even if it hasn't changed, - // to bump the last_suggested time. - instance.PromotionRule = promotionRule - err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, promotionRule).WithCurrentTime()) - logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err) - } - }() + // We need to update candidate_database_instance. + // We register the rule even if it hasn't changed, + // to bump the last_suggested time. + if tablet.Type == topodatapb.TabletType_MASTER || tablet.Type == topodatapb.TabletType_REPLICA { + instance.PromotionRule = NeutralPromoteRule + } else { + instance.PromotionRule = MustNotPromoteRule } + err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, instance.PromotionRule).WithCurrentTime()) + logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err) + + // TODO(sougou): delete cluster_alias_override metadata + instance.SuggestedClusterAlias = fmt.Sprintf("%v:%v", tablet.Keyspace, tablet.Shard) - ReadClusterAliasOverride(instance) - if !isMaxScale { - if instance.SuggestedClusterAlias == "" { - // Only need to do on masters - if config.Config.DetectClusterAliasQuery != "" { - clusterAlias := "" - if err := db.QueryRow(config.Config.DetectClusterAliasQuery).Scan(&clusterAlias); err != nil { - logReadTopologyInstanceError(instanceKey, "DetectClusterAliasQuery", err) - } else { - instance.SuggestedClusterAlias = clusterAlias - } - } - } - if instance.SuggestedClusterAlias == "" { - // Not found by DetectClusterAliasQuery... - // See if a ClusterNameToAlias configuration applies - if clusterAlias := mappedClusterNameToAlias(instance.ClusterName); clusterAlias != "" { - instance.SuggestedClusterAlias = clusterAlias - } - } - } if instance.ReplicationDepth == 0 && config.Config.DetectClusterDomainQuery != "" && !isMaxScale { // Only need to do on masters domainName := "" @@ -963,28 +945,6 @@ Cleanup: return nil, err } -// ReadClusterAliasOverride reads and applies SuggestedClusterAlias based on cluster_alias_override -func ReadClusterAliasOverride(instance *Instance) (err error) { - aliasOverride := "" - query := ` - select - alias - from - cluster_alias_override - where - cluster_name = ? - ` - err = db.QueryOrchestrator(query, sqlutils.Args(instance.ClusterName), func(m sqlutils.RowMap) error { - aliasOverride = m.GetString("alias") - - return nil - }) - if aliasOverride != "" { - instance.SuggestedClusterAlias = aliasOverride - } - return err -} - func ReadReplicationGroupPrimary(instance *Instance) (err error) { query := ` SELECT @@ -1017,7 +977,6 @@ func ReadReplicationGroupPrimary(instance *Instance) (err error) { func ReadInstanceClusterAttributes(instance *Instance) (err error) { var masterOrGroupPrimaryInstanceKey InstanceKey var masterOrGroupPrimaryClusterName string - var masterOrGroupPrimarySuggestedClusterAlias string var masterOrGroupPrimaryReplicationDepth uint var ancestryUUID string var masterOrGroupPrimaryExecutedGtidSet string @@ -1047,7 +1006,6 @@ func ReadInstanceClusterAttributes(instance *Instance) (err error) { args := sqlutils.Args(masterOrGroupPrimaryInstanceKey.Hostname, masterOrGroupPrimaryInstanceKey.Port) err = db.QueryOrchestrator(query, args, func(m sqlutils.RowMap) error { masterOrGroupPrimaryClusterName = m.GetString("cluster_name") - masterOrGroupPrimarySuggestedClusterAlias = m.GetString("suggested_cluster_alias") masterOrGroupPrimaryReplicationDepth = m.GetUint("replication_depth") masterOrGroupPrimaryInstanceKey.Hostname = m.GetString("master_host") masterOrGroupPrimaryInstanceKey.Port = m.GetInt("master_port") @@ -1089,7 +1047,6 @@ func ReadInstanceClusterAttributes(instance *Instance) (err error) { } // While the other stays "1" } instance.ClusterName = clusterName - instance.SuggestedClusterAlias = masterOrGroupPrimarySuggestedClusterAlias instance.ReplicationDepth = replicationDepth instance.IsCoMaster = isCoMaster instance.AncestryUUID = ancestryUUID @@ -1405,6 +1362,12 @@ func ReadWriteableClustersMasters() (instances [](*Instance), err error) { return instances, err } +// ReadClusterAliasInstances reads all instances of a cluster alias +func ReadClusterAliasInstances(clusterAlias string) ([](*Instance), error) { + condition := `suggested_cluster_alias = ? ` + return readInstancesByCondition(condition, sqlutils.Args(clusterAlias), "") +} + // ReadReplicaInstances reads replicas of a given master func ReadReplicaInstances(masterKey *InstanceKey) ([](*Instance), error) { condition := ` diff --git a/go/vt/orchestrator/inst/instance_topology.go b/go/vt/orchestrator/inst/instance_topology.go index f2f787c54f0..d608493cc67 100644 --- a/go/vt/orchestrator/inst/instance_topology.go +++ b/go/vt/orchestrator/inst/instance_topology.go @@ -585,7 +585,7 @@ func canReplicateAssumingOracleGTID(instance, masterInstance *Instance) (canRepl } func instancesAreGTIDAndCompatible(instance, otherInstance *Instance) (isOracleGTID bool, isMariaDBGTID, compatible bool) { - isOracleGTID = (instance.UsingOracleGTID && otherInstance.SupportsOracleGTID) + isOracleGTID = (instance.SupportsOracleGTID && otherInstance.SupportsOracleGTID) isMariaDBGTID = (instance.UsingMariaDBGTID && otherInstance.IsMariaDB()) compatible = isOracleGTID || isMariaDBGTID return isOracleGTID, isMariaDBGTID, compatible @@ -1007,16 +1007,6 @@ func MakeCoMaster(instanceKey *InstanceKey) (*Instance, error) { goto Cleanup } } - if !master.HasReplicationCredentials { - // Let's try , if possible, to get credentials from replica. Best effort. - if replicationUser, replicationPassword, credentialsErr := ReadReplicationCredentials(&instance.Key); credentialsErr == nil { - log.Debugf("Got credentials from a replica. will now apply") - _, err = ChangeMasterCredentials(&master.Key, replicationUser, replicationPassword) - if err != nil { - goto Cleanup - } - } - } if instance.AllowTLS { log.Debugf("Enabling SSL replication") @@ -2533,6 +2523,11 @@ func RegroupReplicasGTID( } } } + + if err := TabletSetMaster(candidateReplica.Key); err != nil { + return emptyReplicas, emptyReplicas, emptyReplicas, candidateReplica, err + } + moveGTIDFunc := func() error { log.Debugf("RegroupReplicasGTID: working on %d replicas", len(replicasToMove)) diff --git a/go/vt/orchestrator/inst/instance_topology_dao.go b/go/vt/orchestrator/inst/instance_topology_dao.go index 956d7b5e78d..0282512ef3a 100644 --- a/go/vt/orchestrator/inst/instance_topology_dao.go +++ b/go/vt/orchestrator/inst/instance_topology_dao.go @@ -389,10 +389,6 @@ func StopReplication(instanceKey *InstanceKey) (*Instance, error) { if err != nil { return instance, log.Errore(err) } - - if !instance.IsReplica() { - return instance, fmt.Errorf("instance is not a replica: %+v", instanceKey) - } _, err = ExecInstance(instanceKey, `stop slave`) if err != nil { // Patch; current MaxScale behavior for STOP SLAVE is to throw an error if replica already stopped. @@ -586,37 +582,6 @@ func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error { return err } -// ChangeMasterCredentials issues a CHANGE MASTER TO... MASTER_USER=, MASTER_PASSWORD=... -func ChangeMasterCredentials(instanceKey *InstanceKey, masterUser string, masterPassword string) (*Instance, error) { - instance, err := ReadTopologyInstance(instanceKey) - if err != nil { - return instance, log.Errore(err) - } - if masterUser == "" { - return instance, log.Errorf("Empty user in ChangeMasterCredentials() for %+v", *instanceKey) - } - - if instance.ReplicationThreadsExist() && !instance.ReplicationThreadsStopped() { - return instance, fmt.Errorf("ChangeMasterTo: Cannot change master on: %+v because replication is running", *instanceKey) - } - log.Debugf("ChangeMasterTo: will attempt changing master credentials on %+v", *instanceKey) - - if *config.RuntimeCLIFlags.Noop { - return instance, fmt.Errorf("noop: aborting CHANGE MASTER TO operation on %+v; signalling error but nothing went wrong.", *instanceKey) - } - _, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?", - masterUser, masterPassword) - - if err != nil { - return instance, log.Errore(err) - } - - log.Infof("ChangeMasterTo: Changed master credentials on %+v", *instanceKey) - - instance, err = ReadTopologyInstance(instanceKey) - return instance, err -} - // EnableMasterSSL issues CHANGE MASTER TO MASTER_SSL=1 func EnableMasterSSL(instanceKey *InstanceKey) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) @@ -661,7 +626,9 @@ func workaroundBug83713(instanceKey *InstanceKey) { } // ChangeMasterTo changes the given instance's master according to given input. +// TODO(sougou): deprecate ReplicationCredentialsQuery, and all other credential discovery. func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinlogCoordinates *BinlogCoordinates, skipUnresolve bool, gtidHint OperationGTIDHint) (*Instance, error) { + user, password := config.Config.MySQLReplicaUser, config.Config.MySQLReplicaPassword instance, err := ReadTopologyInstance(instanceKey) if err != nil { return instance, log.Errore(err) @@ -696,16 +663,16 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl if instance.UsingMariaDBGTID && gtidHint != GTIDHintDeny { // Keep on using GTID changeMasterFunc = func() error { - _, err := ExecInstance(instanceKey, "change master to master_host=?, master_port=?", - changeToMasterKey.Hostname, changeToMasterKey.Port) + _, err := ExecInstance(instanceKey, "change master to master_user=?, master_password=?, master_host=?, master_port=?", + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } changedViaGTID = true } else if instance.UsingMariaDBGTID && gtidHint == GTIDHintDeny { // Make sure to not use GTID changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, "change master to master_host=?, master_port=?, master_log_file=?, master_log_pos=?, master_use_gtid=no", - changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) + _, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?, master_host=?, master_port=?, master_log_file=?, master_log_pos=?, master_use_gtid=no", + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) return err } } else if instance.IsMariaDB() && gtidHint == GTIDHintForce { @@ -720,39 +687,39 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl mariadbGTIDHint = "current_pos" } changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, fmt.Sprintf("change master to master_host=?, master_port=?, master_use_gtid=%s", mariadbGTIDHint), - changeToMasterKey.Hostname, changeToMasterKey.Port) + _, err = ExecInstance(instanceKey, fmt.Sprintf("change master to master_user=?, master_password=?, master_host=?, master_port=?, master_use_gtid=%s", mariadbGTIDHint), + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } changedViaGTID = true } else if instance.UsingOracleGTID && gtidHint != GTIDHintDeny { // Is Oracle; already uses GTID; keep using it. changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, "change master to master_host=?, master_port=?", - changeToMasterKey.Hostname, changeToMasterKey.Port) + _, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?, master_host=?, master_port=?", + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } changedViaGTID = true } else if instance.UsingOracleGTID && gtidHint == GTIDHintDeny { // Is Oracle; already uses GTID changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, "change master to master_host=?, master_port=?, master_log_file=?, master_log_pos=?, master_auto_position=0", - changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) + _, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?, master_host=?, master_port=?, master_log_file=?, master_log_pos=?, master_auto_position=0", + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) return err } } else if instance.SupportsOracleGTID && gtidHint == GTIDHintForce { // Is Oracle; not using GTID right now; turn into GTID changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, "change master to master_host=?, master_port=?, master_auto_position=1", - changeToMasterKey.Hostname, changeToMasterKey.Port) + _, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?, master_host=?, master_port=?, master_auto_position=1", + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } changedViaGTID = true } else { // Normal binlog file:pos changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, "change master to master_host=?, master_port=?, master_log_file=?, master_log_pos=?", - changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) + _, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?, master_host=?, master_port=?, master_log_file=?, master_log_pos=?", + user, password, changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) return err } } @@ -980,36 +947,6 @@ func MasterPosWait(instanceKey *InstanceKey, binlogCoordinates *BinlogCoordinate return instance, err } -// Attempt to read and return replication credentials from the mysql.slave_master_info system table -func ReadReplicationCredentials(instanceKey *InstanceKey) (replicationUser string, replicationPassword string, err error) { - if config.Config.ReplicationCredentialsQuery != "" { - err = ScanInstanceRow(instanceKey, config.Config.ReplicationCredentialsQuery, &replicationUser, &replicationPassword) - if err == nil && replicationUser == "" { - err = fmt.Errorf("Empty username retrieved by ReplicationCredentialsQuery") - } - if err == nil { - return replicationUser, replicationPassword, nil - } - log.Errore(err) - } - // Didn't get credentials from ReplicationCredentialsQuery, or ReplicationCredentialsQuery doesn't exist in the first place? - // We brute force our way through mysql.slave_master_info - { - query := ` - select - ifnull(max(User_name), '') as user, - ifnull(max(User_password), '') as password - from - mysql.slave_master_info - ` - err = ScanInstanceRow(instanceKey, query, &replicationUser, &replicationPassword) - if err == nil && replicationUser == "" { - err = fmt.Errorf("Empty username found in mysql.slave_master_info") - } - } - return replicationUser, replicationPassword, log.Errore(err) -} - // SetReadOnly sets or clears the instance's global read_only variable func SetReadOnly(instanceKey *InstanceKey, readOnly bool) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) diff --git a/go/vt/orchestrator/inst/tablet_dao.go b/go/vt/orchestrator/inst/tablet_dao.go new file mode 100644 index 00000000000..7aaeb153001 --- /dev/null +++ b/go/vt/orchestrator/inst/tablet_dao.go @@ -0,0 +1,103 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inst + +import ( + "context" + "errors" + + "github.com/golang/protobuf/proto" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/orchestrator/db" + "vitess.io/vitess/go/vt/orchestrator/external/golib/log" + "vitess.io/vitess/go/vt/orchestrator/external/golib/sqlutils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +// TopoServ is the connection to the topo server. +var TopoServ *topo.Server + +// ErrTabletAliasNil is a fixed error message. +var ErrTabletAliasNil = errors.New("tablet alias is nil") + +// TabletSetMaster designates the tablet that owns an instance as the master. +func TabletSetMaster(instanceKey InstanceKey) error { + if instanceKey.Hostname == "" { + return errors.New("can't set tablet to master: instance is unspecified") + } + tablet, err := ReadTablet(instanceKey) + if err != nil { + return err + } + tmc := tmclient.NewTabletManagerClient() + if err := tmc.ChangeType(context.TODO(), tablet, topodatapb.TabletType_MASTER); err != nil { + return err + } + ti, err := TopoServ.GetTablet(context.TODO(), tablet.Alias) + if err != nil { + return log.Errore(err) + } + if err := SaveTablet(ti.Tablet); err != nil { + log.Errore(err) + } + return nil +} + +// ReadTablet reads the vitess tablet record. +func ReadTablet(instanceKey InstanceKey) (*topodatapb.Tablet, error) { + query := ` + select + info + from + vitess_tablet + where hostname=? and port=? + ` + args := sqlutils.Args(instanceKey.Hostname, instanceKey.Port) + tablet := &topodatapb.Tablet{} + err := db.QueryOrchestrator(query, args, func(row sqlutils.RowMap) error { + return proto.UnmarshalText(row.GetString("info"), tablet) + }) + if err != nil { + return nil, err + } + if tablet.Alias == nil { + return nil, ErrTabletAliasNil + } + return tablet, nil +} + +// SaveTablet saves the tablet record against the instanceKey. +func SaveTablet(tablet *topodatapb.Tablet) error { + _, err := db.ExecOrchestrator(` + replace + into vitess_tablet ( + hostname, port, cell, tablet_type, master_timestamp, info + ) values ( + ?, ?, ?, ?, ?, ? + ) + `, + tablet.MysqlHostname, + int(tablet.MysqlPort), + tablet.Alias.Cell, + int(tablet.Type), + logutil.ProtoToTime(tablet.MasterTermStartTime), + proto.CompactTextString(tablet), + ) + return err +} diff --git a/go/vt/orchestrator/logic/orchestrator.go b/go/vt/orchestrator/logic/orchestrator.go index b44017fac2a..be70b17b04e 100644 --- a/go/vt/orchestrator/logic/orchestrator.go +++ b/go/vt/orchestrator/logic/orchestrator.go @@ -492,6 +492,7 @@ func ContinuousDiscovery() { raftCaretakingTick := time.Tick(10 * time.Minute) recoveryTick := time.Tick(time.Duration(config.RecoveryPollSeconds) * time.Second) autoPseudoGTIDTick := time.Tick(time.Duration(config.PseudoGTIDIntervalSeconds) * time.Second) + tabletTopoTick := OpenTabletDiscovery() var recoveryEntrance int64 var snapshotTopologiesTick <-chan time.Time if config.Config.SnapshotTopologiesIntervalHours > 0 { @@ -615,6 +616,8 @@ func ContinuousDiscovery() { go inst.SnapshotTopologies() } }() + case <-tabletTopoTick: + go RefreshTablets() } } } diff --git a/go/vt/orchestrator/logic/tablet_discovery.go b/go/vt/orchestrator/logic/tablet_discovery.go new file mode 100644 index 00000000000..9bcdebd16f0 --- /dev/null +++ b/go/vt/orchestrator/logic/tablet_discovery.go @@ -0,0 +1,241 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logic + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "vitess.io/vitess/go/vt/orchestrator/db" + "vitess.io/vitess/go/vt/orchestrator/external/golib/log" + "vitess.io/vitess/go/vt/orchestrator/external/golib/sqlutils" + "vitess.io/vitess/go/vt/orchestrator/inst" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +var ( + ts *topo.Server +) + +// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker +// channel for polling. +func OpenTabletDiscovery() <-chan time.Time { + // TODO(sougou): If there's a shutdown signal, we have to close the topo. + ts = topo.Open() + // TODO(sougou): remove ts and push some functions into inst. + inst.TopoServ = ts + // Clear existing cache and perform a new refresh. + if _, err := db.ExecOrchestrator("delete from vitess_tablet"); err != nil { + log.Errore(err) + } + refreshTabletsUsing(func(instanceKey *inst.InstanceKey) { + _ = inst.InjectSeed(instanceKey) + }) + // TODO(sougou): parameterize poll interval. + return time.Tick(15 * time.Second) //nolint SA1015: using time.Tick leaks the underlying ticker +} + +// RefreshTablets reloads the tablets from topo. +func RefreshTablets() { + refreshTabletsUsing(func(instanceKey *inst.InstanceKey) { + DiscoverInstance(*instanceKey) + }) +} + +func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey)) { + if !IsLeaderOrActive() { + return + } + cells, err := ts.GetKnownCells(context.TODO()) + if err != nil { + log.Errore(err) + return + } + + var wg sync.WaitGroup + for _, cell := range cells { + wg.Add(1) + go func(cell string) { + defer wg.Done() + refreshTabletsInCell(cell, loader) + }(cell) + } + wg.Wait() +} + +func refreshTabletsInCell(cell string, loader func(instanceKey *inst.InstanceKey)) { + latestInstances := make(map[inst.InstanceKey]bool) + tablets, err := topotools.GetAllTablets(context.TODO(), ts, cell) + if err != nil { + log.Errorf("Error fetching topo info for cell %v: %v", cell, err) + return + } + + // Discover new tablets. + // TODO(sougou): enhance this to work with multi-schema, + // where each instanceKey can have multiple tablets. + for _, tabletInfo := range tablets { + tablet := tabletInfo.Tablet + if tablet.MysqlHostname == "" { + continue + } + if tablet.Type != topodatapb.TabletType_MASTER && !topo.IsReplicaType(tablet.Type) { + continue + } + instanceKey := inst.InstanceKey{ + Hostname: tablet.MysqlHostname, + Port: int(tablet.MysqlPort), + } + latestInstances[instanceKey] = true + old, err := inst.ReadTablet(instanceKey) + if err != nil && err != inst.ErrTabletAliasNil { + log.Errore(err) + continue + } + if proto.Equal(tablet, old) { + continue + } + if err := inst.SaveTablet(tablet); err != nil { + log.Errore(err) + continue + } + loader(&instanceKey) + log.Infof("Discovered: %v", tablet) + } + + // Forget tablets that were removed. + toForget := make(map[inst.InstanceKey]*topodatapb.Tablet) + query := "select hostname, port, info from vitess_tablet where cell = ?" + err = db.QueryOrchestrator(query, sqlutils.Args(cell), func(row sqlutils.RowMap) error { + curKey := inst.InstanceKey{ + Hostname: row.GetString("hostname"), + Port: row.GetInt("port"), + } + if !latestInstances[curKey] { + tablet := &topodatapb.Tablet{} + if err := proto.UnmarshalText(row.GetString("info"), tablet); err != nil { + log.Errore(err) + return nil + } + toForget[curKey] = tablet + } + return nil + }) + if err != nil { + log.Errore(err) + } + for instanceKey, tablet := range toForget { + log.Infof("Forgeting: %v", tablet) + _, err := db.ExecOrchestrator(` + delete + from vitess_tablet + where + hostname=? and port=?`, + instanceKey.Hostname, + instanceKey.Port, + ) + if err != nil { + log.Errore(err) + } + if err := inst.ForgetInstance(&instanceKey); err != nil { + log.Errore(err) + } + } +} + +// LockShard locks the keyspace-shard preventing others from performing conflicting actions. +func LockShard(instanceKey inst.InstanceKey) (func(*error), error) { + if instanceKey.Hostname == "" { + return nil, errors.New("Can't lock shard: instance is unspecified") + } + + tablet, err := inst.ReadTablet(instanceKey) + if err != nil { + return nil, err + } + _, unlock, err := ts.LockShard(context.TODO(), tablet.Keyspace, tablet.Shard, "Orc Recovery") + return unlock, err +} + +// TabletRefresh refreshes the tablet info. +func TabletRefresh(instanceKey inst.InstanceKey) error { + tablet, err := inst.ReadTablet(instanceKey) + if err != nil { + return err + } + ti, err := ts.GetTablet(context.TODO(), tablet.Alias) + if err != nil { + return err + } + return inst.SaveTablet(ti.Tablet) +} + +// TabletDemoteMaster requests the master tablet to stop accepting transactions. +func TabletDemoteMaster(instanceKey inst.InstanceKey) error { + return tabletDemoteMaster(instanceKey, true) +} + +// TabletUndoDemoteMaster requests the master tablet to undo the demote. +func TabletUndoDemoteMaster(instanceKey inst.InstanceKey) error { + return tabletDemoteMaster(instanceKey, false) +} + +func tabletDemoteMaster(instanceKey inst.InstanceKey, forward bool) error { + if instanceKey.Hostname == "" { + return errors.New("Can't demote/undo master: instance is unspecified") + } + tablet, err := inst.ReadTablet(instanceKey) + if err != nil { + return err + } + tmc := tmclient.NewTabletManagerClient() + if forward { + _, err = tmc.DemoteMaster(context.TODO(), tablet) + } else { + err = tmc.UndoDemoteMaster(context.TODO(), tablet) + } + return err +} + +func ShardMaster(instanceKey *inst.InstanceKey) (masterKey *inst.InstanceKey, err error) { + tablet, err := inst.ReadTablet(*instanceKey) + if err != nil { + return nil, err + } + si, err := ts.GetShard(context.TODO(), tablet.Keyspace, tablet.Shard) + if err != nil { + return nil, err + } + if !si.HasMaster() { + return nil, fmt.Errorf("no master tablet for shard %v/%v", tablet.Keyspace, tablet.Shard) + } + master, err := ts.GetTablet(context.TODO(), si.MasterAlias) + if err != nil { + return nil, err + } + return &inst.InstanceKey{ + Hostname: master.MysqlHostname, + Port: int(master.MysqlPort), + }, nil +} diff --git a/go/vt/orchestrator/logic/topology_recovery.go b/go/vt/orchestrator/logic/topology_recovery.go index 27e871ff1ae..23800675dd8 100644 --- a/go/vt/orchestrator/logic/topology_recovery.go +++ b/go/vt/orchestrator/logic/topology_recovery.go @@ -564,7 +564,8 @@ func recoverDeadMaster(topologyRecovery *TopologyRecovery, candidateInstanceKey } func() error { - inst.BeginDowntime(inst.NewDowntime(failedInstanceKey, inst.GetMaintenanceOwner(), inst.DowntimeLostInRecoveryMessage, time.Duration(config.LostInRecoveryDowntimeSeconds)*time.Second)) + // TODO(sougou): Commented out: this downtime feels a little aggressive. + //inst.BeginDowntime(inst.NewDowntime(failedInstanceKey, inst.GetMaintenanceOwner(), inst.DowntimeLostInRecoveryMessage, time.Duration(config.LostInRecoveryDowntimeSeconds)*time.Second)) acknowledgeInstanceFailureDetection(&analysisEntry.AnalyzedInstanceKey) for _, replica := range lostReplicas { replica := replica @@ -826,8 +827,20 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverDeadMaster.", analysisEntry.AnalyzedInstanceKey)) return false, nil, err } + log.Infof("Analysis: %v, deadmaster %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey) // That's it! We must do recovery! + // TODO(sougou): This function gets called by GracefulMasterTakeover which may + // need to obtain shard lock before getting here. + unlock, err := LockShard(analysisEntry.AnalyzedInstanceKey) + if err != nil { + log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+ + "skipProcesses: %v: NOT detecting/recovering host, could not obtain shard lock (%v)", + analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses, err) + return false, nil, err + } + defer unlock(&err) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("will handle DeadMaster event on %+v", analysisEntry.ClusterDetails.ClusterName)) recoverDeadMasterCounter.Inc(1) recoveryAttempted, promotedReplica, lostReplicas, err := recoverDeadMaster(topologyRecovery, candidateInstanceKey, skipProcesses) @@ -878,30 +891,27 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: successfully promoted %+v", promotedReplica.Key)) AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: promoted server coordinates: %+v", promotedReplica.SelfBinlogCoordinates)) - if config.Config.ApplyMySQLPromotionAfterMasterFailover || analysisEntry.CommandHint == inst.GracefulMasterTakeoverCommandHint { - // on GracefulMasterTakeoverCommandHint it makes utter sense to RESET SLAVE ALL and read_only=0, and there is no sense in not doing so. - AuditTopologyRecovery(topologyRecovery, "- RecoverDeadMaster: will apply MySQL changes to promoted master") - { - _, err := inst.ResetReplicationOperation(&promotedReplica.Key) - if err != nil { - // Ugly, but this is important. Let's give it another try - _, err = inst.ResetReplicationOperation(&promotedReplica.Key) - } - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying RESET SLAVE ALL on promoted master: success=%t", (err == nil))) - if err != nil { - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: NOTE that %+v is promoted even though SHOW SLAVE STATUS may still show it has a master", promotedReplica.Key)) - } + AuditTopologyRecovery(topologyRecovery, "- RecoverDeadMaster: will apply MySQL changes to promoted master") + { + _, err := inst.ResetReplicationOperation(&promotedReplica.Key) + if err != nil { + // Ugly, but this is important. Let's give it another try + _, err = inst.ResetReplicationOperation(&promotedReplica.Key) } - { - _, err := inst.SetReadOnly(&promotedReplica.Key, false) - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=0 on promoted master: success=%t", (err == nil))) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying RESET SLAVE ALL on promoted master: success=%t", (err == nil))) + if err != nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: NOTE that %+v is promoted even though SHOW SLAVE STATUS may still show it has a master", promotedReplica.Key)) } - // Let's attempt, though we won't necessarily succeed, to set old master as read-only - go func() { - _, err := inst.SetReadOnly(&analysisEntry.AnalyzedInstanceKey, true) - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=1 on demoted master: success=%t", (err == nil))) - }() } + { + _, err := inst.SetReadOnly(&promotedReplica.Key, false) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=0 on promoted master: success=%t", (err == nil))) + } + // Let's attempt, though we won't necessarily succeed, to set old master as read-only + go func() { + _, err := inst.SetReadOnly(&analysisEntry.AnalyzedInstanceKey, true) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=1 on demoted master: success=%t", (err == nil))) + }() kvPairs := inst.GetClusterMasterKVPairs(analysisEntry.ClusterDetails.ClusterAlias, &promotedReplica.Key) AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Writing KV %+v", kvPairs)) @@ -1520,6 +1530,15 @@ func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstance } else { return checkAndRecoverLockedSemiSyncMaster, true } + // topo + case inst.ClusterHasNoMaster: + return electNewMaster, true + case inst.MasterHasMaster: + return fixClusterAndMaster, true + case inst.MasterIsReadOnly: + return fixMaster, true + case inst.NotConnectedToMaster, inst.ConnectedToWrongMaster, inst.ReplicationStopped, inst.ReplicaIsWritable: + return fixReplica, false // intermediate master case inst.DeadIntermediateMaster: return checkAndRecoverDeadIntermediateMaster, true @@ -1873,7 +1892,7 @@ func getGracefulMasterTakeoverDesignatedInstance(clusterMasterKey *inst.Instance // It expects that replica to have no siblings. // This function is graceful in that it will first lock down the master, then wait // for the designated replica to catch up with last position. -// It will point old master at the newly promoted master at the correct coordinates, but will not start replication. +// It will point old master at the newly promoted master at the correct coordinates. func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, auto bool) (topologyRecovery *TopologyRecovery, promotedMasterCoordinates *inst.BinlogCoordinates, err error) { clusterMasters, err := inst.ReadClusterMaster(clusterName) if err != nil { @@ -1945,8 +1964,6 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, } log.Infof("GracefulMasterTakeover: Will demote %+v and promote %+v instead", clusterMaster.Key, designatedInstance.Key) - replicationUser, replicationPassword, replicationCredentialsError := inst.ReadReplicationCredentials(&designatedInstance.Key) - analysisEntry, err := forceAnalysisEntry(clusterName, inst.DeadMaster, inst.GracefulMasterTakeoverCommandHint, &clusterMaster.Key) if err != nil { return nil, nil, err @@ -1959,8 +1976,8 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, return nil, nil, fmt.Errorf("Failed running PreGracefulTakeoverProcesses: %+v", err) } - log.Infof("GracefulMasterTakeover: Will set %+v as read_only", clusterMaster.Key) - if clusterMaster, err = inst.SetReadOnly(&clusterMaster.Key, true); err != nil { + log.Infof("GracefulMasterTakeover: invoking TabletDemoteMaster on %+v", clusterMaster.Key) + if err := TabletDemoteMaster(clusterMaster.Key); err != nil { return nil, nil, err } demotedMasterSelfBinlogCoordinates := &clusterMaster.SelfBinlogCoordinates @@ -1982,10 +1999,11 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, return nil, nil, fmt.Errorf("GracefulMasterTakeover: recovery attempted but with no results. This should not happen") } if topologyRecovery.SuccessorKey == nil { - // Promotion fails. - // Undo setting read-only on original master. - inst.SetReadOnly(&clusterMaster.Key, false) - return nil, nil, fmt.Errorf("GracefulMasterTakeover: Recovery attempted yet no replica promoted; err=%+v", err) + // Promotion failed. Undo. + log.Infof("GracefulMasterTakeover: Invoking tabletUndoDemoteMaster on %+v", clusterMaster.Key) + if err := TabletUndoDemoteMaster(clusterMaster.Key); err != nil { + log.Errore(err) + } } var gtidHint inst.OperationGTIDHint = inst.GTIDHintNeutral if topologyRecovery.RecoveryType == MasterRecoveryGTID { @@ -1995,17 +2013,9 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, if !clusterMaster.SelfBinlogCoordinates.Equals(demotedMasterSelfBinlogCoordinates) { log.Errorf("GracefulMasterTakeover: sanity problem. Demoted master's coordinates changed from %+v to %+v while supposed to have been frozen", *demotedMasterSelfBinlogCoordinates, clusterMaster.SelfBinlogCoordinates) } - if !clusterMaster.HasReplicationCredentials && replicationCredentialsError == nil { - _, credentialsErr := inst.ChangeMasterCredentials(&clusterMaster.Key, replicationUser, replicationPassword) - if err == nil { - err = credentialsErr - } - } - if auto { - _, startReplicationErr := inst.StartReplication(&clusterMaster.Key) - if err == nil { - err = startReplicationErr - } + _, startReplicationErr := inst.StartReplication(&clusterMaster.Key) + if err == nil { + err = startReplicationErr } if designatedInstance.AllowTLS { @@ -2018,3 +2028,159 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, return topologyRecovery, promotedMasterCoordinates, err } + +// electNewMaster elects a new master while none were present before. +func electNewMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) + if topologyRecovery == nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewMaster.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + log.Infof("Analysis: %v, will elect a new master: %v", analysisEntry.Analysis, analysisEntry.SuggestedClusterAlias) + + unlock, err := LockShard(analysisEntry.AnalyzedInstanceKey) + if err != nil { + log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+ + "skipProcesses: %v: NOT detecting/recovering host, could not obtain shard lock (%v)", + analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses, err) + return false, topologyRecovery, err + } + defer unlock(&err) + + replicas, err := inst.ReadClusterAliasInstances(analysisEntry.SuggestedClusterAlias) + if err != nil { + return false, topologyRecovery, err + } + // TODO(sougou): this is not reliable, because of the timeout. + replicas = inst.StopReplicasNicely(replicas, time.Duration(config.Config.InstanceBulkOperationsWaitTimeoutSeconds)*time.Second) + if len(replicas) == 0 { + return false, topologyRecovery, fmt.Errorf("no instances in cluster %v", analysisEntry.SuggestedClusterAlias) + } + + // Find an initial candidate + var candidate *inst.Instance + for _, replica := range replicas { + // TODO(sougou): this needs to do more. see inst.chooseCandidateReplica + if !inst.IsBannedFromBeingCandidateReplica(replica) { + candidate = replica + break + } + } + if candidate == nil { + err := fmt.Errorf("no candidate qualifies to be a master") + AuditTopologyRecovery(topologyRecovery, err.Error()) + return true, topologyRecovery, err + } + + // Compare the current candidate with the rest to see if other instances can be + // moved under. If not, see if the other intance can become a candidate instead. + for _, replica := range replicas { + if replica == candidate { + continue + } + if err := inst.CheckMoveViaGTID(replica, candidate); err != nil { + if err := inst.CheckMoveViaGTID(candidate, replica); err != nil { + return false, topologyRecovery, fmt.Errorf("instances are not compatible: %+v %+v: %v", candidate, replica, err) + } else { + // Make sure the new candidate meets the requirements. + if !inst.IsBannedFromBeingCandidateReplica(replica) { + candidate = replica + } + } + } + } + + if err := inst.TabletSetMaster(candidate.Key); err != nil { + return true, topologyRecovery, err + } + // TODO(sougou): parallelize + for _, replica := range replicas { + if replica.Key == candidate.Key { + continue + } + if _, err := inst.MoveBelowGTID(&replica.Key, &candidate.Key); err != nil { + return false, topologyRecovery, err + } + } + if _, err := inst.SetReadOnly(&candidate.Key, false); err != nil { + return false, topologyRecovery, err + } + return true, topologyRecovery, nil +} + +// fixClusterAndMaster performs a traditional vitess PlannedReparentShard. +func fixClusterAndMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) + if topologyRecovery == nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixClusterAndMaster.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + log.Infof("Analysis: %v, will fix incorrect mastership %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey) + + // Reset replication on current master. This will prevent the comaster code-path. + // TODO(sougou): this should probably done while holding a lock. + _, err = inst.ResetReplicationOperation(&analysisEntry.AnalyzedInstanceKey) + if err != nil { + return false, topologyRecovery, err + } + + altAnalysis, err := forceAnalysisEntry(analysisEntry.ClusterDetails.ClusterName, inst.DeadMaster, "", &analysisEntry.AnalyzedInstanceMasterKey) + if err != nil { + return false, topologyRecovery, err + } + recoveryAttempted, topologyRecovery, err = ForceExecuteRecovery(altAnalysis, &analysisEntry.AnalyzedInstanceKey, false) + if err != nil { + return recoveryAttempted, topologyRecovery, err + } + if err := TabletRefresh(analysisEntry.AnalyzedInstanceKey); err != nil { + log.Errore(err) + } + return recoveryAttempted, topologyRecovery, err +} + +// fixMaster sets the master as read-write. +func fixMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) + if topologyRecovery == nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixMaster.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + log.Infof("Analysis: %v, will fix master to read-write %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey) + + unlock, err := LockShard(analysisEntry.AnalyzedInstanceKey) + if err != nil { + log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+ + "skipProcesses: %v: NOT detecting/recovering host, could not obtain shard lock (%v)", + analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses, err) + return false, topologyRecovery, err + } + defer unlock(&err) + + if err := TabletUndoDemoteMaster(analysisEntry.AnalyzedInstanceKey); err != nil { + return false, topologyRecovery, err + } + return true, topologyRecovery, nil +} + +// fixReplica sets the replica as read-only and points it at the current master. +func fixReplica(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) + if topologyRecovery == nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + log.Infof("Analysis: %v, will fix replica %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey) + if _, err := inst.SetReadOnly(&analysisEntry.AnalyzedInstanceKey, true); err != nil { + return false, topologyRecovery, err + } + + masterKey, err := ShardMaster(&analysisEntry.AnalyzedInstanceKey) + if err != nil { + log.Info("Could not compute master for %+v", analysisEntry.AnalyzedInstanceKey) + return false, topologyRecovery, err + } + if _, err := inst.MoveBelowGTID(&analysisEntry.AnalyzedInstanceKey, masterKey); err != nil { + return false, topologyRecovery, err + } + return true, topologyRecovery, nil +} diff --git a/go/vt/orchestrator/ssl/ssl.go b/go/vt/orchestrator/ssl/ssl.go index 3f20a5ae610..325400b9ecd 100644 --- a/go/vt/orchestrator/ssl/ssl.go +++ b/go/vt/orchestrator/ssl/ssl.go @@ -75,7 +75,7 @@ func ReadCAFile(caFile string) (*x509.CertPool, error) { if !caCertPool.AppendCertsFromPEM(data) { return nil, errors.New("No certificates parsed") } - log.Info("Read in CA file:", caFile) + log.Infof("Read in CA file: %v", caFile) } return caCertPool, nil } @@ -166,7 +166,7 @@ func ReadPEMData(pemFile string, pemPass []byte) ([]byte, error) { if err != nil { return pemData, err } else { - log.Info("Decrypted", pemFile, "successfully") + log.Infof("Decrypted %v successfully", pemFile) } // Shove the decrypted DER bytes into a new pem Block with blank headers var newBlock pem.Block