diff --git a/.gitignore b/.gitignore index a0c292d1..99fd04d1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ go.mod go.sum vendor/ coverage.txt -examples/test \ No newline at end of file +examples/test +/.vscode \ No newline at end of file diff --git a/README.md b/README.md index 477a5907..53c7880e 100644 --- a/README.md +++ b/README.md @@ -52,4 +52,4 @@ For 2.0.0 version, it supports: ---------- ## License - [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation + [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation \ No newline at end of file diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 17f5d76a..4d0b271a 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -26,13 +26,12 @@ import ( "sync" "time" - jsoniter "github.com/json-iterator/go" - "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" + jsoniter "github.com/json-iterator/go" ) type readType int @@ -101,7 +100,7 @@ func (mq *MessageQueueKey) UnmarshalText(text []byte) error { type localFileOffsetStore struct { group string path string - OffsetTable map[MessageQueueKey]int64 + OffsetTable *sync.Map // concurrent safe , map[MessageQueueKey]int64 // mutex for offset file mutex sync.Mutex } @@ -110,7 +109,7 @@ func NewLocalFileOffsetStore(clientID, group string) OffsetStore { store := &localFileOffsetStore{ group: group, path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"), - OffsetTable: make(map[MessageQueueKey]int64), + OffsetTable: new(sync.Map), } store.load() return store @@ -151,7 +150,9 @@ func (local *localFileOffsetStore) load() { } if datas != nil { - local.OffsetTable = datas + for k, v := range datas { + local.OffsetTable.Store(k, v) + } } } @@ -180,17 +181,17 @@ func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int "new_offset": offset, }) key := MessageQueueKey(*mq) - localOffset, exist := local.OffsetTable[key] + localOffset, exist := local.OffsetTable.Load(key) if !exist { - local.OffsetTable[key] = offset + local.OffsetTable.Store(key, offset) return } if increaseOnly { - if localOffset < offset { - local.OffsetTable[key] = offset + if localOffset.(int64) < offset { + local.OffsetTable.Store(key, offset) } } else { - local.OffsetTable[key] = offset + local.OffsetTable.Store(key, offset) } } @@ -201,10 +202,17 @@ func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) { local.mutex.Lock() defer local.mutex.Unlock() + datas := make(map[MessageQueueKey]int64) + local.OffsetTable.Range(func(key, value interface{}) bool { + k := key.(MessageQueueKey) + v := value.(int64) + datas[k] = v + return true + }) + wrapper := OffsetSerializeWrapper{ - OffsetTable: local.OffsetTable, + OffsetTable: datas, } - data, _ := jsoniter.Marshal(wrapper) utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data)) } @@ -384,11 +392,11 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second) } -func readFromMemory(table map[MessageQueueKey]int64, mq *primitive.MessageQueue) int64 { - localOffset, exist := table[MessageQueueKey(*mq)] +func readFromMemory(table *sync.Map, mq *primitive.MessageQueue) int64 { + localOffset, exist := table.Load(MessageQueueKey(*mq)) if !exist { return -1 } - return localOffset + return localOffset.(int64) } diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go index 27b98d94..cfa0eaac 100644 --- a/consumer/offset_store_test.go +++ b/consumer/offset_store_test.go @@ -21,12 +21,11 @@ import ( "path/filepath" "testing" - "github.com/golang/mock/gomock" - . "github.com/smartystreets/goconvey/convey" - "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" ) func TestNewLocalFileOffsetStore(t *testing.T) { @@ -136,7 +135,7 @@ func TestLocalFileOffsetStore(t *testing.T) { offset = localStore.read(mq, _ReadFromStore) So(offset, ShouldEqual, 1) - delete(localStore.(*localFileOffsetStore).OffsetTable, MessageQueueKey(*mq)) + localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq)) offset = localStore.read(mq, _ReadMemoryThenStore) So(offset, ShouldEqual, 1) })