Skip to content

Commit

Permalink
add mutex to listinfInfo (#30131) (#30362)
Browse files Browse the repository at this point in the history
* add mutex to listinfInfo

* back to mutex

* add warn logging for totObject not matchig sum of storedObjects and errorObjects in listingInfo

* changelog

* fix merge

(cherry picked from commit ce2e5c2)

Co-authored-by: Andrea Spacca <[email protected]>
  • Loading branch information
mergify[bot] and Andrea Spacca authored Feb 12, 2022
1 parent cd57bd4 commit 9ea2e2b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Filebeat*
- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277]
- cisco module: Fix change the broke ASA and FTD configs that used `var.input: syslog`. {pull}30072[30072]
- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131]

*Heartbeat*
- Fix missing mapping for `service.name`. {pull}30324[30324]
Expand Down
17 changes: 16 additions & 1 deletion x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ const (
)

type listingInfo struct {
totObjects int
totObjects int

mu sync.Mutex
storedObjects int
errorObjects int
finalCheck bool
Expand Down Expand Up @@ -105,11 +107,19 @@ func (s *states) Delete(id string) {
func (s *states) IsListingFullyStored(listingID string) bool {
info, _ := s.listingInfo.Load(listingID)
listingInfo := info.(*listingInfo)
listingInfo.mu.Lock()
defer listingInfo.mu.Unlock()
if listingInfo.finalCheck {
return false
}

listingInfo.finalCheck = (listingInfo.storedObjects + listingInfo.errorObjects) == listingInfo.totObjects

if (listingInfo.storedObjects + listingInfo.errorObjects) > listingInfo.totObjects {
s.log.Warnf("unexepected mixmatch between storedObjects (%d), errorObjects (%d) and totObjects (%d)",
listingInfo.storedObjects, listingInfo.errorObjects, listingInfo.totObjects)
}

return listingInfo.finalCheck
}

Expand Down Expand Up @@ -154,6 +164,9 @@ func (s *states) Update(newState state, listingID string) {
// here we increase the number of stored object
info, _ := s.listingInfo.Load(listingID)
listingInfo := info.(*listingInfo)

listingInfo.mu.Lock()

if newState.Stored {
listingInfo.storedObjects++
}
Expand All @@ -162,6 +175,8 @@ func (s *states) Update(newState state, listingID string) {
listingInfo.errorObjects++
}

listingInfo.mu.Unlock()

if _, ok := s.statesByListingID[listingID]; !ok {
s.statesByListingID[listingID] = make([]state, 0)
}
Expand Down

0 comments on commit 9ea2e2b

Please sign in to comment.