Skip to content

Commit

Permalink
Handle response failures + do not record custom events
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcairo committed Jan 20, 2025
1 parent 7a95594 commit dd7bc23
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 132 deletions.
95 changes: 95 additions & 0 deletions Sources/GRPCInterceptors/HookedAsyncSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2025, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

internal struct HookedRPCAsyncSequence<Wrapped: AsyncSequence & Sendable>: AsyncSequence, Sendable where Wrapped.Element: Sendable {
private let wrapped: Wrapped

private let forEachElement: @Sendable (Wrapped.Element) -> Void
private let onFinish: @Sendable () -> Void
private let onFailure: @Sendable (any Error) -> Void

init(
wrapping sequence: Wrapped,
forEachElement: @escaping @Sendable (Wrapped.Element) -> Void,
onFinish: @escaping @Sendable () -> Void,
onFailure: @escaping @Sendable (any Error) -> Void
) {
self.wrapped = sequence
self.forEachElement = forEachElement
self.onFinish = onFinish
self.onFailure = onFailure
}

func makeAsyncIterator() -> HookedAsyncIterator {
HookedAsyncIterator(
self.wrapped,
forEachElement: self.forEachElement,
onFinish: self.onFinish,
onFailure: self.onFailure
)
}

struct HookedAsyncIterator: AsyncIteratorProtocol {
typealias Element = Wrapped.Element

private var wrapped: Wrapped.AsyncIterator
private let forEachElement: @Sendable (Wrapped.Element) -> Void
private let onFinish: @Sendable () -> Void
private let onFailure: @Sendable (any Error) -> Void

init(
_ sequence: Wrapped,
forEachElement: @escaping @Sendable (Wrapped.Element) -> Void,
onFinish: @escaping @Sendable () -> Void,
onFailure: @escaping @Sendable (any Error) -> Void
) {
self.wrapped = sequence.makeAsyncIterator()
self.forEachElement = forEachElement
self.onFinish = onFinish
self.onFailure = onFailure
}

mutating func next(isolation actor: isolated (any Actor)?) async throws(Wrapped.Failure) -> Wrapped.Element? {
do {
if let element = try await self.wrapped.next(isolation: actor) {
self.forEachElement(element)
return element
}

self.onFinish()
return nil
} catch {
self.onFailure(error)
throw error
}
}

mutating func next() async throws -> Wrapped.Element? {
do {
if let element = try await self.wrapped.next() {
self.forEachElement(element)
return element
}

self.onFinish()
return nil
} catch {
self.onFailure(error)
throw error
}
}
}
}
56 changes: 0 additions & 56 deletions Sources/GRPCInterceptors/OnFinishAsyncSequence.swift

This file was deleted.

69 changes: 33 additions & 36 deletions Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal import Tracing
/// For more information, refer to the documentation for `swift-distributed-tracing`.
public struct ClientOTelTracingInterceptor: ClientInterceptor {
private let injector: ClientRequestInjector
private let emitEventOnEachWrite: Bool
private let traceEachMessage: Bool
private var serverHostname: String
private var networkTransportMethod: String

Expand All @@ -36,17 +36,17 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
/// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
/// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the
/// `network.transport` attribute in spans.
/// - emitEventOnEachWrite: If `true`, each request part sent and response part received will be recorded as a separate
/// - traceEachMessage: If `true`, each request part sent and response part received will be recorded as a separate
/// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events.
public init(
serverHostname: String,
networkTransportMethod: String,
emitEventOnEachWrite: Bool = false
traceEachMessage: Bool = true
) {
self.injector = ClientRequestInjector()
self.serverHostname = serverHostname
self.networkTransportMethod = networkTransportMethod
self.emitEventOnEachWrite = emitEventOnEachWrite
self.traceEachMessage = traceEachMessage
}

/// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs
Expand Down Expand Up @@ -84,10 +84,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
) { span in
self.setOTelSpanAttributes(into: span, context: context)

span.addEvent("Request started")

let wrappedProducer = request.producer
if self.emitEventOnEachWrite {
if self.traceEachMessage {
let wrappedProducer = request.producer
request.producer = { writer in
let messageSentCounter = Atomic(1)
let eventEmittingWriter = HookedWriter(
Expand All @@ -104,54 +102,53 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
}
)
try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter))
span.addEvent("Request ended")
}
} else {
request.producer = { writer in
try await wrappedProducer(RPCWriter(wrapping: writer))
span.addEvent("Request ended")
}
}

var response = try await next(request, context)
switch response.accepted {
case .success(var success):
span.addEvent("Received response start")
span.attributes.rpc.grpcStatusCode = 0
if self.emitEventOnEachWrite {
let hookedSequence: HookedRPCAsyncSequence<
RPCAsyncSequence<StreamingClientResponse<Output>.Contents.BodyPart, any Error>
>
if self.traceEachMessage {
let messageReceivedCounter = Atomic(1)
let onEachPartRecordingSequence = success.bodyParts.map { element in
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
var event = SpanEvent(name: "rpc.message")
event.attributes.rpc.messageType = "RECEIVED"
event.attributes.rpc.messageID =
messageReceivedCounter
event.attributes.rpc.messageID = messageReceivedCounter
.wrappingAdd(1, ordering: .sequentiallyConsistent)
.oldValue
span.addEvent(event)
return element
}

let onFinishRecordingSequence = OnFinishAsyncSequence(
wrapping: onEachPartRecordingSequence
) {
span.addEvent("Received response end")
} onFinish: {
span.attributes.rpc.grpcStatusCode = 0
} onFailure: { error in
if let rpcError = error as? RPCError {
span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue
}
span.setStatus(SpanStatus(code: .error))
span.recordError(error)
}

success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence)
response.accepted = .success(success)
} else {
let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) {
span.addEvent("Received response end")
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
// Nothing to do if traceEachMessage is false
} onFinish: {
span.attributes.rpc.grpcStatusCode = 0
} onFailure: { error in
if let rpcError = error as? RPCError {
span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue
}
span.setStatus(SpanStatus(code: .error))
span.recordError(error)
}

success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence)
response.accepted = .success(success)
}

success.bodyParts = RPCAsyncSequence(wrapping: hookedSequence)
response.accepted = .success(success)

case .failure(let error):
span.attributes.rpc.grpcStatusCode = error.code.rawValue
span.setStatus(SpanStatus(code: .error))
span.addEvent("Received error response")
span.recordError(error)
}

Expand Down
Loading

0 comments on commit dd7bc23

Please sign in to comment.