Skip to content

Commit

Permalink
Enhancement: failing concurrent transactions (#193)
Browse files Browse the repository at this point in the history
* enh(failed_queries): mark in transaction registry as failed only for specific status codes

* fix(review): nonrecoverable list to recoverable list of errors

* fix(cache/tests): cleanup of created tmp files
  • Loading branch information
gontarzpawel authored Sep 5, 2022
1 parent cd32b40 commit b459dcc
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 35 deletions.
3 changes: 3 additions & 0 deletions cache/filesystem_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func cacheAddGetHelper(t *testing.T, c Cache) {
if err != nil {
t.Fatalf("failed to get data from filesystem cache: %s", err)
}
defer cachedData.Data.Close()

// Verify trw contains valid headers.
if cachedData.Type != ct {
Expand Down Expand Up @@ -119,6 +120,7 @@ func cacheAddGetHelper(t *testing.T, c Cache) {
if err != nil {
t.Fatalf("failed to get data from filesystem cache: %s", err)
}
defer cachedData.Data.Close()
value := fmt.Sprintf("value %d", i)
//we want to test what happen we the cache handle a big value
if i == 0 {
Expand Down Expand Up @@ -196,6 +198,7 @@ func TestCacheClean(t *testing.T) {
if err != nil {
t.Fatalf("create tmp cache: %s", err)
}
defer crw.Close()

value := fmt.Sprintf("very big value %d", i)
bs := bytes.NewBufferString(value)
Expand Down
78 changes: 60 additions & 18 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func TestServe(t *testing.T) {
req.SetBasicAuth("default", "qwerty")
resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusTeapot {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusTeapot)
if resp.StatusCode != http.StatusInternalServerError {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusInternalServerError)
}
resp.Body.Close()

Expand Down Expand Up @@ -353,9 +353,11 @@ func TestServe(t *testing.T) {
t.Fatalf("unexpected amount of keys in redis: %v", len(keys))
}

resp := httpRequest(t, req, http.StatusOK)
resp, err := httpRequest(t, req, http.StatusOK)
checkErr(t, err)
checkResponse(t, resp.Body, expectedOkResp)
resp2 := httpRequest(t, req, http.StatusOK)
resp2, err := httpRequest(t, req, http.StatusOK)
checkErr(t, err)
checkResponse(t, resp2.Body, expectedOkResp)
keys = redisClient.Keys()
if len(keys) != 2 { // expected 2 because there is a record stored for transaction and a cache item
Expand Down Expand Up @@ -391,9 +393,11 @@ func TestServe(t *testing.T) {
req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil)
checkErr(t, err)

resp := httpRequest(t, req, http.StatusOK)
resp, err := httpRequest(t, req, http.StatusOK)
checkErr(t, err)
checkResponse(t, resp.Body, string(bytesWithInvalidUTFPairs))
resp2 := httpRequest(t, req, http.StatusOK)
resp2, err := httpRequest(t, req, http.StatusOK)
checkErr(t, err)
// if we do not use base64 to encode/decode the cached payload, EOF error will be thrown here.
checkResponse(t, resp2.Body, string(bytesWithInvalidUTFPairs))
keys = redisClient.Keys()
Expand Down Expand Up @@ -597,7 +601,19 @@ func TestServe(t *testing.T) {
// scenario: 1st query fails before grace_time elapsed. 2nd query fails as well.

q := "SELECT ERROR"
executeTwoConcurrentRequests(t, q, http.StatusTeapot, http.StatusInternalServerError, "DB::Exception\n", "concurrent query failed")
executeTwoConcurrentRequests(t, q, http.StatusInternalServerError, http.StatusInternalServerError, "DB::Exception\n", "concurrent query failed")
},
startHTTP,
},
{
"http concurrent transaction failure scenario - transaction completed, not failed - query is recoverable",
"testdata/http.concurrent.transaction.yml",
func(t *testing.T) {
// max_exec_time = 300 ms, grace_time = 160 ms (> max_exec_time/2)
// scenario: 1st query fails before grace_time elapsed. 2nd query fails as well.

q := "SELECT RECOVERABLE-ERROR"
executeTwoConcurrentRequests(t, q, http.StatusServiceUnavailable, http.StatusServiceUnavailable, "DB::Unavailable\n", "DB::Unavailable\n")
},
startHTTP,
},
Expand Down Expand Up @@ -746,8 +762,12 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
q := string(query)
switch {
case q == "SELECT ERROR":
w.WriteHeader(http.StatusTeapot)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "DB::Exception\n")
case q == "SELECT RECOVERABLE-ERROR":
println("called clickhouse recoverable")
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprint(w, "DB::Unavailable\n")
case q == "SELECT SLEEP":
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "foo")
Expand Down Expand Up @@ -787,27 +807,49 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
// Results are asserted according to the specified input parameters.
func executeTwoConcurrentRequests(t *testing.T, query string, firstStatusCode, secondStatusCode int, firstBody, secondBody string) {
u := fmt.Sprintf("http://127.0.0.1:9090?query=%s&user=concurrent_user", url.QueryEscape(query))
req, err := http.NewRequest("GET", u, nil)
checkErr(t, err)

var wg sync.WaitGroup
wg.Add(2)
var resp1 string
var resp2 string
errs := make(chan error, 0)
defer close(errs)
errors := make([]error, 0)
go func() {
for err := range errs {
errors = append(errors, err)
}
}()
go func() {
resp := httpRequest(t, req, firstStatusCode)
defer wg.Done()
req, err := http.NewRequest("GET", u, nil)
checkErr(t, err)
resp, err := httpRequest(t, req, firstStatusCode)
if err != nil {
errs <- err
return
}
resp1 = bbToString(t, resp.Body)
wg.Done()
}()

go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
resp := httpRequest(t, req, secondStatusCode)
req, err := http.NewRequest("GET", u, nil)
checkErr(t, err)
resp, err := httpRequest(t, req, secondStatusCode)
if err != nil {
errs <- err
return
}
resp2 = bbToString(t, resp.Body)
wg.Done()
}()
wg.Wait()

if len(errors) != 0 {
t.Fatalf("concurrent test scenario failed due to: %v", errors)
}

if !strings.Contains(resp1, firstBody) {
t.Fatalf("concurrent test scenario: unexpected resp body: %s, expected : %s", resp1, firstBody)
}
Expand Down Expand Up @@ -917,15 +959,15 @@ func httpGet(t *testing.T, url string, statusCode int) *http.Response {
return resp
}

func httpRequest(t *testing.T, request *http.Request, statusCode int) *http.Response {
func httpRequest(t *testing.T, request *http.Request, statusCode int) (*http.Response, error) {
t.Helper()
client := http.Client{}
resp, err := client.Do(request)
if err != nil {
t.Fatalf("unexpected erorr while doing GET request: %s", err)
return resp, fmt.Errorf("unexpected erorr while doing GET request: %s", err)
}
if resp.StatusCode != statusCode {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, statusCode)
return resp, fmt.Errorf("unexpected status code: %d; expected: %d", resp.StatusCode, statusCode)
}
return resp
return resp, nil
}
49 changes: 32 additions & 17 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
_ = RespondWithData(srw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, http.StatusOK)
return
}

// Await for potential result from concurrent query
transactionState, err := userCache.AwaitForConcurrentTransaction(key)
if err != nil {
Expand All @@ -288,8 +287,8 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
} else {
if transactionState.IsCompleted() {
cachedData, err := userCache.Get(key)
defer cachedData.Data.Close()
if err == nil {
defer cachedData.Data.Close()
_ = RespondWithData(srw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, http.StatusOK)
cacheHitFromConcurrentQueries.With(labels).Inc()
log.Debugf("%s: cache hit after awaiting concurrent query", s)
Expand Down Expand Up @@ -340,21 +339,15 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
}
contentMetadata := cache.ContentMetadata{Length: contentLength, Encoding: contentEncoding, Type: contentType}

if tmpFileRespWriter.StatusCode() != http.StatusOK || s.canceled {
statusCode := tmpFileRespWriter.StatusCode()
if statusCode != http.StatusOK || s.canceled {
// Do not cache non-200 or cancelled responses.
// Restore the original status code by proxyRequest if it was set.
if srw.statusCode != 0 {
tmpFileRespWriter.WriteHeader(srw.statusCode)
}

// mark transaction as failed
// todo: discuss if we should mark it as failed upon timeout. The rational against it would be to hope that
// partial results of the query are cached and therefore subsequent execution can succeed
if err = userCache.Fail(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}

err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, tmpFileRespWriter.StatusCode())
rp.completeTransaction(s, statusCode, userCache, key, q)
err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, statusCode)
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
Expand All @@ -367,10 +360,8 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
if err != nil {
log.Errorf("%s: %s; query: %q - failed to put response in the cache", s, err, q)
}
// mark transaction as completed
if err = userCache.Complete(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}
rp.completeTransaction(s, statusCode, userCache, key, q)

// we need to reset the offset since the reader of tmpFileRespWriter was already
// consumed in RespondWithData(...)
err = tmpFileRespWriter.ResetFileOffset()
Expand All @@ -379,7 +370,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
respondWith(srw, err, http.StatusInternalServerError)
return
}
err = RespondWithData(srw, reader, contentMetadata, expiration, tmpFileRespWriter.StatusCode())
err = RespondWithData(srw, reader, contentMetadata, expiration, statusCode)
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
Expand All @@ -388,6 +379,30 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
}
}

// clickhouseRecoverableStatusCodes set of recoverable http responses' status codes from Clickhouse.
// When such happens we mark transaction as completed and let concurrent query to hit another Clickhouse shard.
// possible http error codes in clickhouse (i.e: https://github.com/ClickHouse/ClickHouse/blob/master/src/Server/HTTPHandler.cpp)
var clickhouseRecoverableStatusCodes = map[int]struct{}{http.StatusServiceUnavailable: {}}

func (rp *reverseProxy) completeTransaction(s *scope, statusCode int, userCache *cache.AsyncCache, key *cache.Key, q []byte) {
if statusCode < 300 {
if err := userCache.Complete(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}
return
}

if _, ok := clickhouseRecoverableStatusCodes[statusCode]; ok {
if err := userCache.Complete(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}
} else {
if err := userCache.Fail(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}
}
}

func calcQueryParamsHash(origParams url.Values) uint32 {
queryParams := make(map[string]string)
for param := range origParams {
Expand Down

0 comments on commit b459dcc

Please sign in to comment.