Skip to content

Commit

Permalink
Add data stream support
Browse files Browse the repository at this point in the history
Check whether data stream is supported by ES. If supported, at data stream
configuration to index template. Data streams work with timeseries data,
configure @timestamp as timestamp field.
  • Loading branch information
simitt committed Jul 7, 2020
1 parent bdcce73 commit eb7a7d2
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions idxmgmt/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,17 @@ func (m *manager) setupManaged(templateFeature, ilmFeature feature) error {
}
// load index template for default apm index - with disabled index pattern
composedOf := []string{baseComponentName}
if err := m.loadIndexTemplate(ilmFeature, baseComponentName, disabledPattern(), composedOf); err != nil {
if err := m.loadIndexTemplate(ilmFeature, baseComponentName, disabledPattern(), composedOf, false); err != nil {
return err
}

// load index template for onboarding and sourcemap index
for _, t := range []string{"sourcemap", "onboarding"} {
name := fmt.Sprintf("%s-%s", common.APMPrefix, t)
composedOf := []string{baseComponentName}
m.loadIndexTemplate(ilmFeature, name, name+patternSuffix, composedOf)
m.loadIndexTemplate(ilmFeature, name, name+patternSuffix, composedOf, false)
}

// load event specific templates, policies and aliases
var policiesLoaded []string
var err error
Expand All @@ -142,16 +143,20 @@ func (m *manager) setupManaged(templateFeature, ilmFeature feature) error {
}
name := ilmSupporter.Alias().Name

withDataStreams := m.clientHandler.SupportsDataStream()
// load component template per event type (with ILM settings)
m.loadEventComponentTemplate(ilmFeature, name, ilmSupporter.Policy().Name)
m.loadEventComponentTemplate(ilmFeature, name, ilmSupporter.Policy().Name, withDataStreams)

// load index template per event type
composedOf := []string{baseComponentName, name}
m.loadIndexTemplate(ilmFeature, name, name+patternSuffix, composedOf)

// load ilm write aliases AFTER template creation
if err = m.loadLegacyAlias(ilmFeature, ilmSupporter); err != nil {
return err
m.loadIndexTemplate(ilmFeature, name, name+patternSuffix, composedOf, withDataStreams)

// only load regular ILM write alias if data streams are not supported
if !withDataStreams {
// load ilm write aliases AFTER template creation
if err = m.loadLegacyAlias(ilmFeature, ilmSupporter); err != nil {
return err
}
}
}
return nil
Expand All @@ -170,12 +175,12 @@ func (m *manager) setupUnmanaged(templateFeature, ilmFeature feature) error {
// load index template for onboarding and sourcemap index with non-matching index pattern
for _, t := range []string{"sourcemap", "onboarding"} {
name := fmt.Sprintf("%s-%s", common.APMPrefix, t)
m.loadIndexTemplate(ilmFeature, name, disabledPattern(), []string{})
m.loadIndexTemplate(ilmFeature, name, disabledPattern(), []string{}, false)
}
// load event specific index templates with non-matching index pattern
for _, ilmSupporter := range m.supporter.ilmSupporters {
name := ilmSupporter.Alias().Name
m.loadIndexTemplate(ilmFeature, name, disabledPattern(), []string{})
m.loadIndexTemplate(ilmFeature, name, disabledPattern(), []string{}, false)
}

// load generic component template
Expand All @@ -192,7 +197,7 @@ func (m *manager) setupUnmanaged(templateFeature, ilmFeature feature) error {
pattern = baseComponentName + patternSuffix
}
composedOf := []string{baseComponentName}
if err := m.loadIndexTemplate(ilmFeature, baseComponentName, pattern, composedOf); err != nil {
if err := m.loadIndexTemplate(ilmFeature, baseComponentName, pattern, composedOf, false); err != nil {
return err
}

Expand Down Expand Up @@ -322,7 +327,7 @@ func (m *manager) loadGenericComponentTemplate(templateFeature feature, name str
return m.loadTemplate(cfg, m.assets.Fields(m.supporter.info.Beat))
}

func (m *manager) loadEventComponentTemplate(ilmFeature feature, name, policyName string) error {
func (m *manager) loadEventComponentTemplate(ilmFeature feature, name, policyName string, withDataStream bool) error {
if !ilmFeature.load {
return nil
}
Expand All @@ -332,8 +337,10 @@ func (m *manager) loadEventComponentTemplate(ilmFeature feature, name, policyNam
cfg.Enabled = ilmFeature.load
cfg.Overwrite = ilmFeature.overwrite
cfg.Settings.Index = map[string]interface{}{
"lifecycle.name": policyName,
"lifecycle.rollover_alias": cfg.Name,
"lifecycle.name": policyName,
}
if !withDataStream {
cfg.Settings.Index["lifecycle.rollover_alias"] = cfg.Name
}
return m.loadTemplate(cfg, nil)
}
Expand All @@ -346,14 +353,17 @@ func (m *manager) loadTemplate(config template.TemplateConfig, fields []byte) er
return nil
}

func (m *manager) loadIndexTemplate(ilmFeature feature, name, pattern string, composedOf []string) error {
func (m *manager) loadIndexTemplate(ilmFeature feature, name, pattern string, composedOf []string, withDataStream bool) error {
cfg := template.DefaultConfig()
cfg.Kind = template.KindIndex
cfg.Name = name
cfg.Pattern = pattern
cfg.ComposedOf = composedOf
cfg.Enabled = ilmFeature.load
cfg.Overwrite = ilmFeature.overwrite
if withDataStream {
cfg.DataStream = map[string]string{"timestamp_field": "@timestamp"}
}
return m.loadTemplate(cfg, nil)
}

Expand Down

0 comments on commit eb7a7d2

Please sign in to comment.