Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] - Added missing locks for safe concurrency #34914

Merged
merged 10 commits into from
Mar 31, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748]
- Fix errors and panics due to re-used processors {pull}34761[34761]
- Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689]
- [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914]

*Heartbeat*

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -27,6 +28,8 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -107,9 +110,12 @@ func (j *job) do(ctx context.Context, id string) {
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
}
}

Expand Down Expand Up @@ -217,9 +223,12 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (l *limiter) wait() {
l.wg.Wait()
}

// release puts pack a worker thread.
// release puts back a worker thread.
func (l *limiter) release() {
<-l.limit
l.wg.Done()
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato
}

// moveToLastSeenJob, moves to the latest job position past the last seen job
// Jobs are stored in lexicographical order always , hence the latest position can be found either on the basis of job name or timestamp
// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp
func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job {
var latestJobs []*job
jobsToReturn := make([]*job, 0)
Expand Down
17 changes: 16 additions & 1 deletion x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ func (s *state) save(name string, lastModifiedOn time.Time) {

// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
s.mu.Lock()
s.cp.IsRootArray[name] = true
s.mu.Unlock()
}

// savePartial, partially saves/updates the current state for cursor checkpoint
func (s *state) savePartial(name string, offset int64) {
s.mu.Lock()
s.cp.LastProcessedOffset[name] = offset
s.mu.Unlock()
}

// updateFailedJobs, adds a job name to a failedJobs map, which helps
Expand All @@ -87,11 +91,11 @@ func (s *state) savePartial(name string, offset int64) {
// A failed job will be re-tried a maximum of 3 times after which the
// entry is removed from the map
func (s *state) updateFailedJobs(jobName string) {
s.mu.Lock()
// we do not store partially processed jobs as failed jobs
if _, ok := s.cp.LastProcessedOffset[jobName]; ok {
return
}
s.mu.Lock()
s.cp.FailedJobs[jobName]++
if s.cp.FailedJobs[jobName] > maxFailedJobRetries {
delete(s.cp.FailedJobs, jobName)
Expand All @@ -100,7 +104,18 @@ func (s *state) updateFailedJobs(jobName string) {
}

// setCheckpoint, sets checkpoint from source to current state instance
// If for some reason the current state is empty, assigns new states as
// a fail safe mechanism
func (s *state) setCheckpoint(chkpt *Checkpoint) {
if chkpt.FailedJobs == nil {
chkpt.FailedJobs = make(map[string]int)
}
if chkpt.IsRootArray == nil {
chkpt.IsRootArray = make(map[string]bool)
}
if chkpt.LastProcessedOffset == nil {
chkpt.LastProcessedOffset = make(map[string]int64)
}
s.cp = chkpt
}

Expand Down