diff --git a/src/common/helpers.go b/src/common/helpers.go index 76bf3747252..0ae4a7f9a9c 100644 --- a/src/common/helpers.go +++ b/src/common/helpers.go @@ -6,10 +6,19 @@ import ( "encoding/binary" "encoding/json" "fmt" + "os" "protocol" "time" ) +func GetFileSize(path string) (int64, error) { + info, err := os.Stat(path) + if err != nil { + return 0, err + } + return info.Size(), nil +} + func StringToSeriesArray(seriesString string, args ...interface{}) ([]*protocol.Series, error) { seriesString = fmt.Sprintf(seriesString, args...) series := []*protocol.Series{} diff --git a/src/coordinator/cluster_configuration.go b/src/coordinator/cluster_configuration.go index 72be6982b16..45044682c34 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/coordinator/cluster_configuration.go @@ -1,9 +1,11 @@ package coordinator import ( + "bytes" log "code.google.com/p/log4go" "common" "configuration" + "encoding/gob" "errors" "fmt" "sync" @@ -422,3 +424,71 @@ func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint defer self.createDatabaseLock.RUnlock() return self.databaseReplicationFactors[name] } + +func (self *ClusterConfiguration) Save() ([]byte, error) { + data := struct { + Databases map[string]uint8 + Admins map[string]*clusterAdmin + DbUsers map[string]map[string]*dbUser + Servers []*ClusterServer + HasRunningServers bool + LocalServerId uint32 + ClusterVersion uint32 + }{ + self.databaseReplicationFactors, + self.clusterAdmins, + self.dbUsers, + self.servers, + self.hasRunningServers, + self.localServerId, + self.ClusterVersion, + } + + b := bytes.NewBuffer(nil) + err := gob.NewEncoder(b).Encode(&data) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func (self *ClusterConfiguration) Recovery(b []byte) error { + data := struct { + Databases map[string]uint8 + Admins map[string]*clusterAdmin + DbUsers map[string]map[string]*dbUser + Servers []*ClusterServer + HasRunningServers bool + LocalServerId uint32 + ClusterVersion uint32 + }{} + + err := gob.NewDecoder(bytes.NewReader(b)).Decode(&data) + if err != nil { + return err + } + + self.databaseReplicationFactors = data.Databases + self.clusterAdmins = data.Admins + self.dbUsers = data.DbUsers + + // copy the protobuf client from the old servers + oldServers := map[string]*ProtobufClient{} + for _, server := range self.servers { + oldServers[server.ProtobufConnectionString] = server.protobufClient + } + + self.servers = data.Servers + for _, server := range self.servers { + server.protobufClient = oldServers[server.ProtobufConnectionString] + if server.protobufClient == nil { + server.Connect() + } + } + self.hasRunningServers = data.HasRunningServers + self.localServerId = data.LocalServerId + self.ClusterVersion = data.ClusterVersion + + return nil +} diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 09f28acb63c..cafa690e2f7 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -1,6 +1,7 @@ package coordinator import ( + "common" . "common" "configuration" "datastore" @@ -156,6 +157,40 @@ func (self *CoordinatorSuite) TestCanRecover(c *C) { assertConfigContains(server.port, "db1", true, c) } +func (self *CoordinatorSuite) TestCanSnapshot(c *C) { + server := startAndVerifyCluster(1, c)[0] + // defer clean(server) + + path, port := server.path, server.port + + for i := 0; i < 1000; i++ { + dbname := fmt.Sprintf("db%d", i) + server.CreateDatabase(dbname, uint8(1)) + assertConfigContains(server.port, dbname, true, c) + } + size, err := common.GetFileSize(server.raftServer.LogPath()) + c.Assert(err, IsNil) + server.forceLogCompaction() + newSize, err := common.GetFileSize(server.raftServer.LogPath()) + c.Assert(err, IsNil) + c.Assert(newSize < size, Equals, true) + fmt.Printf("size of %s shrinked from %d to %d\n", server.raftServer.LogPath(), size, newSize) + server.Close() + time.Sleep(SERVER_STARTUP_TIME) + server = newConfigAndServer(c) + // reset the path and port to the previous server and remove the + // path that was created by newConfigAndServer + os.RemoveAll(server.path) + server.path = path + server.port = port + // defer clean(server) + server.ListenAndServe() + for i := 0; i < 1000; i++ { + dbname := fmt.Sprintf("db%d", i) + assertConfigContains(server.port, dbname, true, c) + } +} + func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) { servers := startAndVerifyCluster(2, c) defer clean(servers...) diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index 38249be4f00..287dac10c4b 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -3,6 +3,7 @@ package coordinator import ( "bytes" log "code.google.com/p/log4go" + "common" "configuration" "encoding/binary" "encoding/json" @@ -206,13 +207,48 @@ func (s *RaftServer) connectionString() string { return fmt.Sprintf("http://%s:%d", s.host, s.port) } +const ( + MAX_SIZE = 10 * MEGABYTE +) + +func (s *RaftServer) forceLogCompaction() { + err := s.raftServer.TakeSnapshot() + if err != nil { + log.Error("Cannot take snapshot: %s", err) + } +} + +func (s *RaftServer) CompactLog() { + checkSizeTicker := time.Tick(time.Minute) + forceCompactionTicker := time.Tick(time.Hour * 24) + + for { + select { + case <-checkSizeTicker: + log.Debug("Testing if we should compact the raft logs") + + path := s.raftServer.LogPath() + size, err := common.GetFileSize(path) + if err != nil { + log.Error("Error getting size of file '%s': %s", path, err) + } + if size < MAX_SIZE { + continue + } + s.forceLogCompaction() + case <-forceCompactionTicker: + s.forceLogCompaction() + } + } +} + func (s *RaftServer) startRaft() error { log.Info("Initializing Raft Server: %s %d", s.path, s.port) // Initialize and start Raft server. transporter := raft.NewHTTPTransporter("/raft") var err error - s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.clusterConfig, "") + s.raftServer, err = raft.NewServer(s.name, s.path, transporter, s.clusterConfig, s.clusterConfig, "") if err != nil { return err } @@ -220,6 +256,8 @@ func (s *RaftServer) startRaft() error { transporter.Install(s.raftServer, s) s.raftServer.Start() + go s.CompactLog() + if !s.raftServer.IsLogEmpty() { log.Info("Recovered from log") return nil