From 7fcc3a02c3216851bc169c9fef2bf7fafbe8a818 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Sat, 21 Sep 2024 11:28:59 +0530 Subject: [PATCH 1/9] simplified state checkpoint calculation --- .../input/azureblobstorage/scheduler.go | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index ba433b78f416..1dfc6cd70bc2 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -192,35 +192,20 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { var latestJobs []*job jobsToReturn := make([]*job, 0) - counter := 0 - flag := false - ignore := false for _, job := range jobs { switch { case job.timestamp().After(s.state.checkpoint().LatestEntryTime): latestJobs = append(latestJobs, job) - case job.name() == s.state.checkpoint().BlobName: - flag = true case job.name() > s.state.checkpoint().BlobName: - flag = true - counter-- - case job.name() <= s.state.checkpoint().BlobName && (!ignore): - ignore = true + jobsToReturn = append(jobsToReturn, job) } - counter++ - } - - if flag && (counter < len(jobs)-1) { - jobsToReturn = jobs[counter+1:] - } else if !flag && !ignore { - jobsToReturn = jobs } // in a senario where there are some jobs which have a greater timestamp // but lesser alphanumeric order and some jobs have greater alphanumeric order // than the current checkpoint blob name, then we append the latest jobs - if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { + if len(latestJobs) > 0 { jobsToReturn = append(latestJobs, jobsToReturn...) } From ea112de32f3bf2d6bdcd49fde7d7cc26576839d4 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Sat, 21 Sep 2024 11:37:36 +0530 Subject: [PATCH 2/9] updated changelog --- CHANGELOG-developer.next.asciidoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index abe0164fe70d..ad9694066449 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -205,9 +205,10 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] - Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] +- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}35705[35705] {pull}35724[35724] ==== Deprecated - Deprecated the `common.Float` type. {issue}28279[28279] {pull}28280[28280] - Deprecate Beat generators. {pull}28814[28814] -- Remove garbled PE executable test from auditbeat FIM module testing. {issue}35705[35705] {pull}35724[35724] +- Remove garbled PE executable test from auditbeat FIM module testing. {issue}40674[40674] {pull}40936[40936] From d4cffd171a72e976b8dc1dfb77564141ce896be0 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Sat, 21 Sep 2024 11:40:09 +0530 Subject: [PATCH 3/9] updated changelog --- CHANGELOG-developer.next.asciidoc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index ad9694066449..8dfab2cd80df 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -205,10 +205,9 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] - Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] -- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}35705[35705] {pull}35724[35724] - +- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936] ==== Deprecated - Deprecated the `common.Float` type. {issue}28279[28279] {pull}28280[28280] - Deprecate Beat generators. {pull}28814[28814] -- Remove garbled PE executable test from auditbeat FIM module testing. {issue}40674[40674] {pull}40936[40936] +- Remove garbled PE executable test from auditbeat FIM module testing. {issue}35705[35705] {pull}35724[35724] From 095dacd0437ece94e76d21915bf59f424a7389a1 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Sat, 21 Sep 2024 11:40:37 +0530 Subject: [PATCH 4/9] updated changelog --- CHANGELOG-developer.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8dfab2cd80df..685d641ad0c6 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -206,6 +206,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] - Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] - Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936] + ==== Deprecated - Deprecated the `common.Float` type. {issue}28279[28279] {pull}28280[28280] From 118658d9886b22cea53ae4f78b09e41b788714ea Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 24 Sep 2024 19:08:08 +0530 Subject: [PATCH 5/9] addressed PR comments --- x-pack/filebeat/input/azureblobstorage/scheduler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index 1dfc6cd70bc2..b95467eb0a43 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -190,8 +190,8 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List // moveToLastSeenJob, moves to the latest job position past the last seen job // Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { - var latestJobs []*job - jobsToReturn := make([]*job, 0) + latestJobs := make([]*job, 0, len(jobs)) + jobsToReturn := make([]*job, 0, len(jobs)) for _, job := range jobs { switch { @@ -204,7 +204,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { // in a senario where there are some jobs which have a greater timestamp // but lesser alphanumeric order and some jobs have greater alphanumeric order - // than the current checkpoint blob name, then we append the latest jobs + // than the current checkpoint blob name, we then append to the latest jobs if len(latestJobs) > 0 { jobsToReturn = append(latestJobs, jobsToReturn...) } From 92d6344dc38949b68c7ce1981a0a9c2bceba47de Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 25 Sep 2024 14:28:25 +0530 Subject: [PATCH 6/9] updated with PR suggestions --- .../input/azureblobstorage/scheduler.go | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index b95467eb0a43..9c2e09c2b7d3 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -7,6 +7,8 @@ package azureblobstorage import ( "context" "fmt" + "slices" + "sort" "sync" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" @@ -190,26 +192,18 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List // moveToLastSeenJob, moves to the latest job position past the last seen job // Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { - latestJobs := make([]*job, 0, len(jobs)) - jobsToReturn := make([]*job, 0, len(jobs)) - - for _, job := range jobs { - switch { - case job.timestamp().After(s.state.checkpoint().LatestEntryTime): - latestJobs = append(latestJobs, job) - case job.name() > s.state.checkpoint().BlobName: - jobsToReturn = append(jobsToReturn, job) - } - } + cp := s.state.checkpoint() + jobs = slices.DeleteFunc(jobs, func(j *job) bool { + return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName) + }) // in a senario where there are some jobs which have a greater timestamp - // but lesser alphanumeric order and some jobs have greater alphanumeric order - // than the current checkpoint blob name, we then append to the latest jobs - if len(latestJobs) > 0 { - jobsToReturn = append(latestJobs, jobsToReturn...) - } - - return jobsToReturn + // but lesser lexicographic order and some jobs have greater lexicographic order + // than the current checkpoint blob name, we then sort around the pivot checkpoint timestamp + sort.Slice(jobs, func(i, _ int) bool { + return jobs[i].timestamp().After(cp.LatestEntryTime) + }) + return jobs } func (s *scheduler) isFileSelected(name string) bool { From 0cfb468ef55aad6722c3b93b04435530792b52be Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 25 Sep 2024 14:39:09 +0530 Subject: [PATCH 7/9] updated comment --- x-pack/filebeat/input/azureblobstorage/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index 9c2e09c2b7d3..c3cf7a47a00b 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -197,7 +197,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName) }) - // in a senario where there are some jobs which have a greater timestamp + // in a scenario where there are some jobs which have a greater timestamp // but lesser lexicographic order and some jobs have greater lexicographic order // than the current checkpoint blob name, we then sort around the pivot checkpoint timestamp sort.Slice(jobs, func(i, _ int) bool { From 98ee3e8cf50c768deb0d11c5c9dd056955a4da7e Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 27 Sep 2024 15:59:09 +0530 Subject: [PATCH 8/9] updated comment --- x-pack/filebeat/input/azureblobstorage/scheduler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index c3cf7a47a00b..bb7c708661a9 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -197,9 +197,10 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName) }) - // in a scenario where there are some jobs which have a greater timestamp + // In a scenario where there are some jobs which have a greater timestamp // but lesser lexicographic order and some jobs have greater lexicographic order - // than the current checkpoint blob name, we then sort around the pivot checkpoint timestamp + // than the current checkpoint blob name, we then sort around the pivot checkpoint + // timestamp. sort.Slice(jobs, func(i, _ int) bool { return jobs[i].timestamp().After(cp.LatestEntryTime) }) From 334fb4a5771eb2fd00639def0ae9841102979085 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 30 Sep 2024 12:11:29 +0530 Subject: [PATCH 9/9] changed sort.Slice to sort.SliceStable as suggested --- x-pack/filebeat/input/azureblobstorage/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index bb7c708661a9..b44cefdcf4eb 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -201,7 +201,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { // but lesser lexicographic order and some jobs have greater lexicographic order // than the current checkpoint blob name, we then sort around the pivot checkpoint // timestamp. - sort.Slice(jobs, func(i, _ int) bool { + sort.SliceStable(jobs, func(i, _ int) bool { return jobs[i].timestamp().After(cp.LatestEntryTime) }) return jobs