Skip to content

Commit

Permalink
优化:
Browse files Browse the repository at this point in the history
将投递消息的协程池使用spinlock控制
调整重投窗口配置
调整recover机制
	modified:   conf/cluster.toml
	modified:   handler/deliver_pre.go
	modified:   server/kiteq_server.go
	modified:   server/recover_manager.go
  • Loading branch information
blackbeans committed Oct 11, 2016
1 parent 66be0c6 commit d4c3ad7
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 26 deletions.
2 changes: 1 addition & 1 deletion conf/cluster.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
db="file://.?cap=10000000&checkSeconds=10"
deliverySeconds=5
maxDeliverWorkers=8000
recoverSeconds=10
recoverSeconds=1
recievePermitsPerSecond=20000

[clusters.mysql_dev]
Expand Down
43 changes: 30 additions & 13 deletions handler/deliver_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/blackbeans/kiteq-common/store"
packet "github.com/blackbeans/turbo/packet"
p "github.com/blackbeans/turbo/pipe"
"sync/atomic"
"time"
)

Expand All @@ -17,7 +18,8 @@ type DeliverPreHandler struct {
p.BaseForwardHandler
kitestore store.IKiteStore
exchanger *exchange.BindExchanger
maxDeliverNum chan byte
maxDeliverNum int32
conditions int32
deliverTimeout time.Duration
flowstat *stat.FlowStat
deliveryRegistry *stat.DeliveryRegistry
Expand All @@ -31,9 +33,11 @@ func NewDeliverPreHandler(name string, kitestore store.IKiteStore,
phandler.BaseForwardHandler = p.NewBaseForwardHandler(name, phandler)
phandler.kitestore = kitestore
phandler.exchanger = exchanger
phandler.maxDeliverNum = make(chan byte, maxDeliverWorker)
phandler.maxDeliverNum = (int32)(maxDeliverWorker)
phandler.conditions = 0
phandler.flowstat = flowstat
phandler.deliveryRegistry = deliveryRegistry

return phandler
}

Expand Down Expand Up @@ -62,17 +66,30 @@ func (self *DeliverPreHandler) Process(ctx *p.DefaultPipelineContext, event p.IE
return nil
}

self.maxDeliverNum <- 1
self.flowstat.DeliverGo.Incr(1)
go func() {
defer func() {
<-self.maxDeliverNum
self.flowstat.DeliverGo.Incr(-1)
}()
//启动投递
self.send0(ctx, pevent)
self.flowstat.DeliverFlow.Incr(1)
}()
/**
* 尝试三次进行spinlock
**/
for i := 0; i < 3; i++ {
old := atomic.LoadInt32(&self.conditions)
if (old + 1) > self.maxDeliverNum {
continue
}
if atomic.CompareAndSwapInt32(&self.conditions, old, old+1) {
self.flowstat.DeliverGo.Incr(1)
go func() {
defer func() {
atomic.AddInt32(&self.conditions, -1)
self.flowstat.DeliverGo.Incr(-1)
}()
//启动投递
self.send0(ctx, pevent)
self.flowstat.DeliverFlow.Incr(1)
}()
break
} else {

}
}

return nil
}
Expand Down
14 changes: 7 additions & 7 deletions server/kiteq_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func NewKiteQServer(kc KiteQConfig) *KiteQServer {

//重投策略
rw := make([]handler.RedeliveryWindow, 0, 10)
rw = append(rw, handler.NewRedeliveryWindow(0, 3, 10))
rw = append(rw, handler.NewRedeliveryWindow(4, 10, 30))
rw = append(rw, handler.NewRedeliveryWindow(10, 20, 2*30))
rw = append(rw, handler.NewRedeliveryWindow(20, 30, 4*60))
rw = append(rw, handler.NewRedeliveryWindow(30, 40, 8*60))
rw = append(rw, handler.NewRedeliveryWindow(40, 50, 16*60))
rw = append(rw, handler.NewRedeliveryWindow(50, -1, 32*60))
rw = append(rw, handler.NewRedeliveryWindow(0, 3, 0))
rw = append(rw, handler.NewRedeliveryWindow(4, 10, 5))
rw = append(rw, handler.NewRedeliveryWindow(10, 20, 10))
rw = append(rw, handler.NewRedeliveryWindow(20, 30, 2*10))
rw = append(rw, handler.NewRedeliveryWindow(30, 40, 4*10))
rw = append(rw, handler.NewRedeliveryWindow(40, 50, 8*10))
rw = append(rw, handler.NewRedeliveryWindow(50, -1, 16*10))

//创建KiteqServer的流控
limiter, _ := turbo.NewBurstyLimiter(kc.so.recievePermitsPerSecond/2, kc.so.recievePermitsPerSecond)
Expand Down
9 changes: 4 additions & 5 deletions server/recover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ func (self *RecoverManager) Stop() {
}

func (self *RecoverManager) redeliverMsg(hashKey string, now time.Time) int {
var hasMore bool = true
startIdx := 0
preTimestamp := time.Now().Unix()
preTimestamp := now.Unix()
//开始分页查询未过期的消息实体
for !self.isClose && hasMore {
more, entities := self.kitestore.PageQueryEntity(hashKey, self.serverName,
for !self.isClose {
_, entities := self.kitestore.PageQueryEntity(hashKey, self.serverName,
preTimestamp, startIdx, 200)

if len(entities) <= 0 {
Expand All @@ -95,7 +94,7 @@ func (self *RecoverManager) redeliverMsg(hashKey string, now time.Time) int {
}
}
startIdx += len(entities)
hasMore = more
// hasMore = more
preTimestamp = entities[len(entities)-1].NextDeliverTime
}
return startIdx
Expand Down

0 comments on commit d4c3ad7

Please sign in to comment.