Skip to content

Commit

Permalink
Concurrency fixes (#997)
Browse files Browse the repository at this point in the history
* module/apmhttp: add locking to httptrace handlers

Add synchronisation to requestTracer. Sayeth the httptrace docs:

> Functions may be called concurrently from different goroutines and
some may be called after the request has completed or failed.

There's also a minor enhancement here to set the Connect span outcome
to "failure" if an error occurred.

* apm: fix deadlock in breakdown metrics calculation

Fix a deadlock that can occur when concurrently ending
a parent and child span, due to the parent waiting for
the child to release the transaction lock, and the child
waiting to lock the parent. Locks are now taken in a
consistent order.
  • Loading branch information
axw authored Aug 5, 2021
1 parent 089045a commit cb09aa0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ endif::[]
https://github.com/elastic/apm-agent-go/compare/v1.13.0...master[View commits]
- Fix concurrency bugs in breakdown metrics and module/apmhttp.WithClientTrace {pull}997[#(997)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
69 changes: 55 additions & 14 deletions module/apmhttp/clienttrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type requestTracer struct {
Request,
Response *apm.Span

mu sync.RWMutex
mu sync.Mutex
ended bool
Connects map[connectKey]*apm.Span
}

Expand All @@ -56,52 +57,92 @@ func withClientTrace(ctx context.Context, tx *apm.Transaction, parent *apm.Span)

return httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
DNSStart: func(i httptrace.DNSStartInfo) {
r.DNS = tx.StartSpan(fmt.Sprintf("DNS %s", i.Host), "external.http.dns", parent)
r.mu.Lock()
defer r.mu.Unlock()
if !r.ended {
r.DNS = tx.StartSpan(fmt.Sprintf("DNS %s", i.Host), "external.http.dns", parent)
}
},

DNSDone: func(i httptrace.DNSDoneInfo) {
r.DNS.End()
r.mu.Lock()
defer r.mu.Unlock()
if r.DNS != nil {
r.DNS.End()
r.DNS = nil
}
},

ConnectStart: func(network, addr string) {
span := tx.StartSpan(fmt.Sprintf("Connect %s", addr), "external.http.connect", parent)
r.mu.Lock()
r.Connects[connectKey{network: network, addr: addr}] = span
r.mu.Unlock()
defer r.mu.Unlock()
if !r.ended {
key := connectKey{network: network, addr: addr}
span := tx.StartSpan(fmt.Sprintf("Connect %s", addr), "external.http.connect", parent)
r.Connects[key] = span
}
},

ConnectDone: func(network, addr string, err error) {
r.mu.RLock()
span := r.Connects[connectKey{network: network, addr: addr}]
r.mu.RUnlock()
span.End()
r.mu.Lock()
defer r.mu.Unlock()
key := connectKey{network: network, addr: addr}
if span, ok := r.Connects[key]; ok {
delete(r.Connects, key)
if err != nil {
span.Outcome = "failure"
}
span.End()
}
},

GotConn: func(info httptrace.GotConnInfo) {
r.Request = tx.StartSpan("Request", "external.http.request", parent)
r.mu.Lock()
defer r.mu.Unlock()
if !r.ended {
r.Request = tx.StartSpan("Request", "external.http.request", parent)
}
},

TLSHandshakeStart: func() {
r.TLS = tx.StartSpan("TLS", "external.http.tls", parent)
r.mu.Lock()
defer r.mu.Unlock()
if !r.ended {
r.TLS = tx.StartSpan("TLS", "external.http.tls", parent)
}
},

TLSHandshakeDone: func(_ tls.ConnectionState, _ error) {
// It is possible for TLSHandshakeDone to be called even if
// TLSHandshakeStart has not, in case a timeout occurs first.
r.mu.Lock()
defer r.mu.Unlock()
if r.TLS != nil {
r.TLS.End()
r.TLS = nil
}
},

GotFirstResponseByte: func() {
r.Request.End()
r.Response = tx.StartSpan("Response", "external.http.response", parent)
r.mu.Lock()
defer r.mu.Unlock()
if r.Request != nil {
r.Request.End()
r.Request = nil
}
if !r.ended {
r.Response = tx.StartSpan("Response", "external.http.response", parent)
}
},
}), &r
}

func (r *requestTracer) end() {
r.mu.Lock()
defer r.mu.Unlock()
if r.Response != nil {
r.Response.End()
r.Response = nil
}
r.ended = true
}
17 changes: 13 additions & 4 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func (tx *Transaction) StartSpanOptions(name, spanType string, opts SpanOptions)
}
transactionID := tx.traceContext.Span

// Lock the parent first to avoid deadlocks in breakdown metrics calculation.
if opts.parent != nil {
opts.parent.mu.Lock()
defer opts.parent.mu.Unlock()
}

// Prevent tx from being ended while we're starting a span.
tx.mu.RLock()
defer tx.mu.RUnlock()
Expand Down Expand Up @@ -117,8 +123,6 @@ func (tx *Transaction) StartSpanOptions(name, spanType string, opts SpanOptions)

if tx.breakdownMetricsEnabled {
if span.parent != nil {
span.parent.mu.Lock()
defer span.parent.mu.Unlock()
if !span.parent.ended() {
span.parent.childrenTimer.childStarted(span.timestamp)
}
Expand Down Expand Up @@ -334,6 +338,13 @@ func (s *Span) ParentID() SpanID {
func (s *Span) reportSelfTime() {
endTime := s.timestamp.Add(s.Duration)

// If this span has a parent span, lock it before proceeding to
// prevent deadlocking when concurrently ending parent and child.
if s.parent != nil {
s.parent.mu.Lock()
defer s.parent.mu.Unlock()
}

// TODO(axw) try to find a way to not lock the transaction when
// ending every span. We already lock them when starting spans.
s.tx.mu.RLock()
Expand All @@ -345,11 +356,9 @@ func (s *Span) reportSelfTime() {
s.tx.TransactionData.mu.Lock()
defer s.tx.TransactionData.mu.Unlock()
if s.parent != nil {
s.parent.mu.Lock()
if !s.parent.ended() {
s.parent.childrenTimer.childEnded(endTime)
}
s.parent.mu.Unlock()
} else {
s.tx.childrenTimer.childEnded(endTime)
}
Expand Down

0 comments on commit cb09aa0

Please sign in to comment.