From c98c09e3344fc4a2680f027f22cdd277b88fbda6 Mon Sep 17 00:00:00 2001 From: nate Date: Wed, 4 Nov 2020 14:45:54 -0500 Subject: [PATCH] Allow AggregationOptions to be set for a namespace. (#248) --- docs/api.md | 47 +++++++++++ pkg/apis/m3dboperator/v1alpha1/namespace.go | 38 +++++++++ .../v1alpha1/zz_generated.deepcopy.go | 80 ++++++++++++++++++- pkg/m3admin/namespace/namespace.go | 60 +++++++++++--- pkg/m3admin/namespace/namespace_test.go | 69 ++++++++++++++++ pkg/m3admin/namespace/presets.go | 20 +++++ 6 files changed, 304 insertions(+), 10 deletions(-) diff --git a/docs/api.md b/docs/api.md index 4125b05b..8a060e62 100644 --- a/docs/api.md +++ b/docs/api.md @@ -12,6 +12,10 @@ This document enumerates the Custom Resource Definitions used by the M3DB Operat * [M3DBClusterList](#m3dbclusterlist) * [M3DBStatus](#m3dbstatus) * [NodeAffinityTerm](#nodeaffinityterm) +* [AggregatedAttributes](#aggregatedattributes) +* [Aggregation](#aggregation) +* [AggregationOptions](#aggregationoptions) +* [DownsampleOptions](#downsampleoptions) * [IndexOptions](#indexoptions) * [Namespace](#namespace) * [NamespaceOptions](#namespaceoptions) @@ -145,6 +149,48 @@ NodeAffinityTerm represents a node label and a set of label values, any of which [Back to TOC](#table-of-contents) +## AggregatedAttributes + +AggregatedAttributes are attributes specifying how data points are aggregated. + +| Field | Description | Scheme | Required | +| ----- | ----------- | ------ | -------- | +| resolution | Resolution is the time range to aggregate data across. | string | false | +| downsampleOptions | DownsampleOptions stores options for downsampling data points. | *[DownsampleOptions](#downsampleoptions) | false | + +[Back to TOC](#table-of-contents) + +## Aggregation + +Aggregation describes data points within a namespace. + +| Field | Description | Scheme | Required | +| ----- | ----------- | ------ | -------- | +| aggregated | Aggregated indicates whether data points are aggregated or not. | bool | false | +| attributes | Attributes defines how data is aggregated when Aggregated is set to true. This field is ignored when aggregated is false. | [AggregatedAttributes](#aggregatedattributes) | false | + +[Back to TOC](#table-of-contents) + +## AggregationOptions + +AggregationOptions is a set of options for aggregating data within the namespace. + +| Field | Description | Scheme | Required | +| ----- | ----------- | ------ | -------- | +| aggregations | Aggregations are the aggregations for a namespace. | [][Aggregation](#aggregation) | false | + +[Back to TOC](#table-of-contents) + +## DownsampleOptions + +DownsampleOptions is a set of options related to downsampling data. + +| Field | Description | Scheme | Required | +| ----- | ----------- | ------ | -------- | +| all | All indicates whether to send data points to this namespace. If set to false, this namespace will not receive data points. In this case, data will need to be sent to the namespace via another mechanism (e.g. rollup/recording rules). | bool | false | + +[Back to TOC](#table-of-contents) + ## IndexOptions IndexOptions defines parameters for indexing. @@ -183,6 +229,7 @@ NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.gith | retentionOptions | RetentionOptions sets the retention parameters. | [RetentionOptions](#retentionoptions) | false | | indexOptions | IndexOptions sets the indexing parameters. | [IndexOptions](#indexoptions) | false | | coldWritesEnabled | ColdWritesEnabled controls whether cold writes are enabled. | bool | false | +| aggregationOptions | AggregationOptions sets the aggregation parameters. | [AggregationOptions](#aggregationoptions) | false | [Back to TOC](#table-of-contents) diff --git a/pkg/apis/m3dboperator/v1alpha1/namespace.go b/pkg/apis/m3dboperator/v1alpha1/namespace.go index f2f5ff51..3ad02973 100644 --- a/pkg/apis/m3dboperator/v1alpha1/namespace.go +++ b/pkg/apis/m3dboperator/v1alpha1/namespace.go @@ -63,6 +63,41 @@ type IndexOptions struct { BlockSize string `json:"blockSize,omitempty"` } +// AggregationOptions is a set of options for aggregating data +// within the namespace. +type AggregationOptions struct { + // Aggregations are the aggregations for a namespace. + Aggregations []Aggregation `json:"aggregations,omitempty"` +} + +// Aggregation describes data points within a namespace. +type Aggregation struct { + // Aggregated indicates whether data points are aggregated or not. + Aggregated bool `json:"aggregated,omitempty"` + + // Attributes defines how data is aggregated when Aggregated is set to true. + // This field is ignored when aggregated is false. + Attributes AggregatedAttributes `json:"attributes,omitempty"` +} + +// AggregatedAttributes are attributes specifying how data points are aggregated. +type AggregatedAttributes struct { + // Resolution is the time range to aggregate data across. + Resolution string `json:"resolution,omitempty"` + + // DownsampleOptions stores options for downsampling data points. + DownsampleOptions *DownsampleOptions `json:"downsampleOptions,omitempty"` +} + +// DownsampleOptions is a set of options related to downsampling data. +type DownsampleOptions struct { + // All indicates whether to send data points to this namespace. + // If set to false, this namespace will not receive data points. In this + // case, data will need to be sent to the namespace via another mechanism + // (e.g. rollup/recording rules). + All bool `json:"all,omitempty"` +} + // NamespaceOptions defines parameters for an M3DB namespace. See // https://m3db.github.io/m3/operational_guide/namespace_configuration/ for more // details. @@ -93,4 +128,7 @@ type NamespaceOptions struct { // ColdWritesEnabled controls whether cold writes are enabled. ColdWritesEnabled bool `json:"coldWritesEnabled,omitempty"` + + // AggregationOptions sets the aggregation parameters. + AggregationOptions AggregationOptions `json:"aggregationOptions,omitempty"` } diff --git a/pkg/apis/m3dboperator/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/m3dboperator/v1alpha1/zz_generated.deepcopy.go index ad9ca552..645ead7e 100644 --- a/pkg/apis/m3dboperator/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/m3dboperator/v1alpha1/zz_generated.deepcopy.go @@ -29,6 +29,67 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AggregatedAttributes) DeepCopyInto(out *AggregatedAttributes) { + *out = *in + if in.DownsampleOptions != nil { + in, out := &in.DownsampleOptions, &out.DownsampleOptions + *out = new(DownsampleOptions) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AggregatedAttributes. +func (in *AggregatedAttributes) DeepCopy() *AggregatedAttributes { + if in == nil { + return nil + } + out := new(AggregatedAttributes) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Aggregation) DeepCopyInto(out *Aggregation) { + *out = *in + in.Attributes.DeepCopyInto(&out.Attributes) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Aggregation. +func (in *Aggregation) DeepCopy() *Aggregation { + if in == nil { + return nil + } + out := new(Aggregation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AggregationOptions) DeepCopyInto(out *AggregationOptions) { + *out = *in + if in.Aggregations != nil { + in, out := &in.Aggregations, &out.Aggregations + *out = make([]Aggregation, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AggregationOptions. +func (in *AggregationOptions) DeepCopy() *AggregationOptions { + if in == nil { + return nil + } + out := new(AggregationOptions) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterCondition) DeepCopyInto(out *ClusterCondition) { *out = *in @@ -169,6 +230,22 @@ func (in *ClusterSpec) DeepCopy() *ClusterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DownsampleOptions) DeepCopyInto(out *DownsampleOptions) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DownsampleOptions. +func (in *DownsampleOptions) DeepCopy() *DownsampleOptions { + if in == nil { + return nil + } + out := new(DownsampleOptions) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExternalCoordinatorConfig) DeepCopyInto(out *ExternalCoordinatorConfig) { *out = *in @@ -341,7 +418,7 @@ func (in *Namespace) DeepCopyInto(out *Namespace) { if in.Options != nil { in, out := &in.Options, &out.Options *out = new(NamespaceOptions) - **out = **in + (*in).DeepCopyInto(*out) } return } @@ -361,6 +438,7 @@ func (in *NamespaceOptions) DeepCopyInto(out *NamespaceOptions) { *out = *in out.RetentionOptions = in.RetentionOptions out.IndexOptions = in.IndexOptions + in.AggregationOptions.DeepCopyInto(&out.AggregationOptions) return } diff --git a/pkg/m3admin/namespace/namespace.go b/pkg/m3admin/namespace/namespace.go index 0fbdc132..bd8fcb5e 100644 --- a/pkg/m3admin/namespace/namespace.go +++ b/pkg/m3admin/namespace/namespace.go @@ -90,16 +90,22 @@ func m3dbNamespaceOptsFromSpec(opts *myspec.NamespaceOptions) (*m3ns.NamespaceOp return nil, err } + aggOpts, err := m3dbAggregationOptsFromSpec(opts.AggregationOptions) + if err != nil { + return nil, err + } + return &m3ns.NamespaceOptions{ - BootstrapEnabled: opts.BootstrapEnabled, - FlushEnabled: opts.FlushEnabled, - WritesToCommitLog: opts.WritesToCommitLog, - CleanupEnabled: opts.CleanupEnabled, - RepairEnabled: opts.RepairEnabled, - RetentionOptions: retentionOpts, - SnapshotEnabled: opts.SnapshotEnabled, - IndexOptions: indexOpts, - ColdWritesEnabled: opts.ColdWritesEnabled, + BootstrapEnabled: opts.BootstrapEnabled, + FlushEnabled: opts.FlushEnabled, + WritesToCommitLog: opts.WritesToCommitLog, + CleanupEnabled: opts.CleanupEnabled, + RepairEnabled: opts.RepairEnabled, + RetentionOptions: retentionOpts, + SnapshotEnabled: opts.SnapshotEnabled, + IndexOptions: indexOpts, + ColdWritesEnabled: opts.ColdWritesEnabled, + AggregationOptions: aggOpts, }, nil } @@ -150,3 +156,39 @@ func m3dbIndexOptsFromSpec(opts myspec.IndexOptions) (*m3ns.IndexOptions, error) BlockSizeNanos: blockSize.Nanoseconds(), }, nil } + +func m3dbAggregationOptsFromSpec(opts myspec.AggregationOptions) (*m3ns.AggregationOptions, error) { + if len(opts.Aggregations) == 0 { + return nil, nil + } + + aggs := make([]*m3ns.Aggregation, 0, len(opts.Aggregations)) + for _, specAgg := range opts.Aggregations { + agg := &m3ns.Aggregation{Aggregated: specAgg.Aggregated} + if agg.Aggregated { + resolution, err := time.ParseDuration(specAgg.Attributes.Resolution) + if err != nil { + return nil, fmt.Errorf("failed to parse aggregation option Resolution: %w", err) + } + + agg.Attributes = &m3ns.AggregatedAttributes{ + ResolutionNanos: resolution.Nanoseconds(), + } + + if specAgg.Attributes.DownsampleOptions == nil { + agg.Attributes.DownsampleOptions = &m3ns.DownsampleOptions{All: true} + } else { + agg.Attributes.DownsampleOptions = &m3ns.DownsampleOptions{ + All: specAgg.Attributes.DownsampleOptions.All, + } + } + } + + aggs = append(aggs, agg) + } + + return &m3ns.AggregationOptions{ + Aggregations: aggs, + }, nil + +} diff --git a/pkg/m3admin/namespace/namespace_test.go b/pkg/m3admin/namespace/namespace_test.go index 90c62bb9..22f0576c 100644 --- a/pkg/m3admin/namespace/namespace_test.go +++ b/pkg/m3admin/namespace/namespace_test.go @@ -79,6 +79,11 @@ func TestRequestFromSpec(t *testing.T) { BlockSize: "1s", Enabled: true, }, + AggregationOptions: myspec.AggregationOptions{ + Aggregations: []myspec.Aggregation{ + {Aggregated: false}, + }, + }, }, }, req: &admin.NamespaceAddRequest{ @@ -97,6 +102,70 @@ func TestRequestFromSpec(t *testing.T) { BlockSizeNanos: 1000000000, Enabled: true, }, + AggregationOptions: &m3ns.AggregationOptions{ + Aggregations: []*m3ns.Aggregation{ + {Aggregated: false}, + }, + }, + }, + }, + }, + { + ns: myspec.Namespace{ + Name: "aggregated", + Options: &myspec.NamespaceOptions{ + BootstrapEnabled: true, + RetentionOptions: myspec.RetentionOptions{ + RetentionPeriod: "1s", + BlockSize: "1s", + BufferFuture: "1s", + BufferPast: "1s", + BlockDataExpiry: true, + BlockDataExpiryAfterNotAccessPeriod: "1s", + }, + IndexOptions: myspec.IndexOptions{ + BlockSize: "1s", + Enabled: true, + }, + AggregationOptions: myspec.AggregationOptions{ + Aggregations: []myspec.Aggregation{ + { + Aggregated: true, + Attributes: myspec.AggregatedAttributes{ + Resolution: "1s", + }, + }, + }, + }, + }, + }, + req: &admin.NamespaceAddRequest{ + Name: "aggregated", + Options: &m3ns.NamespaceOptions{ + BootstrapEnabled: true, + RetentionOptions: &m3ns.RetentionOptions{ + RetentionPeriodNanos: 1000000000, + BlockSizeNanos: 1000000000, + BufferFutureNanos: 1000000000, + BufferPastNanos: 1000000000, + BlockDataExpiry: true, + BlockDataExpiryAfterNotAccessPeriodNanos: 1000000000, + }, + IndexOptions: &m3ns.IndexOptions{ + BlockSizeNanos: 1000000000, + Enabled: true, + }, + AggregationOptions: &m3ns.AggregationOptions{ + Aggregations: []*m3ns.Aggregation{ + { + Aggregated: true, + Attributes: &m3ns.AggregatedAttributes{ + ResolutionNanos: 1000000000, + DownsampleOptions: &m3ns.DownsampleOptions{All: true}, + }, + }, + }, + }, }, }, }, diff --git a/pkg/m3admin/namespace/presets.go b/pkg/m3admin/namespace/presets.go index d809be47..fc348298 100644 --- a/pkg/m3admin/namespace/presets.go +++ b/pkg/m3admin/namespace/presets.go @@ -62,6 +62,16 @@ var ( BlockSize: (2 * time.Hour).String(), }, ColdWritesEnabled: false, + AggregationOptions: myspec.AggregationOptions{ + Aggregations: []myspec.Aggregation{ + { + Aggregated: true, + Attributes: myspec.AggregatedAttributes{ + Resolution: (10 * time.Second).String(), + }, + }, + }, + }, } presetOneMinuteFourtyDaysIndexed = myspec.NamespaceOptions{ @@ -84,5 +94,15 @@ var ( BlockSize: (24 * time.Hour).String(), }, ColdWritesEnabled: false, + AggregationOptions: myspec.AggregationOptions{ + Aggregations: []myspec.Aggregation{ + { + Aggregated: true, + Attributes: myspec.AggregatedAttributes{ + Resolution: (1 * time.Minute).String(), + }, + }, + }, + }, } )