-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Init package libbeat/statestore (#19117)
Initialize support for the statestore package. The addition of the statestore package is split up into multiple changeset to ease review. The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore). Once finalized, the libbeat/statestore package contains: - The statestore frontend and interface for use within Beats - Interfaces for the store backend - A common set of tests store backends need to support - a storetest package for testing new features that require a store. The testing helpers use map[string]interface{} that can be initialized or queried after the test run for validation purposes. - The default memlog backend + tests This change includes the frontend and backend interfaces only. Once merged we will add the tests and finally the memlog store.
- Loading branch information
Steffen Siering
authored
Jun 12, 2020
1 parent
eaf5e2f
commit 1fc18c2
Showing
50 changed files
with
3,778 additions
and
1,073 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package backend | ||
|
||
// Registry provides access to stores managed by the backend storage. | ||
type Registry interface { | ||
// Access opens a store. The store will be closed by the frontend, once all | ||
// accessed stores have been closed. | ||
// | ||
// The Store instance returned must be threadsafe. | ||
Access(name string) (Store, error) | ||
|
||
// Close is called on shutdown after all stores have been closed. | ||
// An implementation of Registry is not required to check for the stores to be closed. | ||
Close() error | ||
} | ||
|
||
// ValueDecoder is used to decode values into go structs or maps within a transaction. | ||
// A ValueDecoder is supposed to be invalidated by beats after the loop operations has returned. | ||
type ValueDecoder interface { | ||
Decode(to interface{}) error | ||
} | ||
|
||
// Store provides access to key value pairs. | ||
type Store interface { | ||
// Close should close the store and release all used resources. | ||
Close() error | ||
|
||
// Has checks if the key exists. No error must be returned if the key does | ||
// not exists, but the bool return must be false. | ||
// An error return value must indicate internal errors only. The store is | ||
// assumed to be in a 'bad' but recoverable state if 'Has' fails. | ||
Has(key string) (bool, error) | ||
|
||
// Get decodes the value for the given key into value. | ||
// Besides internal implementation specific errors an error is assumed | ||
// to be returned if the key does not exist or the type of the value | ||
// passed is incompatible to the actual value in the store (decoding error). | ||
Get(key string, value interface{}) error | ||
|
||
// Set inserts or overwrites a key pair in the store. | ||
// The `value` parameters can be assumed to be a struct or a map. Besides | ||
// internal implementation specific errors, an error should be returned if | ||
// the value given can not be encoded. | ||
Set(key string, value interface{}) error | ||
|
||
// Remove removes and entry from the store. | ||
Remove(string) error | ||
|
||
// Each loops over all key value pairs in the store calling fn for each pair. | ||
// The ValueDecoder is used by fn to optionally decode the value into a | ||
// custom struct or map. The decoder must be executable multiple times, but | ||
// is assumed to be invalidated once fn returns | ||
// The loop shall return if fn returns an error or false. | ||
Each(fn func(string, ValueDecoder) (bool, error)) error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package statestore | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
) | ||
|
||
// ErrorAccess indicates that an error occured when trying to open a Store. | ||
type ErrorAccess struct { | ||
name string | ||
cause error | ||
} | ||
|
||
// Store reports the name of the store that could not been accessed. | ||
func (e *ErrorAccess) Store() string { return e.name } | ||
|
||
// Unwrap returns the cause for the error or nil if the cause is unknown or has | ||
// not been reported by the backend | ||
func (e *ErrorAccess) Unwrap() error { return e.cause } | ||
|
||
// Error creates a descriptive error string. | ||
func (e *ErrorAccess) Error() string { | ||
if e.cause == nil { | ||
return fmt.Sprintf("failed to open store '%v'", e.name) | ||
} | ||
return fmt.Sprintf("failed to open store '%v': %v", e.name, e.cause) | ||
} | ||
|
||
// ErrorClosed indicates that the operation failed because the store has already been closed. | ||
type ErrorClosed struct { | ||
name string | ||
operation string | ||
} | ||
|
||
// Store reports the name of the store that has been closed. | ||
func (e *ErrorClosed) Store() string { return e.name } | ||
|
||
// Operation returns a 'readable' name for the operation that failed to access the closed store. | ||
func (e *ErrorClosed) Operation() string { return e.operation } | ||
|
||
// Error creates a descriptive error string. | ||
func (e *ErrorClosed) Error() string { | ||
return fmt.Sprintf("can not executed %v operation on closed store '%v'", e.operation, e.name) | ||
} | ||
|
||
// ErrorOperation is returned when a generic store operation failed. | ||
type ErrorOperation struct { | ||
name string | ||
operation string | ||
cause error | ||
} | ||
|
||
// Store reports the name of the store. | ||
func (e *ErrorOperation) Store() string { return e.name } | ||
|
||
// Operation returns a 'readable' name for the operation that failed. | ||
func (e *ErrorOperation) Operation() string { return e.operation } | ||
|
||
// Unwrap returns the cause of the failure. | ||
func (e *ErrorOperation) Unwrap() error { return e.cause } | ||
|
||
// Error creates a descriptive error string. | ||
func (e *ErrorOperation) Error() string { | ||
return fmt.Sprintf("failed in %v operation on store '%v': %v", e.operation, e.name, e.cause) | ||
} | ||
|
||
// IsClosed returns true if the cause for an Error is ErrorClosed. | ||
func IsClosed(err error) bool { | ||
var tmp *ErrorClosed | ||
if errors.As(err, &tmp) { | ||
return true | ||
} | ||
return false | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package statestore | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/elastic/beats/v7/libbeat/statestore/backend" | ||
) | ||
|
||
// Registry manages multiple key-value stores. | ||
// When working with a registry, one must access a store. Depending on backend | ||
// a store can be an index, a table, or a directory. All access to a store is | ||
// handled by transaction. | ||
type Registry struct { | ||
backend backend.Registry | ||
|
||
mu sync.Mutex | ||
active map[string]*sharedStore // active/open stores | ||
wg sync.WaitGroup | ||
} | ||
|
||
// ValueDecoder is used to decode retrieved from an actual store. A | ||
// ValueDecoder instance is valid for the lifetime of the transaction only. | ||
type ValueDecoder = backend.ValueDecoder | ||
|
||
// NewRegistry creates a new Registry with a configured backend. | ||
func NewRegistry(backend backend.Registry) *Registry { | ||
return &Registry{ | ||
backend: backend, | ||
active: map[string]*sharedStore{}, | ||
} | ||
} | ||
|
||
// Close closes the backend storage. Close blocks until all stores in use are closed. | ||
func (r *Registry) Close() error { | ||
r.wg.Wait() // wait for all stores being closed | ||
return r.backend.Close() | ||
} | ||
|
||
// Get opens a shared store. A store is closed and released only after all it's | ||
// users have closed the store. | ||
func (r *Registry) Get(name string) (*Store, error) { | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
shared := r.active[name] | ||
if shared == nil { | ||
backend, err := r.backend.Access(name) | ||
if err != nil { | ||
return nil, &ErrorAccess{name: name, cause: err} | ||
} | ||
|
||
shared = newSharedStore(r, name, backend) | ||
defer shared.Release() | ||
|
||
r.active[name] = shared | ||
r.wg.Add(1) | ||
} | ||
|
||
return newStore(shared), nil | ||
} | ||
|
||
func (r *Registry) unregisterStore(s *sharedStore) { | ||
_, exists := r.active[s.name] | ||
if !exists { | ||
panic("removing an unknown store") | ||
} | ||
|
||
delete(r.active, s.name) | ||
r.wg.Done() | ||
} |
Oops, something went wrong.