From 40e9bbfcf5c109455777a9a5b262f37c48da5641 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 21 Feb 2024 14:31:08 +0800 Subject: [PATCH 1/4] Add tcpproxy. --- trunk/3rdparty/srs-bench/tcpproxy/main.go | 136 ++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 trunk/3rdparty/srs-bench/tcpproxy/main.go diff --git a/trunk/3rdparty/srs-bench/tcpproxy/main.go b/trunk/3rdparty/srs-bench/tcpproxy/main.go new file mode 100644 index 0000000000..df63a66427 --- /dev/null +++ b/trunk/3rdparty/srs-bench/tcpproxy/main.go @@ -0,0 +1,136 @@ +package main + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net" + "sync" + "time" +) + +func main() { + if err := doMain(); err != nil { + panic(err) + } +} + +func doMain() error { + hashID := buildHashID() + + listener, err := net.Listen("tcp", ":1935") + if err != nil { + return err + } + trace(hashID, "Listen at %v", listener.Addr()) + + for { + client, err := listener.Accept() + if err != nil { + return err + } + + backend, err := net.Dial("tcp", "localhost:19350") + if err != nil { + return err + } + + go serve(client, backend) + } + return nil +} + +func serve(client, backend net.Conn) { + defer client.Close() + defer backend.Close() + hashID := buildHashID() + if err := doServe(hashID, client, backend); err != nil { + trace(hashID, "Serve error %v", err) + } +} + +func doServe(hashID string, client, backend net.Conn) error { + var wg sync.WaitGroup + var r0 error + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + for { + nn, err := io.CopyN(backend, client, 128*1024) + if err != nil { + trace(hashID, "Copy from client error %v", err) + r0 = err + return + } + if nn == 0 { + trace(hashID, "Copy from client EOF") + return + } + + trace(hashID, "Copy %v bytes to RTMP backend", nn) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + for { + nn, err := io.CopyN(client, backend, 128*1024) + if err != nil { + r0 = err + return + } + if nn == 0 { + trace(hashID, "Copy from backend EOF") + return + } + + trace(hashID, "Copy %v bytes to RTMP client", nn) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + defer client.Close() + defer backend.Close() + + <-ctx.Done() + trace(hashID, "Context is done, close the connections") + }() + + trace(hashID, "Start proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr()) + wg.Wait() + trace(hashID, "Finish proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr()) + + return r0 +} + +func trace(id, msg string, a ...interface{}) { + fmt.Println(fmt.Sprintf("[%v][%v] %v", + time.Now().Format("2006-01-02 15:04:05.000"), id, + fmt.Sprintf(msg, a...), + )) +} + +func buildHashID() string { + randomData := make([]byte, 16) + if _, err := rand.Read(randomData); err != nil { + return "" + } + + hash := sha256.Sum256(randomData) + return hex.EncodeToString(hash[:])[:6] +} From fad8faffbde5a827491766ce859fed813bff18fd Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 21 Feb 2024 15:05:51 +0800 Subject: [PATCH 2/4] Set TCP NODELAY. --- trunk/3rdparty/srs-bench/pcap/main.go | 2 +- trunk/3rdparty/srs-bench/tcpproxy/main.go | 35 +++++++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/trunk/3rdparty/srs-bench/pcap/main.go b/trunk/3rdparty/srs-bench/pcap/main.go index 9409578be4..8938dcb444 100644 --- a/trunk/3rdparty/srs-bench/pcap/main.go +++ b/trunk/3rdparty/srs-bench/pcap/main.go @@ -90,7 +90,7 @@ func doMain(ctx context.Context) error { if len(payload) == 0 { continue } - if tcp.DstPort != 1935 { + if tcp.DstPort != 1935 && tcp.DstPort != 19350 { continue } diff --git a/trunk/3rdparty/srs-bench/tcpproxy/main.go b/trunk/3rdparty/srs-bench/tcpproxy/main.go index df63a66427..09dcfbf642 100644 --- a/trunk/3rdparty/srs-bench/tcpproxy/main.go +++ b/trunk/3rdparty/srs-bench/tcpproxy/main.go @@ -6,7 +6,6 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "io" "net" "sync" "time" @@ -59,20 +58,35 @@ func doServe(hashID string, client, backend net.Conn) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + if c, ok := client.(*net.TCPConn); ok { + c.SetNoDelay(true) + } + if c, ok := backend.(*net.TCPConn); ok { + c.SetNoDelay(true) + } + wg.Add(1) go func() { defer wg.Done() defer cancel() for { - nn, err := io.CopyN(backend, client, 128*1024) + buf := make([]byte, 128*1024) + nn, err := client.Read(buf) if err != nil { - trace(hashID, "Copy from client error %v", err) + trace(hashID, "Read from client error %v", err) r0 = err return } if nn == 0 { - trace(hashID, "Copy from client EOF") + trace(hashID, "Read from client EOF") + return + } + + _, err = backend.Write(buf[:nn]) + if err != nil { + trace(hashID, "Write to RTMP backend error %v", err) + r0 = err return } @@ -86,13 +100,22 @@ func doServe(hashID string, client, backend net.Conn) error { defer cancel() for { - nn, err := io.CopyN(client, backend, 128*1024) + buf := make([]byte, 128*1024) + nn, err := backend.Read(buf) if err != nil { + trace(hashID, "Read from RTMP backend error %v", err) r0 = err return } if nn == 0 { - trace(hashID, "Copy from backend EOF") + trace(hashID, "Read from RTMP backend EOF") + return + } + + _, err = client.Write(buf[:nn]) + if err != nil { + trace(hashID, "Write to client error %v", err) + r0 = err return } From 0183b0086a30de026c91f2e42783b19d30b9d91e Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 21 Feb 2024 16:24:46 +0800 Subject: [PATCH 3/4] Support pcap. --- trunk/3rdparty/srs-bench/pcap/main.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/trunk/3rdparty/srs-bench/pcap/main.go b/trunk/3rdparty/srs-bench/pcap/main.go index 8938dcb444..2464e51acb 100644 --- a/trunk/3rdparty/srs-bench/pcap/main.go +++ b/trunk/3rdparty/srs-bench/pcap/main.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "os" + "strings" "time" "github.com/google/gopacket" @@ -62,9 +63,19 @@ func doMain(ctx context.Context) error { } defer f.Close() - r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions) - if err != nil { - return errors.Wrapf(err, "new reader") + var source *gopacket.PacketSource + if strings.HasSuffix(filename, ".pcap") { + r, err := pcapgo.NewReader(f) + if err != nil { + return errors.Wrapf(err, "new reader") + } + source = gopacket.NewPacketSource(r, r.LinkType()) + } else { + r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions) + if err != nil { + return errors.Wrapf(err, "new reader") + } + source = gopacket.NewPacketSource(r, r.LinkType()) } // TODO: FIXME: Should start a goroutine to consume bytes from conn. @@ -76,7 +87,6 @@ func doMain(ctx context.Context) error { var packetNumber uint64 var previousTime *time.Time - source := gopacket.NewPacketSource(r, r.LinkType()) for packet := range source.Packets() { packetNumber++ From 75b9a58a49a2f0e9d971e190202e941fe9b2c860 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 19 Mar 2024 21:09:52 +0800 Subject: [PATCH 4/4] Update release to v6.0.117 --- trunk/doc/CHANGELOG.md | 1 + trunk/src/core/srs_core_version6.hpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 373dcccc64..7607a48bf6 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-03-19, Merge [#3958](https://github.com/ossrs/srs/pull/3958): Add a TCP proxy for debugging. v6.0.117 (#3958) * v6.0, 2024-03-20, Merge [#3964](https://github.com/ossrs/srs/pull/3964): WebRTC: Add support for A/V only WHEP/WHEP player. v6.0.116 (#3964) * v6.0, 2024-03-19, Merge [#3990](https://github.com/ossrs/srs/pull/3990): System: Disable feature that obtains versions and check features status. v6.0.115 (#3990) * v6.0, 2024-03-18, Merge [#3973](https://github.com/ossrs/srs/pull/3973): Typo: Fix some typo for #3973 #3976 #3982. v6.0.114 (#3973) diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index 5cb79c0389..d79d5849d4 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 116 +#define VERSION_REVISION 117 #endif