Skip to content

Commit

Permalink
add mutex to listinfInfo (#30131)
Browse files Browse the repository at this point in the history
* add mutex to listinfInfo

* back to mutex

* add warn logging for totObject not matchig sum of storedObjects and errorObjects in listingInfo

* changelog

* fix merge
  • Loading branch information
Andrea Spacca authored Feb 8, 2022
1 parent 4c747cd commit ce2e5c2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif

- tcp/unix input: Stop accepting connections after socket is closed. {pull}29712[29712]
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131]

*Heartbeat*

Expand Down
17 changes: 16 additions & 1 deletion x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ const (
)

type listingInfo struct {
totObjects int
totObjects int

mu sync.Mutex
storedObjects int
errorObjects int
finalCheck bool
Expand Down Expand Up @@ -105,11 +107,19 @@ func (s *states) Delete(id string) {
func (s *states) IsListingFullyStored(listingID string) bool {
info, _ := s.listingInfo.Load(listingID)
listingInfo := info.(*listingInfo)
listingInfo.mu.Lock()
defer listingInfo.mu.Unlock()
if listingInfo.finalCheck {
return false
}

listingInfo.finalCheck = (listingInfo.storedObjects + listingInfo.errorObjects) == listingInfo.totObjects

if (listingInfo.storedObjects + listingInfo.errorObjects) > listingInfo.totObjects {
s.log.Warnf("unexepected mixmatch between storedObjects (%d), errorObjects (%d) and totObjects (%d)",
listingInfo.storedObjects, listingInfo.errorObjects, listingInfo.totObjects)
}

return listingInfo.finalCheck
}

Expand Down Expand Up @@ -154,6 +164,9 @@ func (s *states) Update(newState state, listingID string) {
// here we increase the number of stored object
info, _ := s.listingInfo.Load(listingID)
listingInfo := info.(*listingInfo)

listingInfo.mu.Lock()

if newState.Stored {
listingInfo.storedObjects++
}
Expand All @@ -162,6 +175,8 @@ func (s *states) Update(newState state, listingID string) {
listingInfo.errorObjects++
}

listingInfo.mu.Unlock()

if _, ok := s.statesByListingID[listingID]; !ok {
s.statesByListingID[listingID] = make([]state, 0)
}
Expand Down

0 comments on commit ce2e5c2

Please sign in to comment.