Skip to content

Commit

Permalink
change ringBuffer to a single channel + a mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshageman-stripe committed Jan 15, 2019
1 parent 057c805 commit 23f64cd
Showing 1 changed file with 30 additions and 47 deletions.
77 changes: 30 additions & 47 deletions stripe.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ type BackendImplementation struct {
networkRetriesSleep bool

enableTelemetry bool
requestMetricsBuffer *ringBuffer
requestMetricsBuffer chan requestMetrics
requestMetricsMutex *sync.Mutex
}

// Call is the Backend.Call implementation for invoking Stripe APIs.
Expand Down Expand Up @@ -303,7 +304,7 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte

if s.enableTelemetry {
select {
case metrics := <-s.requestMetricsBuffer.outputChannel:
case metrics := <-s.requestMetricsBuffer:
metricsJSON, err := json.Marshal(&requestTelemetry{LastRequestMetrics: metrics})
if err == nil {
req.Header.Set("X-Stripe-Client-Telemetry", string(metricsJSON))
Expand Down Expand Up @@ -425,8 +426,7 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte
RequestDurationMS: requestDurationMS,
}

s.requestMetricsBuffer.inputChannel <- metrics
<-s.requestMetricsBuffer.done // wait for object to be placed in the circular buffer
s.recordRequestMetrics(metrics)
}
}

Expand Down Expand Up @@ -863,14 +863,6 @@ type requestTelemetry struct {
LastRequestMetrics requestMetrics `json:"last_request_metrics"`
}

// ringBuffer is a circular buffer of requestMetrics.
type ringBuffer struct {
inputChannel chan requestMetrics
outputChannel chan requestMetrics
done chan struct{}
logger Printfer
}

//
// Private variables
//
Expand Down Expand Up @@ -944,52 +936,43 @@ func isHTTPWriteMethod(method string) bool {
// The vast majority of the time you should be calling GetBackendWithConfig
// instead of this function.
func newBackendImplementation(backendType SupportedBackend, config *BackendConfig) Backend {
var requestMetricsBuffer *ringBuffer
var requestMetricsBuffer chan requestMetrics

// only allocate the requestMetrics buffer if client telemetry is enabled.
if config.EnableTelemetry {
requestMetricsBuffer = &ringBuffer{
// inputChannel does not need to be buffered because ringBuffer#run()
// is always pulling objects off inputChannel and putting them in
// outputChannel.
inputChannel: make(chan requestMetrics),
outputChannel: make(chan requestMetrics, telemetryBufferSize),
done: make(chan struct{}),
logger: config.Logger,
}

go requestMetricsBuffer.run()
requestMetricsBuffer = make(chan requestMetrics, telemetryBufferSize)
}

return &BackendImplementation{
HTTPClient: config.HTTPClient,
LogLevel: config.LogLevel,
Logger: config.Logger,
MaxNetworkRetries: config.MaxNetworkRetries,
Type: backendType,
URL: config.URL,
networkRetriesSleep: true,
enableTelemetry: config.EnableTelemetry,
HTTPClient: config.HTTPClient,
LogLevel: config.LogLevel,
Logger: config.Logger,
MaxNetworkRetries: config.MaxNetworkRetries,
Type: backendType,
URL: config.URL,
networkRetriesSleep: true,
enableTelemetry: config.EnableTelemetry,

// requestMetricsBuffer is a circular buffer of unsent metrics from previous
// requests. You should not write to requestMetricsBuffer without holding the
// requestMetricsMutex lock.
requestMetricsBuffer: requestMetricsBuffer,
requestMetricsMutex: &sync.Mutex{},
}
}

// This is the bookkeeping method for ringBuffer, which should be run as a
// seperate goroutine.
func (r *ringBuffer) run() {
for v := range r.inputChannel {
select {
case r.outputChannel <- v:
default:
dropped := <-r.outputChannel
r.logger.Printf("Dropped message: %#v", dropped)
r.outputChannel <- v
}
r.logger.Printf("Enqueued message: %#v", v)
r.done <- struct{}{}
}
func (s *BackendImplementation) recordRequestMetrics(r requestMetrics) {
s.requestMetricsMutex.Lock()
defer s.requestMetricsMutex.Unlock()

close(r.outputChannel)
// treat requestMetricsBuffer as a circular buffer: if it is full, pop off the
// oldest requestMetrics struct and insert r.
select {
case s.requestMetricsBuffer <- r:
default:
<-s.requestMetricsBuffer
s.requestMetricsBuffer <- r
}
}

func normalizeURL(url string) string {
Expand Down

0 comments on commit 23f64cd

Please sign in to comment.