Skip to content

Commit

Permalink
fix(operator): Allow structured metadata only if V13 schema provided (#…
Browse files Browse the repository at this point in the history
…13463)

Co-authored-by: Robert Jacob <[email protected]>
  • Loading branch information
periklis and xperimental authored Jul 16, 2024
1 parent d4179aa commit 3ac130b
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 30 deletions.
17 changes: 16 additions & 1 deletion operator/internal/handlers/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func BuildOptions(ctx context.Context, k k8s.Client, stack *lokiv1.LokiStack, fg
}
}

now := time.Now().UTC()
storageSchemas, err := storage.BuildSchemaConfig(
time.Now().UTC(),
now,
stack.Spec.Storage,
stack.Status.Storage,
)
Expand All @@ -59,6 +60,7 @@ func BuildOptions(ctx context.Context, k k8s.Client, stack *lokiv1.LokiStack, fg
}

objStore.Schemas = storageSchemas
objStore.AllowStructuredMetadata = allowStructuredMetadata(storageSchemas, now)

if stack.Spec.Storage.TLS == nil {
return objStore, nil
Expand Down Expand Up @@ -98,3 +100,16 @@ func BuildOptions(ctx context.Context, k k8s.Client, stack *lokiv1.LokiStack, fg

return objStore, nil
}

func allowStructuredMetadata(schemas []lokiv1.ObjectStorageSchema, now time.Time) bool {
activeVersion := lokiv1.ObjectStorageSchemaV11
for _, s := range schemas {
time, _ := s.EffectiveDate.UTCTime()
if time.Before(now) {
activeVersion = s.Version
}
}

return activeVersion != lokiv1.ObjectStorageSchemaV11 &&
activeVersion != lokiv1.ObjectStorageSchemaV12
}
122 changes: 122 additions & 0 deletions operator/internal/handlers/internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -560,3 +561,124 @@ func TestBuildOptions_WhenInvalidCAConfigMap_SetDegraded(t *testing.T) {
require.Error(t, err)
require.Equal(t, degradedErr, err)
}

func TestAllowStructuredMetadata(t *testing.T) {
testTime := time.Date(2024, 7, 1, 1, 0, 0, 0, time.UTC)
tt := []struct {
desc string
schemas []lokiv1.ObjectStorageSchema
wantAllow bool
}{
{
desc: "disallow - no schemas",
schemas: []lokiv1.ObjectStorageSchema{},
wantAllow: false,
},
{
desc: "disallow - only v12",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV12,
EffectiveDate: "2024-07-01",
},
},
wantAllow: false,
},
{
desc: "allow - only v13",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-07-01",
},
},
wantAllow: true,
},
{
desc: "disallow - v13 in future",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV12,
EffectiveDate: "2024-07-01",
},
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-07-02",
},
},
wantAllow: false,
},
{
desc: "disallow - v13 in past",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-06-01",
},
{
Version: lokiv1.ObjectStorageSchemaV12,
EffectiveDate: "2024-07-01",
},
},
wantAllow: false,
},
{
desc: "disallow - v13 in past and future",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-06-01",
},
{
Version: lokiv1.ObjectStorageSchemaV12,
EffectiveDate: "2024-07-01",
},
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-07-02",
},
},
wantAllow: false,
},
{
desc: "allow - v13 active",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV12,
EffectiveDate: "2024-06-01",
},
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-07-01",
},
},
wantAllow: true,
},
{
desc: "allow - v13 active, v12 in future",
schemas: []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV13,
EffectiveDate: "2024-07-01",
},
{
Version: lokiv1.ObjectStorageSchemaV12,
EffectiveDate: "2024-08-01",
},
},
wantAllow: true,
},
}

for _, tc := range tt {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()

allow := allowStructuredMetadata(tc.schemas, testTime)
if allow != tc.wantAllow {
t.Errorf("got %v, want %v", allow, tc.wantAllow)
}
})
}
}
Loading

0 comments on commit 3ac130b

Please sign in to comment.