diff --git a/libbeat/statestore/backend/memlog/memlog.go b/libbeat/statestore/backend/memlog/memlog.go index 265b53a3df1c..fc93d85e85d9 100644 --- a/libbeat/statestore/backend/memlog/memlog.go +++ b/libbeat/statestore/backend/memlog/memlog.go @@ -54,6 +54,9 @@ type Settings struct { // Checkpoint predicate that can trigger a registry file rotation. If not // configured, memlog will automatically trigger a checkpoint every 10MB. Checkpoint CheckpointPredicate + + // If set memlog will not check the version of the meta file. + IgnoreVersionCheck bool } // CheckpointPredicate is the type for configurable checkpoint checks. @@ -62,7 +65,7 @@ type CheckpointPredicate func(fileSize uint64) bool const defaultFileMode os.FileMode = 0600 -const defaultBufferSize = 4096 +const defaultBufferSize = 4 * 1024 func defaultCheckpoint(filesize uint64) bool { const limit = 10 * 1 << 20 // set rotation limit to 10MB by default @@ -110,7 +113,7 @@ func (r *Registry) Access(name string) (backend.Store, error) { home := filepath.Join(r.settings.Root, name) fileMode := r.settings.FileMode bufSz := r.settings.BufferSize - store, err := openStore(logger, home, fileMode, bufSz, r.settings.Checkpoint) + store, err := openStore(logger, home, fileMode, bufSz, r.settings.IgnoreVersionCheck, r.settings.Checkpoint) if err != nil { return nil, err } diff --git a/libbeat/statestore/backend/memlog/memlog_test.go b/libbeat/statestore/backend/memlog/memlog_test.go new file mode 100644 index 000000000000..d8fe048f69c7 --- /dev/null +++ b/libbeat/statestore/backend/memlog/memlog_test.go @@ -0,0 +1,258 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "encoding/json" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/beats/v7/libbeat/statestore/internal/storecompliance" +) + +func init() { + logp.DevelopmentSetup() +} + +func TestCompliance_Default(t *testing.T) { + storecompliance.TestBackendCompliance(t, func(testPath string) (backend.Registry, error) { + return New(logp.NewLogger("test"), Settings{Root: testPath}) + }) +} + +func TestCompliance_AlwaysCheckpoint(t *testing.T) { + storecompliance.TestBackendCompliance(t, func(testPath string) (backend.Registry, error) { + return New(logp.NewLogger("test"), Settings{ + Root: testPath, + Checkpoint: func(filesize uint64) bool { + return true + }, + }) + }) +} + +func TestLoadVersion1(t *testing.T) { + dataHome := "testdata/1" + + list, err := ioutil.ReadDir(dataHome) + if err != nil { + t.Fatal(err) + } + + cases := list[:0] + for _, info := range list { + if info.IsDir() { + cases = append(cases, info) + } + } + + for _, info := range cases { + name := filepath.Base(info.Name()) + t.Run(name, func(t *testing.T) { + testLoadVersion1Case(t, filepath.Join(dataHome, info.Name())) + }) + } +} + +func testLoadVersion1Case(t *testing.T, dataPath string) { + path, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("Failed to create temporary test directory: %v", err) + } + defer os.RemoveAll(path) + + t.Logf("Test tmp dir: %v", path) + + if err := copyPath(path, dataPath); err != nil { + t.Fatalf("Failed to copy test file to the temporary directory: %v", err) + } + + // load expected test results + raw, err := ioutil.ReadFile(filepath.Join(path, "expected.json")) + if err != nil { + t.Fatalf("Failed to load expected.json: %v", err) + } + + expected := struct { + Txid uint64 + Datafile string + Entries map[string]interface{} + }{} + if err := json.Unmarshal(raw, &expected); err != nil { + t.Fatalf("Failed to parse expected.json: %v", err) + } + + // load store: + store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, true, func(_ uint64) bool { + return false + }) + if err != nil { + t.Fatalf("Failed to load test store: %v", err) + } + defer store.Close() + + disk := store.disk + disk.removeOldDataFiles() + + // validate store: + assert.Equal(t, expected.Txid, disk.nextTxID-1) + if expected.Datafile != "" { + assert.Equal(t, filepath.Join(path, expected.Datafile), disk.activeDataFile.path) + } + + // check all keys in expected are known and do match stored values: + func() { + for key, val := range expected.Entries { + var tmp interface{} + err := store.Get(key, &tmp) + require.NoError(t, err, "error reading entry (key=%v)", key) + + assert.Equal(t, val, tmp, "failed when checking key '%s'", key) + } + }() + + // check store does not contain any additional keys + func() { + err = store.Each(func(key string, val statestore.ValueDecoder) (bool, error) { + _, exists := expected.Entries[string(key)] + if !exists { + t.Errorf("unexpected key: %s", key) + } + return true, nil + }) + assert.NoError(t, err) + }() +} + +func TestTxIDLessEqual(t *testing.T) { + cases := map[string]struct { + a, b uint64 + want bool + }{ + "is equal": {10, 10, true}, + "is less": {8, 9, true}, + "is bigger": {9, 8, false}, + "is less 0 with integer overflow": { + math.MaxUint64 - 2, 0, true, + }, + "is less random value with integer overflow": { + math.MaxUint64 - 2, 10, true, + }, + "is less with large ids": { + math.MaxUint64 - 10, math.MaxUint64 - 9, true, + }, + "is bigger with large ids": { + math.MaxUint64 - 9, math.MaxUint64 - 10, false, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + got := isTxIDLessEqual(test.a, test.b) + if got != test.want { + t.Fatalf("%v <= %v should be %v", test.a, test.b, test.want) + } + }) + } +} + +func copyPath(to, from string) error { + info, err := os.Stat(from) + if err != nil { + return err + } + + if info.IsDir() { + return copyDir(to, from) + } + if info.Mode().IsRegular() { + return copyFile(to, from) + } + + // ignore other file types + return nil +} + +func copyDir(to, from string) error { + if !isDir(to) { + info, err := os.Stat(from) + if err != nil { + return err + } + + if err := os.MkdirAll(to, info.Mode()); err != nil { + return err + } + } + + list, err := ioutil.ReadDir(from) + if err != nil { + return err + } + + for _, file := range list { + name := file.Name() + err := copyPath(filepath.Join(to, name), filepath.Join(from, name)) + if err != nil { + return err + } + } + return nil +} + +func copyFile(to, from string) error { + in, err := os.Open(from) + if err != nil { + return err + } + defer in.Close() + + info, err := in.Stat() + if err != nil { + return err + } + + out, err := os.OpenFile(to, os.O_CREATE|os.O_RDWR|os.O_TRUNC, info.Mode()) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + return err +} + +func isDir(path string) bool { + info, err := os.Stat(path) + return err == nil && info.IsDir() +} + +func isFile(path string) bool { + info, err := os.Stat(path) + return err == nil && info.Mode().IsRegular() +} diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go index 68bb6124a3af..29880a6aae8b 100644 --- a/libbeat/statestore/backend/memlog/store.go +++ b/libbeat/statestore/backend/memlog/store.go @@ -18,9 +18,12 @@ package memlog import ( + "fmt" "os" + "path/filepath" "sync" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore/backend" @@ -56,51 +59,201 @@ type entry struct { // If an error in in the log file is detected, the store opening routine continues from the last known valid state and will trigger a checkpoint // operation on subsequent writes, also truncating the log file. // Old data files are scheduled for deletion later. -func openStore(log *logp.Logger, home string, mode os.FileMode, bufSz uint, checkpoint CheckpointPredicate) (*store, error) { - panic("TODO: implement me") +func openStore(log *logp.Logger, home string, mode os.FileMode, bufSz uint, ignoreVersionCheck bool, checkpoint CheckpointPredicate) (*store, error) { + fi, err := os.Stat(home) + if os.IsNotExist(err) { + err = os.MkdirAll(home, os.ModeDir|0770) + if err != nil { + return nil, err + } + + err = writeMetaFile(home, mode) + if err != nil { + return nil, err + } + } else if !fi.Mode().IsDir() { + return nil, fmt.Errorf("'%v' is not a directory", home) + } else { + if err := pathEnsurePermissions(filepath.Join(home, metaFileName), mode); err != nil { + return nil, fmt.Errorf("failed to update meta file permissions: %w", err) + } + } + + if !ignoreVersionCheck { + meta, err := readMetaFile(home) + if err != nil { + return nil, err + } + if err := checkMeta(meta); err != nil { + return nil, err + } + } + + if err := pathEnsurePermissions(filepath.Join(home, activeDataFileName), mode); err != nil { + return nil, fmt.Errorf("failed to update active file permissions: %w", err) + } + + dataFiles, err := listDataFiles(home) + if err != nil { + return nil, err + } + for _, df := range dataFiles { + if err := pathEnsurePermissions(df.path, mode); err != nil { + return nil, fmt.Errorf("failed to update data file permissions: %w", err) + } + } + if err := pathEnsurePermissions(filepath.Join(home, logFileName), mode); err != nil { + return nil, fmt.Errorf("failed to update log file permissions: %w", err) + } + + tbl := map[string]entry{} + var txid uint64 + if L := len(dataFiles); L > 0 { + active := dataFiles[L-1] + txid = active.txid + if err := loadDataFile(active.path, tbl); err != nil { + return nil, err + } + } + + logp.Info("Loading data file of '%v' succeeded. Active transaction id=%v", home, txid) + + var entries uint + memstore := memstore{tbl} + txid, entries, err = loadLogFile(&memstore, txid, home) + logp.Info("Finished loading transaction log file for '%v'. Active transaction id=%v", home, txid) + + if err != nil { + // Error indicates the log file was incomplete or corrupted. + // Anyways, we already have the table in a valid state and will + // continue opening the store from here. + logp.Warn("Incomplete or corrupted log file in %v. Continue with last known complete and consistent state. Reason: %v", home, err) + } + + diskstore, err := newDiskStore(log, home, dataFiles, txid, mode, entries, err != nil, bufSz, checkpoint) + if err != nil { + return nil, err + } + + return &store{ + disk: diskstore, + mem: memstore, + }, nil } // Close closes access to the update log file and clears the in memory key // value store. Access to the store after close can lead to a panic. func (s *store) Close() error { - panic("TODO: implement me") + s.lock.Lock() + defer s.lock.Unlock() + s.mem = memstore{} + return s.disk.Close() } // Has checks if the key is known. The in memory store does not report any // errors. func (s *store) Has(key string) (bool, error) { - panic("TODO: implement me") + s.lock.RLock() + defer s.lock.RUnlock() + return s.mem.Has(key), nil } // Get retrieves and decodes the key-value pair into to. func (s *store) Get(key string, to interface{}) error { - panic("TODO: implement me") + s.lock.RLock() + defer s.lock.RUnlock() + + dec := s.mem.Get(key) + if dec == nil { + return errKeyUnknown + } + return dec.Decode(to) } // Set inserts or overwrites a key-value pair. // If encoding was successful the in-memory state will be updated and a // set-operation is logged to the diskstore. func (s *store) Set(key string, value interface{}) error { - panic("TODO: implement me") + var tmp common.MapStr + if err := typeconv.Convert(&tmp, value); err != nil { + return err + } + + s.lock.Lock() + defer s.lock.Unlock() + + s.mem.Set(key, tmp) + return s.logOperation(&opSet{K: key, V: tmp}) } // Remove removes a key from the in memory store and logs a remove operation to // the diskstore. The operation does not check if the key exists. func (s *store) Remove(key string) error { - panic("TODO: implement me") + s.lock.Lock() + defer s.lock.Unlock() + + s.mem.Remove(key) + return s.logOperation(&opRemove{K: key}) +} + +// lopOperation ensures that the diskstore reflects the recent changes to the +// in memory store by either triggering a checkpoint operations or adding the +// operation type to the update log file. +func (s *store) logOperation(op op) error { + if s.disk.mustCheckpoint() { + err := s.disk.WriteCheckpoint(s.mem.table) + if err != nil { + // if writing the new checkpoint file failed we try to fallback to + // appending the log operation. + // TODO: make append configurable and retry checkpointing with backoff. + s.disk.LogOperation(op) + } + + return err + } + + return s.disk.LogOperation(op) } // Each iterates over all key-value pairs in the store. func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { - panic("TODO: implement me") + s.lock.RLock() + defer s.lock.RUnlock() + + for k, entry := range s.mem.table { + cont, err := fn(k, entry) + if !cont || err != nil { + return err + } + } + + return nil +} + +func (m *memstore) Has(key string) bool { + _, exists := m.table[key] + return exists +} + +func (m *memstore) Get(key string) backend.ValueDecoder { + entry, exists := m.table[key] + if !exists { + return nil + } + return entry } -func (s *memstore) Set(key string, value interface{}) error { - panic("TODO: implement me") +func (m *memstore) Set(key string, value common.MapStr) { + m.table[key] = entry{value: value} } -func (s *memstore) Remove(key string) error { - panic("TODO: implement me") +func (m *memstore) Remove(key string) bool { + _, exists := m.table[key] + if !exists { + return false + } + delete(m.table, key) + return true } func (e entry) Decode(to interface{}) error { diff --git a/libbeat/statestore/backend/memlog/testdata/1/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/Readme.md new file mode 100644 index 000000000000..7532ab9b9425 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/Readme.md @@ -0,0 +1 @@ +Sample disk stores version 1 diff --git a/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/1.json b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/1.json new file mode 100644 index 000000000000..90bd45d5de1c --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/1.json @@ -0,0 +1,5 @@ +[ +{"_key":"key0","a":0}, +{"_key":"key1","a":1}, +{"_key":"key2","a":2} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/Readme.md new file mode 100644 index 000000000000..9c7b4584885d --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/Readme.md @@ -0,0 +1,2 @@ +Store file with transaction ID and log with with transaction id 3. +The data file 2.json is missing, which leads to all entries in the log file to be ignored. diff --git a/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/expected.json b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/expected.json new file mode 100644 index 000000000000..f691496a5b23 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/expected.json @@ -0,0 +1,9 @@ +{ + "txid": 1, + "datafile": "1.json", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/log.json b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/log.json new file mode 100644 index 000000000000..5b5d8f4e0f62 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/log.json @@ -0,0 +1,6 @@ +{"op": "set", "id": 3} +{"K":"key3","V":{"a":3}} +{"op": "set", "id": 3} +{"K":"key3","V":{"a":3}} +{"op": "set", "id": 3} +{"K":"key5","V":{"a":5}} diff --git a/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/meta.json b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/meta.json new file mode 100644 index 000000000000..bcb050a28217 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/commit_wrong_id/meta.json @@ -0,0 +1 @@ +{"version": "1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/data_and_log/1.json b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/1.json new file mode 100644 index 000000000000..90bd45d5de1c --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/1.json @@ -0,0 +1,5 @@ +[ +{"_key":"key0","a":0}, +{"_key":"key1","a":1}, +{"_key":"key2","a":2} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/data_and_log/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/Readme.md new file mode 100644 index 000000000000..5272ae569bd2 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/Readme.md @@ -0,0 +1 @@ +Store with valid data file and valid log file with logged updates. diff --git a/libbeat/statestore/backend/memlog/testdata/1/data_and_log/expected.json b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/expected.json new file mode 100644 index 000000000000..c7a3023283ca --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/expected.json @@ -0,0 +1,12 @@ +{ + "txid": 4, + "datafile": "1.json", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2}, + "key3": {"a": 3}, + "key4": {"a": 4}, + "key5": {"a": 5} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/data_and_log/log.json b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/log.json new file mode 100644 index 000000000000..c7edd4f783b3 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/log.json @@ -0,0 +1,6 @@ +{"op":"set", "id": 2} +{"K":"key3","V":{"a":3}} +{"op":"set", "id": 3} +{"K":"key4","V":{"a":4}} +{"op":"set", "id": 4} +{"K":"key5","V":{"a":5}} diff --git a/libbeat/statestore/backend/memlog/testdata/1/data_and_log/meta.json b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/meta.json new file mode 100644 index 000000000000..bcb050a28217 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/data_and_log/meta.json @@ -0,0 +1 @@ +{"version": "1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/datafile_only/1.json b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/1.json new file mode 100644 index 000000000000..0460d15de490 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/1.json @@ -0,0 +1,8 @@ +[ +{"_key":"key0","a":0}, +{"_key":"key1","a":1}, +{"_key":"key2","a":2}, +{"_key":"key3","a":3}, +{"_key":"key4","a":4}, +{"_key":"key5","a":5} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/datafile_only/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/Readme.md new file mode 100644 index 000000000000..d5cc1f554404 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/Readme.md @@ -0,0 +1,2 @@ +Valid store without log.json. All entries are read from 1.json only. + diff --git a/libbeat/statestore/backend/memlog/testdata/1/datafile_only/expected.json b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/expected.json new file mode 100644 index 000000000000..8c643ce7fda2 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/expected.json @@ -0,0 +1,12 @@ +{ + "txid": 1, + "datafile": "1.json", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2}, + "key3": {"a": 3}, + "key4": {"a": 4}, + "key5": {"a": 5} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/datafile_only/meta.json b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/meta.json new file mode 100644 index 000000000000..bcb050a28217 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/datafile_only/meta.json @@ -0,0 +1 @@ +{"version": "1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/1.json b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/1.json new file mode 100644 index 000000000000..90bd45d5de1c --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/1.json @@ -0,0 +1,5 @@ +[ +{"_key":"key0","a":0}, +{"_key":"key1","a":1}, +{"_key":"key2","a":2} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/Readme.md new file mode 100644 index 000000000000..a3df15b92838 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/Readme.md @@ -0,0 +1,2 @@ +Valid data file with incomplete log.json. The last entry in the log file has +the data part missing, which will fail the log file to be read. diff --git a/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/expected.json b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/expected.json new file mode 100644 index 000000000000..f691496a5b23 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/expected.json @@ -0,0 +1,9 @@ +{ + "txid": 1, + "datafile": "1.json", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/log.json b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/log.json new file mode 100644 index 000000000000..50462314cb6a --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/log.json @@ -0,0 +1 @@ +{"op":"set", "id": 2} diff --git a/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/meta.json b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/meta.json new file mode 100644 index 000000000000..bcb050a28217 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/incomplete_op/meta.json @@ -0,0 +1 @@ +{"version": "1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/Readme.md new file mode 100644 index 000000000000..0c8ac8f51abd --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/Readme.md @@ -0,0 +1,3 @@ +Checkpoint file does not exist yet. All entries are read from the log file. +Contents is missing with the last operation in the log file triggereing a parse error. + diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/expected.json b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/expected.json new file mode 100644 index 000000000000..356fb55782d9 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/expected.json @@ -0,0 +1,12 @@ +{ + "txid": 6, + "datafile": "", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2}, + "key3": {"a": 3}, + "key4": {"a": 4}, + "key5": {"a": 5} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/log.json b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/log.json new file mode 100644 index 000000000000..a0cc0a36eb91 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/log.json @@ -0,0 +1,13 @@ +{"op":"set", "id": 1} +{"K":"key0","V":{"a":0}} +{"op":"set", "id": 2} +{"K":"key1","V":{"a":1}} +{"op":"set", "id": 3} +{"K":"key2","V":{"a":2}} +{"op":"set", "id": 4} +{"K":"key3","V":{"a":3}} +{"op":"set", "id": 5} +{"K":"key4","V":{"a":4}} +{"op":"set", "id": 6} +{"K":"key5","V":{"a":5}} +{"op":"set", diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/meta.json b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/meta.json new file mode 100644 index 000000000000..cd576d0b504a --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_incomplete/meta.json @@ -0,0 +1 @@ +{"version":"1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_only/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/Readme.md new file mode 100644 index 000000000000..725042043f30 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/Readme.md @@ -0,0 +1,2 @@ +Checkpoint file does not exist yet. All entries are read from the log file. + diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_only/expected.json b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/expected.json new file mode 100644 index 000000000000..356fb55782d9 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/expected.json @@ -0,0 +1,12 @@ +{ + "txid": 6, + "datafile": "", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2}, + "key3": {"a": 3}, + "key4": {"a": 4}, + "key5": {"a": 5} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_only/log.json b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/log.json new file mode 100644 index 000000000000..f00679f22555 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/log.json @@ -0,0 +1,12 @@ +{"op":"set", "id": 1} +{"K":"key0","V":{"a":0}} +{"op":"set", "id": 2} +{"K":"key1","V":{"a":1}} +{"op":"set", "id": 3} +{"K":"key2","V":{"a":2}} +{"op":"set", "id": 4} +{"K":"key3","V":{"a":3}} +{"op":"set", "id": 5} +{"K":"key4","V":{"a":4}} +{"op":"set", "id": 6} +{"K":"key5","V":{"a":5}} diff --git a/libbeat/statestore/backend/memlog/testdata/1/logfile_only/meta.json b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/meta.json new file mode 100644 index 000000000000..cd576d0b504a --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/logfile_only/meta.json @@ -0,0 +1 @@ +{"version":"1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/1.json b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/1.json new file mode 100644 index 000000000000..0460d15de490 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/1.json @@ -0,0 +1,8 @@ +[ +{"_key":"key0","a":0}, +{"_key":"key1","a":1}, +{"_key":"key2","a":2}, +{"_key":"key3","a":3}, +{"_key":"key4","a":4}, +{"_key":"key5","a":5} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/2.json b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/2.json new file mode 100644 index 000000000000..c3d3f12f859a --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/2.json @@ -0,0 +1,3 @@ +[ +{"_key":"key0","a":0} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/3.json b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/3.json new file mode 100644 index 000000000000..9ac6abf9862f --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/3.json @@ -0,0 +1,5 @@ +[ +{"_key":"key0","a":30}, +{"_key":"key1","a":31}, +{"_key":"key2","a":32} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/Readme.md new file mode 100644 index 000000000000..cdea11598196 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/Readme.md @@ -0,0 +1,2 @@ +Store with old data files that failed to be cleaned up. Only entries from data file 3.json should be loaded. + diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/expected.json b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/expected.json new file mode 100644 index 000000000000..f671203914aa --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/expected.json @@ -0,0 +1,9 @@ +{ + "txid": 3, + "datafile": "3.json", + "entries": { + "key0": {"a": 30}, + "key1": {"a": 31}, + "key2": {"a": 32} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/meta.json b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/meta.json new file mode 100644 index 000000000000..bcb050a28217 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_datafiles/meta.json @@ -0,0 +1 @@ +{"version": "1"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/5.json b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/5.json new file mode 100644 index 000000000000..90bd45d5de1c --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/5.json @@ -0,0 +1,5 @@ +[ +{"_key":"key0","a":0}, +{"_key":"key1","a":1}, +{"_key":"key2","a":2} +] diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/Readme.md b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/Readme.md new file mode 100644 index 000000000000..9d99a9ae95aa --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/Readme.md @@ -0,0 +1,3 @@ +Due to restart the log file was not truncated after the last checkpoint was +written. Update with ID 0 adds the removed key0 again to the store. All entries +in log.json should be ignored. diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/expected.json b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/expected.json new file mode 100644 index 000000000000..4e9547000e72 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/expected.json @@ -0,0 +1,9 @@ +{ + "txid": 5, + "datafile": "5.json", + "entries": { + "key0": {"a": 0}, + "key1": {"a": 1}, + "key2": {"a": 2} + } +} diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/log.json b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/log.json new file mode 100644 index 000000000000..426ce6a4f619 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/log.json @@ -0,0 +1,8 @@ +{"op":"set", "id": 1} +{"K":"key0","V":{"a":0}} +{"op":"set", "id": 2} +{"K":"key1","V":{"a":1}} +{"op":"set", "id": 3} +{"K":"key2","V":{"a":2}} +{"op":"remove", "id": 4} +{"K":"key0"} diff --git a/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/meta.json b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/meta.json new file mode 100644 index 000000000000..bcb050a28217 --- /dev/null +++ b/libbeat/statestore/backend/memlog/testdata/1/old_entries_in_log/meta.json @@ -0,0 +1 @@ +{"version": "1"} diff --git a/libbeat/statestore/backend/memlog/util.go b/libbeat/statestore/backend/memlog/util.go index f02311392b9e..2027c87adca0 100644 --- a/libbeat/statestore/backend/memlog/util.go +++ b/libbeat/statestore/backend/memlog/util.go @@ -20,6 +20,7 @@ package memlog import ( "io" "os" + "runtime" "syscall" ) @@ -76,3 +77,45 @@ func trySyncPath(path string) { defer f.Close() syncFile(f) } + +// pathEnsurePermissions checks if the file permissions for the given file match wantPerm. +// The permissions are updated using chmod if needed. +// No file will be created if the file does not yet exist. +func pathEnsurePermissions(path string, wantPerm os.FileMode) error { + f, err := os.OpenFile(path, os.O_RDWR, wantPerm) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + + defer f.Close() + return fileEnsurePermissions(f, wantPerm) +} + +// fileEnsurePermissions checks if the file permissions for the given file +// matches wantPerm. If not fileEnsurePermissions tries to update +// the current permissions via chmod. +// The file is not created or updated if it does not exist. +func fileEnsurePermissions(f *os.File, wantPerm os.FileMode) error { + if runtime.GOOS == "windows" { + return nil + } + + fi, err := f.Stat() + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + + wantPerm = wantPerm & os.ModePerm + perm := fi.Mode() & os.ModePerm + if wantPerm == perm { + return nil + } + + return f.Chmod((fi.Mode() &^ os.ModePerm) | wantPerm) +}