Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(operator): Allow structured metadata only if V13 schema provided #13463

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions operator/internal/manifests/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

// BuildDistributor returns a list of k8s objects for Loki Distributor
Expand Down Expand Up @@ -44,6 +45,10 @@ func BuildDistributor(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureDeployment(deployment, opts.ObjectStorage); err != nil {
return nil, err
}

if err := configureHashRingEnv(&deployment.Spec.Template.Spec, opts); err != nil {
return nil, err
}
Expand Down
56 changes: 33 additions & 23 deletions operator/internal/manifests/internal/config/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -371,7 +371,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -797,7 +797,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -1155,7 +1155,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -1514,7 +1514,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -1911,7 +1911,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -2241,7 +2241,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -2680,7 +2680,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -3004,7 +3004,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -3501,7 +3501,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_addr: ${HASH_RING_INSTANCE_ADDR}
Expand Down Expand Up @@ -3762,7 +3762,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_addr: ${HASH_RING_INSTANCE_ADDR}
Expand Down Expand Up @@ -4024,7 +4024,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -4287,7 +4287,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -4586,7 +4586,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -4880,7 +4880,7 @@ limits_config:
query_timeout: 1m
volume_enabled: true
volume_max_series: 1000
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -5124,11 +5124,12 @@ func defaultOptions() Options {

func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
for _, tc := range []struct {
name string
schemaConfig []lokiv1.ObjectStorageSchema
shippers []string
expSchemaConfig string
expStorageConfig string
name string
schemaConfig []lokiv1.ObjectStorageSchema
shippers []string
expSchemaConfig string
expStorageConfig string
expStructuredMetadata string
}{
{
name: "default_config_v11_schema",
Expand Down Expand Up @@ -5156,6 +5157,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: false`,
},
{
name: "v12_schema",
Expand Down Expand Up @@ -5183,6 +5186,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: false`,
},
{
name: "v13_schema",
Expand Down Expand Up @@ -5210,6 +5215,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: true`,
},
{
name: "multiple_schema",
Expand Down Expand Up @@ -5266,6 +5273,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: true`,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -5366,7 +5375,7 @@ limits_config:
query_timeout: 1m
volume_enabled: true
volume_max_series: 1000
allow_structured_metadata: true
${STORAGE_STRUCTURED_METADATA}
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -5416,6 +5425,7 @@ analytics:
`
expCfg = strings.Replace(expCfg, "${SCHEMA_CONFIG}", tc.expSchemaConfig, 1)
expCfg = strings.Replace(expCfg, "${STORAGE_CONFIG}", tc.expStorageConfig, 1)
expCfg = strings.Replace(expCfg, "${STORAGE_STRUCTURED_METADATA}", tc.expStructuredMetadata, 1)

opts := defaultOptions()
opts.ObjectStorage.Schemas = tc.schemaConfig
Expand Down Expand Up @@ -5540,7 +5550,7 @@ limits_config:
query_timeout: 1m
volume_enabled: true
volume_max_series: 1000
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -5712,7 +5722,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ limits_config:
enabled: true
desired_rate: {{ . }}MB
{{- end }}
allow_structured_metadata: true
allow_structured_metadata: {{ .ObjectStorage.AllowStructuredMetadata }}
{{- with .GossipRing }}
memberlist:
abort_if_cluster_join_fails: true
Expand Down
49 changes: 49 additions & 0 deletions operator/internal/manifests/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

func TestNewTimeoutConfig_ReturnsDefaults_WhenLimitsSpecEmpty(t *testing.T) {
Expand Down Expand Up @@ -190,3 +191,51 @@ func TestNewTimeoutConfig_ReturnsDefaults_WhenTenantQueryTimeoutParseError(t *te
_, err := NewTimeoutConfig(s.Spec.Limits)
require.Error(t, err)
}

func TestAllowStructuredMetadata_ReturnsTrue(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaV13},
},
},
}

require.True(t, opts.ObjectStorage.AllowStructuredMetadata())
}

func TestAllowStructuredMetadata_ReturnsTrue_WithFutureVersions(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaVersion("v14")},
},
},
}

require.True(t, opts.ObjectStorage.AllowStructuredMetadata())
}

func TestAllowStructuredMetadata_ReturnsFalse_WithV11(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaV11},
},
},
}

require.False(t, opts.ObjectStorage.AllowStructuredMetadata())
}

func TestAllowStructuredMetadata_ReturnsFalse_WithV12(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaV12},
},
},
}

require.False(t, opts.ObjectStorage.AllowStructuredMetadata())
}
5 changes: 5 additions & 0 deletions operator/internal/manifests/query-frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

// BuildQueryFrontend returns a list of k8s objects for Loki QueryFrontend
Expand Down Expand Up @@ -44,6 +45,10 @@ func BuildQueryFrontend(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureDeployment(deployment, opts.ObjectStorage); err != nil {
return nil, err
}

if err := configureHashRingEnv(&deployment.Spec.Template.Spec, opts); err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions operator/internal/manifests/storage/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func configureDeployment(d *appsv1.Deployment, opts Options) error {
return kverrors.Wrap(err, "failed to merge gcs object storage spec ")
}

p = ensureObjectStoreStructuredMetadata(&d.Spec.Template.Spec, opts)
if err := mergo.Merge(&d.Spec.Template.Spec, p, mergo.WithOverride); err != nil {
return kverrors.Wrap(err, "failed to merge object store structured metadata CLI arguments ")
}

return nil
}

Expand All @@ -98,6 +103,11 @@ func configureStatefulSet(s *appsv1.StatefulSet, opts Options) error {
return kverrors.Wrap(err, "failed to merge gcs object storage spec ")
}

p = ensureObjectStoreStructuredMetadata(&s.Spec.Template.Spec, opts)
if err := mergo.Merge(&s.Spec.Template.Spec, p, mergo.WithOverride); err != nil {
return kverrors.Wrap(err, "failed to merge object store structured metadata CLI arguments ")
}

return nil
}

Expand All @@ -116,6 +126,22 @@ func configureStatefulSetCA(s *appsv1.StatefulSet, tls *TLSConfig) error {
return nil
}

func ensureObjectStoreStructuredMetadata(p *corev1.PodSpec, opts Options) corev1.PodSpec {
if !opts.AllowStructuredMetadata() {
return *p
}

container := p.Containers[0].DeepCopy()

container.Args = append(container.Args, "-validation.allow-structured-metadata=false")
periklis marked this conversation as resolved.
Show resolved Hide resolved

return corev1.PodSpec{
Containers: []corev1.Container{
*container,
},
}
}

func ensureObjectStoreCredentials(p *corev1.PodSpec, opts Options) corev1.PodSpec {
container := p.Containers[0].DeepCopy()
volumes := p.Volumes
Expand Down
Loading
Loading