-
Notifications
You must be signed in to change notification settings - Fork 82
/
template.go
147 lines (121 loc) · 3.54 KB
/
template.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package esutil
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"github.com/rs/zerolog"
"github.com/elastic/go-elasticsearch/v8"
)
const (
templateVersion = 1
defaultSettings = `
{
"index.lifecycle.name": "%s"
}
`
defaultTemplate = `
{
"version": %v,
"index_patterns": [ "%s" ],
%s
"priority": 200,
"template": {
"mappings" : %s,
"settings" : %s
}
}
`
)
type Template struct {
Version int `json:"version"`
Settings map[string]interface{} `json:"settings"`
}
type AckResponse struct {
Acknowledged bool `json:"acknowledged"`
}
func EnsureTemplate(ctx context.Context, cli *elasticsearch.Client, name, mapping string, ilm bool) (err error) {
templateName := GetILMPolicyName(name)
// Get current template
res, err := cli.Indices.GetTemplate(
cli.Indices.GetTemplate.WithContext(ctx),
cli.Indices.GetTemplate.WithFlatSettings(true),
cli.Indices.GetTemplate.WithName(templateName),
)
if err != nil {
return err
}
defer res.Body.Close()
settings := "{}"
if ilm {
policyName := GetILMPolicyName(name)
settings = fmt.Sprintf(defaultSettings, policyName)
}
if res.StatusCode != http.StatusOK {
// Template not found, create a new one
return createTemplate(ctx, cli, name, templateVersion, settings, mapping, ilm)
}
// Decode template from response
var r map[string]Template
err = json.NewDecoder(res.Body).Decode(&r)
if err != nil {
return err
}
template, ok := r[name]
if !ok {
// Template not found, create a new one
return createTemplate(ctx, cli, name, templateVersion, settings, mapping, ilm)
}
// Check settings
zerolog.Ctx(ctx).Debug().Interface("settings", template.Settings).Msg("Found existing settings")
if template.Version >= templateVersion {
zerolog.Ctx(ctx).Info().
Int("current templated version", template.Version).
Int("new template version", templateVersion).
Msg("Skipping template creation because upstream version")
return nil
}
zerolog.Ctx(ctx).Info().
Int("current templated version", template.Version).
Int("new template version", templateVersion).
Msg("Creating template")
return createTemplate(ctx, cli, name, templateVersion, settings, mapping, ilm)
}
func createTemplate(ctx context.Context, cli *elasticsearch.Client, name string, templateVersion int, settings, mapping string, ilm bool) error {
zerolog.Ctx(ctx).Info().Str("name", name).Msg("Create template")
datastream := ""
if ilm {
datastream = `"data_stream": { },`
}
body := fmt.Sprintf(defaultTemplate, templateVersion, name, datastream, mapping, settings)
res, err := cli.Indices.PutIndexTemplate(name,
strings.NewReader(body),
cli.Indices.PutIndexTemplate.WithContext(ctx),
)
if err != nil {
return err
}
defer res.Body.Close()
err = checkResponseError(res, zerolog.Ctx(ctx))
if err != nil {
if errors.Is(err, ErrResourceAlreadyExists) {
zerolog.Ctx(ctx).Info().Str("name", name).Msg("Index template already exists")
return nil
}
return err
}
var r AckResponse
err = json.NewDecoder(res.Body).Decode(&r)
if err != nil {
return fmt.Errorf("failed to parse put template response: %v version: %v, err: %w", name, templateVersion, err)
}
if !r.Acknowledged {
return fmt.Errorf("failed to receive acknowledgment for put template request: %v version: %v", name, templateVersion)
}
return nil
}