forked from ngaut/codis-ha
-
Notifications
You must be signed in to change notification settings - Fork 0
/
servergroup.go
134 lines (113 loc) · 3.28 KB
/
servergroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"github.com/juju/errors"
log "github.com/ngaut/logging"
"github.com/wandoulabs/codis/pkg/models"
"time"
)
func GetServerGroups() ([]models.ServerGroup, error) {
var groups []models.ServerGroup
err := callHttp(&groups, genUrl(*apiServer, "/api/server_groups"), "GET", nil)
return groups, err
}
func PingServer(checker AliveChecker, errCtx interface{}, errCh chan<- interface{}) {
err := checker.CheckAlive()
log.Debugf("check %+v, result:%v, errCtx:%+v", checker, err, errCtx)
if err != nil {
errCh <- errCtx
return
}
errCh <- nil
}
func verifyAndUpServer(checker AliveChecker, errCtx interface{}) {
errCh := make(chan interface{}, 100)
go PingServer(checker, errCtx, errCh)
s := <-errCh
if s == nil { //alive
handleAddServer(errCtx.(*models.Server))
}
}
func getSlave(master *models.Server) (*models.Server, error) {
var group models.ServerGroup
err := callHttp(&group, genUrl(*apiServer, "/api/server_group/", master.GroupId), "GET", nil)
if err != nil {
return nil, errors.Trace(err)
}
for _, s := range group.Servers {
if s.Type == models.SERVER_TYPE_SLAVE {
return s, nil
}
}
return nil, errors.Errorf("can not find any slave in this group: %v", group)
}
func handleCrashedServer(s *models.Server) error {
switch s.Type {
case models.SERVER_TYPE_MASTER:
//get slave and do promote
slave, err := getSlave(s)
if err != nil {
log.Warning(errors.ErrorStack(err))
return err
}
log.Infof("try promote %+v", slave)
err = callHttp(nil, genUrl(*apiServer, "/api/server_group/", slave.GroupId, "/promote"), "POST", slave)
if err != nil {
log.Errorf("do promote %v failed %v", slave, errors.ErrorStack(err))
return err
}
case models.SERVER_TYPE_SLAVE:
log.Errorf("slave is down: %+v", s)
case models.SERVER_TYPE_OFFLINE:
//no need to handle it
default:
log.Fatalf("unkonwn type %+v", s)
}
return nil
}
func handleAddServer(s *models.Server) {
s.Type = models.SERVER_TYPE_SLAVE
log.Infof("try reusing slave %+v", s)
err := callHttp(nil, genUrl(*apiServer, "/api/server_group/", s.GroupId, "/addServer"), "PUT", s)
log.Errorf("do reusing slave %v failed %v", s, errors.ErrorStack(err))
}
//ping codis-server find crashed codis-server
func CheckAliveAndPromote(groups []models.ServerGroup) ([]models.Server, error) {
errCh := make(chan interface{}, 100)
var serverCnt int
for _, group := range groups { //each group
for _, s := range group.Servers { //each server
serverCnt++
rc := acf(s.Addr, 5*time.Second)
news := s
go PingServer(rc, news, errCh)
}
}
//get result
var crashedServer []models.Server
for i := 0; i < serverCnt; i++ {
s := <-errCh
if s == nil { //alive
continue
}
log.Warningf("server maybe crashed %+v", s)
crashedServer = append(crashedServer, *s.(*models.Server))
err := handleCrashedServer(s.(*models.Server))
if err != nil {
return crashedServer, err
}
}
return crashedServer, nil
}
//ping codis-server find node up with type offine
func CheckOfflineAndPromoteSlave(groups []models.ServerGroup) ([]models.Server, error) {
for _, group := range groups { //each group
for _, s := range group.Servers { //each server
rc := acf(s.Addr, 5*time.Second)
news := s
if (s.Type == models.SERVER_TYPE_OFFLINE) {
verifyAndUpServer(rc, news)
}
}
}
return nil, nil
}