diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ef991baa858..2e963bac692 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -251,6 +251,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix event.kind for system/syslog pipeline {issue}20365[20365] {pull}20390[20390] - Clone value when copy fields in processors to avoid crash. {issue}19206[19206] {pull}20500[20500] - Fix event.type for zeek/ssl and duplicate event.category for zeek/connection {pull}20696[20696] +- Fix long registry migration times. {pull}20717[20717] {issue}20705[20705] *Heartbeat* diff --git a/filebeat/registrar/migrate.go b/filebeat/registrar/migrate.go index 16e7b14744f..39b63636e3b 100644 --- a/filebeat/registrar/migrate.go +++ b/filebeat/registrar/migrate.go @@ -31,7 +31,6 @@ import ( helper "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/paths" - "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/backend/memlog" ) @@ -214,17 +213,15 @@ func (m *Migrator) updateToVersion1(regHome string) error { registryBackend, err := memlog.New(logp.NewLogger("migration"), memlog.Settings{ Root: m.dataPath, FileMode: m.permissions, - Checkpoint: func(_ uint64) bool { return true }, + Checkpoint: func(sz uint64) bool { return false }, IgnoreVersionCheck: true, }) if err != nil { return errors.Wrap(err, "failed to create new registry backend") } + defer registryBackend.Close() - reg := statestore.NewRegistry(registryBackend) - defer reg.Close() - - store, err := reg.Get("filebeat") + store, err := registryBackend.Access("filebeat") if err != nil { return errors.Wrap(err, "failed to open filebeat registry store") } @@ -234,6 +231,13 @@ func (m *Migrator) updateToVersion1(regHome string) error { return errors.Wrap(err, "failed to migrate registry states") } + if checkpointer, ok := store.(interface{ Checkpoint() error }); ok { + err := checkpointer.Checkpoint() + if err != nil { + return fmt.Errorf("failed to fsync filebeat storage state: %w", err) + } + } + if err := os.Remove(origDataFile); err != nil { return errors.Wrapf(err, "migration complete but failed to remove original data file: %v", origDataFile) } diff --git a/filebeat/registrar/migrate_bench_test.go b/filebeat/registrar/migrate_bench_test.go new file mode 100644 index 00000000000..2a1ba7820cb --- /dev/null +++ b/filebeat/registrar/migrate_bench_test.go @@ -0,0 +1,141 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build linux darwin + +package registrar + +import ( + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/elastic/beats/v7/filebeat/input/file" + libfile "github.com/elastic/beats/v7/libbeat/common/file" +) + +var keep bool + +func init() { + flag.BoolVar(&keep, "keep", false, "do not delete test directories") +} + +func BenchmarkMigration0To1(b *testing.B) { + for _, entries := range []int{1, 10, 100, 1000, 10000, 100000} { + b.Run(fmt.Sprintf("%v", entries), func(b *testing.B) { + b.StopTimer() + + dataHome := tempDir(b) + registryHome := filepath.Join(dataHome, "filebeat") + mkDir(b, registryHome) + + metaPath := filepath.Join(registryHome, "meta.json") + dataPath := filepath.Join(registryHome, "data.json") + + states := make([]file.State, entries) + for i := range states { + states[i] = file.State{ + Id: fmt.Sprintf("123455-%v", i), + Source: fmt.Sprintf("/path/to/test/file-%v.log", i), + FileStateOS: libfile.StateOS{ + Inode: uint64(i), + Device: 123455, + }, + } + } + + for i := 0; i < b.N; i++ { + b.StopTimer() + clearDir(b, registryHome) + // cleanup older run + + writeFile(b, metaPath, []byte(`{"version": "0"}`)) + func() { + f, err := os.Create(dataPath) + if err != nil { + b.Fatal(err) + } + defer f.Close() + + enc := json.NewEncoder(f) + if err := enc.Encode(states); err != nil { + b.Fatal(err) + } + }() + + migrator := &Migrator{ + dataPath: dataHome, + permissions: 0600, + } + + b.StartTimer() + if err := migrator.updateToVersion1(registryHome); err != nil { + b.Fatal(err) + } + } + }) + } +} + +func tempDir(t testing.TB) string { + cwd, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + + path, err := ioutil.TempDir(cwd, "") + if err != nil { + t.Fatal(err) + } + + if !keep { + t.Cleanup(func() { + os.RemoveAll(path) + }) + } + return path +} + +func mkDir(t testing.TB, path string) { + if err := os.MkdirAll(path, 0700); err != nil { + t.Fatal(err) + } +} + +func clearDir(t testing.TB, path string) { + old, err := ioutil.ReadDir(path) + if err != nil { + t.Fatal(err) + } + for _, info := range old { + if err := os.RemoveAll(info.Name()); err != nil { + t.Fatal(err) + } + } +} + +func writeFile(t testing.TB, path string, contents []byte) { + t.Helper() + err := ioutil.WriteFile(path, contents, 0600) + if err != nil { + t.Fatal(err) + } +} diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 0faa8a38890..fa07048d205 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/backend" ) type Registrar struct { @@ -300,7 +301,7 @@ func readStatesFrom(store *statestore.Store) ([]file.State, error) { return states, nil } -func writeStates(store *statestore.Store, states []file.State) error { +func writeStates(store backend.Store, states []file.State) error { for i := range states { key := fileStatePrefix + states[i].Id if err := store.Set(key, states[i]); err != nil { diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py index 91e0664d214..1e45d50eb84 100644 --- a/filebeat/tests/system/test_registrar_upgrade.py +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -94,5 +94,5 @@ def validate_if_registry_is_moved_under_folder(self): assert os.path.isdir(migrated_registry_dir) assert os.path.isdir(migrated_registry_dir + "/filebeat") assert os.path.isfile(migrated_registry_dir + "/filebeat/log.json") - assert os.path.isfile(migrated_registry_dir + "/filebeat/1.json") + assert os.path.isfile(migrated_registry_dir + "/filebeat/2.json") assert os.path.isfile(migrated_registry_dir + "/filebeat/active.dat") diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go index 29880a6aae8..55da9db06ed 100644 --- a/libbeat/statestore/backend/memlog/store.go +++ b/libbeat/statestore/backend/memlog/store.go @@ -196,6 +196,16 @@ func (s *store) Remove(key string) error { return s.logOperation(&opRemove{K: key}) } +// Checkpoint triggers a state checkpoint operation. All state will be written +// to a new transaction data file and fsync'ed. The log file will be reset after +// a successful write. +func (s *store) Checkpoint() error { + s.lock.Lock() + defer s.lock.Unlock() + + return s.disk.WriteCheckpoint(s.mem.table) +} + // lopOperation ensures that the diskstore reflects the recent changes to the // in memory store by either triggering a checkpoint operations or adding the // operation type to the update log file.