Skip to content

Commit

Permalink
Ensure kubernetes caches don't expire if they are being read (elastic…
Browse files Browse the repository at this point in the history
…#10946)

Some metrics in metricbeat kubernetes module are cached during a time,
if they are not updated they are removed. But it is usual to have pods or
containers that are not updated during more time that the expiration cache.
Current implementation was not renovating expiration times for cache
entries so all were eventually removed if updates for them are not received.
Replace it with the cache implementation available in libbeat, but keeping
the existing interface.

Also, use slashes instead of dashes to generate unique container uids.
Dashes can be used by kubernetes names, what could lead to ambiguous
keys for the caches.

Fix elastic#10658
  • Loading branch information
jsoriano authored Mar 4, 2019
1 parent b6c56e1 commit db4b4c2
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added function to close sql database connection. {pull}10355[10355]
- Fix issue with `elasticsearch/node_stats` metricset (x-pack) not indexing `source_node` field. {pull}10639[10639]
- Migrate docker autodiscover to ECS. {issue}10757[10757] {pull}10862[10862]
- Fix issue in kubernetes module preventing usage percentages to be properly calculated. {pull}10946[10946]

*Packetbeat*

Expand Down
95 changes: 44 additions & 51 deletions metricbeat/module/kubernetes/util/metrics_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package util

import (
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
)

// PerfMetrics stores known metrics from Kubernetes nodes and containers
var PerfMetrics = NewPerfMetricsCache()

const defaultTimeout = 120 * time.Second
func init() {
PerfMetrics.Start()
}

var now = time.Now
var sleep = time.Sleep
const defaultTimeout = 120 * time.Second

// NewPerfMetricsCache initializes and returns a new PerfMetricsCache
func NewPerfMetricsCache() *PerfMetricsCache {
Expand All @@ -43,80 +45,71 @@ func NewPerfMetricsCache() *PerfMetricsCache {

// PerfMetricsCache stores known metrics from Kubernetes nodes and containers
type PerfMetricsCache struct {
mutex sync.RWMutex
NodeMemAllocatable *valueMap
NodeCoresAllocatable *valueMap

ContainerMemLimit *valueMap
ContainerCoresLimit *valueMap
}

func newValueMap(timeout time.Duration) *valueMap {
return &valueMap{
values: map[string]value{},
timeout: timeout,
}
// Start cache workers
func (c *PerfMetricsCache) Start() {
c.NodeMemAllocatable.Start()
c.NodeCoresAllocatable.Start()
c.ContainerMemLimit.Start()
c.ContainerCoresLimit.Start()
}

type valueMap struct {
sync.RWMutex
running bool
timeout time.Duration
values map[string]value
// Stop cache workers
func (c *PerfMetricsCache) Stop() {
c.NodeMemAllocatable.Stop()
c.NodeCoresAllocatable.Stop()
c.ContainerMemLimit.Stop()
c.ContainerCoresLimit.Stop()
}

type value struct {
value float64
expires int64
type valueMap struct {
cache *common.Cache
timeout time.Duration
}

// ContainerUID creates an unique ID for from namespace, pod name and container name
func ContainerUID(namespace, pod, container string) string {
return namespace + "-" + pod + "-" + container
func newValueMap(timeout time.Duration) *valueMap {
return &valueMap{
cache: common.NewCache(timeout, 0),
timeout: timeout,
}
}

// Get value
func (m *valueMap) Get(name string) float64 {
m.RLock()
defer m.RUnlock()
return m.values[name].value
return m.GetWithDefault(name, 0.0)
}

// Get value
func (m *valueMap) GetWithDefault(name string, def float64) float64 {
m.RLock()
defer m.RUnlock()
val, ok := m.values[name]
if ok {
return val.value
v := m.cache.Get(name)
if v, ok := v.(float64); ok {
return v
}
return def
}

// Set value
func (m *valueMap) Set(name string, val float64) {
m.Lock()
defer m.Unlock()
m.ensureCleanupWorker()
m.values[name] = value{val, now().Add(m.timeout).Unix()}
m.cache.PutWithTimeout(name, val, m.timeout)
}

func (m *valueMap) ensureCleanupWorker() {
if !m.running {
// Run worker to cleanup expired entries
m.running = true
go func() {
for {
sleep(m.timeout)
m.Lock()
now := now().Unix()
for name, val := range m.values {
if now > val.expires {
delete(m.values, name)
}
}
m.Unlock()
}
}()
}
// Start cache workers
func (m *valueMap) Start() {
m.cache.StartJanitor(m.timeout)
}

// Stop cache workers
func (m *valueMap) Stop() {
m.cache.StopJanitor()
}

// ContainerUID creates an unique ID for from namespace, pod name and container name
func ContainerUID(namespace, pod, container string) string {
return namespace + "/" + pod + "/" + container
}
37 changes: 1 addition & 36 deletions metricbeat/module/kubernetes/util/metrics_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,10 @@ package util

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTimeout(t *testing.T) {
// Mock monotonic time:
fakeTimeCh := make(chan int64)
go func() {
fakeTime := time.Now().Unix()
for {
fakeTime++
fakeTimeCh <- fakeTime
}
}()

now = func() time.Time {
return time.Unix(<-fakeTimeCh, 0)
}

// Blocking sleep:
sleepCh := make(chan struct{})
sleep = func(time.Duration) {
<-sleepCh
}

test := newValueMap(1 * time.Second)

test.Set("foo", 3.14)

// Let cleanup do its job
sleepCh <- struct{}{}
sleepCh <- struct{}{}
sleepCh <- struct{}{}

// Check it expired
assert.Equal(t, 0.0, test.Get("foo"))
}

func TestValueMap(t *testing.T) {
test := newValueMap(defaultTimeout)

Expand All @@ -82,5 +47,5 @@ func TestGetWithDefault(t *testing.T) {
}

func TestContainerUID(t *testing.T) {
assert.Equal(t, "a-b-c", ContainerUID("a", "b", "c"))
assert.Equal(t, "a/b/c", ContainerUID("a", "b", "c"))
}

0 comments on commit db4b4c2

Please sign in to comment.