Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stripe client telemetry to request headers #766

Merged
merged 18 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions stripe.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ var LogLevel = 2
// be overridden if a backend is created with GetBackendWithConfig.
var Logger Printfer

// EnableTelemetry is a global override for enabling client telemetry, which
// sends request performance metrics to Stripe via the `X-Stripe-Client-Telemetry`
// header. If set to true, all clients will send telemetry metrics. Defaults to
// false.
//
// Telemetry can also be enabled on a per-client basis by instead creating a
// `BackendConfig` with `EnableTelemetry: true`.
var EnableTelemetry = false

//
// Public types
//
Expand Down Expand Up @@ -138,6 +147,12 @@ type BackendConfig struct {
//
// If left empty, it'll be set to the default for the SupportedBackend.
URL string

// EnableTelemetry allows request metrics (request id and duration) to be sent
// to Stripe in subsequent requests via the `X-Stripe-Client-Telemetry` header.
//
// Defaults to false.
EnableTelemetry bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, can you put EnableTelemetry into the right place alphabetically?

}

// BackendImplementation is the internal implementation for making HTTP calls
Expand All @@ -158,6 +173,10 @@ type BackendImplementation struct {
//
// See also SetNetworkRetriesSleep.
networkRetriesSleep bool

enableTelemetry bool
requestMetricsBuffer chan requestMetrics
requestMetricsMutex *sync.Mutex
}

// Call is the Backend.Call implementation for invoking Stripe APIs.
Expand Down Expand Up @@ -292,8 +311,23 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte
s.Logger.Printf("Requesting %v %v%v\n", req.Method, req.URL.Host, req.URL.Path)
}

if s.enableTelemetry {
select {
case metrics := <-s.requestMetricsBuffer:
metricsJSON, err := json.Marshal(&requestTelemetry{LastRequestMetrics: metrics})
if err == nil {
req.Header.Set("X-Stripe-Client-Telemetry", string(metricsJSON))
} else if s.LogLevel > 2 {
s.Logger.Printf("Unable to encode client telemetry: %s", err)
}
default:
// no metrics available, ignore.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be worth amending this comment to point out that this default case needs to be here to enable a non-blocking receive on the channel. Wouldn't want some over-zealous refactorer to remove it, thinking that it was an innocuous block.

}
}

var res *http.Response
var err error
var requestDurationMS int
for retry := 0; ; {
start := time.Now()

Expand Down Expand Up @@ -338,8 +372,14 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte
}
}

// `requestStart` is used solely for client telemetry and, unlike `start`,
// does not account for the time spent building the request body.
requestStart := time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use start? All that's happening between the two is some structs being initialized locally. In practice it's likely to have a negligible effect on timing, and I don't think it's worth complicating the code over.


res, err = s.HTTPClient.Do(req)

requestDurationMS = int(time.Since(requestStart) / time.Millisecond)

if s.LogLevel > 2 {
s.Logger.Printf("Request completed in %v (retry: %v)\n",
time.Since(start), retry)
Expand Down Expand Up @@ -387,6 +427,18 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte
return err
}

if s.enableTelemetry {
reqID := res.Header.Get("Request-Id")
if len(reqID) > 0 {
metrics := requestMetrics{
RequestID: reqID,
RequestDurationMS: requestDurationMS,
}

s.recordRequestMetrics(metrics)
}
}

defer res.Body.Close()

resBody, err := ioutil.ReadAll(res.Body)
Expand Down Expand Up @@ -779,6 +831,9 @@ const minNetworkRetriesDelay = 500 * time.Millisecond

const uploadsURL = "https://uploads.stripe.com"

// The number of requestMetric objects to buffer for client telemetry.
const telemetryBufferSize = 16
Copy link
Contributor

