Skip to content

Commit

Permalink
Code style
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Sep 6, 2024
1 parent 65f5ee5 commit 312aa1b
Showing 1 changed file with 18 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,36 @@ private[sync] object OxSourceWebSocketProcessor:

def apply[REQ, RESP](
oxDispatcher: OxDispatcher,
pipe: OxStreams.Pipe[REQ, RESP],
processingPipe: OxStreams.Pipe[REQ, RESP],
o: WebSocketBodyOutput[OxStreams.Pipe[REQ, RESP], REQ, RESP, ?, OxStreams],
ctx: ChannelHandlerContext
): Processor[NettyWebSocketFrame, NettyWebSocketFrame] =
def decodeFrame(f: WebSocketFrame): REQ = o.requests.decode(f) match {
case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure)
case x: DecodeResult.Value[REQ] @unchecked => x.v
}

val frame2FramePipe: OxStreams.Pipe[NettyWebSocketFrame, NettyWebSocketFrame] = ox ?=>
val closeSignal = new Semaphore(0)
(incoming: Source[NettyWebSocketFrame]) =>
val outgoing = pipe(
optionallyConcatenateFrames(o.concatenateFragmentedFrames)(
takeUntilCloseFrame(passAlongCloseFrame = o.decodeCloseRequests, closeSignal)(
incoming
.mapAsView { f =>
val sttpFrame = nettyFrameToFrame(f)
f.release()
sttpFrame
}
)
)
.mapAsView(f =>
o.requests.decode(f) match {
case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure)
case x: DecodeResult.Value[REQ] @unchecked => x.v
}
)
)
val outgoing = incoming
.mapAsView { f =>
val sttpFrame = nettyFrameToFrame(f)
f.release()
sttpFrame
}
.pipe(takeUntilCloseFrame(passAlongCloseFrame = o.decodeCloseRequests, closeSignal))
.pipe(optionallyConcatenateFrames(o.concatenateFragmentedFrames))
.mapAsView(decodeFrame)
.pipe(processingPipe)
.mapAsView(r => frameToNettyFrame(o.responses.encode(r)))

// when the client closes the connection, we need to close the outgoing channel as well - this needs to be
// done in the client's pipeline code; monitoring that this happens within a timeout after the close happens
monitorOutgoingClosedAfterClientClose(closeSignal, outgoing)

outgoing
end frame2FramePipe

// We need this kind of interceptor to make Netty reply correctly to closed channel or error
def wrapSubscriberWithNettyCallback[B](sub: Subscriber[? >: B]): Subscriber[? >: B] = new Subscriber[B] {
Expand All @@ -76,6 +74,7 @@ private[sync] object OxSourceWebSocketProcessor:
sub.onComplete()
}
new OxProcessor(oxDispatcher, frame2FramePipe, wrapSubscriberWithNettyCallback)
end apply

private def optionallyConcatenateFrames(doConcatenate: Boolean)(s: Source[WebSocketFrame])(using Ox): Source[WebSocketFrame] =
if doConcatenate then s.mapStateful(() => None: Accumulator)(accumulateFrameState).collectAsView { case Some(f: WebSocketFrame) => f }
Expand Down

0 comments on commit 312aa1b

Please sign in to comment.