Skip to content

Commit

Permalink
fix: data race when update subversion for consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhiwen1 committed Nov 17, 2022
1 parent 6bbfc3f commit 09f5e4f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
2 changes: 1 addition & 1 deletion consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (dc *defaultConsumer) doBalance() {
func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData {
result := make([]*internal.SubscriptionData, 0)
dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
result = append(result, value.(*internal.SubscriptionData))
result = append(result, value.(*internal.SubscriptionData).Clone())
return true
})
return result
Expand Down
26 changes: 26 additions & 0 deletions internal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,32 @@ type SubscriptionData struct {
ExpType string `json:"expressionType"`
}

func (sd *SubscriptionData) Clone() *SubscriptionData {
cloned := &SubscriptionData{
ClassFilterMode: sd.ClassFilterMode,
Topic: sd.Topic,
SubString: sd.SubString,
SubVersion: sd.SubVersion,
ExpType: sd.ExpType,
}

if sd.Tags.Items() != nil {
cloned.Tags = utils.NewSet()
for _, value := range sd.Tags.Items() {
cloned.Tags.Add(value)
}
}

if sd.Codes.Items() != nil {
cloned.Codes = utils.NewSet()
for _, value := range sd.Codes.Items() {
cloned.Codes.Add(value)
}
}

return cloned
}

type producerData struct {
GroupName string `json:"groupName"`
}
Expand Down
6 changes: 5 additions & 1 deletion internal/utils/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func NewSet() Set {
}
}

func (s *Set) Items() map[string]UniqueItem {
return s.items
}

func (s *Set) Add(v UniqueItem) {
s.items[v.UniqueID()] = v
}
Expand Down Expand Up @@ -98,6 +102,6 @@ func (s *Set) MarshalJSON() ([]byte, error) {
return buffer.Bytes(), nil
}

func (s Set) UnmarshalJSON(data []byte) (err error) {
func (s *Set) UnmarshalJSON(data []byte) (err error) {
return nil
}

0 comments on commit 09f5e4f

Please sign in to comment.