Skip to content

Commit

Permalink
fix: panic.
Browse files Browse the repository at this point in the history
  • Loading branch information
foxis committed Apr 27, 2024
1 parent c31c68b commit d4b9512
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
File renamed without changes.
30 changes: 14 additions & 16 deletions network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,7 @@ type StreamContext struct {
// NewWithContext will use ReadWriter and allow for context cancellation in Read and Write methods.
func NewWithContext(ctx context.Context, localPeer, remotePeer network.Peer, rw io.ReadWriter, handlers map[types.Type]network.MessageHandler) *StreamContext {
ret := &StreamContext{
Stream: &Stream{
Base: dndm.NewBaseWithCtx(ctx),
localPeer: localPeer,
remotePeer: remotePeer,
rw: rw,
handlers: handlers,
routes: make(map[string]dndm.Route),
},
Stream: New(localPeer, remotePeer, rw, handlers),
read: contextRW{
request: make(chan contextRWRequest),
},
Expand All @@ -54,15 +47,15 @@ func NewWithContext(ctx context.Context, localPeer, remotePeer network.Peer, rw
close(ret.write.request)
})

ret.run(ret.Ctx())
ret.run()
return ret
}

func (c *StreamContext) run(ctx context.Context) {
go c.read.Run(ctx, c.rw.Read)
go c.write.Run(ctx, c.rw.Write)
func (c *StreamContext) run() {
go c.read.Run(c.Ctx(), c.rw.Read)
go c.write.Run(c.Ctx(), c.rw.Write)
go func() {
<-ctx.Done()
<-c.Ctx().Done()
c.Close()
}()
}
Expand Down Expand Up @@ -142,15 +135,20 @@ type contextRW struct {
func (c *contextRW) Run(ctx context.Context, f func([]byte) (int, error)) {
defer func() {
if err := recover(); err != nil {
slog.Error("contextRW.Run Panic TypeOf", err, reflect.TypeOf(err))
slog.Error("contextRW.Run Panic TypeOf", "err", err, "type", reflect.TypeOf(err))
panic(err)
}
}()

for {
select {
case <-ctx.Done():
return
case r := <-c.request:
case r, ok := <-c.request:
if !ok {
continue
}

n, err := f(r.data)
select {
case <-ctx.Done():
Expand All @@ -168,7 +166,7 @@ func (c *contextRW) Request(ctx context.Context, buf []byte) (int, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
case c.request <- contextRWRequest{data: buf, resultCh: ch}:
case c.request <- contextRWRequest{data: buf, resultCh: ch, ctx: ctx}:
select {
case <-ctx.Done():
return 0, ctx.Err()
Expand Down
8 changes: 8 additions & 0 deletions network/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestStreamContext_Read(t *testing.T) {

_, _, err = streamContext.Read(ctx)
assert.Equal(t, io.EOF, err)

time.Sleep(time.Millisecond)

rw.AssertExpectations(t)
}

func TestStreamContext_Write(t *testing.T) {
Expand All @@ -86,6 +90,10 @@ func TestStreamContext_Write(t *testing.T) {

err = streamContext.Write(ctx, route, &testtypes.Foo{})
assert.NoError(t, err)

time.Sleep(time.Millisecond)

rw.AssertExpectations(t)
}

func TestStream_CreateClose(t *testing.T) {
Expand Down

0 comments on commit d4b9512

Please sign in to comment.