diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index abe0164fe70d..685d641ad0c6 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -205,6 +205,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] - Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] +- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936] ==== Deprecated diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index ba433b78f416..b44cefdcf4eb 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -7,6 +7,8 @@ package azureblobstorage import ( "context" "fmt" + "slices" + "sort" "sync" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" @@ -190,41 +192,19 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List // moveToLastSeenJob, moves to the latest job position past the last seen job // Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { - var latestJobs []*job - jobsToReturn := make([]*job, 0) - counter := 0 - flag := false - ignore := false - - for _, job := range jobs { - switch { - case job.timestamp().After(s.state.checkpoint().LatestEntryTime): - latestJobs = append(latestJobs, job) - case job.name() == s.state.checkpoint().BlobName: - flag = true - case job.name() > s.state.checkpoint().BlobName: - flag = true - counter-- - case job.name() <= s.state.checkpoint().BlobName && (!ignore): - ignore = true - } - counter++ - } - - if flag && (counter < len(jobs)-1) { - jobsToReturn = jobs[counter+1:] - } else if !flag && !ignore { - jobsToReturn = jobs - } - - // in a senario where there are some jobs which have a greater timestamp - // but lesser alphanumeric order and some jobs have greater alphanumeric order - // than the current checkpoint blob name, then we append the latest jobs - if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { - jobsToReturn = append(latestJobs, jobsToReturn...) - } + cp := s.state.checkpoint() + jobs = slices.DeleteFunc(jobs, func(j *job) bool { + return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName) + }) - return jobsToReturn + // In a scenario where there are some jobs which have a greater timestamp + // but lesser lexicographic order and some jobs have greater lexicographic order + // than the current checkpoint blob name, we then sort around the pivot checkpoint + // timestamp. + sort.SliceStable(jobs, func(i, _ int) bool { + return jobs[i].timestamp().After(cp.LatestEntryTime) + }) + return jobs } func (s *scheduler) isFileSelected(name string) bool {