@brandur-stripe brandur-stripe Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this up a couple lines? (The constant names are ordered alphabetically so let's try to keep them that way.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, would you mind amending the comment to mention that additional objects will be dropped if the buffer is full? It'll make it extra clear what the lib does when the buffer is full (as opposed to other behaviors like growing the buffer, or sending metrics synchronously, etc.)

(I know you mention that at the site where the channel is actually written to, but I think it'd be helpful to have here as well).


//
// Private types
//
Expand All @@ -805,6 +860,18 @@ type stripeClientUserAgent struct {
Uname string `json:"uname"`
}

// requestMetrics contains the id and duration of the last request sent
type requestMetrics struct {
RequestID string `json:"request_id"`
RequestDurationMS int `json:"request_duration_ms"`
}

// requestTelemetry contains the payload sent in the
// `X-Stripe-Client-Telemetry` header when BackendConfig#EnableTelemetry = true.
type requestTelemetry struct {
LastRequestMetrics requestMetrics `json:"last_request_metrics"`
}

//
// Private variables
//
Expand Down Expand Up @@ -878,6 +945,14 @@ func isHTTPWriteMethod(method string) bool {
// The vast majority of the time you should be calling GetBackendWithConfig
// instead of this function.
func newBackendImplementation(backendType SupportedBackend, config *BackendConfig) Backend {
var requestMetricsBuffer chan requestMetrics
enableTelemetry := config.EnableTelemetry || EnableTelemetry

// only allocate the requestMetrics buffer if client telemetry is enabled.
if enableTelemetry {
requestMetricsBuffer = make(chan requestMetrics, telemetryBufferSize)
}

return &BackendImplementation{
HTTPClient: config.HTTPClient,
LogLevel: config.LogLevel,
Expand All @@ -886,6 +961,27 @@ func newBackendImplementation(backendType SupportedBackend, config *BackendConfi
Type: backendType,
URL: config.URL,
networkRetriesSleep: true,
enableTelemetry: enableTelemetry,

// requestMetricsBuffer is a circular buffer of unsent metrics from previous
// requests. You should not write to requestMetricsBuffer without holding the
// requestMetricsMutex lock.
requestMetricsBuffer: requestMetricsBuffer,
requestMetricsMutex: &sync.Mutex{},
}
}

func (s *BackendImplementation) recordRequestMetrics(r requestMetrics) {
s.requestMetricsMutex.Lock()
defer s.requestMetricsMutex.Unlock()

// treat requestMetricsBuffer as a circular buffer: if it is full, pop off the
// oldest requestMetrics struct and insert r.
select {
case s.requestMetricsBuffer <- r:
default:
<-s.requestMetricsBuffer
Copy link

@bobby-stripe bobby-stripe Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jameshageman-stripe I like how clever this is to avoid taking a lock when reading the buffer, but I think as-is there is the (remote but possible) possibility of a deadlock:

goroutine 1:
locks requestMetricsMutex
attempt to write to requestMetricsBuffer, but it is full, fall through to the default case on line 982
goroutine 1 descheduled

goroutine 2..n:
read from requestMetricsBuffer until it is empty (because the user's application has issued a burst of API requests, or something)

goroutine 1:
rescheduled
attempt to read from <-s.requestMetricsBuffer, which will block

goroutine 1 is now blocked waiting for a metric to appear on the channel, but nobody else will be able to write to the channel as requestMetricsMutex is held (all incoming requests will now also deadlock, waiting on that mutex).

I think this can be addressed by changing:

<-s.requestMetricsBuffer

to

select {
case _ = <-s.requestMetricsBuffer:
default:
}

So that we don't do a blocking channel read with the mutex held (along with a big fat comment), but maybe its worth considering if this is too clever and we should unconditionally grab the mutex for all reads + writes of s.requestMetricsBuffer

Unless I'm reading this wrong and there is nothing to worry about, which is possible!

s.requestMetricsBuffer <- r
}
}

Expand Down
202 changes: 202 additions & 0 deletions stripe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"regexp"
Expand Down Expand Up @@ -165,6 +166,207 @@ func TestDo_RetryOnTimeout(t *testing.T) {
assert.Equal(t, uint32(2), atomic.LoadUint32(&counter))
}

// Test that telemetry metrics are not sent by default
func TestDo_TelemetryDisabled(t *testing.T) {
type testServerResponse struct {
Message string `json:"message"`
}

message := "Hello, client."
requestNum := 0

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great test. 👍 I love Go's httptest package!

// none of the requests should include telemetry metrics
assert.Equal(t, r.Header.Get("X-Stripe-Client-Telemetry"), "")

response := testServerResponse{Message: message}

data, err := json.Marshal(response)
assert.NoError(t, err)

_, err = w.Write(data)
assert.NoError(t, err)

requestNum++
}))
defer testServer.Close()

backend := stripe.GetBackendWithConfig(
stripe.APIBackend,
&stripe.BackendConfig{
LogLevel: 3,
MaxNetworkRetries: 0,
URL: testServer.URL,
},
).(*stripe.BackendImplementation)

for i := 0; i < 2; i++ {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's initially unclear why this loop executes twice. Can you add a quick comment that explains that it's because the telemetry comes from the previous completed request/response?

request, err := backend.NewRequest(
http.MethodGet,
"/hello",
"sk_test_123",
"application/x-www-form-urlencoded",
nil,
)
assert.NoError(t, err)

var response testServerResponse
err = backend.Do(request, nil, &response)

assert.NoError(t, err)
assert.Equal(t, message, response.Message)
}

// We should have seen exactly two requests.
assert.Equal(t, 2, requestNum)
}

// Test that telemetry metrics are sent on subsequent requests when
// stripe.EnableTelemetry = true.
func TestDo_TelemetryEnabled(t *testing.T) {
type testServerResponse struct {
Message string `json:"message"`
}

type requestMetrics struct {
RequestID string `json:"request_id"`
RequestDurationMS int `json:"request_duration_ms"`
}

type requestTelemetry struct {
LastRequestMetrics requestMetrics `json:"last_request_metrics"`
}

message := "Hello, client."
requestNum := 0

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestNum++

telemetryStr := r.Header.Get("X-Stripe-Client-Telemetry")
switch requestNum {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case 1:
// the first request should not receive any metrics
assert.Equal(t, telemetryStr, "")
case 2:
assert.True(t, len(telemetryStr) > 0, "telemetryStr should not be empty")

// the telemetry should properly unmarshal into stripe.RequestTelemetry
var telemetry requestTelemetry
err := json.Unmarshal([]byte(telemetryStr), &telemetry)
assert.NoError(t, err)

// the second request should include the metrics for the first request
assert.Equal(t, telemetry.LastRequestMetrics.RequestID, "req_1")
default:
assert.Fail(t, "Should not have reached request %v", requestNum)
}

w.Header().Set("Request-Id", fmt.Sprintf("req_%d", requestNum))
response := testServerResponse{Message: message}

data, err := json.Marshal(response)
assert.NoError(t, err)

_, err = w.Write(data)
assert.NoError(t, err)
}))
defer testServer.Close()

