Skip to content

Commit

Permalink
does not finish write futures while the channel is being closed
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 28, 2024
1 parent e9b4561 commit fde07e9
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
closing: Future[void]

func shortLog*(s: LPChannel): auto =
try:
Expand Down Expand Up @@ -99,7 +100,7 @@ proc reset*(s: LPChannel) {.async.} =
if s.isClosed:
trace "Already closed", s
return

s.closing = newFuture[void]()
s.isClosed = true
s.closedLocal = true
s.localReset = not s.remoteReset
Expand All @@ -117,10 +118,10 @@ proc reset*(s: LPChannel) {.async.} =
await s.conn.close()
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg

asyncSpawn resetMessage()
await resetMessage()

await s.closeImpl() # noraises, nocancels

s.closing.complete()
trace "Channel reset", s

method close*(s: LPChannel) {.async.} =
Expand All @@ -130,6 +131,8 @@ method close*(s: LPChannel) {.async.} =
if s.closedLocal:
trace "Already closed", s
return

s.closing = newFuture[void]()
s.closedLocal = true

trace "Closing channel", s, conn = s.conn, len = s.len
Expand All @@ -147,7 +150,7 @@ method close*(s: LPChannel) {.async.} =
trace "Cannot send close message", s, id = s.id, msg = exc.msg

await s.closeUnderlying() # maybe already eofed

s.closing.complete()
trace "Closed channel", s, len = s.len

method initStream*(s: LPChannel) =
Expand Down Expand Up @@ -200,6 +203,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
if s.remoteReset:
raise newLPStreamResetError()
if s.closedLocal:
await s.closing
raise newLPStreamClosedError()
if s.conn.closed:
raise newLPStreamConnDownError()
Expand Down Expand Up @@ -293,8 +297,9 @@ proc init*(
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In)

dir: if initiator: Direction.Out else: Direction.In,
closing: newFuture[void]())
chann.closing.complete()
chann.initStream()

when chronicles.enabledLogLevel == LogLevel.TRACE:
Expand Down

0 comments on commit fde07e9

Please sign in to comment.