From e93028fcceff2dc3b2c63a677ecb3ae5ddb5df75 Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Wed, 7 Feb 2024 14:45:58 +0800 Subject: [PATCH 01/18] unstable->3.5 (#2396) * fix: codis-dashboard uses 100% cpu(#2332) (#2393) Co-authored-by: liuchengyu * fix: The role displayed on the first Server in the Group area of the codis-fe is incorrect (#2350) (#2387) Co-authored-by: liuchengyu --------- Co-authored-by: Chengyu Liu Co-authored-by: liuchengyu --- codis/cmd/fe/assets/index.html | 5 ++++- codis/pkg/proxy/stats.go | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/codis/cmd/fe/assets/index.html b/codis/cmd/fe/assets/index.html index 023bf98c49..2524aa1fa2 100644 --- a/codis/cmd/fe/assets/index.html +++ b/codis/cmd/fe/assets/index.html @@ -538,7 +538,10 @@

Group

- S + + Master + Slave + diff --git a/codis/pkg/proxy/stats.go b/codis/pkg/proxy/stats.go index 06a2b67aa2..e157e31270 100644 --- a/codis/pkg/proxy/stats.go +++ b/codis/pkg/proxy/stats.go @@ -85,7 +85,13 @@ func init() { // Clear the accumulated maximum delay to 0 go func() { for { - time.Sleep(time.Duration(RefreshPeriod.Int64())) + refreshPeriod := RefreshPeriod.Int64() + if refreshPeriod == 0 { + time.Sleep(15 * time.Second) + } else { + time.Sleep(time.Duration(refreshPeriod)) + } + for _, s := range cmdstats.opmap { s.maxDelay.Set(0) } From f61f49af7376863a57486aacba0e06e2e1e5c01b Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Wed, 7 Feb 2024 15:40:27 +0800 Subject: [PATCH 02/18] unstable->3.5 (#2397) * fix: codis-dashboard uses 100% cpu(#2332) (#2393) Co-authored-by: liuchengyu * fix: The role displayed on the first Server in the Group area of the codis-fe is incorrect (#2350) (#2387) Co-authored-by: liuchengyu * fix: automatic fix master-slave replication relationship after master or slave service restarted (#2373, #2038, #1950, #1967, #2351)) (#2386) Co-authored-by: liuchengyu * feat:add 3.5.3 changelog (#2395) * add 3.5.3 changelog --------- Co-authored-by: chejinge --------- Co-authored-by: Chengyu Liu Co-authored-by: liuchengyu Co-authored-by: chejinge --- CHANGELOG.MD | 89 +++++++ CHANGELOG_CN.MD | 81 ++++++ codis/config/dashboard.toml | 7 +- codis/pkg/models/action.go | 3 + codis/pkg/models/group.go | 45 +++- codis/pkg/topom/config.go | 8 +- codis/pkg/topom/context.go | 2 +- codis/pkg/topom/topom.go | 18 +- codis/pkg/topom/topom_group.go | 317 ++++++++++++++++-------- codis/pkg/topom/topom_sentinel.go | 179 ++++++------- codis/pkg/topom/topom_stats.go | 20 +- codis/pkg/utils/redis/client.go | 35 ++- codis/pkg/utils/redis/client_test.go | 90 +++---- codis/pkg/utils/redis/codis_sentinel.go | 1 + codis/pkg/utils/redis/sentinel.go | 40 ++- 15 files changed, 658 insertions(+), 277 deletions(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 9170b5bb4c..003e369f5d 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,3 +1,92 @@ +# v3.5.3 + +## New features + +- Pika supports ACL[#2013](https://github.com/OpenAtomFoundation/pika/pull/2013) @[lqxhub](https://github.com/lqxhub) + +- Automatically resume service when Codis dashboard coroutine panics[#2349](https://github.com/OpenAtomFoundation/pika/pull/2349)@[chengyu-l](https://github.com/chengyu-l) + +- During the full replication process, the slave node of the pika service does not receive read traffic requests.[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[tedli](https://github.com/tedli) + +- Pika cache adds bimap data type.[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- Delete the remaining Slots in Sharing mode. There is only DB under Pika, and there are multiple DBs under one Pika.[#2251](https://github.com/OpenAtomFoundation/pika/pull/2251) @[Mixficsol](https://github.com/Mixficsol) + +- Pika exporter exposes cache-related data collection indicators.[#2318](https://github.com/OpenAtomFoundation/pika/pull/2318) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika supports separation of fast and slow commands.[#2162](https://github.com/OpenAtomFoundation/pika/pull/2162) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- After pika executes bgsave, retain the unix timepoint.[#2167](https://github.com/OpenAtomFoundation/pika/pull/2167) @[hero-heng](https://github.com/hero-heng) + +- Pika supports dynamic configuration of the disable_auto_compations parameter.[#2257](https://github.com/OpenAtomFoundation/pika/pull/2257) @[hero-heng](https://github.com/hero-heng) + +- Pika supports Redis Stream.[#1955](https://github.com/OpenAtomFoundation/pika/pull/1955) @[KKorpse](https://github.com/KKorpse) + +- Pika supports large key analysis tools[#2195](https://github.com/OpenAtomFoundation/pika/pull/2195) @[sjcsjc123](https://github.com/sjcsjc123) + +- Pika supports dynamic adjustment of Pika cache parameters[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- Updated Pika benchmark tool to support more interface stress tests.[#2222](https://github.com/OpenAtomFoundation/pika/pull/2222)@[wangshao1](https://github.com/wangshao1) + +- Pika Operator supports automatic expansion of pika clusters.[#2121](https://github.com/OpenAtomFoundation/pika/pull/2121)@[machinly](https://github.com/machinly/) + +- Add the CompactRange command to support compacting keys within a certain range.[#2163](https://github.com/OpenAtomFoundation/pika/pull/2163)@[u6th9d](https://github.com/u6th9d) + +- Add small time cost compaction policy.[#2172](https://github.com/OpenAtomFoundation/pika/pull/2172)@[u6th9d](https://github.com/u6th9d) + +- Upgrade RocksDB version to v8.7.3.[#2157](https://github.com/OpenAtomFoundation/pika/pull/2157)@[JasirVoriya](https://github.com/JasirVoriya) + +- Pika distributed cluster Codis proxy adds new observable indicators.[#2199](https://github.com/OpenAtomFoundation/pika/pull/2199)@[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika distributed cluster supports automatic failover.[#2386](https://github.com/OpenAtomFoundation/pika/pull/2386)@[chengyu-l](https://github.com/chengyu-l) + +## bugfix + +- Fixed an issue where Pika would accidentally delete dump files during full replication from the node.[#2377](https://github.com/OpenAtomFoundation/pika/pull/2377)@[wangshao1](https://github.com/wangshao1) + +- Fixed the processing logic after the slave node receives an abnormal response packet from the master during the master-slave replication process.[#2319](https://github.com/OpenAtomFoundation/pika/pull/2319)@[wangshao1](https://github.com/wangshao1) + +- Call disable compaction when pika executes the shutdown command to improve the process exit speed. [#2345](https://github.com/OpenAtomFoundation/pika/pull/2345) @[panlei-coder](https://github.com/panlei-coder) + +- Fix the problem of inaccurate Codis-dashboard Redis Memory value.[#2337](https://github.com/OpenAtomFoundation/pika/pull/2337) @[Mixficsol](https://github.com/Mixficsol) + +- The INFO command is time-consuming and optimized to reduce the frequency of disk checks. [#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- Fixed the issue where rsync deletes temporary files with incorrect paths and fails to delete them, causing rocksdb to fail to open.[#2186](https://github.com/OpenAtomFoundation/pika/pull/2186)@[wangshao1](https://github.com/wangshao1) + +- Fixed the problem that the compact, bgsave, and info keyspace commands did not specify the db name, resulting in some coredump commands.[#2194](https://github.com/OpenAtomFoundation/pika/pull/2194)@[u6th9d](https://github.com/u6th9d) + +- Codis dashboard uses info replication instead of info command to search master ip to reduce the performance impact on Pika. [#2198](https://github.com/OpenAtomFoundation/pika/pull/2198) @[chenbt-hz](https://github.com/chenbt-hz) + +- Fix Pika cache to use edge cases to solve the problem of cache and DB data inconsistency in some scenarios.[#2225](https://github.com/OpenAtomFoundation/pika/pull/2225) @[chejinge](https://github.com/chejinge) + +- Fixed the issue where Segmentation fault would be reported when the dump folder is empty.[#2265](https://github.com/OpenAtomFoundation/pika/pull/2265) @[chenbt-hz](https://github.com/chenbt-hz) + +- Fixed the problem that some command caches did not take effect due to flag calculation errors.[#2217](https://github.com/OpenAtomFoundation/pika/pull/2217) @[lqxhub](https://github.com/lqxhub) + +- Fixed the problem that in master-slave replication mode, after the master instance flushdb, the slave instance cannot be accessed due to deadlock.[#2249](https://github.com/OpenAtomFoundation/pika/pull/2249)@[ForestLH](https://github.com/ForestLH) + +- Fixed the issue where some commands did not judge the return value of RocksDB.[#2187](https://github.com/OpenAtomFoundation/pika/pull/2187)@[callme-taota](https://github.com/callme-taota) + +- Fixed the problem that some command caches did not take effect due to flag calculation errors.[#2217](https://github.com/OpenAtomFoundation/pika/pull/2217) @[lqxhub](https://github.com/lqxhub) + +- Fixed the problem that in master-slave replication mode, after the master instance flushdb, the slave instance cannot be accessed due to deadlock.[#2249](https://github.com/OpenAtomFoundation/pika/pull/2249)@[ForestLH](https://github.com/ForestLH) + +- Fixed the issue where some commands did not judge the return value of RocksDB.[#2187](https://github.com/OpenAtomFoundation/pika/pull/2187)@[callme-taota](https://github.com/callme-taota) + +- Fix the problem of info keyspace returning wrong results.[#2369](https://github.com/OpenAtomFoundation/pika/pull/2369)@[Mixficsol](https://github.com/Mixficsol) + +- Standard function return value and initial value.[#2176](https://github.com/OpenAtomFoundation/pika/pull/2176)@[Mixficsol](https://github.com/Mixficsol) + +- Fixed the problem of inaccurate network monitoring indicator statistics.[#2234](https://github.com/OpenAtomFoundation/pika/pull/2234)@[chengyu-l](https://github.com/chengyu-l) + +- Fixed an issue where some parameters in configuration file loading were abnormal.[#2218](https://github.com/OpenAtomFoundation/pika/pull/2218)@[jettcc](https://github.com/jettcc) + +- Fix Codis dashboard cpu used 100%.[#2393](https://github.com/OpenAtomFoundation/pika/pull/2393)@[chengyu-l](https://github.com/chengyu-l) + +- Fix the problem of abnormal display of master and slave roles in Codis fe of pika.[#2387](https://github.com/OpenAtomFoundation/pika/pull/2387)@[chengyu-l](https://github.com/chengyu-l) + + # v3.5.2 ## New features diff --git a/CHANGELOG_CN.MD b/CHANGELOG_CN.MD index 9c43cf2dba..95a76d0b9a 100644 --- a/CHANGELOG_CN.MD +++ b/CHANGELOG_CN.MD @@ -1,3 +1,84 @@ +# v3.5.3 + +## 新特性 + +- Pika 支持 ACL[#2013](https://github.com/OpenAtomFoundation/pika/pull/2013) @[lqxhub](https://github.com/lqxhub) + +- 在 Codis dashboard 协程 panic 时自动恢复服务[#2349](https://github.com/OpenAtomFoundation/pika/pull/2349)@[chengyu-l](https://github.com/chengyu-l) + +- 在全量复制的过程中,pika 服务的从节点不接收读流量请求 [#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[tedli](https://github.com/tedli) + +- Pika cache 新增 bimap数据类型[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- 删除 Sharing 模式残留的 Slot,Pika 下只有 DB,一个 Pika 下有多个 DB[#2251](https://github.com/OpenAtomFoundation/pika/pull/2251) @[Mixficsol](https://github.com/Mixficsol) + +- Pika exporter 暴露 cache 相关的数据采集指标[#2318](https://github.com/OpenAtomFoundation/pika/pull/2318) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika 支持快慢命令分离[#2162](https://github.com/OpenAtomFoundation/pika/pull/2162) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- pika 执行完成 Bgsave后, 保留 unix timepoint[#2167](https://github.com/OpenAtomFoundation/pika/pull/2167) @[hero-heng](https://github.com/hero-heng) + +- Pika 支持动态配置 disable_auto_compations 参数[#2257](https://github.com/OpenAtomFoundation/pika/pull/2257) @[hero-heng](https://github.com/hero-heng) + +- Pika 支持 Redis Stream[#1955](https://github.com/OpenAtomFoundation/pika/pull/1955) @[KKorpse](https://github.com/KKorpse) + +- Pika 支持大 key 分析工具[#2195](https://github.com/OpenAtomFoundation/pika/pull/2195) @[sjcsjc123](https://github.com/sjcsjc123) + +- Pika 支持动态调整 Pika cache 参数[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- 更新 Pika benchmark 工具支持更多的接口压测[#2222](https://github.com/OpenAtomFoundation/pika/pull/2222)@[wangshao1](https://github.com/wangshao1) + +- Pika Operator 支持 pika 集群自动扩容[#2121](https://github.com/OpenAtomFoundation/pika/pull/2121)@[machinly](https://github.com/machinly/) + +- 添加 CompactRange 命令支持对一定范围内的 key 进行 compact[#2163](https://github.com/OpenAtomFoundation/pika/pull/2163)@[u6th9d](https://github.com/u6th9d) + +- 提升 Compaction 速度减少 Compaction 耗时[#2172](https://github.com/OpenAtomFoundation/pika/pull/2172)@[u6th9d](https://github.com/u6th9d) + +- 升级 RocksDB 版本到 v8.7.3[#2157](https://github.com/OpenAtomFoundation/pika/pull/2157)@[JasirVoriya](https://github.com/JasirVoriya) + +- Pika 分布式集群 Codis proxy 新增可观测指标[#2199](https://github.com/OpenAtomFoundation/pika/pull/2199)@[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika 分布式集群支持自动 failover[#2386](https://github.com/OpenAtomFoundation/pika/pull/2386)@[chengyu-l](https://github.com/chengyu-l) + +## bugfix + +- 修复 Pika 有从节点进行全量复制期间会误删除 dump 文件的问题[#2377](https://github.com/OpenAtomFoundation/pika/pull/2377)@[wangshao1](https://github.com/wangshao1) + +- 修复主从复制过程中, slave 节点收到 master 异常回包后的处理逻辑[#2319](https://github.com/OpenAtomFoundation/pika/pull/2319)@[wangshao1](https://github.com/wangshao1) + +- 在 Pika 执行 shutdown 命令时调用 disable compaction, 提升进程退出速度 [#2345](https://github.com/OpenAtomFoundation/pika/pull/2345) @[panlei-coder](https://github.com/panlei-coder) + +- 修复 Codis-dashboard Redis Memory 值不准确的问题[#2337](https://github.com/OpenAtomFoundation/pika/pull/2337) @[Mixficsol](https://github.com/Mixficsol) + +- INFO 命令耗时优化,降低查磁盘频率 [#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- 修复 Rsync 删除临时文件路径不对,删除失败,导致rocksdb打开失败的问题[#2186](https://github.com/OpenAtomFoundation/pika/pull/2186)@[wangshao1](https://github.com/wangshao1) + +- 修复 Compact ,Bgsave ,Info keyspace 命令未指定db名称,导致部分命令 coredump 的问题[#2194](https://github.com/OpenAtomFoundation/pika/pull/2194)@[u6th9d](https://github.com/u6th9d) + +- Codis dashboard 用 info replication 替代 info 命令查寻 master ip 降低对 Pika 的性能影响 [#2198](https://github.com/OpenAtomFoundation/pika/pull/2198) @[chenbt-hz](https://github.com/chenbt-hz) + +- 修复 Pika cache 使用边缘case,解决部分场景下 cache 和 DB 数据不一致的问题[#2225](https://github.com/OpenAtomFoundation/pika/pull/2225) @[chejinge](https://github.com/chejinge) + +- 修复当 dump 文件夹为空时,会启动报错 Segmentation fault 的问题[#2265](https://github.com/OpenAtomFoundation/pika/pull/2265) @[chenbt-hz](https://github.com/chenbt-hz) + +- 修复因为flag计算错误,导致的部分命令缓存没有生效问题[#2217](https://github.com/OpenAtomFoundation/pika/pull/2217) @[lqxhub](https://github.com/lqxhub) + +- 修复主从复制模式下,主实例 flushdb 后,从实例因为死锁导致的不能访问的问题[#2249](https://github.com/OpenAtomFoundation/pika/pull/2249)@[ForestLH](https://github.com/ForestLH) + +- 修复部分命令未对 RocksDB 的返回值进行判断的问题[#2187](https://github.com/OpenAtomFoundation/pika/pull/2187)@[callme-taota](https://github.com/callme-taota) + +- 规范函数的返回值及初始值[#2176](https://github.com/OpenAtomFoundation/pika/pull/2176)@[Mixficsol](https://github.com/Mixficsol) + +- 修复网络监控指标统计不准确的问题[#2234](https://github.com/OpenAtomFoundation/pika/pull/2234)@[chengyu-l](https://github.com/chengyu-l) + +- 修复配置文件加载部分参数异常的问题[#2218](https://github.com/OpenAtomFoundation/pika/pull/2218)@[jettcc](https://github.com/jettcc) + +- 修复 Codis dashboard cpu 100% 的问题[#2393](https://github.com/OpenAtomFoundation/pika/pull/2393)@[chengyu-l](https://github.com/chengyu-l) + +- 修复 Codis fe pika 主从角色显示异常的问题[#2387](https://github.com/OpenAtomFoundation/pika/pull/2387)@[chengyu-l](https://github.com/chengyu-l) + + # v3.5.2 ## 新特性 diff --git a/codis/config/dashboard.toml b/codis/config/dashboard.toml index ebd910ec62..44ef06213a 100644 --- a/codis/config/dashboard.toml +++ b/codis/config/dashboard.toml @@ -33,9 +33,10 @@ migration_async_numkeys = 500 migration_timeout = "30s" # Set configs for redis sentinel. -sentinel_check_server_state_interval = "5s" -sentinel_check_master_failover_interval = "1s" -sentinel_master_dead_check_times = 5 +sentinel_check_server_state_interval = "10s" +sentinel_check_master_failover_interval = "2s" +sentinel_master_dead_check_times = 10 +sentinel_check_offline_server_interval = "2s" sentinel_client_timeout = "10s" sentinel_quorum = 2 sentinel_parallel_syncs = 1 diff --git a/codis/pkg/models/action.go b/codis/pkg/models/action.go index 80dbe4b55c..1138df7b4b 100644 --- a/codis/pkg/models/action.go +++ b/codis/pkg/models/action.go @@ -11,4 +11,7 @@ const ( ActionMigrating = "migrating" ActionFinished = "finished" ActionSyncing = "syncing" + ActionSynced = "synced" + + ActionSyncedFailed = "synced_failed" ) diff --git a/codis/pkg/models/group.go b/codis/pkg/models/group.go index 11d6e7bf52..092ec2f117 100644 --- a/codis/pkg/models/group.go +++ b/codis/pkg/models/group.go @@ -25,6 +25,38 @@ func (g *Group) GetServersMap() map[string]*GroupServer { return results } +// SelectNewMaster choose a new master node in the group +func (g *Group) SelectNewMaster() (string, int) { + var newMasterServer *GroupServer + var newMasterIndex = -1 + + for index, server := range g.Servers { + if index == 0 || server.State != GroupServerStateNormal { + continue + } + + if newMasterServer == nil { + newMasterServer = server + newMasterIndex = index + } else if server.DbBinlogFileNum > newMasterServer.DbBinlogFileNum { + // Select the slave node with the latest offset as the master node + newMasterServer = server + newMasterIndex = index + } else if server.DbBinlogFileNum == newMasterServer.DbBinlogFileNum { + if server.DbBinlogOffset > newMasterServer.DbBinlogOffset { + newMasterServer = server + newMasterIndex = index + } + } + } + + if newMasterServer == nil { + return "", newMasterIndex + } + + return newMasterServer.Addr, newMasterIndex +} + type GroupServerState int8 const ( @@ -33,6 +65,13 @@ const ( GroupServerStateOffline ) +type GroupServerRole string + +const ( + RoleMaster GroupServerRole = "master" + RoleSlave GroupServerRole = "slave" +) + type GroupServer struct { Addr string `json:"server"` DataCenter string `json:"datacenter"` @@ -43,9 +82,11 @@ type GroupServer struct { } `json:"action"` // master or slave - Role string `json:"role"` + Role GroupServerRole `json:"role"` // If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field - ReplyOffset int `json:"reply_offset"` + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + // Monitoring status, 0 normal, 1 subjective offline, 2 actual offline // If marked as 2 , no service is provided State GroupServerState `json:"state"` diff --git a/codis/pkg/topom/config.go b/codis/pkg/topom/config.go index 4d7234b662..d1e0d44e5f 100644 --- a/codis/pkg/topom/config.go +++ b/codis/pkg/topom/config.go @@ -50,9 +50,10 @@ migration_async_numkeys = 500 migration_timeout = "30s" # Set configs for redis sentinel. -sentinel_check_server_state_interval = "5s" -sentinel_check_master_failover_interval = "1s" -sentinel_master_dead_check_times = 5 +sentinel_check_server_state_interval = "10s" +sentinel_check_master_failover_interval = "2s" +sentinel_master_dead_check_times = 10 +sentinel_check_offline_server_interval = "2s" sentinel_client_timeout = "10s" sentinel_quorum = 2 sentinel_parallel_syncs = 1 @@ -86,6 +87,7 @@ type Config struct { SentinelCheckServerStateInterval timesize.Duration `toml:"sentinel_check_server_state_interval" json:"sentinel_client_timeout"` SentinelCheckMasterFailoverInterval timesize.Duration `toml:"sentinel_check_master_failover_interval" json:"sentinel_check_master_failover_interval"` SentinelMasterDeadCheckTimes int8 `toml:"sentinel_master_dead_check_times" json:"sentinel_master_dead_check_times"` + SentinelCheckOfflineServerInterval timesize.Duration `toml:"sentinel_check_offline_server_interval" json:"sentinel_check_offline_server_interval"` SentinelClientTimeout timesize.Duration `toml:"sentinel_client_timeout" json:"sentinel_client_timeout"` SentinelQuorum int `toml:"sentinel_quorum" json:"sentinel_quorum"` SentinelParallelSyncs int `toml:"sentinel_parallel_syncs" json:"sentinel_parallel_syncs"` diff --git a/codis/pkg/topom/context.go b/codis/pkg/topom/context.go index b765154e7c..fcec2157e3 100644 --- a/codis/pkg/topom/context.go +++ b/codis/pkg/topom/context.go @@ -40,7 +40,7 @@ func (ctx *context) getSlotMapping(sid int) (*models.SlotMapping, error) { } func (ctx *context) getSlotMappingsByGroupId(gid int) []*models.SlotMapping { - var slots = []*models.SlotMapping{} + var slots []*models.SlotMapping for _, m := range ctx.slots { if m.GroupId == gid || m.Action.TargetId == gid { slots = append(slots, m) diff --git a/codis/pkg/topom/topom.go b/codis/pkg/topom/topom.go index 67bcd7daca..f2c34f6b58 100644 --- a/codis/pkg/topom/topom.go +++ b/codis/pkg/topom/topom.go @@ -210,12 +210,12 @@ func (s *Topom) Start(routines bool) error { } }, nil, true, 0) - // Check the status of the pre-offline master every 1 second + // Check the status of the pre-offline master every 2 second // to determine whether to automatically switch master and slave gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { - w, _ := s.CheckPreOffineMastersState(5 * time.Second) + w, _ := s.CheckPreOfflineMastersState(5 * time.Second) if w != nil { w.Wait() } @@ -224,6 +224,20 @@ func (s *Topom) Start(routines bool) error { } }, nil, true, 0) + // Check the status of the offline master and slave every 30 second + // to determine whether to automatically recover to right master-slave replication relationship + gxruntime.GoUnterminated(func() { + for !s.IsClosed() { + if s.IsOnline() { + w, _ := s.CheckOfflineMastersAndSlavesState(5 * time.Second) + if w != nil { + w.Wait() + } + } + time.Sleep(s.Config().SentinelCheckOfflineServerInterval.Duration()) + } + }, nil, true, 0) + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index 46a53c417f..517fb2da4c 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -302,18 +302,7 @@ func (s *Topom) GroupPromoteServer(gid int, addr string) error { if err := s.storeUpdateGroup(g); err != nil { return err } - - var ( - master = slice[0].Addr - client *redis.Client - ) - if client, err = redis.NewClient(master, s.config.ProductAuth, time.Second); err != nil { - log.WarnErrorf(err, "create redis client to %s failed", master) - } - defer client.Close() - if err = client.SetMaster("NO:ONE"); err != nil { - log.WarnErrorf(err, "redis %s set master to NO:ONE failed", master) - } + _ = promoteServerToNewMaster(slice[0].Addr, s.config.ProductAuth) fallthrough case models.ActionFinished: @@ -341,129 +330,243 @@ func (s *Topom) GroupPromoteServer(gid int, addr string) error { } } -func (s *Topom) trySwitchGroupMaster(gid int, cache *redis.InfoCache) error { - ctx, err := s.newContext() - if err != nil { - return err - } - g, err := ctx.getGroup(gid) - if err != nil { - return err +func (s *Topom) tryFixReplicationRelationships(ctx *context, recoveredGroupServers []*redis.ReplicationState) { + for _, state := range recoveredGroupServers { + log.Infof("group-[%d] try to fix server[%v-%v] replication relationship", state.GroupID, state.Index, state.Addr) + group, err := ctx.getGroup(state.GroupID) + if err != nil { + log.Error(err) + continue + } + + group.OutOfSync = true + err = s.storeUpdateGroup(group) + if err != nil { + s.dirtyGroupCache(group.Id) + continue + } + + err = s.tryFixReplicationRelationship(group, state.Server, state) + if err != nil { + log.Warnf("group-[%d] fix server[%v] replication relationship failed, err: %v", group.Id, state.Addr, err) + continue + } + + // Notify all servers to update slot information + slots := ctx.getSlotMappingsByGroupId(group.Id) + if err = s.resyncSlotMappings(ctx, slots...); err != nil { + log.Warnf("group-[%d] notify all proxy failed, %v", group.Id, err) + continue + } else { + group.OutOfSync = false + _ = s.storeUpdateGroup(group) + s.dirtyGroupCache(group.Id) + } } +} - master := s.selectNextMaster(g.Servers) +// tryFixReplicationRelationship +// +// master or slave have already recovered service, fix its master-slave replication relationship. +// only fix which the old state of GroupServer is GroupServerStateOffline. +// It will only update the state of GroupServer to GroupServerStateNormal, If the GroupServer have right +// master-slave replication relationship. +func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *models.GroupServer, state *redis.ReplicationState) (err error) { + curMasterAddr := group.Servers[0].Addr + if isGroupMaster(state, group) { + // current server is master, + if models.GroupServerRole(state.Replication.Role) == models.RoleMaster { + return nil + } + + // execute the command `slaveof no one` + if err = promoteServerToNewMaster(state.Addr, s.config.ProductAuth); err != nil { + return err + } + } else { + // skip if it has right replication relationship + if state.Replication.GetMasterAddr() == curMasterAddr { + return nil + } - if master == "" { - servers, _ := json.Marshal(g) - log.Errorf("group %d donn't has any slaves to switch master, %s", gid, servers) - return errors.Errorf("cann't switch slave to master") + // current server is slave, execute the command `slaveof [new master ip] [new master port]` + if err = updateMasterToNewOne(groupServer.Addr, curMasterAddr, s.config.ProductAuth); err != nil { + return err + } } - return s.doSwitchGroupMaster(gid, master, cache) + groupServer.State = models.GroupServerStateNormal + groupServer.ReCallTimes = 0 + groupServer.ReplicaGroup = true + groupServer.Role = models.GroupServerRole(state.Replication.Role) + groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum + groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.Action.State = models.ActionSynced + err = s.storeUpdateGroup(group) + // clean cache whether err is nil or not + s.dirtyGroupCache(group.Id) + return err } -// Choose to change to the next master node in the group -func (s *Topom) selectNextMaster(servers []*models.GroupServer) string { - if len(servers) == 0 { - return "" - } +func isGroupMaster(state *redis.ReplicationState, g *models.Group) bool { + return state.Index == 0 && g.Servers[0].Addr == state.Addr +} - var masterServer *models.GroupServer +func (s *Topom) updateSlaveOfflineGroups(ctx *context, offlineGroups []*models.Group) { + for _, group := range offlineGroups { + log.Infof("group-[%d] update slave offline state", group.Id) + group.OutOfSync = true + err := s.storeUpdateGroup(group) + if err != nil { + s.dirtyGroupCache(group.Id) + continue + } - for _, server := range servers { - if server.State != models.GroupServerStateNormal { + // Notify all servers to update slot information + slots := ctx.getSlotMappingsByGroupId(group.Id) + if err := s.resyncSlotMappings(ctx, slots...); err != nil { + log.Warnf("group-[%d] notify all proxy failed, %v", group.Id, err) continue } + } +} - // If there is already a master node in the group working normally, return directly - if server.Role == "master" { - return server.Addr +// trySwitchGroupsToNewMaster +// +// the master have already been offline, and it will select and switch to a new master from the Group +func (s *Topom) trySwitchGroupsToNewMaster(ctx *context, masterOfflineGroups []*models.Group) { + for _, group := range masterOfflineGroups { + log.Infof("group-[%d] try to switch new master", group.Id) + group.OutOfSync = true + err := s.storeUpdateGroup(group) + if err != nil { + s.dirtyGroupCache(group.Id) + continue } - if masterServer == nil { - masterServer = server - } else if server.ReplyOffset > masterServer.ReplyOffset { - // Select the slave node with the latest offset as the master node - masterServer = server + // try to switch to new master + if err := s.trySwitchGroupMaster(group); err != nil { + log.Errorf("group-[%d] switch master failed, %v", group.Id, err) + continue + } + + // Notify all servers to update slot information + slots := ctx.getSlotMappingsByGroupId(group.Id) + if err := s.resyncSlotMappings(ctx, slots...); err != nil { + log.Warnf("group-[%d] notify all proxy failed, %v", group.Id, err) + continue + } else { + group.OutOfSync = false + _ = s.storeUpdateGroup(group) + s.dirtyGroupCache(group.Id) } } +} - if masterServer == nil { - return "" +func (s *Topom) trySwitchGroupMaster(group *models.Group) error { + newMasterAddr, newMasterIndex := group.SelectNewMaster() + if newMasterAddr == "" { + servers, _ := json.Marshal(group) + log.Errorf("group %d don't has any slaves to switch master, %s", group.Id, servers) + return errors.Errorf("can't switch slave to master") } - return masterServer.Addr + // TODO liuchengyu check new master is available + //available := isAvailableAsNewMaster(masterServer, s.Config()) + //if !available { + // return "" + //} + + return s.doSwitchGroupMaster(group, newMasterAddr, newMasterIndex) } -func (s *Topom) doSwitchGroupMaster(gid int, master string, cache *redis.InfoCache) error { - ctx, err := s.newContext() +func isAvailableAsNewMaster(groupServer *models.GroupServer, conf *Config) bool { + rc, err := redis.NewClient(groupServer.Addr, conf.ProductAuth, 500*time.Millisecond) if err != nil { - return err + log.Warnf("connect GroupServer[%v] failed!, error:%v", groupServer.Addr, err) + return false } - g, err := ctx.getGroup(gid) + defer rc.Close() + + info, err := rc.InfoReplication() if err != nil { - return err + log.Warnf("get InfoReplication from GroupServer[%v] failed!, error:%v", groupServer.Addr, err) + return false } - var index = func() int { - for i, x := range g.Servers { - if x.Addr == master { - return i - } - } - for i, x := range g.Servers { - rid1 := cache.GetRunId(master) - rid2 := cache.GetRunId(x.Addr) - if rid1 != "" && rid1 == rid2 { - return i - } - } - return -1 - }() - if index == -1 { - return errors.Errorf("group-[%d] doesn't have server %s with runid = '%s'", g.Id, master, cache.GetRunId(master)) + if info.MasterLinkStatus == "down" { + // down state means the slave does not finished full sync from master + log.Warnf("the master_link_status of GroupServer[%v] is down state. it cannot be selected as master", groupServer.Addr) + return false } - if index == 0 { + + return true +} + +func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMasterIndex int) (err error) { + if newMasterIndex <= 0 || newMasterAddr == "" { return nil } - defer s.dirtyGroupCache(g.Id) - - log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, index, g.Servers[index].Addr) + log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, newMasterIndex, newMasterAddr) // Set the slave node as the new master node - var client *redis.Client - if client, err = redis.NewClient(master, s.config.ProductAuth, 100*time.Millisecond); err != nil { - log.WarnErrorf(err, "create redis client to %s failed", master) - return err + if err = promoteServerToNewMaster(newMasterAddr, s.config.ProductAuth); err != nil { + return errors.Errorf("promote server[%v] to new master failed, err:%v", newMasterAddr, err) } - defer client.Close() - if err = client.SetMaster("NO:ONE"); err != nil { - log.WarnErrorf(err, "redis %s set master to NO:ONE failed", master) - return err - } + g.Servers[newMasterIndex].Role = models.RoleMaster + g.Servers[newMasterIndex].Action.State = models.ActionSynced + g.Servers[0], g.Servers[newMasterIndex] = g.Servers[newMasterIndex], g.Servers[0] + defer func() { + err = s.storeUpdateGroup(g) + // clean cache whether err is nil or not + s.dirtyGroupCache(g.Id) + }() // Set other nodes in the group as slave nodes of the new master node for _, server := range g.Servers { - if server.State != models.GroupServerStateNormal || server.Addr == master { + if server.State != models.GroupServerStateNormal || server.Addr == newMasterAddr { continue } - var client2 *redis.Client - if client2, err = redis.NewClient(server.Addr, s.config.ProductAuth, 100*time.Millisecond); err != nil { - log.WarnErrorf(err, "create redis client to %s failed", master) - return err - } - defer client2.Close() - if err = client2.SetMaster(master); err != nil { - log.WarnErrorf(err, "redis %s set master to %s failed", server.Addr, master) - return err + + err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth) + if err != nil { + // skip err, and retry to update master-slave replication relationship through next heartbeat check + err = nil + server.Action.State = models.ActionSyncedFailed + server.State = models.GroupServerStateOffline + log.Warnf("group-[%d] update server[%d] replication relationship failed, new master: %s", g.Id, newMasterIndex, newMasterAddr) + } else { + server.Action.State = models.ActionSynced + server.Role = models.RoleSlave } } - g.Servers[0], g.Servers[index] = g.Servers[index], g.Servers[0] - g.Servers[0].Role = "master" - g.OutOfSync = true - return s.storeUpdateGroup(g) + return err +} + +func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) { + return setNewRedisMaster(serverAddr, masterAddr, auth, false) +} + +func promoteServerToNewMaster(serverAddr, auth string) (err error) { + return setNewRedisMaster(serverAddr, "NO:ONE", auth, false) +} + +func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) { + return setNewRedisMaster(serverAddr, masterAddr, auth, true) +} + +func setNewRedisMaster(serverAddr, masterAddr string, auth string, force bool) (err error) { + var rc *redis.Client + if rc, err = redis.NewClient(serverAddr, auth, 500*time.Millisecond); err != nil { + return errors.Errorf("create redis client to %s failed, err:%v", serverAddr, err) + } + defer rc.Close() + if err = rc.SetMaster(masterAddr, force); err != nil { + return errors.Errorf("server[%s] set master to %s failed, force:%v err:%v", serverAddr, masterAddr, force, err) + } + return err } func (s *Topom) EnableReplicaGroups(gid int, addr string, value bool) error { @@ -640,11 +743,14 @@ func (s *Topom) SyncActionComplete(addr string, failed bool) error { var state string if !failed { - state = "synced" + state = models.ActionSynced } else { - state = "synced_failed" + state = models.ActionSyncedFailed } g.Servers[index].Action.State = state + // check whether the master is offline through heartbeat, if so, select a new master + g.Servers[index].State = models.GroupServerStateOffline + return s.storeUpdateGroup(g) } @@ -665,21 +771,16 @@ func (s *Topom) newSyncActionExecutor(addr string) (func() error, error) { return nil, nil } - var master = "NO:ONE" + var masterAddr string if index != 0 { - master = g.Servers[0].Addr + masterAddr = g.Servers[0].Addr } + return func() error { - c, err := redis.NewClient(addr, s.config.ProductAuth, time.Minute*30) - if err != nil { - log.WarnErrorf(err, "create redis client to %s failed", addr) - return err + if index != 0 { + return updateMasterToNewOne(addr, masterAddr, s.config.ProductAuth) + } else { + return promoteServerToNewMaster(addr, s.config.ProductAuth) } - defer c.Close() - if err := c.SetMaster(master); err != nil { - log.WarnErrorf(err, "redis %s set master to %s failed", addr, master) - return err - } - return nil }, nil } diff --git a/codis/pkg/topom/topom_sentinel.go b/codis/pkg/topom/topom_sentinel.go index 88a20403c9..3ea8b3cd9f 100644 --- a/codis/pkg/topom/topom_sentinel.go +++ b/codis/pkg/topom/topom_sentinel.go @@ -4,14 +4,11 @@ package topom import ( - "time" - "pika/codis/v2/pkg/models" - "pika/codis/v2/pkg/utils/log" "pika/codis/v2/pkg/utils/redis" ) -func (s *Topom) CheckAndSwitchSlavesAndMasters(filter func(index int, g *models.GroupServer) bool) error { +func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *models.GroupServer) bool) error { s.mu.Lock() defer s.mu.Unlock() ctx, err := s.newContext() @@ -19,110 +16,114 @@ func (s *Topom) CheckAndSwitchSlavesAndMasters(filter func(index int, g *models. return err } - config := &redis.MonitorConfig{ - Quorum: s.config.SentinelQuorum, - ParallelSyncs: s.config.SentinelParallelSyncs, - DownAfter: s.config.SentinelDownAfter.Duration(), - FailoverTimeout: s.config.SentinelFailoverTimeout.Duration(), - NotificationScript: s.config.SentinelNotificationScript, - ClientReconfigScript: s.config.SentinelClientReconfigScript, - } - - sentinel := redis.NewCodisSentinel(s.config.ProductName, s.config.ProductAuth) - gs := make(map[int][]*models.GroupServer) - for gid, servers := range ctx.getGroupServers() { - for i, server := range servers { - if filter(i, server) { - if val, ok := gs[gid]; ok { - gs[gid] = append(val, server) - } else { - gs[gid] = []*models.GroupServer{server} - } - } - } - } - if len(gs) == 0 { + groupServers := filterGroupServer(ctx.getGroupServers(), filter) + if len(groupServers) == 0 { return nil } - states := sentinel.RefreshMastersAndSlavesClient(config.ParallelSyncs, gs) - - var pending []*models.Group - + states := checkGroupServersReplicationState(s.Config(), groupServers) + var slaveOfflineGroups []*models.Group + var masterOfflineGroups []*models.Group + var recoveredGroupServersState []*redis.ReplicationState + var group *models.Group for _, state := range states { - var g *models.Group - if g, err = ctx.getGroup(state.GroupID); err != nil { + group, err = ctx.getGroup(state.GroupID) + if err != nil { return err } - serversMap := g.GetServersMap() - if len(serversMap) == 0 { - continue - } + s.checkAndUpdateGroupServerState(s.Config(), group, state.Server, state, &slaveOfflineGroups, + &masterOfflineGroups, &recoveredGroupServersState) + } - // It was the master node before, the master node hangs up, and it is currently the master node - if state.Index == 0 && state.Err != nil && g.Servers[0].Addr == state.Addr { - if g.Servers[0].State == models.GroupServerStateNormal { - g.Servers[0].State = models.GroupServerStateSubjectiveOffline - } else { - // update retries - g.Servers[0].ReCallTimes++ - - // Retry more than config times, start election - if g.Servers[0].ReCallTimes >= s.Config().SentinelMasterDeadCheckTimes { - // Mark enters objective offline state - g.Servers[0].State = models.GroupServerStateOffline - g.Servers[0].ReplicaGroup = false - } - // Start the election master node - if g.Servers[0].State == models.GroupServerStateOffline { - pending = append(pending, g) - } - } - } + if len(slaveOfflineGroups) > 0 { + // slave has been offline, and update state + s.updateSlaveOfflineGroups(ctx, slaveOfflineGroups) + } - // Update the offset information of the state and role nodes - if val, ok := serversMap[state.Addr]; ok { - if state.Err != nil { - if val.State == models.GroupServerStateNormal { - val.State = models.GroupServerStateSubjectiveOffline - } - continue + if len(masterOfflineGroups) > 0 { + // old master offline already, auto switch to new master + s.trySwitchGroupsToNewMaster(ctx, masterOfflineGroups) + } + + if len(recoveredGroupServersState) > 0 { + // offline GroupServer's service has recovered, check and fix it's master-slave replication relationship + s.tryFixReplicationRelationships(ctx, recoveredGroupServersState) + } + + return nil +} + +func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group, groupServer *models.GroupServer, + state *redis.ReplicationState, slaveOfflineGroups *[]*models.Group, masterOfflineGroups *[]*models.Group, + recoveredGroupServers *[]*redis.ReplicationState) { + if state.Err != nil { + if groupServer.State == models.GroupServerStateNormal { + // pre offline + groupServer.State = models.GroupServerStateSubjectiveOffline + } else { + // update retries + groupServer.ReCallTimes++ + + // Retry more than config times, start election + if groupServer.ReCallTimes >= conf.SentinelMasterDeadCheckTimes { + // Mark enters objective offline state + groupServer.State = models.GroupServerStateOffline + groupServer.Action.State = models.ActionNothing + groupServer.ReplicaGroup = false } - val.State = models.GroupServerStateNormal - val.ReCallTimes = 0 - val.Role = state.Replication.Role - if val.Role == "master" { - val.ReplyOffset = state.Replication.MasterReplOffset + // Start the election master node + if groupServer.State == models.GroupServerStateOffline && isGroupMaster(state, group) { + *masterOfflineGroups = append(*masterOfflineGroups, group) } else { - val.ReplyOffset = state.Replication.SlaveReplOffset + *slaveOfflineGroups = append(*slaveOfflineGroups, group) } } + } else { + if groupServer.State == models.GroupServerStateOffline { + *recoveredGroupServers = append(*recoveredGroupServers, state) + // update GroupServer to GroupServerStateNormal state later + } else { + // Update the offset information of the state and role nodes + groupServer.State = models.GroupServerStateNormal + groupServer.ReCallTimes = 0 + groupServer.ReplicaGroup = true + groupServer.Role = models.GroupServerRole(state.Replication.Role) + groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum + groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.Action.State = models.ActionSynced + } } +} - if len(pending) == 0 { - return nil +func checkGroupServersReplicationState(conf *Config, gs map[int][]*models.GroupServer) []*redis.ReplicationState { + config := &redis.MonitorConfig{ + Quorum: conf.SentinelQuorum, + ParallelSyncs: conf.SentinelParallelSyncs, + DownAfter: conf.SentinelDownAfter.Duration(), + FailoverTimeout: conf.SentinelFailoverTimeout.Duration(), + NotificationScript: conf.SentinelNotificationScript, + ClientReconfigScript: conf.SentinelClientReconfigScript, } - cache := &redis.InfoCache{ - Auth: s.config.ProductAuth, Timeout: time.Millisecond * 100, - } - // Try to switch master slave - for _, g := range pending { - if err = s.trySwitchGroupMaster(g.Id, cache); err != nil { - log.Errorf("gid-[%d] switch master failed, %v", g.Id, err) - continue - } + sentinel := redis.NewCodisSentinel(conf.ProductName, conf.ProductAuth) + return sentinel.RefreshMastersAndSlavesClient(config.ParallelSyncs, gs) +} - slots := ctx.getSlotMappingsByGroupId(g.Id) - // Notify all servers to update slot information - if err = s.resyncSlotMappings(ctx, slots...); err != nil { - log.Warnf("group-[%d] resync-rollback to preparing", g.Id) - continue +func filterGroupServer(groupServers map[int][]*models.GroupServer, + filter func(index int, gs *models.GroupServer) bool) map[int][]*models.GroupServer { + filteredGroupServers := make(map[int][]*models.GroupServer) + for gid, servers := range groupServers { + for i, server := range servers { + if filter(i, server) { + if val, ok := filteredGroupServers[gid]; ok { + filteredGroupServers[gid] = append(val, server) + } else { + filteredGroupServers[gid] = []*models.GroupServer{server} + } + } } - s.dirtyGroupCache(g.Id) } - - return nil + return filteredGroupServers } diff --git a/codis/pkg/topom/topom_stats.go b/codis/pkg/topom/topom_stats.go index 9186e05a13..d9538cc7ad 100644 --- a/codis/pkg/topom/topom_stats.go +++ b/codis/pkg/topom/topom_stats.go @@ -167,7 +167,7 @@ func (s *Topom) newMastersAndSlavesStats(timeout time.Duration, filter func(inde go func() { defer close(ch) - err := s.CheckAndSwitchSlavesAndMasters(filter) + err := s.CheckStateAndSwitchSlavesAndMasters(filter) if err != nil { log.Errorf("refresh masters and slaves failed, %v", err) stats.Error = err @@ -189,19 +189,31 @@ func (s *Topom) CheckMastersAndSlavesState(timeout time.Duration) (*sync.WaitGro wg := &sync.WaitGroup{} wg.Add(1) go s.newMastersAndSlavesStats(timeout, func(index int, g *models.GroupServer) bool { - return index != 0 || g.State == models.GroupServerStateNormal + return g.State == models.GroupServerStateNormal }, wg) return wg, nil } -func (s *Topom) CheckPreOffineMastersState(timeout time.Duration) (*sync.WaitGroup, error) { +func (s *Topom) CheckPreOfflineMastersState(timeout time.Duration) (*sync.WaitGroup, error) { s.mu.Lock() defer s.mu.Unlock() wg := &sync.WaitGroup{} wg.Add(1) go s.newMastersAndSlavesStats(timeout, func(index int, g *models.GroupServer) bool { - return index == 0 && g.State != models.GroupServerStateNormal + return g.State == models.GroupServerStateSubjectiveOffline + }, wg) + return wg, nil +} + +func (s *Topom) CheckOfflineMastersAndSlavesState(timeout time.Duration) (*sync.WaitGroup, error) { + s.mu.Lock() + defer s.mu.Unlock() + + wg := &sync.WaitGroup{} + wg.Add(1) + go s.newMastersAndSlavesStats(timeout, func(index int, g *models.GroupServer) bool { + return g.State == models.GroupServerStateOffline }, wg) return wg, nil } diff --git a/codis/pkg/utils/redis/client.go b/codis/pkg/utils/redis/client.go index 21ae9e83b6..5f751321db 100644 --- a/codis/pkg/utils/redis/client.go +++ b/codis/pkg/utils/redis/client.go @@ -46,7 +46,7 @@ func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout), }...) if err != nil { - return nil, errors.Trace(err) + return nil, err } return &Client{ conn: c, Addr: addr, Auth: auth, @@ -203,11 +203,16 @@ func (c *Client) InfoReplication() (*InfoReplication, error) { return nil, errors.Trace(err) } + return parseInfoReplication(text) +} + +func parseInfoReplication(text string) (*InfoReplication, error) { var ( info = make(map[string]string) slaveMap = make([]map[string]string, 0) infoReplication InfoReplication slaves []InfoSlave + err error ) for _, line := range strings.Split(text, "\n") { @@ -231,6 +236,21 @@ func (c *Client) InfoReplication() (*InfoReplication, error) { } slaveMap = append(slaveMap, slave) + } else if strings.HasPrefix(key, "db0") { + // consider only the case of having one DB (db0) + kvArray := strings.Split(kv[1], ",") + for _, kvStr := range kvArray { + subKvArray := strings.Split(kvStr, "=") + if len(subKvArray) != 2 { + continue + } + + if subKvArray[0] == "binlog_offset" { + fileNumAndOffset := strings.Split(subKvArray[1], " ") + info["binlog_file_num"] = strings.TrimSpace(fileNumAndOffset[0]) + info["binlog_offset"] = strings.TrimSpace(fileNumAndOffset[1]) + } + } } else { info[key] = strings.TrimSpace(kv[1]) } @@ -306,7 +326,7 @@ func (c *Client) InfoFullv2() (map[string]string, error) { } } -func (c *Client) SetMaster(master string) error { +func (c *Client) SetMaster(master string, force bool) error { if master == "" || strings.ToUpper(master) == "NO:ONE" { if _, err := c.Do("SLAVEOF", "NO", "ONE"); err != nil { return err @@ -319,8 +339,15 @@ func (c *Client) SetMaster(master string) error { if _, err := c.Do("CONFIG", "set", "masterauth", c.Auth); err != nil { return err } - if _, err := c.Do("SLAVEOF", host, port); err != nil { - return err + + if force { + if _, err := c.Do("SLAVEOF", host, port, "-f"); err != nil { + return err + } + } else { + if _, err := c.Do("SLAVEOF", host, port); err != nil { + return err + } } } if _, err := c.Do("CONFIG", "REWRITE"); err != nil { diff --git a/codis/pkg/utils/redis/client_test.go b/codis/pkg/utils/redis/client_test.go index db726f2b4c..f867ee9175 100644 --- a/codis/pkg/utils/redis/client_test.go +++ b/codis/pkg/utils/redis/client_test.go @@ -1,63 +1,53 @@ package redis import ( - "encoding/json" "fmt" - "regexp" - "strings" "testing" + + "github.com/stretchr/testify/assert" ) -func TestKk(t *testing.T) { - ok, err := regexp.Match("slave[0-9]+", []byte("slave_01")) +func TestMasterInfoReplication(t *testing.T) { + text := ` +# Replication(MASTER) +role:master +ReplicationID:94e8feeaf9036a77c59ad2f091f1c0b0858047f06fa1e09afa +connected_slaves:1 +slave0:ip=10.224.129.104,port=9971,conn_fd=104,lag=(db0:0) +db0:binlog_offset=2 384,safety_purge=none +` + res, err := parseInfoReplication(text) + if err != nil { + fmt.Println(err) + return + } - fmt.Sprintln(ok, err) + assert.Equal(t, res.DbBinlogFileNum, uint64(2), "db0 binlog file_num not right") + assert.Equal(t, res.DbBinlogOffset, uint64(384), "db0 binlog offset not right") + assert.Equal(t, len(res.Slaves), 1, "slaves numbers not right") + assert.Equal(t, res.Slaves[0].IP, "10.224.129.104", "slave0 IP not right") + assert.Equal(t, res.Slaves[0].Port, "9971", "slave0 Port not right") } -func TestParseInfo(t *testing.T) { - text := "# Replication\nrole:master\nconnected_slaves:1\nslave0:ip=10.174.22.228,port=9225,state=online,offset=2175592,lag=0\nmaster_repl_offset:2175592\nrepl_backlog_active:1\nrepl_backlog_size:1048576\nrepl_backlog_first_byte_offset:1127017\nrepl_backlog_histlen:1048576\n" - info := make(map[string]string) - slaveMap := make([]map[string]string, 0) - var slaves []InfoSlave - var infoReplication InfoReplication - - for _, line := range strings.Split(text, "\n") { - kv := strings.SplitN(line, ":", 2) - if len(kv) != 2 { - continue - } - - if key := strings.TrimSpace(kv[0]); key != "" { - if ok, _ := regexp.Match("slave[0-9]+", []byte(key)); ok { - slaveKvs := strings.Split(kv[1], ",") - - slave := make(map[string]string) - for _, slaveKvStr := range slaveKvs { - slaveKv := strings.Split(slaveKvStr, "=") - if len(slaveKv) != 2 { - continue - } - slave[slaveKv[0]] = slaveKv[1] - } - - slaveMap = append(slaveMap, slave) - } else { - info[key] = strings.TrimSpace(kv[1]) - } - } - } - if len(slaveMap) > 0 { - slavesStr, _ := json.Marshal(slaveMap) - err := json.Unmarshal(slavesStr, &slaves) - - _ = err - info["slaveMap"] = string(slavesStr) +func TestSlaveInfoReplication(t *testing.T) { + text := ` +# Replication(SLAVE) +role:slave +ReplicationID:94e8feeaf9036a77c59ad2f091f1c0b0858047f06fa1e09afa +master_host:10.224.129.40 +master_port:9971 +master_link_status:up +slave_priority:100 +slave_read_only:1 +db0:binlog_offset=1 284,safety_purge=none +` + res, err := parseInfoReplication(text) + if err != nil { + fmt.Println(err) + return } - str, _ := json.Marshal(info) - err := json.Unmarshal(str, &infoReplication) - infoReplication.Slaves = slaves - - _ = err - fmt.Println(err) + assert.Equal(t, res.DbBinlogFileNum, uint64(1), "db0 binlog file_num not right") + assert.Equal(t, res.DbBinlogOffset, uint64(284), "db0 binlog offset not right") + assert.Equal(t, len(res.Slaves), 0) } diff --git a/codis/pkg/utils/redis/codis_sentinel.go b/codis/pkg/utils/redis/codis_sentinel.go index 0b8b150ebd..4d1ce73bed 100644 --- a/codis/pkg/utils/redis/codis_sentinel.go +++ b/codis/pkg/utils/redis/codis_sentinel.go @@ -108,6 +108,7 @@ func (s *CodisSentinel) RefreshMastersAndSlavesClient(parallel int, groupServers Index: index, GroupID: gid, Addr: server.Addr, + Server: server, Replication: info, Err: err, } diff --git a/codis/pkg/utils/redis/sentinel.go b/codis/pkg/utils/redis/sentinel.go index e71155c065..c76c4d7f68 100644 --- a/codis/pkg/utils/redis/sentinel.go +++ b/codis/pkg/utils/redis/sentinel.go @@ -5,8 +5,11 @@ package redis import ( "encoding/json" + "net" "strconv" "time" + + "pika/codis/v2/pkg/models" ) type SentinelMaster struct { @@ -66,8 +69,9 @@ type InfoReplication struct { ConnectedSlaves int `json:"connected_slaves"` MasterHost string `json:"master_host"` MasterPort string `json:"master_port"` - SlaveReplOffset int `json:"slave_repl_offset"` - MasterReplOffset int `json:"master_repl_offset"` + MasterLinkStatus string `json:"master_link_status"` // down; up + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 Slaves []InfoSlave `json:"-"` } @@ -75,10 +79,19 @@ type ReplicationState struct { GroupID int Index int Addr string + Server *models.GroupServer Replication *InfoReplication Err error } +func (i *InfoReplication) GetMasterAddr() string { + if len(i.MasterHost) == 0 { + return "" + } + + return net.JoinHostPort(i.MasterHost, i.MasterPort) +} + func (i *InfoReplication) UnmarshalJSON(b []byte) error { var kvmap map[string]string if err := json.Unmarshal(b, &kvmap); err != nil { @@ -90,18 +103,23 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error { i.ConnectedSlaves = intval } } - if val, ok := kvmap["slave_repl_offset"]; ok { - if intval, err := strconv.Atoi(val); err == nil { - i.SlaveReplOffset = intval + + i.Role = kvmap["role"] + i.MasterPort = kvmap["master_host"] + i.MasterHost = kvmap["master_port"] + i.MasterLinkStatus = kvmap["master_link_status"] + + if val, ok := kvmap["binlog_file_num"]; ok { + if intval, err := strconv.ParseUint(val, 10, 64); err == nil { + i.DbBinlogFileNum = intval } } - if val, ok := kvmap["master_repl_offset"]; ok { - if intval, err := strconv.Atoi(val); err == nil { - i.MasterReplOffset = intval + + if val, ok := kvmap["binlog_offset"]; ok { + if intval, err := strconv.ParseUint(val, 10, 64); err == nil { + i.DbBinlogOffset = intval } } - i.Role = kvmap["role"] - i.MasterPort = kvmap["master_host"] - i.MasterHost = kvmap["master_port"] + return nil } From 00d628d2d765859ec97f59700687f7feeb3062a4 Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Fri, 23 Feb 2024 21:12:15 +0800 Subject: [PATCH 03/18] Update pika_migrate_thread.cc (#2415) --- src/pika_migrate_thread.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pika_migrate_thread.cc b/src/pika_migrate_thread.cc index fce5c6886c..dbd9d1516f 100644 --- a/src/pika_migrate_thread.cc +++ b/src/pika_migrate_thread.cc @@ -333,7 +333,8 @@ PikaParseSendThread::PikaParseSendThread(PikaMigrateThread *migrate_thread, cons timeout_ms_(3000), mgrtkeys_num_(64), should_exit_(false), - migrate_thread_(migrate_thread) {} + migrate_thread_(migrate_thread), + db_(db) {} PikaParseSendThread::~PikaParseSendThread() { if (is_running()) { From 58d31dc870636a7a3fa38a98500175f27a8ffb4a Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Mon, 26 Feb 2024 11:57:56 +0800 Subject: [PATCH 04/18] fix lock --- src/pika_migrate_thread.cc | 41 ++++++++++++++++++++++++++++++++++++++ src/pika_repl_bgworker.cc | 6 +++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/pika_migrate_thread.cc b/src/pika_migrate_thread.cc index dbd9d1516f..a8be1694de 100644 --- a/src/pika_migrate_thread.cc +++ b/src/pika_migrate_thread.cc @@ -67,6 +67,47 @@ static int doAuth(net::NetCli *cli) { return 0; } +// delete key from cache, slot and db +static int KeyDelete(char key_type, std::string key) +{ + int32_t res = 0; + std::string slotKey = GetSlotsSlotKey(SlotId(key)); + + // delete from cache + if (PIKA_CACHE_NONE != g_pika_conf->cache_model() + && PIKA_CACHE_STATUS_OK == g_pika_server->CacheStatus()) { + g_pika_server->Cache()->Del(key); + } + + // delete key from slot + std::vector members; + members.push_back(key_type + key); + rocksdb::Status s = g_pika_server->db()->SRem(slotKey, members, &res); + if (!s.ok()) { + if (s.IsNotFound()) { + LOG(INFO) << "Del key Srem key " << key << " not found"; + return 0; + } + else { + LOG(WARNING) << "Del key Srem key: " << key << " from slotKey, error: " << strerror(errno); + return -1; + } + } + + // delete key from db + members.clear(); + members.push_back(key); + std::map type_status; + int64_t del_nums = g_pika_server->db()->Del(members, &type_status); + if (0 > del_nums) { + LOG(WARNING) << "Del key: " << key << " at slot " << SlotNum(key) << " error"; + return -1; + } + + return 1; +} + + static int migrateKeyTTl(net::NetCli *cli, const std::string& key, storage::DataType data_type, const std::shared_ptr& db) { net::RedisCmdArgsType argv; diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 308e3e14fa..fc4db713f7 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -12,6 +12,8 @@ #include "include/pika_rm.h" #include "include/pika_server.h" #include "pstd/include/pstd_defer.h" +#include "src/pstd/include/scope_record_lock.h" +#include "include/pika_conf.h" extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; @@ -216,6 +218,8 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { start_us = pstd::NowMicros(); } // Add read lock for no suspend command + pstd::lock::MultiRecordLock record_lock(c_ptr->GetDB()->LockMgr()); + record_lock.Lock(c_ptr->current_key()); if (!c_ptr->IsSuspend()) { c_ptr->GetDB()->DbRWLockReader(); } @@ -236,7 +240,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { if (!c_ptr->IsSuspend()) { c_ptr->GetDB()->DbRWUnLock(); } - + record_lock.Unlock(c_ptr->current_key()); if (g_pika_conf->slowlog_slower_than() >= 0) { auto start_time = static_cast(start_us / 1000000); auto duration = static_cast(pstd::NowMicros() - start_us); From f7222d78ebf6cf7368dd6b07e50068b3841ce3b5 Mon Sep 17 00:00:00 2001 From: chejinge Date: Tue, 27 Feb 2024 17:35:47 +0800 Subject: [PATCH 05/18] fix lock --- src/pika_migrate_thread.cc | 41 -------------------------------------- 1 file changed, 41 deletions(-) diff --git a/src/pika_migrate_thread.cc b/src/pika_migrate_thread.cc index a8be1694de..dbd9d1516f 100644 --- a/src/pika_migrate_thread.cc +++ b/src/pika_migrate_thread.cc @@ -67,47 +67,6 @@ static int doAuth(net::NetCli *cli) { return 0; } -// delete key from cache, slot and db -static int KeyDelete(char key_type, std::string key) -{ - int32_t res = 0; - std::string slotKey = GetSlotsSlotKey(SlotId(key)); - - // delete from cache - if (PIKA_CACHE_NONE != g_pika_conf->cache_model() - && PIKA_CACHE_STATUS_OK == g_pika_server->CacheStatus()) { - g_pika_server->Cache()->Del(key); - } - - // delete key from slot - std::vector members; - members.push_back(key_type + key); - rocksdb::Status s = g_pika_server->db()->SRem(slotKey, members, &res); - if (!s.ok()) { - if (s.IsNotFound()) { - LOG(INFO) << "Del key Srem key " << key << " not found"; - return 0; - } - else { - LOG(WARNING) << "Del key Srem key: " << key << " from slotKey, error: " << strerror(errno); - return -1; - } - } - - // delete key from db - members.clear(); - members.push_back(key); - std::map type_status; - int64_t del_nums = g_pika_server->db()->Del(members, &type_status); - if (0 > del_nums) { - LOG(WARNING) << "Del key: " << key << " at slot " << SlotNum(key) << " error"; - return -1; - } - - return 1; -} - - static int migrateKeyTTl(net::NetCli *cli, const std::string& key, storage::DataType data_type, const std::shared_ptr& db) { net::RedisCmdArgsType argv; From ab103b65ca67c61ff76ab7a4d3d107c0672a6450 Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Mon, 4 Mar 2024 09:43:56 +0800 Subject: [PATCH 06/18] Update pika_version.h (#2443) --- include/pika_version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/pika_version.h b/include/pika_version.h index fc71609547..f490739b1f 100644 --- a/include/pika_version.h +++ b/include/pika_version.h @@ -8,6 +8,6 @@ #define PIKA_MAJOR 3 #define PIKA_MINOR 5 -#define PIKA_PATCH 2 +#define PIKA_PATCH 3 #endif // INCLUDE_PIKA_VERSION_H_ From 1bbd402c34b0728b6dc16a7992548c20b92f2e52 Mon Sep 17 00:00:00 2001 From: Qx Date: Tue, 5 Mar 2024 10:36:52 +0800 Subject: [PATCH 07/18] fix: ACL user authentication errors --- src/pika_client_conn.cc | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index ea5244067e..12002a94b9 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -3,19 +3,17 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "include/pika_client_conn.h" - #include -#include +#include #include #include -#include - #include "include/pika_admin.h" +#include "include/pika_client_conn.h" #include "include/pika_cmd_table_manager.h" #include "include/pika_command.h" #include "include/pika_conf.h" +#include "include/pika_conf.h" #include "include/pika_define.h" #include "include/pika_rm.h" #include "include/pika_server.h" @@ -449,21 +447,19 @@ void PikaClientConn::DoAuth(const std::shared_ptr& user) { void PikaClientConn::UnAuth(const std::shared_ptr& user) { user_ = user; - authenticated_ = false; + // If the user does not have a password, and the user is valid, then the user does not need authentication + authenticated_ = user_->HasFlags(static_cast(AclUserFlag::NO_PASS)) && + !user_->HasFlags(static_cast(AclUserFlag::DISABLED)); } bool PikaClientConn::IsAuthed() const { return authenticated_; } bool PikaClientConn::AuthRequired() const { - if (IsAuthed()) { // the user is authed, not required - return false; - } - - if (user_->HasFlags(static_cast(AclUserFlag::NO_PASS))) { // the user is no password - return false; - } - - return user_->HasFlags(static_cast(AclUserFlag::DISABLED)); // user disabled + // If the user does not have a password, and the user is valid, then the user does not need authentication + // Otherwise, you need to determine whether go has been authenticated + return (!user_->HasFlags(static_cast(AclUserFlag::NO_PASS)) || + user_->HasFlags(static_cast(AclUserFlag::DISABLED))) && + !IsAuthed(); } std::string PikaClientConn::UserName() const { return user_->Name(); } From e9501562b9ac72e7520c7d31f3488d0612ee27c7 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Wed, 6 Mar 2024 20:48:44 +0800 Subject: [PATCH 08/18] fix acl --- include/acl.h | 4 ++++ include/pika_conf.h | 11 +++++++++++ src/acl.cc | 32 +++++++++++++++++++++++++++++++- src/pika_admin.cc | 15 ++++++++++++--- src/pika_conf.cc | 8 +++++--- 5 files changed, 63 insertions(+), 7 deletions(-) diff --git a/include/acl.h b/include/acl.h index 1363732352..e66dbd17a5 100644 --- a/include/acl.h +++ b/include/acl.h @@ -365,6 +365,9 @@ class Acl { void UpdateDefaultUserPassword(const std::string& pass); + void InitAdminUser(); + void InitDefaultUser(const std::string& bl); + // After the user channel is modified, determine whether the current channel needs to be disconnected void KillPubsubClientsIfNeeded(const std::shared_ptr& origin, const std::shared_ptr& newUser); @@ -380,6 +383,7 @@ class Acl { static std::vector GetAllCategoryName(); static const std::string DefaultUser; + static const std::string Admin; static const int64_t LogGroupingMaxTimeDelta; // Adds a new entry in the ACL log, making sure to delete the old entry diff --git a/include/pika_conf.h b/include/pika_conf.h index 32df043bca..ac65a5606c 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -168,6 +168,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return masterauth_; } + std::string userpass() { + std::shared_lock l(rwlock_); + return userpass_; + } std::string bgsave_path() { std::shared_lock l(rwlock_); return bgsave_path_; @@ -367,6 +371,11 @@ class PikaConf : public pstd::BaseConf { return pstd::Set2String(slow_cmd_set_, ','); } + const std::string GetUserBlackList() { + std::shared_lock l(rwlock_); + return userblacklist_; + } + bool is_slow_cmd(const std::string& cmd) { std::shared_lock l(rwlock_); return slow_cmd_set_.find(cmd) != slow_cmd_set_.end(); @@ -689,6 +698,7 @@ class PikaConf : public pstd::BaseConf { std::string replication_id_; std::string requirepass_; std::string masterauth_; + std::string userpass_; std::atomic classic_mode_; int databases_ = 0; int default_slot_num_ = 1; @@ -740,6 +750,7 @@ class PikaConf : public pstd::BaseConf { std::string network_interface_; + std::string userblacklist_; std::vector users_; // acl user rules std::string aclFile_; diff --git a/src/acl.cc b/src/acl.cc index 3c78adf0e8..55d3491e3d 100644 --- a/src/acl.cc +++ b/src/acl.cc @@ -293,11 +293,15 @@ std::vector User::AllChannelKey() { // class Acl pstd::Status Acl::Initialization() { AddUser(CreateDefaultUser()); - UpdateDefaultUserPassword(g_pika_conf->requirepass()); + UpdateDefaultUserPassword(g_pika_conf->userpass()); + + AddUser(CreatedUser(Admin)); + InitAdminUser(); auto status = LoadUsersAtStartup(); if (!status.ok()) { return status; } + InitDefaultUser(g_pika_conf->GetUserBlackList()); return status; } @@ -472,6 +476,31 @@ void Acl::UpdateDefaultUserPassword(const std::string& pass) { } } +void Acl::InitAdminUser() { + auto pass = g_pika_conf->requirepass(); + std::unique_lock wl(mutex_); + auto u = GetUser(Admin); + if (pass.empty()) { + u->SetUser("nopass"); + } else { + u->SetUser(">"+pass); + } + u->SetUser("+@all"); + u->SetUser("~*"); + u->SetUser("&*"); + u->SetUser("on"); +} + +void Acl::InitDefaultUser(const std::string& bl) { + std::unique_lock wl(mutex_); + auto defaultUser = GetUser(DefaultUser); + std::vector blacklist; + pstd::StringSplit(bl, ',', blacklist); + for(auto& i : blacklist) { + defaultUser->SetUser("-"+i); + } +} + // bool Acl::CheckUserCanExec(const std::shared_ptr& cmd, const PikaCmdArgsType& argv) { cmd->name(); } std::shared_ptr Acl::CreateDefaultUser() { @@ -725,6 +754,7 @@ std::array, 3> Acl::SelectorFlags = {{ }}; const std::string Acl::DefaultUser = "default"; +const std::string Acl::Admin = "admin"; const int64_t Acl::LogGroupingMaxTimeDelta = 60000; void Acl::AddLogEntry(int32_t reason, int32_t context, const std::string& username, const std::string& object, diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 772f61feab..8c8a49662e 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -269,15 +269,24 @@ void AuthCmd::Do() { std::string pwd = ""; bool defaultAuth = false; if (argv_.size() == 2) { - userName = Acl::DefaultUser; pwd = argv_[1]; - defaultAuth = true; +// defaultAuth = true; } else { userName = argv_[1]; pwd = argv_[2]; } - auto authResult = AuthenticateUser(name(), userName, pwd, conn, defaultAuth); + AuthResult authResult; + if (userName == "") { + // admin + authResult = AuthenticateUser(name(), Acl::Admin, pwd, conn, defaultAuth); + if (authResult != AuthResult::OK) { + // default。 + authResult = AuthenticateUser(name(), Acl::DefaultUser, pwd, conn, true); + } + } else { + authResult = AuthenticateUser(name(), userName, pwd, conn, defaultAuth); + } switch (authResult) { case AuthResult::INVALID_CONN: diff --git a/src/pika_conf.cc b/src/pika_conf.cc index c7234e145f..b6796f8fe7 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -49,7 +49,7 @@ int PikaConf::Load() { GetConfStr("replication-id", &replication_id_); GetConfStr("requirepass", &requirepass_); GetConfStr("masterauth", &masterauth_); - // GetConfStr("userpass", &userpass_); + GetConfStr("userpass", &userpass_); GetConfInt("maxclients", &maxclients_); if (maxclients_ <= 0) { maxclients_ = 20000; @@ -461,6 +461,8 @@ int PikaConf::Load() { network_interface_ = ""; GetConfStr("network-interface", &network_interface_); + // userblacklist + GetConfStr("userblacklist", &userblacklist_); // acl users GetConfStrMulti("user", &users_); @@ -623,8 +625,8 @@ int PikaConf::ConfigRewrite() { SetConfInt("timeout", timeout_); SetConfStr("requirepass", requirepass_); SetConfStr("masterauth", masterauth_); - // SetConfStr("userpass", userpass_); - // SetConfStr("userblacklist", userblacklist); + SetConfStr("userpass", userpass_); +// SetConfStr("userblacklist", userblacklist_); SetConfStr("dump-prefix", bgsave_prefix_); SetConfInt("maxclients", maxclients_); SetConfInt("dump-expire", expire_dump_days_); From 75358e65c37d7fc3cfaa5d8cb595939af7cc48fd Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Thu, 7 Mar 2024 10:04:14 +0800 Subject: [PATCH 09/18] default & limit --- conf/pika.conf | 2 +- include/acl.h | 5 ++--- src/acl.cc | 29 ++++++++++++----------------- src/pika_admin.cc | 8 ++++---- 4 files changed, 19 insertions(+), 25 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 09c48018c0..508c10d86a 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -80,7 +80,7 @@ masterauth : # the value of this parameter will be ignored and all users are considered as administrators, # in this scenario, users are not subject to the restrictions imposed by the userblacklist. # PS: "admin password" refers to value of the parameter above: requirepass. -# userpass : +userpass : # The blacklist of commands for users that logged in by userpass, # the commands that added to this list will not be available for users except for administrator. diff --git a/include/acl.h b/include/acl.h index e66dbd17a5..329dbadbf8 100644 --- a/include/acl.h +++ b/include/acl.h @@ -365,8 +365,7 @@ class Acl { void UpdateDefaultUserPassword(const std::string& pass); - void InitAdminUser(); - void InitDefaultUser(const std::string& bl); + void InitLimitUser(const std::string& bl); // After the user channel is modified, determine whether the current channel needs to be disconnected void KillPubsubClientsIfNeeded(const std::shared_ptr& origin, const std::shared_ptr& newUser); @@ -383,7 +382,7 @@ class Acl { static std::vector GetAllCategoryName(); static const std::string DefaultUser; - static const std::string Admin; + static const std::string Limit; static const int64_t LogGroupingMaxTimeDelta; // Adds a new entry in the ACL log, making sure to delete the old entry diff --git a/src/acl.cc b/src/acl.cc index 55d3491e3d..94adbd804f 100644 --- a/src/acl.cc +++ b/src/acl.cc @@ -293,15 +293,14 @@ std::vector User::AllChannelKey() { // class Acl pstd::Status Acl::Initialization() { AddUser(CreateDefaultUser()); - UpdateDefaultUserPassword(g_pika_conf->userpass()); + UpdateDefaultUserPassword(g_pika_conf->requirepass()); - AddUser(CreatedUser(Admin)); - InitAdminUser(); + AddUser(CreatedUser(Limit)); + InitLimitUser(g_pika_conf->GetUserBlackList()); auto status = LoadUsersAtStartup(); if (!status.ok()) { return status; } - InitDefaultUser(g_pika_conf->GetUserBlackList()); return status; } @@ -476,31 +475,27 @@ void Acl::UpdateDefaultUserPassword(const std::string& pass) { } } -void Acl::InitAdminUser() { - auto pass = g_pika_conf->requirepass(); +void Acl::InitLimitUser(const std::string& bl) { + auto pass = g_pika_conf->userpass(); + std::vector blacklist; + pstd::StringSplit(bl, ',', blacklist); std::unique_lock wl(mutex_); - auto u = GetUser(Admin); + auto u = GetUser(Limit); if (pass.empty()) { u->SetUser("nopass"); } else { u->SetUser(">"+pass); } + u->SetUser("on"); u->SetUser("+@all"); u->SetUser("~*"); u->SetUser("&*"); - u->SetUser("on"); -} -void Acl::InitDefaultUser(const std::string& bl) { - std::unique_lock wl(mutex_); - auto defaultUser = GetUser(DefaultUser); - std::vector blacklist; - pstd::StringSplit(bl, ',', blacklist); for(auto& i : blacklist) { - defaultUser->SetUser("-"+i); + u->SetUser("-"+i); } -} +} // bool Acl::CheckUserCanExec(const std::shared_ptr& cmd, const PikaCmdArgsType& argv) { cmd->name(); } std::shared_ptr Acl::CreateDefaultUser() { @@ -754,7 +749,7 @@ std::array, 3> Acl::SelectorFlags = {{ }}; const std::string Acl::DefaultUser = "default"; -const std::string Acl::Admin = "admin"; +const std::string Acl::Limit = "limit"; const int64_t Acl::LogGroupingMaxTimeDelta = 60000; void Acl::AddLogEntry(int32_t reason, int32_t context, const std::string& username, const std::string& object, diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 8c8a49662e..5da761de21 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -278,11 +278,11 @@ void AuthCmd::Do() { AuthResult authResult; if (userName == "") { - // admin - authResult = AuthenticateUser(name(), Acl::Admin, pwd, conn, defaultAuth); + // default + authResult = AuthenticateUser(name(), Acl::DefaultUser, pwd, conn, true); if (authResult != AuthResult::OK) { - // default。 - authResult = AuthenticateUser(name(), Acl::DefaultUser, pwd, conn, true); + // Limit + authResult = AuthenticateUser(name(), Acl::Limit, pwd, conn, defaultAuth); } } else { authResult = AuthenticateUser(name(), userName, pwd, conn, defaultAuth); From dae620d85e1ae874de2a4a287d89466c7d59c028 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Thu, 7 Mar 2024 11:19:24 +0800 Subject: [PATCH 10/18] blacklist instead of acl user --- include/acl.h | 2 +- src/acl.cc | 42 ++++++++++++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/include/acl.h b/include/acl.h index 329dbadbf8..f7236d3fa1 100644 --- a/include/acl.h +++ b/include/acl.h @@ -365,7 +365,7 @@ class Acl { void UpdateDefaultUserPassword(const std::string& pass); - void InitLimitUser(const std::string& bl); + void InitLimitUser(const std::string& bl, bool limit_exist); // After the user channel is modified, determine whether the current channel needs to be disconnected void KillPubsubClientsIfNeeded(const std::shared_ptr& origin, const std::shared_ptr& newUser); diff --git a/src/acl.cc b/src/acl.cc index 94adbd804f..c55216e94e 100644 --- a/src/acl.cc +++ b/src/acl.cc @@ -295,9 +295,15 @@ pstd::Status Acl::Initialization() { AddUser(CreateDefaultUser()); UpdateDefaultUserPassword(g_pika_conf->requirepass()); - AddUser(CreatedUser(Limit)); - InitLimitUser(g_pika_conf->GetUserBlackList()); auto status = LoadUsersAtStartup(); + auto u = GetUser(Limit); + bool limit_exist = true; + if (nullptr == u) { + AddUser(CreatedUser(Limit)); + limit_exist = false; + } + InitLimitUser(g_pika_conf->GetUserBlackList(), limit_exist); + if (!status.ok()) { return status; } @@ -475,26 +481,34 @@ void Acl::UpdateDefaultUserPassword(const std::string& pass) { } } -void Acl::InitLimitUser(const std::string& bl) { +void Acl::InitLimitUser(const std::string& bl, bool limit_exist) { auto pass = g_pika_conf->userpass(); std::vector blacklist; pstd::StringSplit(bl, ',', blacklist); std::unique_lock wl(mutex_); auto u = GetUser(Limit); - if (pass.empty()) { - u->SetUser("nopass"); + if (limit_exist) { + if (!bl.empty()) { + u->SetUser("+@all"); + for(auto& i : blacklist) { + u->SetUser("-"+i); + } + } } else { - u->SetUser(">"+pass); - } - u->SetUser("on"); - u->SetUser("+@all"); - u->SetUser("~*"); - u->SetUser("&*"); + if (pass.empty()) { + u->SetUser("nopass"); + } else { + u->SetUser(">"+pass); + } + u->SetUser("on"); + u->SetUser("+@all"); + u->SetUser("~*"); + u->SetUser("&*"); - for(auto& i : blacklist) { - u->SetUser("-"+i); + for(auto& i : blacklist) { + u->SetUser("-"+i); + } } - } // bool Acl::CheckUserCanExec(const std::shared_ptr& cmd, const PikaCmdArgsType& argv) { cmd->name(); } From 469fce0b70c17031db3d4a061a99a0a46369a3ef Mon Sep 17 00:00:00 2001 From: Mixficsol <838844609@qq.com> Date: Thu, 7 Mar 2024 17:06:38 +0800 Subject: [PATCH 11/18] add rename command (#2462) --- conf/pika.conf | 15 +++++++++++++++ include/pika_cmd_table_manager.h | 1 + include/pika_conf.h | 2 +- src/pika.cc | 4 ++-- src/pika_client_conn.cc | 2 +- src/pika_cmd_table_manager.cc | 12 ++++++++++++ src/pika_conf.cc | 21 ++++++++++++++++++++- 7 files changed, 52 insertions(+), 5 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 09c48018c0..1cb064202f 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -508,3 +508,18 @@ cache-lfu-decay-time: 1 # # aclfile : ../conf/users.acl +# It is possible to change the name of dangerous commands in a shared environment. +# For instance the CONFIG command may be renamed into something Warning: To prevent +# data inconsistency caused by different configuration files, do not use the rename +# command to modify write commands on the primary and secondary servers. If necessary, +# ensure that the configuration files of the primary and secondary servers are consistent +# In addition, when using the command rename, you must not use "" to modify the command, +# for example, rename-command: FLUSHALL "360flushall" is incorrect; instead, use +# rename-command: FLUSHALL 360flushall is correct. After the rename command is executed, +# it is most appropriate to use a numeric string with uppercase or lowercase letters +# for example: rename-command : FLUSHALL joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR +# +# Example: +# +# rename-command : FLUSHALL 360flushall +# rename-command : FLUSHDB 360flushdb diff --git a/include/pika_cmd_table_manager.h b/include/pika_cmd_table_manager.h index 1b0c162807..27393da4ec 100644 --- a/include/pika_cmd_table_manager.h +++ b/include/pika_cmd_table_manager.h @@ -30,6 +30,7 @@ class PikaCmdTableManager { PikaCmdTableManager(); virtual ~PikaCmdTableManager() = default; void InitCmdTable(void); + void RenameCommand(const std::string before, const std::string after); std::shared_ptr GetCmd(const std::string& opt); bool CmdExist(const std::string& cmd) const; CmdTable* GetCmdTable(); diff --git a/include/pika_conf.h b/include/pika_conf.h index 32df043bca..0fb855a71c 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -743,7 +743,7 @@ class PikaConf : public pstd::BaseConf { std::vector users_; // acl user rules std::string aclFile_; - + std::vector cmds_; std::atomic acl_pubsub_default_ = 0; // default channel pub/sub permission std::atomic acl_Log_max_len_ = 0; // default acl log max len diff --git a/src/pika.cc b/src/pika.cc index 2d62d3d6b9..2ebd2573f5 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -174,6 +174,8 @@ int main(int argc, char* argv[]) { usage(); exit(-1); } + g_pika_cmd_table_manager = std::make_unique(); + g_pika_cmd_table_manager->InitCmdTable(); PikaConfInit(path); rlimit limit; @@ -205,8 +207,6 @@ int main(int argc, char* argv[]) { InitCRC32Table(); LOG(INFO) << "Server at: " << path; - g_pika_cmd_table_manager = std::make_unique(); - g_pika_cmd_table_manager->InitCmdTable(); g_pika_server = new PikaServer(); g_pika_rm = std::make_unique(); g_network_statistic = std::make_unique(); diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 12002a94b9..b4bbeccff7 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include #include #include #include @@ -13,7 +14,6 @@ #include "include/pika_cmd_table_manager.h" #include "include/pika_command.h" #include "include/pika_conf.h" -#include "include/pika_conf.h" #include "include/pika_define.h" #include "include/pika_rm.h" #include "include/pika_server.h" diff --git a/src/pika_cmd_table_manager.cc b/src/pika_cmd_table_manager.cc index 567c120a18..2be143a84e 100644 --- a/src/pika_cmd_table_manager.cc +++ b/src/pika_cmd_table_manager.cc @@ -50,6 +50,18 @@ void PikaCmdTableManager::InitCmdTable(void) { } } +void PikaCmdTableManager::RenameCommand(const std::string before, const std::string after) { + auto it = cmds_->find(before); + if (it != cmds_->end()) { + if (after.length() > 0) { + cmds_->insert(std::pair>(after, std::move(it->second))); + } else { + LOG(ERROR) << "The value of rename-command is null"; + } + cmds_->erase(it); + } +} + std::unordered_map* PikaCmdTableManager::GetCommandStatMap() { return &cmdstat_map_; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index c7234e145f..d151e93331 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -15,9 +15,12 @@ #include "cache/include/config.h" #include "include/acl.h" +#include "include/pika_cmd_table_manager.h" +#include "include/pika_conf.h" #include "include/pika_define.h" using pstd::Status; +extern std::unique_ptr g_pika_cmd_table_manager; PikaConf::PikaConf(const std::string& path) : pstd::BaseConf(path), conf_path_(path), local_meta_(std::make_unique()) {} @@ -465,7 +468,23 @@ int PikaConf::Load() { GetConfStrMulti("user", &users_); GetConfStr("aclfile", &aclFile_); - + GetConfStrMulti("rename-command", &cmds_); + for (const auto & i : cmds_) { + std::string before, after; + std::istringstream iss(i); + iss >> before; + if (iss) { + iss >> after; + pstd::StringToLower(before); + pstd::StringToLower(after); + std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(before); + if (!c_ptr) { + LOG(ERROR) << "No such " << before << " command in pika-command"; + return -1; + } + g_pika_cmd_table_manager->RenameCommand(before, after); + } + } std::string acl_pubsub_default; GetConfStr("acl-pubsub-default", &acl_pubsub_default); if (acl_pubsub_default == "allchannels") { From e097388b9a98180cadc612929dce3f66d0d3253f Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Thu, 7 Mar 2024 16:34:14 +0800 Subject: [PATCH 12/18] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/acl.h | 2 +- src/acl.cc | 16 ++++++++++------ src/pika_admin.cc | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/include/acl.h b/include/acl.h index f7236d3fa1..5aa83d408d 100644 --- a/include/acl.h +++ b/include/acl.h @@ -382,7 +382,7 @@ class Acl { static std::vector GetAllCategoryName(); static const std::string DefaultUser; - static const std::string Limit; + static const std::string DefaultLimitUser; static const int64_t LogGroupingMaxTimeDelta; // Adds a new entry in the ACL log, making sure to delete the old entry diff --git a/src/acl.cc b/src/acl.cc index c55216e94e..dbacedddfd 100644 --- a/src/acl.cc +++ b/src/acl.cc @@ -296,10 +296,10 @@ pstd::Status Acl::Initialization() { UpdateDefaultUserPassword(g_pika_conf->requirepass()); auto status = LoadUsersAtStartup(); - auto u = GetUser(Limit); + auto u = GetUser(DefaultLimitUser); bool limit_exist = true; if (nullptr == u) { - AddUser(CreatedUser(Limit)); + AddUser(CreatedUser(DefaultLimitUser)); limit_exist = false; } InitLimitUser(g_pika_conf->GetUserBlackList(), limit_exist); @@ -486,13 +486,16 @@ void Acl::InitLimitUser(const std::string& bl, bool limit_exist) { std::vector blacklist; pstd::StringSplit(bl, ',', blacklist); std::unique_lock wl(mutex_); - auto u = GetUser(Limit); + auto u = GetUser(DefaultLimitUser); if (limit_exist) { if (!bl.empty()) { u->SetUser("+@all"); for(auto& i : blacklist) { u->SetUser("-"+i); } + if (!pass.empty()) { + u->SetUser(">"+pass); + } } } else { if (pass.empty()) { @@ -505,8 +508,9 @@ void Acl::InitLimitUser(const std::string& bl, bool limit_exist) { u->SetUser("~*"); u->SetUser("&*"); - for(auto& i : blacklist) { - u->SetUser("-"+i); + for(auto& cmd : blacklist) { + cmd = pstd::StringTrim(cmd, " "); + u->SetUser("-" + cmd); } } } @@ -763,7 +767,7 @@ std::array, 3> Acl::SelectorFlags = {{ }}; const std::string Acl::DefaultUser = "default"; -const std::string Acl::Limit = "limit"; +const std::string Acl::DefaultLimitUser = "limit"; const int64_t Acl::LogGroupingMaxTimeDelta = 60000; void Acl::AddLogEntry(int32_t reason, int32_t context, const std::string& username, const std::string& object, diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 5da761de21..bf066fb187 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -282,7 +282,7 @@ void AuthCmd::Do() { authResult = AuthenticateUser(name(), Acl::DefaultUser, pwd, conn, true); if (authResult != AuthResult::OK) { // Limit - authResult = AuthenticateUser(name(), Acl::Limit, pwd, conn, defaultAuth); + authResult = AuthenticateUser(name(), Acl::DefaultLimitUser, pwd, conn, defaultAuth); } } else { authResult = AuthenticateUser(name(), userName, pwd, conn, defaultAuth); From bdc2b831137e9dcfcf2aaa82bacafd8fe6115906 Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Thu, 7 Mar 2024 17:15:25 +0800 Subject: [PATCH 13/18] support config get userblacklist --- src/pika_admin.cc | 6 +++++- src/pika_conf.cc | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pika_admin.cc b/src/pika_admin.cc index bf066fb187..ddfd4987f3 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1587,7 +1587,11 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "slow-cmd-thread-pool-size"); EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size()); } - + if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) { + elements += 2; + EncodeString(&config_body, "userblacklist"); + EncodeString(&config_body, g_pika_conf -> GetUserBlackList()); + } if (pstd::stringmatch(pattern.data(), "slow-cmd-list", 1) != 0) { elements += 2; EncodeString(&config_body, "slow-cmd-list"); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index f7f75b1a7a..d29d866434 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -645,7 +645,7 @@ int PikaConf::ConfigRewrite() { SetConfStr("requirepass", requirepass_); SetConfStr("masterauth", masterauth_); SetConfStr("userpass", userpass_); -// SetConfStr("userblacklist", userblacklist_); + SetConfStr("userblacklist", userblacklist_); SetConfStr("dump-prefix", bgsave_prefix_); SetConfInt("maxclients", maxclients_); SetConfInt("dump-expire", expire_dump_days_); From 572a2d2378412cc3d38e6b3d56148cd1fb956f77 Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Thu, 7 Mar 2024 18:15:59 +0800 Subject: [PATCH 14/18] fix acl --- tests/unit/acl.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 20900905fc..b7e3c51cab 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -9,7 +9,7 @@ start_server {tags {"acl external:skip"}} { test {Coverage: ACL USERS} { r ACL USERS - } {default newuser} + } {default limit newuser} test {Usernames can not contain spaces or null characters} { catch {r ACL setuser "a a"} err From bc12a03cfb96283e4ce5c736c676db2d35209eef Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Thu, 7 Mar 2024 21:45:06 +0800 Subject: [PATCH 15/18] fix auth --- src/pika_admin.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pika_admin.cc b/src/pika_admin.cc index ddfd4987f3..fe351f7bbc 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -280,7 +280,7 @@ void AuthCmd::Do() { if (userName == "") { // default authResult = AuthenticateUser(name(), Acl::DefaultUser, pwd, conn, true); - if (authResult != AuthResult::OK) { + if (authResult != AuthResult::OK && authResult != AuthResult::NO_REQUIRE_PASS) { // Limit authResult = AuthenticateUser(name(), Acl::DefaultLimitUser, pwd, conn, defaultAuth); } From ae56bd3a1af60fe03cbc0b470df59fa0a897c11b Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Fri, 8 Mar 2024 11:10:56 +0800 Subject: [PATCH 16/18] auth with username default --- tests/integration/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/server_test.go b/tests/integration/server_test.go index 95685d2fdf..c836ff1c55 100644 --- a/tests/integration/server_test.go +++ b/tests/integration/server_test.go @@ -49,7 +49,7 @@ var _ = Describe("Server", func() { r = client.Do(ctx, "config", "set", "requirepass", "foobar") Expect(r.Val()).To(Equal("OK")) - r = client.Do(ctx, "AUTH", "wrong!") + r = client.Do(ctx, "AUTH", "default", "wrong!") Expect(r.Err()).To(MatchError("WRONGPASS invalid username-password pair or user is disabled.")) // r = client.Do(ctx, "AUTH", "foo", "bar") From e7a2f5b9b61f4ca5f823b658529f96a027019bdb Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Fri, 8 Mar 2024 17:11:58 +0800 Subject: [PATCH 17/18] qx comments --- src/acl.cc | 6 ++++-- src/pika_acl.cc | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/acl.cc b/src/acl.cc index dbacedddfd..bf0f4758f2 100644 --- a/src/acl.cc +++ b/src/acl.cc @@ -490,9 +490,11 @@ void Acl::InitLimitUser(const std::string& bl, bool limit_exist) { if (limit_exist) { if (!bl.empty()) { u->SetUser("+@all"); - for(auto& i : blacklist) { - u->SetUser("-"+i); + for(auto& cmd : blacklist) { + cmd = pstd::StringTrim(cmd, " "); + u->SetUser("-" + cmd); } + u->SetUser("on"); if (!pass.empty()) { u->SetUser(">"+pass); } diff --git a/src/pika_acl.cc b/src/pika_acl.cc index 296cbe4206..b6fe3375b7 100644 --- a/src/pika_acl.cc +++ b/src/pika_acl.cc @@ -106,6 +106,10 @@ void PikaAclCmd::DelUser() { res().SetRes(CmdRes::kErrOther, "The 'default' user cannot be removed"); return; } + if (it->data() == Acl::DefaultLimitUser) { + res().SetRes(CmdRes::kErrOther, "The 'limit' user cannot be removed"); + return; + } } std::vector userNames(argv_.begin() + 2, argv_.end()); From 34727aaa094a0845c034e2861af7ac99446d243b Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Fri, 8 Mar 2024 17:57:53 +0800 Subject: [PATCH 18/18] fix config file --- conf/pika.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 332152bfe6..eabd8a285b 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -79,19 +79,19 @@ requirepass : # [NOTICE] The value of this parameter must match the "requirepass" setting on the master. masterauth : -# The [password of user], which is empty by default.(Deprecated) +# The [password of user], which is empty by default. # [NOTICE] If this user password is the same as admin password (including both being empty), # the value of this parameter will be ignored and all users are considered as administrators, # in this scenario, users are not subject to the restrictions imposed by the userblacklist. # PS: "admin password" refers to value of the parameter above: requirepass. -userpass : +# userpass : # The blacklist of commands for users that logged in by userpass, # the commands that added to this list will not be available for users except for administrator. # [Advice] It's recommended to add high-risk commands to this list. # [Format] Commands should be separated by ",". For example: FLUSHALL, SHUTDOWN, KEYS, CONFIG # By default, this list is empty. -userblacklist : +# userblacklist : # Running Mode of Pika, The current version only supports running in "classic mode". # If set to 'classic', Pika will create multiple DBs whose number is the value of configure item "databases".