From c7f4728d71e4ed629c0a2c46eb7c00e239e0782b Mon Sep 17 00:00:00 2001 From: Brandon Wagner Date: Fri, 29 Jul 2022 18:30:19 -0500 Subject: [PATCH 1/2] fix pricing sync with mutex on refresh --- pkg/ec2pricing/odpricing.go | 8 ++++++++ pkg/ec2pricing/spotpricing.go | 8 ++++++++ pkg/selector/selector.go | 2 +- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/ec2pricing/odpricing.go b/pkg/ec2pricing/odpricing.go index f4ea64b..a1fd988 100644 --- a/pkg/ec2pricing/odpricing.go +++ b/pkg/ec2pricing/odpricing.go @@ -23,6 +23,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -44,6 +45,7 @@ type OnDemandPricing struct { DirectoryPath string cache *cache.Cache pricingClient pricingiface.PricingAPI + sync.RWMutex } func LoadODCacheOrNew(pricingClient pricingiface.PricingAPI, region string, fullRefreshTTL time.Duration, directoryPath string) *OnDemandPricing { @@ -111,6 +113,8 @@ func odCacheRefreshJob(odPricing *OnDemandPricing) { } func (c *OnDemandPricing) Refresh() error { + c.Lock() + defer c.Unlock() odInstanceTypeCosts, err := c.fetchOnDemandPricing("") if err != nil { return fmt.Errorf("there was a problem refreshing the on-demand instance type pricing cache: %v", err) @@ -128,6 +132,8 @@ func (c *OnDemandPricing) Get(instanceType string) (float64, error) { if cost, ok := c.cache.Get(instanceType); ok { return cost.(float64), nil } + c.RLock() + defer c.RUnlock() costs, err := c.fetchOnDemandPricing(instanceType) if err != nil { return 0, fmt.Errorf("there was a problem fetching on-demand instance type pricing for %s: %v", instanceType, err) @@ -154,6 +160,8 @@ func (c *OnDemandPricing) Save() error { } func (c *OnDemandPricing) Clear() error { + c.Lock() + defer c.Unlock() c.cache.Flush() return os.Remove(getODCacheFilePath(c.Region, c.DirectoryPath)) } diff --git a/pkg/ec2pricing/spotpricing.go b/pkg/ec2pricing/spotpricing.go index 9c09764..0d12593 100644 --- a/pkg/ec2pricing/spotpricing.go +++ b/pkg/ec2pricing/spotpricing.go @@ -23,6 +23,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -43,6 +44,7 @@ type SpotPricing struct { DirectoryPath string cache *cache.Cache ec2Client ec2iface.EC2API + sync.RWMutex } type spotPricingEntry struct { @@ -118,6 +120,8 @@ func spotCacheRefreshJob(spotPricing *SpotPricing, days int) { } func (c *SpotPricing) Refresh(days int) error { + c.Lock() + defer c.Unlock() spotInstanceTypeCosts, err := c.fetchSpotPricingTimeSeries("", days) if err != nil { return fmt.Errorf("there was a problem refreshing the spot instance type pricing cache: %v", err) @@ -139,6 +143,8 @@ func (c *SpotPricing) Get(instanceType string, zone string, days int) (float64, } } if !ok { + c.RLock() + defer c.RUnlock() zonalSpotPricing, err := c.fetchSpotPricingTimeSeries(instanceType, days) if err != nil { return -1, fmt.Errorf("there was a problem fetching spot instance type pricing for %s: %v", instanceType, err) @@ -224,6 +230,8 @@ func (c *SpotPricing) Save() error { } func (c *SpotPricing) Clear() error { + c.Lock() + defer c.Unlock() c.cache.Flush() return os.Remove(getSpotCacheFilePath(c.Region, c.DirectoryPath)) } diff --git a/pkg/selector/selector.go b/pkg/selector/selector.go index 4088d2d..953932c 100644 --- a/pkg/selector/selector.go +++ b/pkg/selector/selector.go @@ -261,7 +261,7 @@ func (itf Selector) prepareFilter(filters Filters, instanceTypeInfo instancetype if itf.EC2Pricing.OnDemandCacheCount() > 0 { price, err := itf.EC2Pricing.GetOnDemandInstanceTypeCost(instanceTypeName) if err != nil { - log.Printf("Could not retrieve instantaneous hourly on-demand price for instance type %s\n", instanceTypeName) + log.Printf("Could not retrieve instantaneous hourly on-demand price for instance type %s - %s\n", instanceTypeName, err) } else { instanceTypeHourlyPriceOnDemand = &price instanceTypeInfo.OndemandPricePerHour = instanceTypeHourlyPriceOnDemand From 19ddc3da3a9b2d345bdd5c83c23661dc9f6c6a3d Mon Sep 17 00:00:00 2001 From: Brandon Wagner Date: Fri, 29 Jul 2022 18:39:02 -0500 Subject: [PATCH 2/2] expire cache items on load --- pkg/ec2pricing/odpricing.go | 4 +++- pkg/ec2pricing/spotpricing.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/ec2pricing/odpricing.go b/pkg/ec2pricing/odpricing.go index a1fd988..1cc99a6 100644 --- a/pkg/ec2pricing/odpricing.go +++ b/pkg/ec2pricing/odpricing.go @@ -93,7 +93,9 @@ func loadODCacheFrom(itemTTL time.Duration, region string, expandedDirPath strin if err := json.Unmarshal(cacheBytes, odCache); err != nil { return nil, err } - return cache.NewFrom(itemTTL, itemTTL, *odCache), nil + c := cache.NewFrom(itemTTL, itemTTL, *odCache) + c.DeleteExpired() + return c, nil } func getODCacheFilePath(region string, directoryPath string) string { diff --git a/pkg/ec2pricing/spotpricing.go b/pkg/ec2pricing/spotpricing.go index 0d12593..6f4bc6d 100644 --- a/pkg/ec2pricing/spotpricing.go +++ b/pkg/ec2pricing/spotpricing.go @@ -100,7 +100,9 @@ func loadSpotCacheFrom(itemTTL time.Duration, region string, expandedDirPath str if err := decoder.Decode(spotTimeSeries); err != nil { return nil, err } - return cache.NewFrom(itemTTL, itemTTL, *spotTimeSeries), nil + c := cache.NewFrom(itemTTL, itemTTL, *spotTimeSeries) + c.DeleteExpired() + return c, nil } func getSpotCacheFilePath(region string, directoryPath string) string {