Skip to content

Commit

Permalink
fix #184. Don't let the raft log grow larger than 10MB
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Jan 16, 2014
1 parent 028f9b1 commit ae19e20
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 1 deletion.
9 changes: 9 additions & 0 deletions src/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"os"
"protocol"
"time"
)

func GetFileSize(path string) (int64, error) {
info, err := os.Stat(path)
if err != nil {
return 0, err
}
return info.Size(), nil
}

func StringToSeriesArray(seriesString string, args ...interface{}) ([]*protocol.Series, error) {
seriesString = fmt.Sprintf(seriesString, args...)
series := []*protocol.Series{}
Expand Down
70 changes: 70 additions & 0 deletions src/coordinator/cluster_configuration.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package coordinator

import (
"bytes"
log "code.google.com/p/log4go"
"common"
"configuration"
"encoding/gob"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -422,3 +424,71 @@ func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint
defer self.createDatabaseLock.RUnlock()
return self.databaseReplicationFactors[name]
}

func (self *ClusterConfiguration) Save() ([]byte, error) {
data := struct {
Databases map[string]uint8
Admins map[string]*clusterAdmin
DbUsers map[string]map[string]*dbUser
Servers []*ClusterServer
HasRunningServers bool
LocalServerId uint32
ClusterVersion uint32
}{
self.databaseReplicationFactors,
self.clusterAdmins,
self.dbUsers,
self.servers,
self.hasRunningServers,
self.localServerId,
self.ClusterVersion,
}

b := bytes.NewBuffer(nil)
err := gob.NewEncoder(b).Encode(&data)
if err != nil {
return nil, err
}

return b.Bytes(), nil
}

func (self *ClusterConfiguration) Recovery(b []byte) error {
data := struct {
Databases map[string]uint8
Admins map[string]*clusterAdmin
DbUsers map[string]map[string]*dbUser
Servers []*ClusterServer
HasRunningServers bool
LocalServerId uint32
ClusterVersion uint32
}{}

err := gob.NewDecoder(bytes.NewReader(b)).Decode(&data)
if err != nil {
return err
}

self.databaseReplicationFactors = data.Databases
self.clusterAdmins = data.Admins
self.dbUsers = data.DbUsers

// copy the protobuf client from the old servers
oldServers := map[string]*ProtobufClient{}
for _, server := range self.servers {
oldServers[server.ProtobufConnectionString] = server.protobufClient
}

self.servers = data.Servers
for _, server := range self.servers {
server.protobufClient = oldServers[server.ProtobufConnectionString]
if server.protobufClient == nil {
server.Connect()
}
}
self.hasRunningServers = data.HasRunningServers
self.localServerId = data.LocalServerId
self.ClusterVersion = data.ClusterVersion

return nil
}
35 changes: 35 additions & 0 deletions src/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"common"
. "common"
"configuration"
"datastore"
Expand Down Expand Up @@ -156,6 +157,40 @@ func (self *CoordinatorSuite) TestCanRecover(c *C) {
assertConfigContains(server.port, "db1", true, c)
}

func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
server := startAndVerifyCluster(1, c)[0]
// defer clean(server)

path, port := server.path, server.port

for i := 0; i < 1000; i++ {
dbname := fmt.Sprintf("db%d", i)
server.CreateDatabase(dbname, uint8(1))
assertConfigContains(server.port, dbname, true, c)
}
size, err := common.GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil)
server.forceLogCompaction()
newSize, err := common.GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil)
c.Assert(newSize < size, Equals, true)
fmt.Printf("size of %s shrinked from %d to %d\n", server.raftServer.LogPath(), size, newSize)
server.Close()
time.Sleep(SERVER_STARTUP_TIME)
server = newConfigAndServer(c)
// reset the path and port to the previous server and remove the
// path that was created by newConfigAndServer
os.RemoveAll(server.path)
server.path = path
server.port = port
// defer clean(server)
server.ListenAndServe()
for i := 0; i < 1000; i++ {
dbname := fmt.Sprintf("db%d", i)
assertConfigContains(server.port, dbname, true, c)
}
}

func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) {
servers := startAndVerifyCluster(2, c)
defer clean(servers...)
Expand Down
40 changes: 39 additions & 1 deletion src/coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package coordinator
import (
"bytes"
log "code.google.com/p/log4go"
"common"
"configuration"
"encoding/binary"
"encoding/json"
Expand Down Expand Up @@ -206,20 +207,57 @@ func (s *RaftServer) connectionString() string {
return fmt.Sprintf("http://%s:%d", s.host, s.port)
}

const (
MAX_SIZE = 10 * MEGABYTE
)

func (s *RaftServer) forceLogCompaction() {
err := s.raftServer.TakeSnapshot()
if err != nil {
log.Error("Cannot take snapshot: %s", err)
}
}

func (s *RaftServer) CompactLog() {
checkSizeTicker := time.Tick(time.Minute)
forceCompactionTicker := time.Tick(time.Hour * 24)

for {
select {
case <-checkSizeTicker:
log.Debug("Testing if we should compact the raft logs")

path := s.raftServer.LogPath()
size, err := common.GetFileSize(path)
if err != nil {
log.Error("Error getting size of file '%s': %s", path, err)
}
if size < MAX_SIZE {
continue
}
s.forceLogCompaction()
case <-forceCompactionTicker:
s.forceLogCompaction()
}
}
}

func (s *RaftServer) startRaft() error {
log.Info("Initializing Raft Server: %s %d", s.path, s.port)

// Initialize and start Raft server.
transporter := raft.NewHTTPTransporter("/raft")
var err error
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.clusterConfig, "")
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, s.clusterConfig, s.clusterConfig, "")
if err != nil {
return err
}

transporter.Install(s.raftServer, s)
s.raftServer.Start()

go s.CompactLog()

if !s.raftServer.IsLogEmpty() {
log.Info("Recovered from log")
return nil
Expand Down

0 comments on commit ae19e20

Please sign in to comment.