Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetch syncs to redis #1446

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added ToTTLDuration
kwitsch committed Apr 17, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 66a7155650f4eac2c6dd7ab56c60569653baa39d
4 changes: 2 additions & 2 deletions resolver/caching_resolver.go
Original file line number Diff line number Diff line change
@@ -116,28 +116,28 @@

response, err := r.next.Resolve(ctx, req)
if err != nil {
logger.WithError(err).Warn("cache prefetch failed")

Check warning on line 119 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L119

Added line #L119 was not covered by tests

return nil, 0

Check warning on line 121 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L121

Added line #L121 was not covered by tests
}

cacheCopy, ttl := r.createCacheEntry(logger, response.Res)
if cacheCopy == nil || ttl == noCacheTTL {
return nil, 0

Check warning on line 126 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L126

Added line #L126 was not covered by tests
}

packed, err := cacheCopy.Pack()
if err != nil {
logger.WithError(err).WithError(err).Warn("response packing failed")

Check warning on line 131 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L131

Added line #L131 was not covered by tests

return nil, 0

Check warning on line 133 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L133

Added line #L133 was not covered by tests
}

if r.redisClient != nil {
r.redisClient.PublishCache(cacheKey, cacheCopy)

Check warning on line 137 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L137

Added line #L137 was not covered by tests
}

return &packed, time.Duration(ttl) * time.Second
return &packed, util.ToTTLDuration(ttl)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the biggest cause of dupe code is that here we need to return the new cache entry,
but in r.Resolver we need to call r.resultCache.Put.
So we could change the prefetch API to be more like the normal caching flow:
PrefetchingOptions.ReloadFn func(ctx context.Context, key string) (*T, time.Duration)
could be something like OnAboutToExpire func(ctx context.Context, key string),
where the callback is expected to call Put if they want to update the value.
That way reloadCacheEntry could become a tiny wrapper around r.Resolve (or a new resolve method they both call).

From a quick check, ReloadFn is only used by this code (and the cache tests),
and Put allows replacing a value, so I think that should be doable!

From there I think most things will fall into place :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input I think that was the part which caused me the most headaches. 😅

I also try to refactor the caches to streamline the resolver behavior. 👍

}