backend := stripe.GetBackendWithConfig(
stripe.APIBackend,
&stripe.BackendConfig{
LogLevel: 3,
MaxNetworkRetries: 0,
URL: testServer.URL,
EnableTelemetry: true,
},
).(*stripe.BackendImplementation)

for i := 0; i < 2; i++ {
request, err := backend.NewRequest(
http.MethodGet,
"/hello",
"sk_test_123",
"application/x-www-form-urlencoded",
nil,
)
assert.NoError(t, err)

var response testServerResponse
err = backend.Do(request, nil, &response)

assert.NoError(t, err)
assert.Equal(t, message, response.Message)
}

// We should have seen exactly two requests.
assert.Equal(t, 2, requestNum)
}

func TestDo_TelemetryEnabledNoDataRace(t *testing.T) {
type testServerResponse struct {
Message string `json:"message"`
}

message := "Hello, client."
var requestNum int32

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqID := atomic.AddInt32(&requestNum, 1)

w.Header().Set("Request-Id", fmt.Sprintf("req_%d", reqID))
response := testServerResponse{Message: message}

data, err := json.Marshal(response)
assert.NoError(t, err)

_, err = w.Write(data)
assert.NoError(t, err)
}))
defer testServer.Close()

backend := stripe.GetBackendWithConfig(
stripe.APIBackend,
&stripe.BackendConfig{
LogLevel: 3,
MaxNetworkRetries: 0,
URL: testServer.URL,
EnableTelemetry: true,
},
).(*stripe.BackendImplementation)

times := 20 // 20 > telemetryBufferSize, so some metrics could be discarded
done := make(chan struct{})

for i := 0; i < times; i++ {
go func() {
request, err := backend.NewRequest(
http.MethodGet,
"/hello",
"sk_test_123",
"application/x-www-form-urlencoded",
nil,
)
assert.NoError(t, err)

var response testServerResponse
err = backend.Do(request, nil, &response)

assert.NoError(t, err)
assert.Equal(t, message, response.Message)

done <- struct{}{}
}()
}

for i := 0; i < times; i++ {
<-done
}

assert.Equal(t, int32(times), requestNum)
}

func TestFormatURLPath(t *testing.T) {
assert.Equal(t, "/v1/resources/1/subresources/2",
stripe.FormatURLPath("/v1/resources/%s/subresources/%s", "1", "2"))
Expand Down