Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

Commit

Permalink
Merge pull request #454 from vishh/release-0.16.0
Browse files Browse the repository at this point in the history
Cherry picks for v0.16.1
  • Loading branch information
vishh committed Aug 5, 2015
2 parents fc13ee3 + 06e59aa commit f6352ce
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
all: build

TAG = v0.14.3
TAG = v0.16.1
PREFIX = gcr.io/google_containers
FLAGS =

Expand Down
4 changes: 4 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Notes

## 0.16.1 (8-4-2015)
- Fix metrics API to export the latest point.
- Garbage collect old elements from the cache.

## 0.16.0 (7-7-2015)
- Add new sink for GCM autoscaling.
- Force heapster to use kubernetes v1.0 API.
Expand Down
2 changes: 1 addition & 1 deletion heapster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
argPort = flag.Int("port", 8082, "port to listen to")
argIp = flag.String("listen_ip", "", "IP to listen on, defaults to all IPs")
argMaxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used simultaneously. Less than 1 for default (number of cores).")
argCacheDuration = flag.Duration("cache_duration", 10*time.Minute, "The total duration of the historical data that will be cached by heapster.")
argCacheDuration = flag.Duration("cache_duration", 5*time.Minute, "The total duration of the historical data that will be cached by heapster.")
argSources Uris
argSinks Uris
)
Expand Down
4 changes: 2 additions & 2 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewManager(sources []source_api.Source, sinkManager sinks.ExternalSinkManag
return &realManager{
sources: sources,
sinkManager: sinkManager,
cache: cache.NewCache(bufferDuration),
cache: cache.NewCache(bufferDuration, time.Minute),
lastSync: time.Now(),
resolution: res,
decoder: sink_api.NewDecoder(),
Expand Down Expand Up @@ -168,6 +168,6 @@ func trimStatsForContainers(containers []*cache.ContainerElement) []*cache.Conta
// Only keep the latest stats data point.
func onlyKeepLatestStat(cont *cache.ContainerElement) {
if len(cont.Metrics) > 1 {
cont.Metrics = cont.Metrics[len(cont.Metrics)-1:]
cont.Metrics = cont.Metrics[0:1]
}
}
2 changes: 1 addition & 1 deletion schema/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func podElementFactory() *cache.PodElement {
// The cache contains two pods, one with two containers and one without any containers.
// The cache also contains a free container and a "machine"-tagged container.
func cacheFactory() cache.Cache {
source_cache := cache.NewCache(time.Hour)
source_cache := cache.NewCache(time.Hour, time.Hour)

// Generate 4 ContainerMetricElements
cme_1 := cmeFactory()
Expand Down
1 change: 1 addition & 0 deletions sinks/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ContainerMetricElement struct {

type ContainerElement struct {
Metadata
// Data points are in reverse chronological order (most recent to oldest).
Metrics []*ContainerMetricElement
}

Expand Down
78 changes: 61 additions & 17 deletions sinks/cache/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import (

source_api "github.com/GoogleCloudPlatform/heapster/sources/api"
"github.com/GoogleCloudPlatform/heapster/store"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

const rootContainer = "/"

type containerElement struct {
lastUpdated time.Time
Metadata
metrics store.TimeStore
}

type podElement struct {
lastUpdated time.Time
Metadata
// map of container name to container element.
containers map[string]*containerElement
Expand All @@ -50,7 +53,7 @@ type realCache struct {
pods map[string]*podElement
// Map of node hostnames to node cache entry.
nodes map[string]*nodeElement
lock sync.RWMutex
sync.RWMutex
}

func (rc *realCache) newContainerElement() *containerElement {
Expand All @@ -59,13 +62,49 @@ func (rc *realCache) newContainerElement() *containerElement {
}
}

func (rc *realCache) newpodElement() *podElement {
func (rc *realCache) isTooOld(lastUpdated time.Time) bool {
if time.Now().Sub(lastUpdated) >= rc.bufferDuration {
return true
}
return false
}

func (rc *realCache) runGC() {
rc.Lock()
defer rc.Unlock()
for podName, podElem := range rc.pods {
for contName, contElem := range podElem.containers {
if rc.isTooOld(contElem.lastUpdated) {
delete(podElem.containers, contName)
}
}
if rc.isTooOld(podElem.lastUpdated) {
delete(rc.pods, podName)
}
}

for nodeName, nodeElem := range rc.nodes {
if rc.isTooOld(nodeElem.node.lastUpdated) {
delete(rc.nodes, nodeName)
// There is nothing to do for this node, since the entire node element
// has been deleted.
continue
}
for contName, contElem := range nodeElem.freeContainers {
if rc.isTooOld(contElem.lastUpdated) {
delete(nodeElem.freeContainers, contName)
}
}
}
}

func (rc *realCache) newPodElement() *podElement {
return &podElement{
containers: make(map[string]*containerElement),
}
}

func (rc *realCache) newnodeElement() *nodeElement {
func (rc *realCache) newNodeElement() *nodeElement {
return &nodeElement{
node: rc.newContainerElement(),
freeContainers: make(map[string]*containerElement),
Expand All @@ -92,12 +131,12 @@ func storeSpecAndStats(ce *containerElement, c *source_api.Container) {
}

func (rc *realCache) StorePods(pods []source_api.Pod) error {
rc.lock.Lock()
defer rc.lock.Unlock()
rc.Lock()
defer rc.Unlock()
for _, pod := range pods {
pe, ok := rc.pods[pod.ID]
if !ok {
pe = rc.newpodElement()
pe = rc.newPodElement()
pe.Metadata = Metadata{
Name: pod.Name,
Namespace: pod.Namespace,
Expand All @@ -121,20 +160,22 @@ func (rc *realCache) StorePods(pods []source_api.Pod) error {
Hostname: cont.Hostname,
}
storeSpecAndStats(ce, cont)
ce.lastUpdated = time.Now()
}
pe.lastUpdated = time.Now()
}
return nil
}

func (rc *realCache) StoreContainers(containers []source_api.Container) error {
rc.lock.Lock()
defer rc.lock.Unlock()
rc.Lock()
defer rc.Unlock()

for idx := range containers {
cont := &containers[idx]
ne, ok := rc.nodes[cont.Hostname]
if !ok {
ne = rc.newnodeElement()
ne = rc.newNodeElement()
rc.nodes[cont.Hostname] = ne
}
var ce *containerElement
Expand All @@ -158,13 +199,14 @@ func (rc *realCache) StoreContainers(containers []source_api.Container) error {
}
}
storeSpecAndStats(ce, cont)
ce.lastUpdated = time.Now()
}
return nil
}

func (rc *realCache) GetPods(start, end time.Time) []*PodElement {
rc.lock.RLock()
defer rc.lock.RUnlock()
rc.RLock()
defer rc.RUnlock()
var result []*PodElement
for _, pe := range rc.pods {
podElement := &PodElement{
Expand All @@ -188,8 +230,8 @@ func (rc *realCache) GetPods(start, end time.Time) []*PodElement {
}

func (rc *realCache) GetNodes(start, end time.Time) []*ContainerElement {
rc.lock.RLock()
defer rc.lock.RUnlock()
rc.RLock()
defer rc.RUnlock()
var result []*ContainerElement
for _, ne := range rc.nodes {
ce := &ContainerElement{
Expand All @@ -206,8 +248,8 @@ func (rc *realCache) GetNodes(start, end time.Time) []*ContainerElement {
}

func (rc *realCache) GetFreeContainers(start, end time.Time) []*ContainerElement {
rc.lock.RLock()
defer rc.lock.RUnlock()
rc.RLock()
defer rc.RUnlock()
var result []*ContainerElement
for _, ne := range rc.nodes {
for _, ce := range ne.freeContainers {
Expand All @@ -225,10 +267,12 @@ func (rc *realCache) GetFreeContainers(start, end time.Time) []*ContainerElement
return result
}

func NewCache(bufferDuration time.Duration) Cache {
return &realCache{
func NewCache(bufferDuration, gcDuration time.Duration) Cache {
rc := &realCache{
pods: make(map[string]*podElement),
nodes: make(map[string]*nodeElement),
bufferDuration: bufferDuration,
}
go util.Until(rc.runGC, gcDuration, util.NeverStop)
return rc
}
26 changes: 24 additions & 2 deletions sinks/cache/cache_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,24 @@ import (
)

func TestFuzz(t *testing.T) {
cache := NewCache(time.Hour)
cache := NewCache(time.Hour, time.Second)
var (
pods []source_api.Pod
containers []source_api.Container
)
f := fuzz.New().NumElements(2, 10).NilChance(0)
f.Fuzz(&pods)
f.Fuzz(&containers)
assert := assert.New(t)
assert.NoError(cache.StorePods(pods))
assert.NoError(cache.StoreContainers(containers))
time.Sleep(5 * time.Second)
zeroTime := time.Time{}
assert.NotEmpty(cache.GetPods(zeroTime, zeroTime))
}

func TestGC(t *testing.T) {
cache := NewCache(time.Millisecond, time.Second)
var (
pods []source_api.Pod
containers []source_api.Container
Expand All @@ -40,6 +57,10 @@ func TestFuzz(t *testing.T) {
zeroTime := time.Time{}
assert.NotEmpty(cache.GetFreeContainers(zeroTime, zeroTime))
assert.NotEmpty(cache.GetPods(zeroTime, zeroTime))
// Expect all data to be deleted after 2 seconds.
time.Sleep(10 * time.Second)
assert.Empty(cache.GetFreeContainers(zeroTime, zeroTime))
assert.Empty(cache.GetPods(zeroTime, zeroTime))
}

func getContainer(name string) source_api.Container {
Expand Down Expand Up @@ -92,7 +113,7 @@ func TestRealCacheData(t *testing.T) {
Containers: containers,
},
}
cache := NewCache(time.Hour)
cache := NewCache(time.Hour, time.Hour)
assert := assert.New(t)
assert.NoError(cache.StorePods(pods))
assert.NoError(cache.StoreContainers(containers))
Expand All @@ -116,6 +137,7 @@ func TestRealCacheData(t *testing.T) {
for _, expectedContainer := range containers {
ce, exists := actualContainerMap[expectedContainer.Name]
assert.True(exists)
assert.NotNil(ce.Metrics)
assert.NotEmpty(ce.Metrics)
}
}
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package version
// Increment major number for new feature additions and behavioral changes.
// Increment minor number for bug fixes and performance enhancements.
// Increment patch number for critical fixes to existing releases.
const HeapsterVersion = "0.16.0"
const HeapsterVersion = "0.16.1"

0 comments on commit f6352ce

Please sign in to comment.