Skip to content

Commit

Permalink
Merge pull request #58 from youzan/feature-compact-ttl
Browse files Browse the repository at this point in the history
avoid receive raft logs if apply is slow to reduce the memory in raft storage
  • Loading branch information
absolute8511 authored Feb 17, 2020
2 parents be9b967 + 38a4d95 commit b62ee80
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 1,957 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ PREFIX=/usr/local
DESTDIR=
BINDIR=${PREFIX}/bin
PROJECT?=github.com/youzan/ZanRedisDB
VERBINARY?= 0.7.2
VERBINARY?= 0.8.0
COMMIT?=$(shell git rev-parse --short HEAD)
BUILD_TIME?=$(shell date '+%Y-%m-%d_%H:%M:%S-%Z')
GOFLAGS=-ldflags "-X ${PROJECT}/common.VerBinary=${VERBINARY} -X ${PROJECT}/common.Commit=${COMMIT} -X ${PROJECT}/common.BuildTime=${BUILD_TIME}"
Expand Down
13 changes: 2 additions & 11 deletions common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ const (
// do not need to care about the data expiration. Every node in the cluster should start the 'TTLChecker' of the storage system
// with this policy.
LocalDeletion ExpirationPolicy = iota

// ConsistencyDeletion indicates all the expired data should be deleted through Raft, the underlying storage system should
// not delete any data and all the expired keys should be sent to the expired channel. Only the leader should starts
// the 'TTLChecker' with this policy.
ConsistencyDeletion

//
PeriodicalRotation

Expand All @@ -111,9 +105,8 @@ const (
)

const (
DefaultExpirationPolicy = "local_deletion"
ConsistencyDeletionExpirationPolicy = "consistency_deletion"
WaitCompactExpirationPolicy = "wait_compact"
DefaultExpirationPolicy = "local_deletion"
WaitCompactExpirationPolicy = "wait_compact"
)

var (
Expand All @@ -125,8 +118,6 @@ func StringToExpirationPolicy(s string) (ExpirationPolicy, error) {
switch s {
case DefaultExpirationPolicy:
return LocalDeletion, nil
case ConsistencyDeletionExpirationPolicy:
return ConsistencyDeletion, nil
case WaitCompactExpirationPolicy:
return WaitCompact, nil
default:
Expand Down
2 changes: 1 addition & 1 deletion node/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func getTestKVNode(t *testing.T) (*KVNode, string, chan struct{}) {
nsConf.Replicator = 1
nsConf.RaftGroupConf.GroupID = 1000
nsConf.RaftGroupConf.SeedNodes = append(nsConf.RaftGroupConf.SeedNodes, replica)
nsConf.ExpirationPolicy = "consistency_deletion"
nsConf.ExpirationPolicy = common.DefaultExpirationPolicy

mconf := &MachineConfig{
BroadcastAddr: "127.0.0.1",
Expand Down
7 changes: 0 additions & 7 deletions node/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ func (s *KVStore) Destroy() error {
return nil
}

func (s *KVStore) CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error {
if s.opts.EngType == rockredis.EngType {
return s.RockDB.CheckExpiredData(buffer, stop)
}
return nil
}

func (s *KVStore) LocalLookup(key []byte) ([]byte, error) {
value, err := s.KVGet(key)
return value, err
Expand Down
6 changes: 0 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ type KVNode struct {
commitC <-chan applyInfo
appliedIndex uint64
clusterInfo common.IClusterInfo
expireHandler *ExpireHandler
expirationPolicy common.ExpirationPolicy
remoteSyncedStates *remoteSyncedStateMgr
applyWait wait.WaitTime
Expand Down Expand Up @@ -255,7 +254,6 @@ func NewKVNode(kvopts *KVOptions, config *RaftConfig,
}

s.clusterInfo = clusterInfo
s.expireHandler = NewExpireHandler(s)

s.registerHandler()

Expand Down Expand Up @@ -306,7 +304,6 @@ func (nd *KVNode) Start(standalone bool) error {
nd.readIndexLoop()
}()

nd.expireHandler.Start()
return nil
}

Expand All @@ -324,7 +321,6 @@ func (nd *KVNode) Stop() {
}
defer close(nd.stopDone)
close(nd.stopChan)
nd.expireHandler.Stop()
nd.wg.Wait()
nd.rn.StopNode()
nd.sm.Close()
Expand Down Expand Up @@ -1383,8 +1379,6 @@ func (nd *KVNode) ReportMeLeaderToCluster() {

// should not block long in this
func (nd *KVNode) OnRaftLeaderChanged() {
nd.expireHandler.LeaderChanged()

if nd.rn.IsLead() {
go nd.ReportMeLeaderToCluster()
}
Expand Down
16 changes: 15 additions & 1 deletion node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,21 @@ func (rc *raftNode) serveChannels() {
return
case <-rc.node.EventNotifyCh():
moreEntriesToApply := cap(rc.commitC)-len(rc.commitC) > 3
rd, hasUpdate := rc.node.StepNode(moreEntriesToApply, rc.IsBusySnapshot())
// we should slow down raft logs receiving while applying is slow, otherwise we
// may have too much logs in memory if the applying is slow.
busy := rc.IsBusySnapshot()
if !busy {
// note: if the lastIndex and FirstIndex is slow, we should avoid call it in every step
last, err := rc.raftStorage.LastIndex()
if err == nil {
fi, _ := rc.raftStorage.FirstIndex()
fi = fi - 1
if last > fi && last-fi >= uint64(rc.config.SnapCatchup+rc.config.SnapCount)*10 {
busy = true
}
}
}
rd, hasUpdate := rc.node.StepNode(moreEntriesToApply, busy)
if !hasUpdate {
continue
}
Expand Down
5 changes: 0 additions & 5 deletions node/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type StateMachine interface {
GetStats(table string) common.NamespaceStats
Start() error
Close()
CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error
GetBatchOperator() IBatchOperator
}

Expand Down Expand Up @@ -356,10 +355,6 @@ func (kvsm *kvStoreSM) Destroy() {
kvsm.store.Destroy()
}

func (kvsm *kvStoreSM) CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error {
return kvsm.store.CheckExpiredData(buffer, stop)
}

func (kvsm *kvStoreSM) UpdateSnapshotState(term uint64, index uint64) {
if kvsm.store != nil {
kvsm.store.SetLatestSnapIndex(index)
Expand Down
Loading

0 comments on commit b62ee80

Please sign in to comment.