func (r *CachingResolver) redisSubscriber(ctx context.Context) {
@@ -228,7 +228,7 @@
if err != nil {
logger.Error("can't unpack cached entry. Cache malformed?", err)

return nil

Check warning on line 231 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L231

Added line #L231 was not covered by tests
}

// Adjust TTL
@@ -253,17 +253,17 @@
func (r *CachingResolver) putInCache(logger *logrus.Entry, cacheKey string, response *model.Response) *dns.Msg {
cacheCopy, ttl := r.createCacheEntry(logger, response.Res)
if cacheCopy == nil || ttl == noCacheTTL {
return nil

Check warning on line 256 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L256

Added line #L256 was not covered by tests
}

packed, err := cacheCopy.Pack()
if err != nil {
logger.WithError(err).Warn("response packing failed")

Check warning on line 261 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L261

Added line #L261 was not covered by tests

return nil

Check warning on line 263 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L263

Added line #L263 was not covered by tests
}

r.resultCache.Put(cacheKey, &packed, time.Duration(ttl)*time.Second)
r.resultCache.Put(cacheKey, &packed, util.ToTTLDuration(ttl))

return cacheCopy
}
@@ -281,7 +281,7 @@

// if response is not successful, return noCacheTTL since we don't cache these responses
if response.Rcode != dns.RcodeSuccess {
return 0

Check warning on line 284 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L284

Added line #L284 was not covered by tests
}

// adjust TTLs of all answers to match the configured min and max caching times
@@ -296,9 +296,9 @@

ttl := r.modifyResponseTTL(response)
if ttl == noCacheTTL {
logger.Debug("response is not cacheable")

Check warning on line 299 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L299

Added line #L299 was not covered by tests

return nil, 0

Check warning on line 301 in resolver/caching_resolver.go

Codecov / codecov/patch

resolver/caching_resolver.go#L301

Added line #L301 was not covered by tests
}

// don't cache any EDNS OPT records
10 changes: 10 additions & 0 deletions util/dns.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import (
"math"
"sync/atomic"
"time"

"github.com/miekg/dns"
)
@@ -20,90 +21,99 @@
// If the input is negative, the TTL is set to 0.
//
// If the input is greater than the maximum value of uint32, the TTL is set to math.MaxUint32.
func ToTTL[T TTLInput](input T) uint32 {

Check warning on line 24 in util/dns.go

Codecov / codecov/patch

util/dns.go#L24

Added line #L24 was not covered by tests
// use int64 as the intermediate type
res := int64(input)

Check warning on line 26 in util/dns.go

Codecov / codecov/patch

util/dns.go#L26

Added line #L26 was not covered by tests

// check if the input is of underlying type time.Duration
if durType, ok := any(input).(interface{ Seconds() float64 }); ok {
res = int64(durType.Seconds())

Check warning on line 30 in util/dns.go

Codecov / codecov/patch

util/dns.go#L29-L30

Added lines #L29 - L30 were not covered by tests
}

// check if the value is negative or greater than the maximum value of uint32
if res < 0 {

Check warning on line 34 in util/dns.go

Codecov / codecov/patch

util/dns.go#L34

Added line #L34 was not covered by tests
// there is no negative TTL
return 0
} else if res > math.MaxUint32 {

Check warning on line 37 in util/dns.go

Codecov / codecov/patch

util/dns.go#L36-L37

Added lines #L36 - L37 were not covered by tests
// since TTL is a 32-bit unsigned integer, the maximum value is math.MaxUint32
return math.MaxUint32

Check warning on line 39 in util/dns.go

Codecov / codecov/patch

util/dns.go#L39

Added line #L39 was not covered by tests
}

// return the value as uint32
return uint32(res)

Check warning on line 43 in util/dns.go

Codecov / codecov/patch

util/dns.go#L43

Added line #L43 was not covered by tests
}

// ToTTLDuration converts the input to a time.Duration.
//
// If the input is of underlying type time.Duration, the value is returned as is.
//
// Otherwise the value is converted to seconds and returned as time.Duration.
func ToTTLDuration[T TTLInput](input T) time.Duration {
return time.Duration(ToTTL(input)) * time.Second

Check warning on line 52 in util/dns.go

Codecov / codecov/patch

util/dns.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

// SetAnswerMinTTL sets the TTL of all answers in the message that are less than the specified minimum TTL to
// the minimum TTL.
func SetAnswerMinTTL[T TTLInput](msg *dns.Msg, min T) {
minTTL := ToTTL(min)
for _, answer := range msg.Answer {
if atomic.LoadUint32(&answer.Header().Ttl) < minTTL {
atomic.StoreUint32(&answer.Header().Ttl, minTTL)

Check warning on line 61 in util/dns.go

Codecov / codecov/patch

util/dns.go#L57-L61

Added lines #L57 - L61 were not covered by tests
}
}
}

// SetAnswerMaxTTL sets the TTL of all answers in the message that are greater than the specified maximum TTL
// to the maximum TTL.
func SetAnswerMaxTTL[T TTLInput](msg *dns.Msg, max T) {
maxTTL := ToTTL(max)
for _, answer := range msg.Answer {
if atomic.LoadUint32(&answer.Header().Ttl) > maxTTL && maxTTL != 0 {
atomic.StoreUint32(&answer.Header().Ttl, maxTTL)

Check warning on line 72 in util/dns.go

Codecov / codecov/patch

util/dns.go#L68-L72

Added lines #L68 - L72 were not covered by tests
}
}
}

// SetAnswerMinMaxTTL sets the TTL of all answers in the message that are less than the specified minimum TTL
// to the minimum TTL and the TTL of all answers that are greater than the specified maximum TTL to the maximum TTL.
func SetAnswerMinMaxTTL[T TTLInput, TT TTLInput](msg *dns.Msg, min T, max TT) {
minTTL := ToTTL(min)
maxTTL := ToTTL(max)

Check warning on line 81 in util/dns.go

Codecov / codecov/patch

util/dns.go#L79-L81

Added lines #L79 - L81 were not covered by tests

for _, answer := range msg.Answer {
headerTTL := atomic.LoadUint32(&answer.Header().Ttl)
if headerTTL < minTTL {
atomic.StoreUint32(&answer.Header().Ttl, minTTL)
} else if headerTTL > maxTTL && maxTTL != 0 {
atomic.StoreUint32(&answer.Header().Ttl, maxTTL)

Check warning on line 88 in util/dns.go

Codecov / codecov/patch

util/dns.go#L83-L88

Added lines #L83 - L88 were not covered by tests
}
}
}

// GetMinAnswerTTL returns the lowest TTL of all answers in the message.
func GetAnswerMinTTL(msg *dns.Msg) uint32 {
var minTTL atomic.Uint32

Check warning on line 95 in util/dns.go

Codecov / codecov/patch

util/dns.go#L94-L95

Added lines #L94 - L95 were not covered by tests
// initialize minTTL with the maximum value of uint32
minTTL.Store(math.MaxUint32)

Check warning on line 97 in util/dns.go

Codecov / codecov/patch

util/dns.go#L97

Added line #L97 was not covered by tests

for _, answer := range msg.Answer {
headerTTL := atomic.LoadUint32(&answer.Header().Ttl)
if headerTTL < minTTL.Load() {
minTTL.Store(headerTTL)

Check warning on line 102 in util/dns.go

Codecov / codecov/patch

util/dns.go#L99-L102

Added lines #L99 - L102 were not covered by tests
}
}

return minTTL.Load()

Check warning on line 106 in util/dns.go

Codecov / codecov/patch

util/dns.go#L106

Added line #L106 was not covered by tests
}

// AdjustAnswerTTL adjusts the TTL of all answers in the message by the difference between the lowest TTL
// and the answer's TTL plus the specified adjustment.
func AdjustAnswerTTL[T TTLInput](msg *dns.Msg, adjustment T) {
minTTL := GetAnswerMinTTL(msg)
adjustmentTTL := ToTTL(adjustment)

Check warning on line 113 in util/dns.go

Codecov / codecov/patch

util/dns.go#L111-L113

Added lines #L111 - L113 were not covered by tests

for _, answer := range msg.Answer {
headerTTL := atomic.LoadUint32(&answer.Header().Ttl)
atomic.StoreUint32(&answer.Header().Ttl, headerTTL-minTTL+adjustmentTTL)

Check warning on line 117 in util/dns.go

Codecov / codecov/patch

util/dns.go#L115-L117

Added lines #L115 - L117 were not covered by tests
}
}