Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for concurrent requests #37

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add test for single-stream cache resolution
piercefreeman committed Nov 8, 2022
commit facdec2d9bd0f5f215bd3dc34325700d5944ccad
115 changes: 63 additions & 52 deletions groove/proxy/cache.go
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ type Cache struct {

// Disk cache comes bundled with a RWMutex so we can call functions directly
// and the lock will be handled internally
cacheDiskCache *lrucache.CacheInvalidator
cacheStore *lrucache.CacheInvalidator

inflightRequests map[string]*sync.Mutex
lockGeneration *sync.Mutex
@@ -76,7 +76,7 @@ func NewCache(cacheSizeMaxMB uint64) *Cache {

return &Cache{
mode: CacheModeStandard,
cacheDiskCache: lrucache.NewCacheInvalidator(cachePath, 20, 500, 10),
cacheStore: lrucache.NewCacheInvalidator(cachePath, 20, 500, 10),
inflightRequests: map[string]*sync.Mutex{},
lockGeneration: &sync.Mutex{},
blockingLocks: make(map[string]int),
@@ -108,7 +108,7 @@ func (c *Cache) SetValidCacheContents(request *http.Request, response *http.Resp
CacheInvalidation: expires,
Value: responseToArchivedResponse(response),
}
err := c.cacheDiskCache.Set(getCacheKey(request), cacheEntry)
err := c.cacheStore.Set(getCacheKey(request), cacheEntry)
if err != nil {
log.Printf("Failed to set cache entry for key: %s %s", request.URL.String(), err.Error())
}
@@ -133,7 +133,7 @@ func (c *Cache) SetFailedCacheContents(request *http.Request, err error) {
Value: nil,
Error: err.Error(),
}
err = c.cacheDiskCache.Set(getCacheKey(request), cacheEntry)
err = c.cacheStore.Set(getCacheKey(request), cacheEntry)
if err != nil {
log.Printf("Failed to set cache entry for key: %s", request.URL.String())
}
@@ -147,12 +147,12 @@ func (c *Cache) GetCacheContents(request *http.Request) *CacheEntry {

requestKey := getCacheKey(request)

if !c.cacheDiskCache.Has(requestKey) {
if !c.cacheStore.Has(requestKey) {
return nil
}

var cache CacheEntry
err := c.cacheDiskCache.Get(requestKey, &cache)
err := c.cacheStore.Get(requestKey, &cache)

if err != nil {
log.Printf("Failed to read cache entry for key: %s (%s)", request.URL.String(), requestKey)
@@ -248,7 +248,7 @@ func (c *Cache) ReleaseRequestLock(url string) {

func (c *Cache) Clear() {
// Will erase everything from the given disk
c.cacheDiskCache.Clear()
c.cacheStore.Clear()
}

func (c *Cache) cacheEntryValid(request *http.Request, cacheEntry *CacheEntry) bool {
@@ -286,6 +286,60 @@ func (c *Cache) isModeAggressive(request *http.Request) bool {
return false
}

func (c *Cache) handleRequest(r *http.Request) (*http.Request, *http.Response) {
// Determine if we have a cache result available
cacheValue := c.GetCacheContents(r)
if cacheValue != nil {
if cacheValue.Value != nil {
return r, archivedResponseToResponse(r, cacheValue.Value)
} else {
// We should fail the request
return r, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusBadGateway, cacheValue.Error)
}
}

// If we got here, we couldn't immediately resolve the cache
// Determine if we have permission to proceed for this URL
// FIX: This causes a deadlock right now because these request handling aren't goroutines
// therefore they will run inline with the rest of the program and block each other
log.Printf("Will acquire lock: %s\n", r.URL.String())
c.AcquireRequestLock(r.URL.String())
log.Printf("Did acquire lock: %s\n", r.URL.String())

// We now have permission to access this URL and should continue until complete
return r, nil
}

func (c *Cache) handleResponse(
response *http.Response,
request *http.Request,
err error,
) *http.Response {
if err != nil {
c.SetFailedCacheContents(request, err)
c.ReleaseRequestLock(request.URL.String())
return nil
}

// Attempt to update the cache with each historical value, since there
// might have been multiple hops in this request?
requestHistory, responseHistory := getRedirectHistory(response)

// When replaying this we want to replay it in order to capture all the
// event history and test redirect handlers
for i := 0; i < len(requestHistory); i++ {
request := requestHistory[i]
response := responseHistory[i]

c.SetValidCacheContents(request, response)
log.Printf("Will release lock: %s\n", request.URL.String())
c.ReleaseRequestLock(request.URL.String())
log.Printf("Did release lock: %s\n", request.URL.String())
}

return response
}

func setupCacheMiddleware(proxy *goproxy.ProxyHttpServer, cache *Cache, recorder *Recorder) {
proxy.OnRequest().DoFunc(
/*
@@ -297,27 +351,7 @@ func setupCacheMiddleware(proxy *goproxy.ProxyHttpServer, cache *Cache, recorder
return r, nil
}

// Determine if we have a cache result available
cacheValue := cache.GetCacheContents(r)
if cacheValue != nil {
if cacheValue.Value != nil {
return r, archivedResponseToResponse(r, cacheValue.Value)
} else {
// We should fail the request
return r, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusBadGateway, cacheValue.Error)
}
}

// If we got here, we couldn't immediately resolve the cache
// Determine if we have permission to proceed for this URL
// FIX: This causes a deadlock right now because these request handling aren't goroutines
// therefore they will run inline with the rest of the program and block each other
log.Printf("Will acquire lock: %s\n", r.URL.String())
cache.AcquireRequestLock(r.URL.String())
log.Printf("Did acquire lock: %s\n", r.URL.String())

// We now have permission to access this URL and should continue until complete
return r, nil
return cache.handleRequest(r)
},
)

@@ -326,34 +360,11 @@ func setupCacheMiddleware(proxy *goproxy.ProxyHttpServer, cache *Cache, recorder
* Cache layer
*/
func(response *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
if ctx.Error != nil {
request := ctx.Req
cache.SetFailedCacheContents(request, ctx.Error)
cache.ReleaseRequestLock(request.URL.String())
return nil
}

if recorder.mode == RecorderModeRead {
return response
}

// Attempt to update the cache with each historical value, since there
// might have been multiple hops in this request?
requestHistory, responseHistory := getRedirectHistory(response)

// When replaying this we want to replay it in order to capture all the
// event history and test redirect handlers
for i := 0; i < len(requestHistory); i++ {
request := requestHistory[i]
response := responseHistory[i]

cache.SetValidCacheContents(request, response)
log.Printf("Will release lock: %s\n", request.URL.String())
cache.ReleaseRequestLock(request.URL.String())
log.Printf("Did release lock: %s\n", request.URL.String())
}

return response
return cache.handleResponse(response, ctx.Req, ctx.Error)
},
)
}
81 changes: 81 additions & 0 deletions groove/proxy/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"bytes"
"io/ioutil"
"log"
"net/http"
"net/url"
"sync"
"testing"
)

func TestBlockConcurrent(t *testing.T) {
/*
* Test if we run X concurrent requests for the same URL and then satisfy the first
* we will store the same value for the secondary ones.
*/
cache := NewCache(100)
cache.mode = CacheModeAggressive

requestWaiter := &sync.WaitGroup{}

// If the page actually fetches "outbound" there should be multiple values
pageResponses := []string{"1", "2", "3", "4", "5"}
sharedURL := "http://example.com"

// Response payloads
foundValuesLock := &sync.RWMutex{}
foundValues := make([]*http.Response, 0)

for _, value := range pageResponses {
requestWaiter.Add(1)

go func(value string) {
defer requestWaiter.Done()

request := &http.Request{
Method: "GET",
URL: &url.URL{Path: sharedURL},
}

_, cachedResponse := cache.handleRequest(request)

if cachedResponse != nil {
// Cache hit, nothing to do besides log the overall value
foundValuesLock.Lock()
defer foundValuesLock.Unlock()
foundValues = append(foundValues, cachedResponse)
return
}

// If we got here we should execute the outward "fetch", ie. create
// a response payload with our mocked value
response := &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString(value)),
Request: request,
}

finalResponse := cache.handleResponse(response, request, nil)
foundValuesLock.Lock()
defer foundValuesLock.Unlock()
foundValues = append(foundValues, finalResponse)
}(value)
}

requestWaiter.Wait()

uniqueResponses := make([]string, 0)
for _, response := range foundValues {
body, _ := ioutil.ReadAll(response.Body)
if !contains(uniqueResponses, string(body)) {
uniqueResponses = append(uniqueResponses, string(body))
}
}

// Ensure we only have one value and blocking worked successfully
if len(uniqueResponses) != 1 {
log.Fatalf("Expected only one value, got %d", len(uniqueResponses))
}
}