-
Notifications
You must be signed in to change notification settings - Fork 6
/
stream_test.go
103 lines (76 loc) · 2.43 KB
/
stream_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// SPDX-FileCopyrightText: 2021 Henry Bubert
//
// SPDX-License-Identifier: MIT
//go:build ignore
// +build ignore
package muxrpc
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"testing"
"github.com/pkg/errors"
"github.com/ssbc/go-luigi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ssbc/go-muxrpc/v2/codec"
)
func TestStreamDuplex(t *testing.T) {
const req = 23
r := require.New(t)
a := assert.New(t)
iSrc, iSink := luigi.NewPipe(luigi.WithBuffer(2))
var oBuff = &bytes.Buffer{}
var oWriter = codec.NewWriter(oBuff)
str := newStream(iSrc, oWriter, req, streamCapMultiple, streamCapMultiple)
ctx := context.Background()
err := iSink.Pour(ctx, &codec.Packet{Req: req, Flag: codec.FlagStream | codec.FlagString, Body: []byte("test msg")})
r.NoError(err, "error pouring packet to iSink")
err = iSink.Close()
r.NoError(err, "error closing iSink")
x, err := str.Next(ctx)
r.NoError(err, "error reading string from stream")
s, ok := x.(string)
r.True(ok, "read value is not a string")
r.Equal(s, "test msg", "wrong value in string")
_, err = str.Next(ctx)
r.Equal(luigi.EOS{}, errors.Cause(err), "expected end of stream error")
err = str.Pour(ctx, "foo")
r.NoError(err)
err = str.Pour(ctx, "bar")
r.NoError(err)
err = str.Pour(ctx, "baz")
r.NoError(err)
err = str.Close()
r.NoError(err)
err = str.Pour(ctx, "bar")
a.Equal(errSinkClosed, errors.Cause(err), "expected error pouring")
wantHex := "090000000300000017666f6f09000000030000001762617209000000030000001762617a0e000000040000001774727565"
wantBytes, err := hex.DecodeString(wantHex)
r.NoError(err)
got := oBuff.Bytes()
r.Equal(wantBytes, got)
gotReader := codec.NewReader(bytes.NewReader(got))
pkts, err := codec.ReadAllPackets(gotReader)
r.NoError(err)
r.Len(pkts, 4)
r.Equal("foo", string(pkts[0].Body))
r.Equal("bar", string(pkts[1].Body))
r.Equal("baz", string(pkts[2].Body))
r.Equal(codec.FlagEndErr|codec.FlagStream|codec.FlagJSON, pkts[3].Flag)
}
func TestStreamAsyncErr(t *testing.T) {
const req = 23
r := require.New(t)
ctx := context.Background()
iSrc, iSink := luigi.NewPipe(luigi.WithBuffer(2))
var oBuff = &bytes.Buffer{}
var oWriter = codec.NewWriter(oBuff)
str := newStream(iSrc, oWriter, req, streamCapOnce, streamCapNone)
err := iSink.(luigi.ErrorCloser).CloseWithError(fmt.Errorf("some error"))
r.NoError(err)
v, err := str.Next(ctx)
r.EqualError(errors.Cause(err), "some error")
r.Nil(v)
}