-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[storagetest] Build out package for more flexible test cases (#13086)
The storagetest package exists to enable testing the usage of storage extensions. This PR extends and cleans up the package so that it can be used for a far larger set of test cases.
- Loading branch information
1 parent
40d2489
commit 4830b44
Showing
23 changed files
with
523 additions
and
141 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package storagetest // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/extension/experimental/storage" | ||
) | ||
|
||
var ( | ||
errClientClosed = errors.New("client closed") | ||
) | ||
|
||
type TestClient struct { | ||
cache map[string][]byte | ||
cacheMux sync.Mutex | ||
|
||
kind component.Kind | ||
id config.ComponentID | ||
name string | ||
|
||
storageFile string | ||
|
||
closed bool | ||
} | ||
|
||
// NewInMemoryClient creates a storage.Client that functions as a map[string][]byte | ||
// This is useful for tests that do not involve collector restart behavior. | ||
func NewInMemoryClient(kind component.Kind, id config.ComponentID, name string) *TestClient { | ||
return &TestClient{ | ||
cache: make(map[string][]byte), | ||
kind: kind, | ||
id: id, | ||
name: name, | ||
} | ||
} | ||
|
||
// NewFileBackedClient creates a storage.Client that will load previous | ||
// storage contents upon creation and save storage contents when closed. | ||
// It also has metadata which may be used to validate test expectations. | ||
func NewFileBackedClient(kind component.Kind, id config.ComponentID, name string, storageDir string) *TestClient { | ||
client := NewInMemoryClient(kind, id, name) | ||
|
||
client.storageFile = filepath.Join(storageDir, fmt.Sprintf("%d_%s_%s_%s", kind, id.Type(), id.Name(), name)) | ||
|
||
// Attempt to load previous storage content | ||
contents, err := os.ReadFile(client.storageFile) | ||
if err != nil { | ||
// Assume no previous storage content | ||
return client | ||
} | ||
|
||
previousCache := make(map[string][]byte) | ||
if err := json.Unmarshal(contents, &previousCache); err != nil { | ||
// Assume no previous storage content | ||
return client | ||
} | ||
|
||
client.cache = previousCache | ||
return client | ||
} | ||
|
||
func (p *TestClient) Get(_ context.Context, key string) ([]byte, error) { | ||
p.cacheMux.Lock() | ||
defer p.cacheMux.Unlock() | ||
if p.closed { | ||
return nil, errClientClosed | ||
} | ||
|
||
return p.cache[key], nil | ||
} | ||
|
||
func (p *TestClient) Set(_ context.Context, key string, value []byte) error { | ||
p.cacheMux.Lock() | ||
defer p.cacheMux.Unlock() | ||
if p.closed { | ||
return errClientClosed | ||
} | ||
|
||
p.cache[key] = value | ||
return nil | ||
} | ||
|
||
func (p *TestClient) Delete(_ context.Context, key string) error { | ||
p.cacheMux.Lock() | ||
defer p.cacheMux.Unlock() | ||
if p.closed { | ||
return errClientClosed | ||
} | ||
|
||
delete(p.cache, key) | ||
return nil | ||
} | ||
|
||
func (p *TestClient) Batch(_ context.Context, ops ...storage.Operation) error { | ||
p.cacheMux.Lock() | ||
defer p.cacheMux.Unlock() | ||
if p.closed { | ||
return errClientClosed | ||
} | ||
|
||
for _, op := range ops { | ||
switch op.Type { | ||
case storage.Get: | ||
op.Value = p.cache[op.Key] | ||
case storage.Set: | ||
p.cache[op.Key] = op.Value | ||
case storage.Delete: | ||
delete(p.cache, op.Key) | ||
default: | ||
return errors.New("wrong operation type") | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (p *TestClient) Close(_ context.Context) error { | ||
p.cacheMux.Lock() | ||
defer p.cacheMux.Unlock() | ||
|
||
p.closed = true | ||
|
||
if p.storageFile == "" { | ||
return nil | ||
} | ||
|
||
contents, err := json.Marshal(p.cache) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return os.WriteFile(p.storageFile, contents, os.FileMode(0600)) | ||
} | ||
|
||
// Kind of component that is using the storage client | ||
func (p *TestClient) Kind() component.Kind { | ||
return p.kind | ||
} | ||
|
||
// ID of component that is using the storage client | ||
func (p *TestClient) ID() config.ComponentID { | ||
return p.id | ||
} | ||
|
||
// Name assigned to the storage client | ||
func (p *TestClient) Name() string { | ||
return p.name | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package storagetest // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/extension/experimental/storage" | ||
) | ||
|
||
var testStorageType config.Type = "test_storage" | ||
|
||
// TestStorage is an in memory storage extension designed for testing | ||
type TestStorage struct { | ||
config.ExtensionSettings | ||
storageDir string | ||
clients []*TestClient | ||
} | ||
|
||
// Ensure this storage extension implements the appropriate interface | ||
var _ storage.Extension = (*TestStorage)(nil) | ||
|
||
// NewInMemoryStorageExtension creates a TestStorage extension | ||
func NewInMemoryStorageExtension(name string) *TestStorage { | ||
return &TestStorage{ | ||
ExtensionSettings: config.NewExtensionSettings( | ||
config.NewComponentIDWithName(testStorageType, name), | ||
), | ||
clients: []*TestClient{}, | ||
} | ||
} | ||
|
||
// NewFileBackedStorageExtension creates a TestStorage extension | ||
func NewFileBackedStorageExtension(name string, storageDir string) *TestStorage { | ||
return &TestStorage{ | ||
ExtensionSettings: config.NewExtensionSettings( | ||
config.NewComponentIDWithName(testStorageType, name), | ||
), | ||
storageDir: storageDir, | ||
} | ||
} | ||
|
||
// Start does nothing | ||
func (s *TestStorage) Start(context.Context, component.Host) error { | ||
return nil | ||
} | ||
|
||
// Shutdown does nothing | ||
func (s *TestStorage) Shutdown(ctx context.Context) error { | ||
return nil | ||
} | ||
|
||
// GetClient returns a storage client for an individual component | ||
func (s *TestStorage) GetClient(_ context.Context, kind component.Kind, ent config.ComponentID, name string) (storage.Client, error) { | ||
if s.storageDir == "" { | ||
return NewInMemoryClient(kind, ent, name), nil | ||
} | ||
return NewFileBackedClient(kind, ent, name, s.storageDir), nil | ||
} | ||
|
||
var nonStorageType config.Type = "non_storage" | ||
|
||
// NonStorage is useful for testing expected behaviors that involve | ||
// non-storage extensions | ||
type NonStorage struct { | ||
config.ExtensionSettings | ||
} | ||
|
||
// Ensure this extension implements the appropriate interface | ||
var _ component.Extension = (*NonStorage)(nil) | ||
|
||
// NewNonStorageExtension creates a NonStorage extension | ||
func NewNonStorageExtension(name string) *NonStorage { | ||
return &NonStorage{ | ||
ExtensionSettings: config.NewExtensionSettings( | ||
config.NewComponentIDWithName(nonStorageType, name), | ||
), | ||
} | ||
} | ||
|
||
// Start does nothing | ||
func (ns *NonStorage) Start(context.Context, component.Host) error { | ||
return nil | ||
} | ||
|
||
// Shutdown does nothing | ||
func (ns *NonStorage) Shutdown(context.Context) error { | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package storagetest | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/extension/experimental/storage" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestInMemoryLifecycle(t *testing.T) { | ||
ext := NewInMemoryStorageExtension("test") | ||
require.Equal(t, config.NewComponentIDWithName(testStorageType, "test"), ext.ID()) | ||
runExtensionLifecycle(t, ext, false) | ||
} | ||
|
||
func TestFileBackedLifecycle(t *testing.T) { | ||
dir := t.TempDir() | ||
ext := NewFileBackedStorageExtension("test", dir) | ||
require.Equal(t, config.NewComponentIDWithName(testStorageType, "test"), ext.ID()) | ||
runExtensionLifecycle(t, ext, true) | ||
} | ||
|
||
func runExtensionLifecycle(t *testing.T, ext storage.Extension, expectPersistence bool) { | ||
ctx := context.Background() | ||
require.NoError(t, ext.Start(ctx, componenttest.NewNopHost())) | ||
|
||
clientOne, err := ext.GetClient(ctx, component.KindProcessor, config.NewComponentID("foo"), "client_one") | ||
require.NoError(t, err) | ||
|
||
// Write a value, confirm it is saved | ||
require.NoError(t, clientOne.Set(ctx, "foo", []byte("bar"))) | ||
fooVal, err := clientOne.Get(ctx, "foo") | ||
require.NoError(t, err) | ||
require.Equal(t, []byte("bar"), fooVal) | ||
|
||
// Delete the value, confirm it is deleted | ||
require.NoError(t, clientOne.Delete(ctx, "foo")) | ||
fooVal, err = clientOne.Get(ctx, "foo") | ||
require.NoError(t, err) | ||
require.Nil(t, fooVal) | ||
|
||
// Write a new value, confirm it is saved | ||
require.NoError(t, clientOne.Set(ctx, "foo2", []byte("bar2"))) | ||
fooVal, err = clientOne.Get(ctx, "foo2") | ||
require.NoError(t, err) | ||
require.Equal(t, []byte("bar2"), fooVal) | ||
|
||
// Close first client | ||
require.NoError(t, clientOne.Close(ctx)) | ||
|
||
// Create new client to test persistence | ||
clientTwo, err := ext.GetClient(ctx, component.KindProcessor, config.NewComponentID("foo"), "client_one") | ||
require.NoError(t, err) | ||
|
||
// Check if the value is accessible from another client | ||
fooVal, err = clientTwo.Get(ctx, "foo2") | ||
require.NoError(t, err) | ||
if expectPersistence { | ||
require.Equal(t, []byte("bar2"), fooVal) | ||
} else { | ||
require.Nil(t, fooVal) | ||
} | ||
|
||
// Perform some additional operations | ||
set := storage.SetOperation("foo3", []byte("bar3")) | ||
get := storage.GetOperation("foo3") | ||
delete := storage.DeleteOperation("foo3") | ||
getNil := storage.GetOperation("foo3") | ||
require.NoError(t, clientTwo.Batch(ctx, set, get, delete, getNil)) | ||
require.Equal(t, get.Value, []byte("bar3")) | ||
require.Nil(t, getNil.Value) | ||
|
||
// Cleanup | ||
require.NoError(t, clientTwo.Close(ctx)) | ||
require.NoError(t, ext.Shutdown(ctx)) | ||
} |
Oops, something went wrong.