From 35a24342e794999d16114689ba33e38fe467ff0d Mon Sep 17 00:00:00 2001 From: Harlan Date: Fri, 3 Nov 2023 18:03:13 +0800 Subject: [PATCH 1/8] fix error: the session should not be exited when encountering unknown msg type id --- Cargo.lock | 262 ++++++++------------ library/codec/h264/src/sps.rs | 6 +- protocol/rtmp/src/messages/parser.rs | 48 ++-- protocol/rtmp/src/session/client_session.rs | 9 +- protocol/rtmp/src/session/server_session.rs | 7 +- 5 files changed, 142 insertions(+), 190 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c5f9d12..fea28020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,9 +116,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -157,7 +157,7 @@ dependencies = [ "num-traits", "rusticata-macros", "thiserror", - "time 0.3.26", + "time 0.3.30", ] [[package]] @@ -173,7 +173,7 @@ dependencies = [ "num-traits", "rusticata-macros", "thiserror", - "time 0.3.26", + "time 0.3.30", ] [[package]] @@ -219,14 +219,14 @@ checksum = "79fa67157abdfd688a259b6648808757db9347af834624f27ec646da976aee5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.38", ] [[package]] name = "atomic-waker" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" @@ -552,9 +552,9 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" [[package]] name = "cpufeatures" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" dependencies = [ "libc", ] @@ -570,9 +570,9 @@ dependencies = [ [[package]] name = "crc-catalog" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +checksum = "4939f9ed1444bd8c896d37f3090012fa6e7834fe84ef8c9daa166109515732f9" [[package]] name = "cron" @@ -651,9 +651,9 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.0" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622178105f911d937a42cdb140730ba4a3ed2becd8ae6ce39c7d28b5d75d4588" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" dependencies = [ "cfg-if 1.0.0", "cpufeatures", @@ -667,13 +667,13 @@ dependencies = [ [[package]] name = "curve25519-dalek-derive" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.38", ] [[package]] @@ -802,9 +802,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +dependencies = [ + "powerfmt", +] [[package]] name = "derive_builder" @@ -865,7 +868,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.38", ] [[package]] @@ -938,12 +941,23 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.5" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", "libc", - "windows-sys 0.48.0", ] [[package]] @@ -998,9 +1012,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0870c84016d4b481be5c9f323c24f65e31e901ae618f0e80f4308fb00de1d2d" +checksum = "a481586acf778f1b1455424c343f71124b048ffa5f4fc3f8f6ae9dc432dcb3c7" [[package]] name = "fnv" @@ -1562,9 +1576,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.139" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "link-cplusplus" @@ -1608,10 +1622,11 @@ checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" [[package]] name = "md-5" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ + "cfg-if 1.0.0", "digest 0.10.7", ] @@ -1849,7 +1864,7 @@ checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" dependencies = [ "ecdsa", "elliptic-curve", - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -1860,7 +1875,7 @@ checksum = "dfc8c5bf642dde52bb9e87c0ecd8ca5a76faac2eeed98dedb7c717997e1080aa" dependencies = [ "ecdsa", "elliptic-curve", - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -1976,6 +1991,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "pprtmp" version = "0.1.0" @@ -1997,18 +2018,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.63" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.29" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -2098,7 +2119,7 @@ checksum = "ffbe84efe2f38dea12e9bfc1f65377fdf03e53a18cb3b995faedf7934c7e785b" dependencies = [ "pem", "ring", - "time 0.3.26", + "time 0.3.30", "x509-parser 0.14.0", "yasna", ] @@ -2313,9 +2334,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.16" +version = "0.36.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6da3636faa25820d8648e0e31c5d519bbb01f72fdf57131f0f5f7da5fed36eab" +checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644" dependencies = [ "bitflags", "errno", @@ -2432,28 +2453,28 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.18" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.147" +version = "1.0.190" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" +checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.147" +version = "1.0.190" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" +checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" dependencies = [ "proc-macro2", "quote", - "syn 1.0.103", + "syn 2.0.38", ] [[package]] @@ -2490,9 +2511,9 @@ dependencies = [ [[package]] name = "sha1" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if 1.0.0", "cpufeatures", @@ -2514,9 +2535,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if 1.0.0", "cpufeatures", @@ -2687,9 +2708,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.23" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -2738,22 +2759,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.38", ] [[package]] @@ -2769,12 +2790,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.26" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a79d09ac6b08c1ab3906a2f7cc2e81a0e27c7ae89c63812df75e52bef0751e07" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -2782,15 +2804,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.12" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75c65469ed6b3a4809d987a41eb1dc918e9bc1d92211cbad7ae82931846f7451" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -3080,9 +3102,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ "getrandom 0.2.8", ] @@ -3246,11 +3268,11 @@ dependencies = [ "sdp", "serde", "serde_json", - "sha2 0.10.7", + "sha2 0.10.8", "smol_str", "stun", "thiserror", - "time 0.3.26", + "time 0.3.30", "tokio", "turn", "url", @@ -3309,7 +3331,7 @@ dependencies = [ "sec1", "serde", "sha1", - "sha2 0.10.7", + "sha2 0.10.8", "signature", "subtle", "thiserror", @@ -3338,7 +3360,7 @@ dependencies = [ "tokio", "turn", "url", - "uuid 1.4.1", + "uuid 1.5.0", "waitgroup", "webrtc-mdns", "webrtc-util", @@ -3468,13 +3490,13 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ - "windows_aarch64_gnullvm 0.42.1", - "windows_aarch64_msvc 0.42.1", - "windows_i686_gnu 0.42.1", - "windows_i686_msvc 0.42.1", - "windows_x86_64_gnu 0.42.1", - "windows_x86_64_gnullvm 0.42.1", - "windows_x86_64_msvc 0.42.1", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] [[package]] @@ -3483,16 +3505,7 @@ version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" dependencies = [ - "windows-targets 0.42.1", -] - -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", + "windows-targets", ] [[package]] @@ -3501,28 +3514,13 @@ version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" dependencies = [ - "windows_aarch64_gnullvm 0.42.1", - "windows_aarch64_msvc 0.42.1", - "windows_i686_gnu 0.42.1", - "windows_i686_msvc 0.42.1", - "windows_x86_64_gnu 0.42.1", - "windows_x86_64_gnullvm 0.42.1", - "windows_x86_64_msvc 0.42.1", -] - -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] [[package]] @@ -3531,84 +3529,42 @@ version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_msvc" version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_i686_gnu" version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_msvc" version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_x86_64_gnu" version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnullvm" version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_msvc" version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "winreg" version = "0.10.1" @@ -3624,7 +3580,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb66477291e7e8d2b0ff1bcb900bf29489a9692816d79874bea351e7a8b6de96" dependencies = [ - "curve25519-dalek 4.1.0", + "curve25519-dalek 4.1.1", "rand_core 0.6.4", "serde", "zeroize", @@ -3645,7 +3601,7 @@ dependencies = [ "oid-registry 0.4.0", "rusticata-macros", "thiserror", - "time 0.3.26", + "time 0.3.30", ] [[package]] @@ -3664,7 +3620,7 @@ dependencies = [ "ring", "rusticata-macros", "thiserror", - "time 0.3.26", + "time 0.3.30", ] [[package]] @@ -3774,7 +3730,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" dependencies = [ - "time 0.3.26", + "time 0.3.30", ] [[package]] @@ -3794,5 +3750,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.38", ] diff --git a/library/codec/h264/src/sps.rs b/library/codec/h264/src/sps.rs index 831a3c1f..17ec787c 100644 --- a/library/codec/h264/src/sps.rs +++ b/library/codec/h264/src/sps.rs @@ -101,8 +101,10 @@ impl SpsParser { 12 }; - for i in 0..matrix_dim { - self.sps.seq_scaling_list_present_flag.push(self.bits_reader.read_bit()?); + for _ in 0..matrix_dim { + self.sps + .seq_scaling_list_present_flag + .push(self.bits_reader.read_bit()?); } } } diff --git a/protocol/rtmp/src/messages/parser.rs b/protocol/rtmp/src/messages/parser.rs index dddea625..4fd5f53a 100644 --- a/protocol/rtmp/src/messages/parser.rs +++ b/protocol/rtmp/src/messages/parser.rs @@ -1,7 +1,7 @@ use { super::{ define::{msg_type_id, RtmpMessageData}, - errors::{MessageError, MessageErrorValue}, + errors::MessageError, }, crate::{ amf0::{amf0_markers, amf0_reader::Amf0Reader}, @@ -21,7 +21,7 @@ impl MessageParser { pub fn new(chunk_info: ChunkInfo) -> Self { Self { chunk_info } } - pub fn parse(self) -> Result { + pub fn parse(self) -> Result, MessageError> { let mut reader = BytesReader::new(self.chunk_info.payload); match self.chunk_info.message_header.msg_type_id { @@ -50,12 +50,12 @@ impl MessageParser { let others = amf_reader.read_all()?; - return Ok(RtmpMessageData::Amf0Command { + return Ok(Some(RtmpMessageData::Amf0Command { command_name, transaction_id, command_object: command_obj, others, - }); + })); } msg_type_id::AUDIO => { @@ -64,18 +64,18 @@ impl MessageParser { self.chunk_info.message_header.msg_length ); - return Ok(RtmpMessageData::AudioData { + return Ok(Some(RtmpMessageData::AudioData { data: reader.extract_remaining_bytes(), - }); + })); } msg_type_id::VIDEO => { log::trace!( "receive video msg , msg length is{}\n", self.chunk_info.message_header.msg_length ); - return Ok(RtmpMessageData::VideoData { + return Ok(Some(RtmpMessageData::VideoData { data: reader.extract_remaining_bytes(), - }); + })); } msg_type_id::USER_CONTROL_EVENT => { log::trace!( @@ -83,60 +83,50 @@ impl MessageParser { self.chunk_info.message_header.msg_length ); let data = EventMessagesReader::new(reader).parse_event()?; - return Ok(data); + return Ok(Some(data)); } msg_type_id::SET_CHUNK_SIZE => { let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?; - return Ok(RtmpMessageData::SetChunkSize { chunk_size }); + return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size })); } msg_type_id::ABORT => { let chunk_stream_id = ProtocolControlMessageReader::new(reader).read_abort_message()?; - return Ok(RtmpMessageData::AbortMessage { chunk_stream_id }); + return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id })); } msg_type_id::ACKNOWLEDGEMENT => { let sequence_number = ProtocolControlMessageReader::new(reader).read_acknowledgement()?; - return Ok(RtmpMessageData::Acknowledgement { sequence_number }); + return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number })); } msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => { let size = ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?; - return Ok(RtmpMessageData::WindowAcknowledgementSize { size }); + return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size })); } msg_type_id::SET_PEER_BANDWIDTH => { let properties = ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?; - return Ok(RtmpMessageData::SetPeerBandwidth { properties }); + return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties })); } msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => { //let values = Amf0Reader::new(reader).read_all()?; - return Ok(RtmpMessageData::AmfData { + return Ok(Some(RtmpMessageData::AmfData { raw_data: reader.extract_remaining_bytes(), - }); + })); } msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {} msg_type_id::AGGREGATE => {} - _ => { - log::error!( - "the msg_type_id is not supported: {}", - self.chunk_info.message_header.msg_type_id - ); - return Err(MessageError { - value: MessageErrorValue::UnknowMessageType, - }); - } + _ => {} } - log::error!( + log::warn!( "the msg_type_id is not processed: {}", self.chunk_info.message_header.msg_type_id ); - Err(MessageError { - value: MessageErrorValue::UnknowMessageType, - }) + Ok(None) } } diff --git a/protocol/rtmp/src/session/client_session.rs b/protocol/rtmp/src/session/client_session.rs index 62a07871..3ef1bcca 100644 --- a/protocol/rtmp/src/session/client_session.rs +++ b/protocol/rtmp/src/session/client_session.rs @@ -189,9 +189,12 @@ impl ClientSession { Ok(rv) => { if let UnpackResult::Chunks(chunks) = rv { for chunk_info in chunks.iter() { - let mut msg = MessageParser::new(chunk_info.clone()).parse()?; - let timestamp = chunk_info.message_header.timestamp; - self.process_messages(&mut msg, ×tamp).await?; + if let Some(mut msg) = + MessageParser::new(chunk_info.clone()).parse()? + { + let timestamp = chunk_info.message_header.timestamp; + self.process_messages(&mut msg, ×tamp).await?; + } } } } diff --git a/protocol/rtmp/src/session/server_session.rs b/protocol/rtmp/src/session/server_session.rs index d4e76c54..631664f9 100644 --- a/protocol/rtmp/src/session/server_session.rs +++ b/protocol/rtmp/src/session/server_session.rs @@ -188,9 +188,10 @@ impl ServerSession { let timestamp = chunk_info.message_header.timestamp; let msg_stream_id = chunk_info.message_header.msg_streamd_id; - let mut msg = MessageParser::new(chunk_info).parse()?; - self.process_messages(&mut msg, &msg_stream_id, ×tamp) - .await?; + if let Some(mut msg) = MessageParser::new(chunk_info).parse()? { + self.process_messages(&mut msg, &msg_stream_id, ×tamp) + .await?; + } } } } else { From f38d38ad6c6efc39eefbb83c9f6462e538da2a9b Mon Sep 17 00:00:00 2001 From: Harlan Date: Sat, 4 Nov 2023 09:05:58 +0800 Subject: [PATCH 2/8] remove no used "\n" which can add blank lines when print logs --- application/xiu/src/config/errors.rs | 2 +- application/xiu/src/service.rs | 18 ++++---- library/bytesio/src/bits_errors.rs | 10 ++--- library/bytesio/src/bytes_errors.rs | 10 ++--- library/bytesio/src/bytesio_errors.rs | 4 +- library/codec/h264/src/errors.rs | 2 +- library/container/flv/src/errors.rs | 30 ++++++------- library/container/mpegts/src/errors.rs | 14 +++--- library/streamhub/src/errors.rs | 20 ++++----- library/streamhub/src/lib.rs | 8 ++-- protocol/hls/src/errors.rs | 24 +++++----- protocol/hls/src/flv_data_receiver.rs | 2 +- protocol/httpflv/src/httpflv.rs | 2 +- protocol/httpflv/src/server.rs | 2 +- protocol/rtmp/src/amf0/errors.rs | 10 ++--- protocol/rtmp/src/cache/errors.rs | 18 ++++---- protocol/rtmp/src/chunk/errors.rs | 4 +- protocol/rtmp/src/handshake/errors.rs | 22 +++++----- protocol/rtmp/src/messages/errors.rs | 8 ++-- protocol/rtmp/src/netconnection/errors.rs | 6 +-- protocol/rtmp/src/netstream/errors.rs | 4 +- .../src/protocol_control_messages/errors.rs | 4 +- protocol/rtmp/src/relay/errors.rs | 6 +-- protocol/rtmp/src/remuxer/errors.rs | 18 ++++---- protocol/rtmp/src/remuxer/mod.rs | 2 +- protocol/rtmp/src/remuxer/rtsp2rtmp.rs | 2 +- protocol/rtmp/src/session/common.rs | 4 +- protocol/rtmp/src/session/errors.rs | 44 +++++++++---------- .../rtmp/src/user_control_messages/errors.rs | 6 +-- protocol/rtmp/src/utils/errors.rs | 2 +- protocol/rtsp/src/rtp/errors.rs | 8 ++-- protocol/rtsp/src/rtp/rtcp/errors.rs | 4 +- protocol/rtsp/src/session/errors.rs | 16 +++---- protocol/rtsp/src/session/mod.rs | 2 +- protocol/webrtc/src/errors.rs | 6 +-- protocol/webrtc/src/session/errors.rs | 22 +++++----- 36 files changed, 183 insertions(+), 183 deletions(-) diff --git a/application/xiu/src/config/errors.rs b/application/xiu/src/config/errors.rs index 3481ff44..baa6768a 100644 --- a/application/xiu/src/config/errors.rs +++ b/application/xiu/src/config/errors.rs @@ -9,7 +9,7 @@ pub struct ConfigError { #[derive(Debug, Fail)] pub enum ConfigErrorValue { - #[fail(display = "IO error: {}\n", _0)] + #[fail(display = "IO error: {}", _0)] IOError(Error), } diff --git a/application/xiu/src/service.rs b/application/xiu/src/service.rs index 74fb6e3b..298d7238 100644 --- a/application/xiu/src/service.rs +++ b/application/xiu/src/service.rs @@ -111,7 +111,7 @@ impl Service { ); tokio::spawn(async move { if let Err(err) = push_client.run().await { - log::error!("push client error {}\n", err); + log::error!("push client error {}", err); } }); @@ -135,7 +135,7 @@ impl Service { tokio::spawn(async move { if let Err(err) = pull_client.run().await { - log::error!("pull client error {}\n", err); + log::error!("pull client error {}", err); } }); @@ -149,7 +149,7 @@ impl Service { let mut rtmp_server = RtmpServer::new(address, producer, gop_num); tokio::spawn(async move { if let Err(err) = rtmp_server.run().await { - log::error!("rtmp server error: {}\n", err); + log::error!("rtmp server error: {}", err); } }); } @@ -186,7 +186,7 @@ impl Service { tokio::spawn(async move { if let Err(err) = remuxer.run().await { - log::error!("rtmp remuxer server error: {}\n", err); + log::error!("rtmp remuxer server error: {}", err); } }); Ok(()) @@ -208,7 +208,7 @@ impl Service { let mut rtsp_server = RtspServer::new(address, producer); tokio::spawn(async move { if let Err(err) = rtsp_server.run().await { - log::error!("rtsp server error: {}\n", err); + log::error!("rtsp server error: {}", err); } }); } @@ -232,7 +232,7 @@ impl Service { let mut webrtc_server = WebRTCServer::new(address, producer); tokio::spawn(async move { if let Err(err) = webrtc_server.run().await { - log::error!("webrtc server error: {}\n", err); + log::error!("webrtc server error: {}", err); } }); } @@ -252,7 +252,7 @@ impl Service { tokio::spawn(async move { if let Err(err) = httpflv_server::run(event_producer, port).await { - log::error!("httpflv server error: {}\n", err); + log::error!("httpflv server error: {}", err); } }); } @@ -278,7 +278,7 @@ impl Service { tokio::spawn(async move { if let Err(err) = hls_remuxer.run().await { - log::error!("rtmp event processor error: {}\n", err); + log::error!("rtmp event processor error: {}", err); } }); @@ -286,7 +286,7 @@ impl Service { tokio::spawn(async move { if let Err(err) = hls_server::run(port).await { - log::error!("hls server error: {}\n", err); + log::error!("hls server error: {}", err); } }); stream_hub.set_hls_enabled(true); diff --git a/library/bytesio/src/bits_errors.rs b/library/bytesio/src/bits_errors.rs index d457c73e..dbab8046 100644 --- a/library/bytesio/src/bits_errors.rs +++ b/library/bytesio/src/bits_errors.rs @@ -5,15 +5,15 @@ use std::fmt; #[derive(Debug, Fail)] pub enum BitErrorValue { - #[fail(display = "bytes read error\n")] + #[fail(display = "bytes read error")] BytesReadError(BytesReadError), - #[fail(display = "bytes write error\n")] + #[fail(display = "bytes write error")] BytesWriteError(BytesWriteError), - #[fail(display = "the size is bigger than 64\n")] + #[fail(display = "the size is bigger than 64")] TooBig, - #[fail(display = "cannot write the whole 8 bits\n")] + #[fail(display = "cannot write the whole 8 bits")] CannotWrite8Bit, - #[fail(display = "cannot read byte\n")] + #[fail(display = "cannot read byte")] CannotReadByte, } #[derive(Debug)] diff --git a/library/bytesio/src/bytes_errors.rs b/library/bytesio/src/bytes_errors.rs index b8336917..e1c32e12 100644 --- a/library/bytesio/src/bytes_errors.rs +++ b/library/bytesio/src/bytes_errors.rs @@ -11,13 +11,13 @@ pub enum BytesReadErrorValue { NotEnoughBytes, #[fail(display = "empty stream")] EmptyStream, - #[fail(display = "io error: {}\n", _0)] + #[fail(display = "io error: {}", _0)] IO(#[cause] io::Error), #[fail(display = "index out of range")] IndexOutofRange, - #[fail(display = "bytesio read error: {}\n", _0)] + #[fail(display = "bytesio read error: {}", _0)] BytesIOError(BytesIOError), - // #[fail(display = "elapsed: {}\n", _0)] + // #[fail(display = "elapsed: {}", _0)] // TimeoutError(#[cause] Elapsed), } @@ -63,9 +63,9 @@ pub struct BytesWriteError { #[derive(Debug, Fail)] pub enum BytesWriteErrorValue { - #[fail(display = "io error\n")] + #[fail(display = "io error")] IO(io::Error), - #[fail(display = "not enough bytes to write: {}\n", _0)] + #[fail(display = "bytes io error: {}", _0)] BytesIOError(BytesIOError), #[fail(display = "write time out")] Timeout, diff --git a/library/bytesio/src/bytesio_errors.rs b/library/bytesio/src/bytesio_errors.rs index 27c15ccd..73fe415d 100644 --- a/library/bytesio/src/bytesio_errors.rs +++ b/library/bytesio/src/bytesio_errors.rs @@ -9,9 +9,9 @@ pub enum BytesIOErrorValue { NotEnoughBytes, #[fail(display = "empty stream")] EmptyStream, - #[fail(display = "io error\n")] + #[fail(display = "io error")] IOError(io::Error), - #[fail(display = "time out error\n")] + #[fail(display = "time out error")] TimeoutError(tokio::time::error::Elapsed), #[fail(display = "none return")] NoneReturn, diff --git a/library/codec/h264/src/errors.rs b/library/codec/h264/src/errors.rs index 180867cf..db35b19d 100644 --- a/library/codec/h264/src/errors.rs +++ b/library/codec/h264/src/errors.rs @@ -4,7 +4,7 @@ use std::fmt; #[derive(Debug, Fail)] pub enum H264ErrorValue { - #[fail(display = "bit error\n")] + #[fail(display = "bit error")] BitError(BitError), } #[derive(Debug)] diff --git a/library/container/flv/src/errors.rs b/library/container/flv/src/errors.rs index c905fd7c..1ac6d1b5 100644 --- a/library/container/flv/src/errors.rs +++ b/library/container/flv/src/errors.rs @@ -8,11 +8,11 @@ use { #[derive(Debug, Fail)] pub enum TagParseErrorValue { - #[fail(display = "bytes read error\n")] + #[fail(display = "bytes read error")] BytesReadError(BytesReadError), - #[fail(display = "tag data length error\n")] + #[fail(display = "tag data length error")] TagDataLength, - #[fail(display = "unknow tag type error\n")] + #[fail(display = "unknow tag type error")] UnknownTagType, } #[derive(Debug)] @@ -89,13 +89,13 @@ pub struct FlvDemuxerError { pub enum DemuxerErrorValue { // #[fail(display = "server error")] // Error, - #[fail(display = "bytes write error:{}\n", _0)] + #[fail(display = "bytes write error:{}", _0)] BytesWriteError(#[cause] BytesWriteError), - #[fail(display = "bytes read error:{}\n", _0)] + #[fail(display = "bytes read error:{}", _0)] BytesReadError(#[cause] BytesReadError), - #[fail(display = "mpeg avc error:{}\n", _0)] + #[fail(display = "mpeg avc error:{}", _0)] MpegAvcError(#[cause] Mpeg4AvcHevcError), - #[fail(display = "mpeg aac error:{}\n", _0)] + #[fail(display = "mpeg aac error:{}", _0)] MpegAacError(#[cause] MpegAacError), } @@ -149,19 +149,19 @@ impl Fail for FlvDemuxerError { #[derive(Debug, Fail)] pub enum MpegErrorValue { - #[fail(display = "bytes read error:{}\n", _0)] + #[fail(display = "bytes read error:{}", _0)] BytesReadError(#[cause] BytesReadError), - #[fail(display = "bytes write error:{}\n", _0)] + #[fail(display = "bytes write error:{}", _0)] BytesWriteError(#[cause] BytesWriteError), - #[fail(display = "bits error:{}\n", _0)] + #[fail(display = "bits error:{}", _0)] BitError(#[cause] BitError), - #[fail(display = "h264 error:{}\n", _0)] + #[fail(display = "h264 error:{}", _0)] H264Error(#[cause] H264Error), - #[fail(display = "there is not enough bits to read\n")] + #[fail(display = "there is not enough bits to read")] NotEnoughBitsToRead, - #[fail(display = "should not come here\n")] + #[fail(display = "should not come here")] ShouldNotComeHere, - #[fail(display = "the sps nal unit type is not correct\n")] + #[fail(display = "the sps nal unit type is not correct")] SPSNalunitTypeNotCorrect, } #[derive(Debug)] @@ -256,7 +256,7 @@ impl Fail for MpegAacError { #[derive(Debug, Fail)] pub enum BitVecErrorValue { - #[fail(display = "not enough bits left\n")] + #[fail(display = "not enough bits left")] NotEnoughBits, } #[derive(Debug)] diff --git a/library/container/mpegts/src/errors.rs b/library/container/mpegts/src/errors.rs index 0f458e71..6602b8a4 100644 --- a/library/container/mpegts/src/errors.rs +++ b/library/container/mpegts/src/errors.rs @@ -7,25 +7,25 @@ use { #[derive(Debug, Fail)] pub enum MpegTsErrorValue { - #[fail(display = "bytes read error\n")] + #[fail(display = "bytes read error")] BytesReadError(BytesReadError), - #[fail(display = "bytes write error\n")] + #[fail(display = "bytes write error")] BytesWriteError(BytesWriteError), - #[fail(display = "io error\n")] + #[fail(display = "io error")] IOError(Error), - #[fail(display = "program number exists\n")] + #[fail(display = "program number exists")] ProgramNumberExists, - #[fail(display = "pmt count execeed\n")] + #[fail(display = "pmt count execeed")] PmtCountExeceed, - #[fail(display = "stream count execeed\n")] + #[fail(display = "stream count execeed")] StreamCountExeceed, - #[fail(display = "stream not found\n")] + #[fail(display = "stream not found")] StreamNotFound, } #[derive(Debug)] diff --git a/library/streamhub/src/errors.rs b/library/streamhub/src/errors.rs index 43e293b7..e4a0837b 100644 --- a/library/streamhub/src/errors.rs +++ b/library/streamhub/src/errors.rs @@ -4,25 +4,25 @@ use bytesio::bytes_errors::BytesWriteError; use {failure::Fail, std::fmt}; #[derive(Debug, Fail)] pub enum ChannelErrorValue { - #[fail(display = "no app name\n")] + #[fail(display = "no app name")] NoAppName, - #[fail(display = "no stream name\n")] + #[fail(display = "no stream name")] NoStreamName, - #[fail(display = "no app or stream name\n")] + #[fail(display = "no app or stream name")] NoAppOrStreamName, - #[fail(display = "exists\n")] + #[fail(display = "exists")] Exists, - #[fail(display = "send error\n")] + #[fail(display = "send error")] SendError, - #[fail(display = "send video error\n")] + #[fail(display = "send video error")] SendVideoError, - #[fail(display = "send audio error\n")] + #[fail(display = "send audio error")] SendAudioError, - #[fail(display = "bytes read error\n")] + #[fail(display = "bytes read error")] BytesReadError(BytesReadError), - #[fail(display = "bytes write error\n")] + #[fail(display = "bytes write error")] BytesWriteError(BytesWriteError), - #[fail(display = "not correct data sender type\n")] + #[fail(display = "not correct data sender type")] NotCorrectDataSenderType, } #[derive(Debug)] diff --git a/library/streamhub/src/lib.rs b/library/streamhub/src/lib.rs index c4a1d0a4..2a8a45dc 100644 --- a/library/streamhub/src/lib.rs +++ b/library/streamhub/src/lib.rs @@ -353,7 +353,7 @@ impl StreamsHub { .insert(info.id, PubSubInfo::Publish { identifier }); } Err(err) => { - log::error!("event_loop Publish err: {}\n", err); + log::error!("event_loop Publish err: {}", err); continue; } } @@ -365,7 +365,7 @@ impl StreamsHub { } => { if let Err(err) = self.unpublish(&identifier) { log::error!( - "event_loop Unpublish err: {} with identifier: {} \n", + "event_loop Unpublish err: {} with identifier: {}", err, identifier ); @@ -489,7 +489,7 @@ impl StreamsHub { PubSubInfo::Publish { identifier } => { if let Err(err) = self.unpublish(&identifier) { log::error!( - "event_loop ApiKickClient pub err: {} with identifier: {} \n", + "event_loop ApiKickClient pub err: {} with identifier: {}", err, identifier ); @@ -501,7 +501,7 @@ impl StreamsHub { } => { if let Err(err) = self.unsubscribe(&identifier, sub_info) { log::error!( - "event_loop ApiKickClient pub err: {} with identifier: {}\n", + "event_loop ApiKickClient pub err: {} with identifier: {}", err, identifier ); diff --git a/protocol/hls/src/errors.rs b/protocol/hls/src/errors.rs index 37985e72..b90563e6 100644 --- a/protocol/hls/src/errors.rs +++ b/protocol/hls/src/errors.rs @@ -28,17 +28,17 @@ pub struct MediaError { pub enum MediaErrorValue { #[fail(display = "server error")] Error, - #[fail(display = "session error:{}\n", _0)] + #[fail(display = "session error:{}", _0)] SessionError(#[cause] SessionError), - #[fail(display = "amf write error:{}\n", _0)] + #[fail(display = "amf write error:{}", _0)] Amf0WriteError(#[cause] Amf0WriteError), - #[fail(display = "metadata error:{}\n", _0)] + #[fail(display = "metadata error:{}", _0)] MetadataError(#[cause] MetadataError), - #[fail(display = "flv demuxer error:{}\n", _0)] + #[fail(display = "flv demuxer error:{}", _0)] FlvDemuxerError(#[cause] FlvDemuxerError), - #[fail(display = "mpegts error:{}\n", _0)] + #[fail(display = "mpegts error:{}", _0)] MpegTsError(#[cause] MpegTsError), - #[fail(display = "write file error:{}\n", _0)] + #[fail(display = "write file error:{}", _0)] IOError(#[cause] std::io::Error), } @@ -114,17 +114,17 @@ pub struct HlsError { pub enum HlsErrorValue { #[fail(display = "hls error")] Error, - #[fail(display = "session error:{}\n", _0)] + #[fail(display = "session error:{}", _0)] SessionError(#[cause] SessionError), - #[fail(display = "amf write error:{}\n", _0)] + #[fail(display = "amf write error:{}", _0)] Amf0WriteError(#[cause] Amf0WriteError), - #[fail(display = "metadata error:{}\n", _0)] + #[fail(display = "metadata error:{}", _0)] MetadataError(#[cause] MetadataError), - #[fail(display = "flv demuxer error:{}\n", _0)] + #[fail(display = "flv demuxer error:{}", _0)] FlvDemuxerError(#[cause] FlvDemuxerError), - #[fail(display = "media error:{}\n", _0)] + #[fail(display = "media error:{}", _0)] MediaError(#[cause] MediaError), - #[fail(display = "receive error:{}\n", _0)] + #[fail(display = "receive error:{}", _0)] RecvError(#[cause] RecvError), } impl From for HlsError { diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index c658abe2..c2038019 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -152,7 +152,7 @@ impl FlvDataReceiver { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}\n", err); + log::error!("unsubscribe_from_channels err {}", err); } Ok(()) diff --git a/protocol/httpflv/src/httpflv.rs b/protocol/httpflv/src/httpflv.rs index de07d0f7..13133776 100644 --- a/protocol/httpflv/src/httpflv.rs +++ b/protocol/httpflv/src/httpflv.rs @@ -149,7 +149,7 @@ impl HttpFlv { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}\n", err); + log::error!("unsubscribe_from_channels err {}", err); } Ok(()) diff --git a/protocol/httpflv/src/server.rs b/protocol/httpflv/src/server.rs index d31a1fe7..3cb66da7 100644 --- a/protocol/httpflv/src/server.rs +++ b/protocol/httpflv/src/server.rs @@ -42,7 +42,7 @@ async fn handle_connection( tokio::spawn(async move { if let Err(err) = flv_hanlder.run().await { - log::error!("flv handler run error {}\n", err); + log::error!("flv handler run error {}", err); } }); diff --git a/protocol/rtmp/src/amf0/errors.rs b/protocol/rtmp/src/amf0/errors.rs index 06c74f93..6a4b0906 100644 --- a/protocol/rtmp/src/amf0/errors.rs +++ b/protocol/rtmp/src/amf0/errors.rs @@ -8,11 +8,11 @@ use { #[derive(Debug, Fail)] pub enum Amf0ReadErrorValue { - #[fail(display = "Encountered unknown marker: {}\n", marker)] + #[fail(display = "Encountered unknown marker: {}", marker)] UnknownMarker { marker: u8 }, - #[fail(display = "parser string error: {}\n", _0)] + #[fail(display = "parser string error: {}", _0)] StringParseError(#[cause] string::FromUtf8Error), - #[fail(display = "bytes read error :{}\n", _0)] + #[fail(display = "bytes read error :{}", _0)] BytesReadError(BytesReadError), #[fail(display = "wrong type")] WrongType, @@ -43,9 +43,9 @@ impl From for Amf0ReadError { pub enum Amf0WriteErrorValue { #[fail(display = "normal string too long")] NormalStringTooLong, - #[fail(display = "io error\n")] + #[fail(display = "io error")] BufferWriteError(io::Error), - #[fail(display = "bytes write error\n")] + #[fail(display = "bytes write error")] BytesWriteError(BytesWriteError), } diff --git a/protocol/rtmp/src/cache/errors.rs b/protocol/rtmp/src/cache/errors.rs index 40be5ca9..fa046418 100644 --- a/protocol/rtmp/src/cache/errors.rs +++ b/protocol/rtmp/src/cache/errors.rs @@ -9,17 +9,17 @@ use { #[derive(Debug, Fail)] pub enum CacheErrorValue { - #[fail(display = "cache tag parse error\n")] + #[fail(display = "cache tag parse error")] DemuxerError(FlvDemuxerError), - #[fail(display = "mpeg aac error\n")] + #[fail(display = "mpeg aac error")] MpegAacError(MpegAacError), - #[fail(display = "mpeg avc error\n")] + #[fail(display = "mpeg avc error")] MpegAvcError(Mpeg4AvcHevcError), - #[fail(display = "pack error\n")] + #[fail(display = "pack error")] PackError(PackError), - #[fail(display = "read bytes error\n")] + #[fail(display = "read bytes error")] BytesReadError(BytesReadError), - #[fail(display = "h264 error\n")] + #[fail(display = "h264 error")] H264Error(H264Error), } @@ -93,11 +93,11 @@ impl Fail for CacheError { #[derive(Debug, Fail)] pub enum MetadataErrorValue { - #[fail(display = "metadata tag parse error\n")] + #[fail(display = "metadata tag parse error")] DemuxerError(FlvDemuxerError), - #[fail(display = "pack error\n")] + #[fail(display = "pack error")] PackError(PackError), - #[fail(display = "amf write error\n")] + #[fail(display = "amf write error")] Amf0WriteError(Amf0WriteError), } #[derive(Debug)] diff --git a/protocol/rtmp/src/chunk/errors.rs b/protocol/rtmp/src/chunk/errors.rs index e1ddb859..294a9e9c 100644 --- a/protocol/rtmp/src/chunk/errors.rs +++ b/protocol/rtmp/src/chunk/errors.rs @@ -6,7 +6,7 @@ use { #[derive(Debug, Fail)] pub enum UnpackErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), #[fail(display = "unknow read state")] UnknowReadState, @@ -40,7 +40,7 @@ pub enum PackErrorValue { NotExistHeader, #[fail(display = "unknow read state")] UnknowReadState, - #[fail(display = "bytes writer error: {}\n", _0)] + #[fail(display = "bytes writer error: {}", _0)] BytesWriteError(BytesWriteError), } diff --git a/protocol/rtmp/src/handshake/errors.rs b/protocol/rtmp/src/handshake/errors.rs index 92a43a68..dc53a269 100644 --- a/protocol/rtmp/src/handshake/errors.rs +++ b/protocol/rtmp/src/handshake/errors.rs @@ -6,19 +6,19 @@ use { #[derive(Debug, Fail)] pub enum HandshakeErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(BytesWriteError), - #[fail(display = "system time error: {}\n", _0)] + #[fail(display = "system time error: {}", _0)] SysTimeError(SystemTimeError), - #[fail(display = "digest error: {}\n", _0)] + #[fail(display = "digest error: {}", _0)] DigestError(DigestError), - #[fail(display = "Digest not found error\n")] + #[fail(display = "Digest not found error")] DigestNotFound, - #[fail(display = "s0 version not correct error\n")] + #[fail(display = "s0 version not correct error")] S0VersionNotCorrect, - #[fail(display = "io error\n")] + #[fail(display = "io error")] IOError(Error), } @@ -96,13 +96,13 @@ pub struct DigestError { #[derive(Debug, Fail)] pub enum DigestErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), - #[fail(display = "digest length not correct\n")] + #[fail(display = "digest length not correct")] DigestLengthNotCorrect, - #[fail(display = "cannot generate digest\n")] + #[fail(display = "cannot generate digest")] CannotGenerate, - #[fail(display = "unknow schema\n")] + #[fail(display = "unknow schema")] UnknowSchema, } diff --git a/protocol/rtmp/src/messages/errors.rs b/protocol/rtmp/src/messages/errors.rs index fdb60c4d..df865569 100644 --- a/protocol/rtmp/src/messages/errors.rs +++ b/protocol/rtmp/src/messages/errors.rs @@ -11,17 +11,17 @@ use { #[derive(Debug, Fail)] pub enum MessageErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), #[fail(display = "unknow read state")] UnknowReadState, - #[fail(display = "amf0 read error: {}\n", _0)] + #[fail(display = "amf0 read error: {}", _0)] Amf0ReadError(Amf0ReadError), #[fail(display = "unknown message type")] UnknowMessageType, - #[fail(display = "protocol control message read error: {}\n", _0)] + #[fail(display = "protocol control message read error: {}", _0)] ProtocolControlMessageReaderError(ProtocolControlMessageReaderError), - #[fail(display = "user control message read error: {}\n", _0)] + #[fail(display = "user control message read error: {}", _0)] EventMessagesError(EventMessagesError), } diff --git a/protocol/rtmp/src/netconnection/errors.rs b/protocol/rtmp/src/netconnection/errors.rs index 117d154b..4517fdd6 100644 --- a/protocol/rtmp/src/netconnection/errors.rs +++ b/protocol/rtmp/src/netconnection/errors.rs @@ -13,11 +13,11 @@ pub struct NetConnectionError { } #[derive(Debug, Fail)] pub enum NetConnectionErrorValue { - #[fail(display = "amf0 write error: {}\n", _0)] + #[fail(display = "amf0 write error: {}", _0)] Amf0WriteError(Amf0WriteError), - #[fail(display = "amf0 read error: {}\n", _0)] + #[fail(display = "amf0 read error: {}", _0)] Amf0ReadError(Amf0ReadError), - #[fail(display = "pack error\n")] + #[fail(display = "pack error")] PackError(PackError), } diff --git a/protocol/rtmp/src/netstream/errors.rs b/protocol/rtmp/src/netstream/errors.rs index 443a0916..e33630ef 100644 --- a/protocol/rtmp/src/netstream/errors.rs +++ b/protocol/rtmp/src/netstream/errors.rs @@ -11,11 +11,11 @@ pub struct NetStreamError { #[derive(Debug, Fail)] pub enum NetStreamErrorValue { - #[fail(display = "amf0 write error: {}\n", _0)] + #[fail(display = "amf0 write error: {}", _0)] Amf0WriteError(Amf0WriteError), #[fail(display = "invalid max chunk size")] InvalidMaxChunkSize { chunk_size: usize }, - #[fail(display = "pack error\n")] + #[fail(display = "pack error")] PackError(PackError), } diff --git a/protocol/rtmp/src/protocol_control_messages/errors.rs b/protocol/rtmp/src/protocol_control_messages/errors.rs index 4213c333..df4f207e 100644 --- a/protocol/rtmp/src/protocol_control_messages/errors.rs +++ b/protocol/rtmp/src/protocol_control_messages/errors.rs @@ -12,7 +12,7 @@ pub struct ControlMessagesError { #[derive(Debug, Fail)] pub enum ControlMessagesErrorValue { //Amf0WriteError(Amf0WriteError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(BytesWriteError), } @@ -47,7 +47,7 @@ pub struct ProtocolControlMessageReaderError { #[derive(Debug, Fail)] pub enum ProtocolControlMessageReaderErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), } diff --git a/protocol/rtmp/src/relay/errors.rs b/protocol/rtmp/src/relay/errors.rs index bae5f954..a3a44963 100644 --- a/protocol/rtmp/src/relay/errors.rs +++ b/protocol/rtmp/src/relay/errors.rs @@ -17,12 +17,12 @@ impl fmt::Display for ClientError { #[derive(Debug, Fail)] pub enum PushClientErrorValue { - #[fail(display = "receive error\n")] + #[fail(display = "receive error")] ReceiveError(RecvError), - #[fail(display = "send error\n")] + #[fail(display = "send error")] SendError, - #[fail(display = "io error\n")] + #[fail(display = "io error")] IOError(Error), } diff --git a/protocol/rtmp/src/remuxer/errors.rs b/protocol/rtmp/src/remuxer/errors.rs index 7fbe0679..1252d28e 100644 --- a/protocol/rtmp/src/remuxer/errors.rs +++ b/protocol/rtmp/src/remuxer/errors.rs @@ -18,23 +18,23 @@ pub struct RtmpRemuxerError { pub enum RtmpRemuxerErrorValue { #[fail(display = "hls error")] Error, - #[fail(display = "session error:{}\n", _0)] + #[fail(display = "session error:{}", _0)] SessionError(#[cause] SessionError), - #[fail(display = "amf write error:{}\n", _0)] + #[fail(display = "amf write error:{}", _0)] Amf0WriteError(#[cause] Amf0WriteError), - #[fail(display = "metadata error:{}\n", _0)] + #[fail(display = "metadata error:{}", _0)] MetadataError(#[cause] MetadataError), - #[fail(display = "receive error:{}\n", _0)] + #[fail(display = "receive error:{}", _0)] RecvError(#[cause] RecvError), - #[fail(display = "bytes read error:{}\n", _0)] + #[fail(display = "bytes read error:{}", _0)] BytesReadError(#[cause] BytesReadError), - #[fail(display = "bytes write error:{}\n", _0)] + #[fail(display = "bytes write error:{}", _0)] BytesWriteError(#[cause] BytesWriteError), - #[fail(display = "mpeg avc error\n")] + #[fail(display = "mpeg avc error")] MpegAvcError(#[cause] Mpeg4AvcHevcError), - #[fail(display = "flv muxer error\n")] + #[fail(display = "flv muxer error")] FlvMuxerError(#[cause] FlvMuxerError), - #[fail(display = "stream hub event send error\n")] + #[fail(display = "stream hub event send error")] StreamHubEventSendErr, } impl From for RtmpRemuxerError { diff --git a/protocol/rtmp/src/remuxer/mod.rs b/protocol/rtmp/src/remuxer/mod.rs index 7facf884..94c2cc85 100644 --- a/protocol/rtmp/src/remuxer/mod.rs +++ b/protocol/rtmp/src/remuxer/mod.rs @@ -35,7 +35,7 @@ impl RtmpRemuxer { Rtsp2RtmpRemuxerSession::new(stream_path, self.event_producer.clone()); tokio::spawn(async move { if let Err(err) = session.run().await { - log::error!("rtsp2rtmp session error: {}\n", err); + log::error!("rtsp2rtmp session error: {}", err); } }); } diff --git a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs index 160936d6..35457e73 100644 --- a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs +++ b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs @@ -161,7 +161,7 @@ impl Rtsp2RtmpRemuxerSession { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}\n", err); + log::error!("unsubscribe_from_channels err {}", err); } Ok(()) diff --git a/protocol/rtmp/src/session/common.rs b/protocol/rtmp/src/session/common.rs index 035b8f3c..0b20bd4c 100644 --- a/protocol/rtmp/src/session/common.rs +++ b/protocol/rtmp/src/session/common.rs @@ -204,7 +204,7 @@ impl Common { match self.data_sender.send(channel_data) { Ok(_) => {} Err(err) => { - log::error!("receive audio err {}\n", err); + log::error!("receive audio err {}", err); return Err(SessionError { value: SessionErrorValue::SendFrameDataErr, }); @@ -343,7 +343,7 @@ impl Common { info: self.get_subscriber_info(sub_id), }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}\n", err); + log::error!("unsubscribe_from_channels err {}", err); } Ok(()) diff --git a/protocol/rtmp/src/session/errors.rs b/protocol/rtmp/src/session/errors.rs index 54d54c3c..6ebd2fd0 100644 --- a/protocol/rtmp/src/session/errors.rs +++ b/protocol/rtmp/src/session/errors.rs @@ -22,53 +22,53 @@ pub struct SessionError { #[derive(Debug, Fail)] pub enum SessionErrorValue { - #[fail(display = "amf0 write error: {}\n", _0)] + #[fail(display = "amf0 write error: {}", _0)] Amf0WriteError(#[cause] Amf0WriteError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(#[cause] BytesWriteError), - // #[fail(display = "timeout error: {}\n", _0)] + // #[fail(display = "timeout error: {}", _0)] // TimeoutError(#[cause] Elapsed), - #[fail(display = "unpack error: {}\n", _0)] + #[fail(display = "unpack error: {}", _0)] UnPackError(#[cause] UnpackError), - #[fail(display = "message error: {}\n", _0)] + #[fail(display = "message error: {}", _0)] MessageError(#[cause] MessageError), - #[fail(display = "control message error: {}\n", _0)] + #[fail(display = "control message error: {}", _0)] ControlMessagesError(#[cause] ControlMessagesError), - #[fail(display = "net connection error: {}\n", _0)] + #[fail(display = "net connection error: {}", _0)] NetConnectionError(#[cause] NetConnectionError), - #[fail(display = "net stream error: {}\n", _0)] + #[fail(display = "net stream error: {}", _0)] NetStreamError(#[cause] NetStreamError), - #[fail(display = "event messages error: {}\n", _0)] + #[fail(display = "event messages error: {}", _0)] EventMessagesError(#[cause] EventMessagesError), - #[fail(display = "net io error: {}\n", _0)] + #[fail(display = "net io error: {}", _0)] BytesIOError(#[cause] BytesIOError), - #[fail(display = "pack error: {}\n", _0)] + #[fail(display = "pack error: {}", _0)] PackError(#[cause] PackError), - #[fail(display = "handshake error: {}\n", _0)] + #[fail(display = "handshake error: {}", _0)] HandshakeError(#[cause] HandshakeError), - #[fail(display = "cache error name: {}\n", _0)] + #[fail(display = "cache error name: {}", _0)] CacheError(#[cause] CacheError), - #[fail(display = "amf0 count not correct error\n")] + #[fail(display = "amf0 count not correct error")] Amf0ValueCountNotCorrect, - #[fail(display = "amf0 value type not correct error\n")] + #[fail(display = "amf0 value type not correct error")] Amf0ValueTypeNotCorrect, - #[fail(display = "stream hub event send error\n")] + #[fail(display = "stream hub event send error")] StreamHubEventSendErr, - #[fail(display = "none frame data sender error\n")] + #[fail(display = "none frame data sender error")] NoneFrameDataSender, - #[fail(display = "none frame data receiver error\n")] + #[fail(display = "none frame data receiver error")] NoneFrameDataReceiver, - #[fail(display = "send frame data error\n")] + #[fail(display = "send frame data error")] SendFrameDataErr, - #[fail(display = "subscribe count limit is reached.\n")] + #[fail(display = "subscribe count limit is reached.")] SubscribeCountLimitReach, - #[fail(display = "no app name error\n")] + #[fail(display = "no app name error")] NoAppName, - #[fail(display = "no media data can be received now.\n")] + #[fail(display = "no media data can be received now.")] NoMediaDataReceived, #[fail(display = "session is finished.")] diff --git a/protocol/rtmp/src/user_control_messages/errors.rs b/protocol/rtmp/src/user_control_messages/errors.rs index fe3d671b..4cb88c00 100644 --- a/protocol/rtmp/src/user_control_messages/errors.rs +++ b/protocol/rtmp/src/user_control_messages/errors.rs @@ -12,11 +12,11 @@ pub struct EventMessagesError { #[derive(Debug, Fail)] pub enum EventMessagesErrorValue { - #[fail(display = "amf0 write error: {}\n", _0)] + #[fail(display = "amf0 write error: {}", _0)] Amf0WriteError(Amf0WriteError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(BytesWriteError), - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), #[fail(display = "unknow event message type")] UnknowEventMessageType, diff --git a/protocol/rtmp/src/utils/errors.rs b/protocol/rtmp/src/utils/errors.rs index ecccde5a..766c0958 100644 --- a/protocol/rtmp/src/utils/errors.rs +++ b/protocol/rtmp/src/utils/errors.rs @@ -9,7 +9,7 @@ pub struct RtmpUrlParseError { } #[derive(Debug, Fail)] pub enum RtmpUrlParseErrorValue { - #[fail(display = "The url is not valid\n")] + #[fail(display = "The url is not valid")] Notvalid, } diff --git a/protocol/rtsp/src/rtp/errors.rs b/protocol/rtsp/src/rtp/errors.rs index 89b0d294..ed42b83c 100644 --- a/protocol/rtsp/src/rtp/errors.rs +++ b/protocol/rtsp/src/rtp/errors.rs @@ -29,9 +29,9 @@ impl fmt::Display for PackerError { #[derive(Debug, Fail)] pub enum PackerErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(#[cause] BytesWriteError), } @@ -58,9 +58,9 @@ pub struct UnPackerError { #[derive(Debug, Fail)] pub enum UnPackerErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(#[cause] BytesWriteError), } diff --git a/protocol/rtsp/src/rtp/rtcp/errors.rs b/protocol/rtsp/src/rtp/rtcp/errors.rs index 059b1c34..9827253b 100644 --- a/protocol/rtsp/src/rtp/rtcp/errors.rs +++ b/protocol/rtsp/src/rtp/rtcp/errors.rs @@ -9,9 +9,9 @@ pub struct RtcpError { #[derive(Debug, Fail)] pub enum RtcpErrorValue { - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(BytesReadError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(BytesWriteError), } diff --git a/protocol/rtsp/src/session/errors.rs b/protocol/rtsp/src/session/errors.rs index b813e8e3..eb428e2a 100644 --- a/protocol/rtsp/src/session/errors.rs +++ b/protocol/rtsp/src/session/errors.rs @@ -14,21 +14,21 @@ pub struct SessionError { #[derive(Debug, Fail)] pub enum SessionErrorValue { - #[fail(display = "net io error: {}\n", _0)] + #[fail(display = "net io error: {}", _0)] BytesIOError(#[cause] BytesIOError), - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(#[cause] BytesReadError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(#[cause] BytesWriteError), - #[fail(display = "Utf8Error: {}\n", _0)] + #[fail(display = "Utf8Error: {}", _0)] Utf8Error(#[cause] Utf8Error), - #[fail(display = "UnPackerError: {}\n", _0)] + #[fail(display = "UnPackerError: {}", _0)] UnPackerError(#[cause] UnPackerError), - #[fail(display = "stream hub event send error\n")] + #[fail(display = "stream hub event send error")] StreamHubEventSendErr, - #[fail(display = "cannot receive frame data from stream hub\n")] + #[fail(display = "cannot receive frame data from stream hub")] CannotReceiveFrameData, - #[fail(display = "pack error: {}\n", _0)] + #[fail(display = "pack error: {}", _0)] PackerError(#[cause] PackerError), } diff --git a/protocol/rtsp/src/session/mod.rs b/protocol/rtsp/src/session/mod.rs index 177648a3..441b5107 100644 --- a/protocol/rtsp/src/session/mod.rs +++ b/protocol/rtsp/src/session/mod.rs @@ -536,7 +536,7 @@ impl RtspServerSession { info: self.get_subscriber_info(), }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_stream_hub err {}\n", err); + log::error!("unsubscribe_from_stream_hub err {}", err); } Ok(()) diff --git a/protocol/webrtc/src/errors.rs b/protocol/webrtc/src/errors.rs index 2ce93a95..ccb5fae8 100644 --- a/protocol/webrtc/src/errors.rs +++ b/protocol/webrtc/src/errors.rs @@ -12,11 +12,11 @@ pub struct WebRTCError { #[derive(Debug, Fail)] pub enum WebRTCErrorValue { - #[fail(display = "webrtc error: {}\n", _0)] + #[fail(display = "webrtc error: {}", _0)] RTCError(#[cause] RTCError), - #[fail(display = "webrtc util error: {}\n", _0)] + #[fail(display = "webrtc util error: {}", _0)] RTCUtilError(#[cause] RTCUtilError), - #[fail(display = "cannot get local description\n")] + #[fail(display = "cannot get local description")] CanNotGetLocalDescription, } diff --git a/protocol/webrtc/src/session/errors.rs b/protocol/webrtc/src/session/errors.rs index c367d64a..41fb3c84 100644 --- a/protocol/webrtc/src/session/errors.rs +++ b/protocol/webrtc/src/session/errors.rs @@ -14,27 +14,27 @@ pub struct SessionError { #[derive(Debug, Fail)] pub enum SessionErrorValue { - #[fail(display = "net io error: {}\n", _0)] + #[fail(display = "net io error: {}", _0)] BytesIOError(#[cause] BytesIOError), - #[fail(display = "bytes read error: {}\n", _0)] + #[fail(display = "bytes read error: {}", _0)] BytesReadError(#[cause] BytesReadError), - #[fail(display = "bytes write error: {}\n", _0)] + #[fail(display = "bytes write error: {}", _0)] BytesWriteError(#[cause] BytesWriteError), - #[fail(display = "Utf8Error: {}\n", _0)] + #[fail(display = "Utf8Error: {}", _0)] Utf8Error(#[cause] Utf8Error), - #[fail(display = "webrtc error: {}\n", _0)] + #[fail(display = "webrtc error: {}", _0)] RTCError(#[cause] RTCError), - #[fail(display = "stream hub event send error\n")] + #[fail(display = "stream hub event send error")] StreamHubEventSendErr, - #[fail(display = "cannot receive frame data from stream hub\n")] + #[fail(display = "cannot receive frame data from stream hub")] CannotReceiveFrameData, - #[fail(display = "Http Request path error\n")] + #[fail(display = "Http Request path error")] HttpRequestPathError, - #[fail(display = "Not supported\n")] + #[fail(display = "Not supported")] HttpRequestNotSupported, - #[fail(display = "Empty sdp data\n")] + #[fail(display = "Empty sdp data")] HttpRequestEmptySdp, - #[fail(display = "Cannot find Content-Length\n")] + #[fail(display = "Cannot find Content-Length")] HttpRequestNoContentLength, } From 9fce66dcb6664dd096d8e88fa70ecd1f1c4e19cf Mon Sep 17 00:00:00 2001 From: Harlan Date: Sat, 11 Nov 2023 16:29:47 +0800 Subject: [PATCH 3/8] refactor packerizer unpacketizer mod --- protocol/rtmp/src/chunk/mod.rs | 10 ++ protocol/rtmp/src/chunk/packetizer.rs | 51 ++++++--- protocol/rtmp/src/chunk/unpacketizer.rs | 138 ++++++++++++++++-------- 3 files changed, 140 insertions(+), 59 deletions(-) diff --git a/protocol/rtmp/src/chunk/mod.rs b/protocol/rtmp/src/chunk/mod.rs index 90768e3f..81b828ca 100644 --- a/protocol/rtmp/src/chunk/mod.rs +++ b/protocol/rtmp/src/chunk/mod.rs @@ -26,11 +26,21 @@ impl ChunkBasicHeader { //5.3.1.2 #[derive(Eq, PartialEq, Debug, Clone)] pub struct ChunkMessageHeader { + //save the absolute timestamp of chunk type 0. + //or save the computed absolute timestamp of chunk type 1,2,3. pub timestamp: u32, pub msg_length: u32, pub msg_type_id: u8, pub msg_streamd_id: u32, + // Save the timestamp delta of chunk type 1,2. + // For chunk type 3, this field saves the timestamp + // delta inherited from the previous chunk type 1 or 2. + // NOTE: this value should be reset to 0 when the current chunk type is 0. pub timestamp_delta: u32, + // This field will be set for type 0,1,2 .If the timestamp/timestamp delta >= 0xFFFFFF + // then set this value to true else set it to false. + // Note that when the chunk format is 3, this value will be inherited from + // the most recent chunk 0, 1, or 2 chunk.(5.3.1.3 Extended Timestamp). pub is_extended_timestamp: bool, } diff --git a/protocol/rtmp/src/chunk/packetizer.rs b/protocol/rtmp/src/chunk/packetizer.rs index afeb8b3a..a4f9be0c 100644 --- a/protocol/rtmp/src/chunk/packetizer.rs +++ b/protocol/rtmp/src/chunk/packetizer.rs @@ -23,6 +23,8 @@ pub struct ChunkPacketizer { max_chunk_size: usize, //bytes: Cursor>, writer: AsyncBytesWriter, + //save timestamp need to be write for chunk + timestamp: u32, } impl ChunkPacketizer { @@ -32,33 +34,46 @@ impl ChunkPacketizer { //chunk_info: ChunkInfo::new(), writer: AsyncBytesWriter::new(io), max_chunk_size: CHUNK_SIZE as usize, + timestamp: 0, } } fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result { chunk_info.basic_header.format = 0; - let pre_header = self + if let Some(pre_header) = self .csid_2_chunk_header - .get_mut(&chunk_info.basic_header.chunk_stream_id); - - if let Some(val) = pre_header { + .get_mut(&chunk_info.basic_header.chunk_stream_id) + { let cur_msg_header = &mut chunk_info.message_header; - let pre_msg_header = &val.message_header; + let pre_msg_header = &mut pre_header.message_header; if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { chunk_info.basic_header.format = 1; - cur_msg_header.timestamp -= pre_msg_header.timestamp; + cur_msg_header.timestamp_delta = + cur_msg_header.timestamp - pre_msg_header.timestamp; if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id && cur_msg_header.msg_length == pre_msg_header.msg_length { chunk_info.basic_header.format = 2; - if chunk_info.message_header.timestamp == pre_msg_header.timestamp { + if cur_msg_header.timestamp_delta == pre_msg_header.timestamp_delta { chunk_info.basic_header.format = 3; } } } + } else { + assert_eq!(chunk_info.message_header.timestamp_delta, 0); } + + //update pre header + self.csid_2_chunk_header.insert( + chunk_info.basic_header.chunk_stream_id, + ChunkHeader { + basic_header: chunk_info.basic_header.clone(), + message_header: chunk_info.message_header.clone(), + }, + ); + Ok(PackResult::Success) } @@ -81,11 +96,18 @@ impl ChunkPacketizer { basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError> { - let timestamp = if message_header.timestamp >= 0xFFFFFF { + self.timestamp = if basic_header.format == 0 { + message_header.timestamp + } else { + message_header.timestamp_delta + }; + + let timestamp = if self.timestamp >= 0xFFFFFF { message_header.is_extended_timestamp = true; 0xFFFFFF } else { - message_header.timestamp + message_header.is_extended_timestamp = false; + self.timestamp }; match basic_header.format { @@ -101,6 +123,7 @@ impl ChunkPacketizer { self.writer.write_u24::(timestamp)?; self.writer .write_u24::(message_header.msg_length)?; + self.writer.write_u8(message_header.msg_type_id)?; } 2 => { self.writer.write_u24::(timestamp)?; @@ -120,6 +143,11 @@ impl ChunkPacketizer { pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> { self.zip_chunk_header(chunk_info)?; + log::trace!( + "write_chunk current timestamp: {}", + chunk_info.message_header.timestamp, + ); + let mut whole_payload_size = chunk_info.payload.len(); self.write_basic_header( @@ -129,11 +157,10 @@ impl ChunkPacketizer { self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(chunk_info.message_header.timestamp)?; + self.write_extened_timestamp(self.timestamp)?; } let mut cur_payload_size: usize; - while whole_payload_size > 0 { cur_payload_size = if whole_payload_size > self.max_chunk_size { self.max_chunk_size @@ -149,7 +176,7 @@ impl ChunkPacketizer { if whole_payload_size > 0 { self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(chunk_info.message_header.timestamp)?; + self.write_extened_timestamp(self.timestamp)?; } } } diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index 1dfa4789..c0403a4f 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -2,7 +2,7 @@ use { super::{ define, errors::{UnpackError, UnpackErrorValue}, - ChunkBasicHeader, ChunkHeader, ChunkInfo, ChunkMessageHeader, + ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, }, crate::messages::define::msg_type_id, byteorder::{BigEndian, LittleEndian}, @@ -67,8 +67,16 @@ pub struct ChunkUnpacketizer { //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html //https://zhuanlan.zhihu.com/p/165976086 + //We use this member to generate a complete message: + // - basic_header: the 2 fields will be updated from each chunk. + // - message_header: whose fields need to be updated for current chunk + // depends on the format id from basic header. + // Each field can inherit the value from the previous chunk. + // - payload: If the message's payload size is longger than the max chunk size, + // the whole payload will be splitted into several chunks. + // pub current_chunk_info: ChunkInfo, - chunk_headers: HashMap, + chunk_message_headers: HashMap, chunk_read_state: ChunkReadState, msg_header_read_state: MessageHeaderReadState, max_chunk_size: usize, @@ -87,7 +95,7 @@ impl ChunkUnpacketizer { Self { reader: BytesReader::new(BytesMut::new()), current_chunk_info: ChunkInfo::default(), - chunk_headers: HashMap::new(), + chunk_message_headers: HashMap::new(), chunk_read_state: ChunkReadState::ReadBasicHeader, msg_header_read_state: MessageHeaderReadState::ReadTimeStamp, max_chunk_size: define::INIT_CHUNK_SIZE as usize, @@ -280,6 +288,10 @@ impl ChunkUnpacketizer { } //todo + //Only when the csid is changed, we restore the chunk message header + //One AV message may be splitted into serval chunks, the csid + //will be updated when one av message's chunks are completely + //sent/received?? if csid != self.current_chunk_info.basic_header.chunk_stream_id { log::trace!( "read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}", @@ -287,13 +299,24 @@ impl ChunkUnpacketizer { self.current_chunk_info.basic_header.chunk_stream_id, byte ); - if let Some(header) = self.chunk_headers.get_mut(&csid) { - self.current_chunk_info.basic_header = header.basic_header.clone(); - self.current_chunk_info.message_header = header.message_header.clone(); - self.print_current_basic_header(); + //If the chunk stream id is changed, then we should + //restore the cached chunk message header used for + //getting the correct message header fields. + match self.chunk_message_headers.get_mut(&csid) { + Some(header) => { + self.current_chunk_info.message_header = header.clone(); + self.print_current_basic_header(); + } + None => { + //The format id of the first chunk of a new chunk stream id must be zero. + assert_eq!(format_id, 0); + } } } - + if format_id == 0 { + self.current_message_header().timestamp_delta = 0; + } + // each chunk will read and update the csid and format id self.current_chunk_info.basic_header.chunk_stream_id = csid; self.current_chunk_info.basic_header.format = format_id; self.print_current_basic_header(); @@ -327,10 +350,14 @@ impl ChunkUnpacketizer { self.reader.len(), ); - //fix bug: the is_extended_timestamp flag should be set in the read_message_header process - //each time and should not be saved which will lead to incorrectly reading an extra 4 bytes, - //so here at the start of the read_message_header process, reset this flag. - self.current_message_header().is_extended_timestamp = false; + //Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will + //inherited from the most recent chunk 0, 1, or 2. + //(This field is present in Type 3 chunks when the most recent Type 0, + //1, or 2 chunk for the same chunk stream ID indicated the presence of + //an extended timestamp field. 5.3.1.3) + if self.current_chunk_info.basic_header.format != 3 { + self.current_message_header().is_extended_timestamp = false; + } match self.current_chunk_info.basic_header.format { /*****************************************************************/ @@ -474,36 +501,54 @@ impl ChunkUnpacketizer { extended_timestamp = self.reader.read_u32::()?; } - match self.current_chunk_info.basic_header.format { + let cur_format_id = self.current_chunk_info.basic_header.format; + + match cur_format_id { 0 => { if self.current_message_header().is_extended_timestamp { self.current_message_header().timestamp = extended_timestamp; } } - 1 => { - if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp += extended_timestamp - 0xFFFFFF; - } else { - self.current_message_header().timestamp += - self.current_message_header().timestamp_delta; - } - } - 2 => { + 1 | 2 | 3 => { + //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, + //1, or 2 chunk for the same chunk stream ID indicated the presence of + //an extended timestamp field. if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp = - self.current_message_header().timestamp - 0xFFFFFF + extended_timestamp; - } else { - self.current_message_header().timestamp += - self.current_message_header().timestamp_delta; + self.current_message_header().timestamp_delta = extended_timestamp; } } - 3 => { - //log::info!("format 3=============="); - } - //todo: 3 should also be processed + _ => {} } + if cur_format_id == 1 + || cur_format_id == 2 + || (cur_format_id == 3 && self.current_chunk_info.payload.len() == 0) + { + let timestamp = self.current_message_header().timestamp; + let timestamp_delta = self.current_message_header().timestamp_delta; + + let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta); + if is_overflow { + log::warn!( + "the current timestamp is overflow, current timestamp: {}, timestamp delta: {}", + timestamp, + timestamp_delta + ); + } + self.current_message_header().timestamp = cur_abs_timestamp; + } + + let timestamp = self.current_message_header().timestamp; + let timestamp_delta = self.current_message_header().timestamp_delta; + + log::trace!( + "the current timestamp is overflow,format: {}, current timestamp: {}. timestamp delta: {}", + self.current_chunk_info.basic_header.format, + timestamp, + timestamp_delta + ); + self.chunk_read_state = ChunkReadState::ReadMessagePayload; self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp); @@ -549,26 +594,13 @@ impl ChunkUnpacketizer { if self.current_chunk_info.payload.len() == whole_msg_length { self.chunk_read_state = ChunkReadState::Finish; + //get the complete chunk and clear the current chunk payload let chunk_info = self.current_chunk_info.clone(); self.current_chunk_info.payload.clear(); let csid = self.current_chunk_info.basic_header.chunk_stream_id; - - //todo - if let Some(header) = self.chunk_headers.get_mut(&csid) { - header.basic_header = self.current_chunk_info.basic_header.clone(); - header.message_header = self.current_chunk_info.message_header.clone(); - } else { - let chunk_header = ChunkHeader { - basic_header: self.current_chunk_info.basic_header.clone(), - message_header: self.current_chunk_info.message_header.clone(), - }; - self.chunk_headers.insert(csid, chunk_header); - } - - // self.chunk_headers - // .entry(self.current_chunk_info.basic_header.chunk_stream_id) - // .or_insert(chunk_header); + self.chunk_message_headers + .insert(csid, self.current_chunk_info.message_header.clone()); return Ok(UnpackResult::ChunkInfo(chunk_info)); } @@ -617,6 +649,18 @@ mod tests { ) } + #[test] + fn test_overflow_add() { + let aa: u32 = u32::MAX; + println!("{}", aa); + + let (a, b) = aa.overflowing_add(5); + + let b = aa.wrapping_add(5); + + println!("{}", b); + } + // #[test] // fn test_window_acknowlage_size_set_peer_bandwidth() { // let mut unpacker = ChunkUnpacketizer::new(); From 89ab0f916be8d14f259d8ce77d4cfbd513533049 Mon Sep 17 00:00:00 2001 From: Harlan Date: Sun, 12 Nov 2023 07:20:39 +0800 Subject: [PATCH 4/8] refactor codes --- protocol/rtmp/src/chunk/mod.rs | 16 ++++-- protocol/rtmp/src/chunk/packetizer.rs | 69 ++++++++++++++++--------- protocol/rtmp/src/chunk/unpacketizer.rs | 49 ++++++++---------- 3 files changed, 80 insertions(+), 54 deletions(-) diff --git a/protocol/rtmp/src/chunk/mod.rs b/protocol/rtmp/src/chunk/mod.rs index 81b828ca..759b9609 100644 --- a/protocol/rtmp/src/chunk/mod.rs +++ b/protocol/rtmp/src/chunk/mod.rs @@ -23,6 +23,16 @@ impl ChunkBasicHeader { } } +#[derive(Eq, PartialEq, Debug, Clone)] +pub enum ExtendTimestampType { + //There is no extended timestamp + NONE, + //The extended timestamp field is read in format 0 chunk. + FORMAT0, + //The extended timestamp field is read in format 1 or 2 chunk. + FORMAT12, +} + //5.3.1.2 #[derive(Eq, PartialEq, Debug, Clone)] pub struct ChunkMessageHeader { @@ -38,10 +48,10 @@ pub struct ChunkMessageHeader { // NOTE: this value should be reset to 0 when the current chunk type is 0. pub timestamp_delta: u32, // This field will be set for type 0,1,2 .If the timestamp/timestamp delta >= 0xFFFFFF - // then set this value to true else set it to false. + // then set this value to FORMAT0/FORMAT12 else set it to NONE. // Note that when the chunk format is 3, this value will be inherited from // the most recent chunk 0, 1, or 2 chunk.(5.3.1.3 Extended Timestamp). - pub is_extended_timestamp: bool, + pub extended_timestamp_type: ExtendTimestampType, } impl ChunkMessageHeader { @@ -52,7 +62,7 @@ impl ChunkMessageHeader { msg_type_id, msg_streamd_id: msg_stream_id, timestamp_delta: 0, - is_extended_timestamp: false, + extended_timestamp_type: ExtendTimestampType::NONE, } } } diff --git a/protocol/rtmp/src/chunk/packetizer.rs b/protocol/rtmp/src/chunk/packetizer.rs index a4f9be0c..e3e79812 100644 --- a/protocol/rtmp/src/chunk/packetizer.rs +++ b/protocol/rtmp/src/chunk/packetizer.rs @@ -1,7 +1,7 @@ use { super::{ define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo, - ChunkMessageHeader, + ChunkMessageHeader, ExtendTimestampType, }, byteorder::{BigEndian, LittleEndian}, bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO}, @@ -23,18 +23,17 @@ pub struct ChunkPacketizer { max_chunk_size: usize, //bytes: Cursor>, writer: AsyncBytesWriter, - //save timestamp need to be write for chunk - timestamp: u32, + //save extended timestamp need to be write for chunk + extended_timestamp: Option, } impl ChunkPacketizer { pub fn new(io: Arc>>) -> Self { Self { csid_2_chunk_header: HashMap::new(), - //chunk_info: ChunkInfo::new(), writer: AsyncBytesWriter::new(io), max_chunk_size: CHUNK_SIZE as usize, - timestamp: 0, + extended_timestamp: None, } } fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result { @@ -96,23 +95,41 @@ impl ChunkPacketizer { basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError> { - self.timestamp = if basic_header.format == 0 { - message_header.timestamp - } else { - message_header.timestamp_delta - }; - - let timestamp = if self.timestamp >= 0xFFFFFF { - message_header.is_extended_timestamp = true; - 0xFFFFFF - } else { - message_header.is_extended_timestamp = false; - self.timestamp + let message_header_timestamp: u32; + (self.extended_timestamp, message_header_timestamp) = match basic_header.format { + 0 => { + if message_header.timestamp >= 0xFFFFFF { + message_header.extended_timestamp_type = ExtendTimestampType::FORMAT0; + (Some(message_header.timestamp), 0xFFFFFF) + } else { + (None, message_header.timestamp) + } + } + 1 | 2 => { + if message_header.timestamp_delta >= 0xFFFFFF { + //if use the format1,2's extended timestamp, there may be a problem for + //av timestamp. + log::warn!( + "Now use extended timestamp for format {}, the value is: {}", + basic_header.format, + message_header.timestamp_delta + ); + message_header.extended_timestamp_type = ExtendTimestampType::FORMAT12; + (Some(message_header.timestamp_delta), 0xFFFFFF) + } else { + (None, message_header.timestamp_delta) + } + } + _ => { + //should not be here + (None, 0) + } }; match basic_header.format { 0 => { - self.writer.write_u24::(timestamp)?; + self.writer + .write_u24::(message_header_timestamp)?; self.writer .write_u24::(message_header.msg_length)?; self.writer.write_u8(message_header.msg_type_id)?; @@ -120,13 +137,15 @@ impl ChunkPacketizer { .write_u32::(message_header.msg_streamd_id)?; } 1 => { - self.writer.write_u24::(timestamp)?; + self.writer + .write_u24::(message_header_timestamp)?; self.writer .write_u24::(message_header.msg_length)?; self.writer.write_u8(message_header.msg_type_id)?; } 2 => { - self.writer.write_u24::(timestamp)?; + self.writer + .write_u24::(message_header_timestamp)?; } _ => {} } @@ -154,10 +173,11 @@ impl ChunkPacketizer { chunk_info.basic_header.format, chunk_info.basic_header.chunk_stream_id, )?; + self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; - if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(self.timestamp)?; + if let Some(extended_timestamp) = self.extended_timestamp { + self.write_extened_timestamp(extended_timestamp)?; } let mut cur_payload_size: usize; @@ -175,8 +195,9 @@ impl ChunkPacketizer { if whole_payload_size > 0 { self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; - if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(self.timestamp)?; + + if let Some(extended_timestamp) = self.extended_timestamp { + self.write_extened_timestamp(extended_timestamp)?; } } } diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index c0403a4f..2618f47c 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -2,7 +2,7 @@ use { super::{ define, errors::{UnpackError, UnpackErrorValue}, - ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, + ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType, }, crate::messages::define::msg_type_id, byteorder::{BigEndian, LittleEndian}, @@ -356,7 +356,7 @@ impl ChunkUnpacketizer { //1, or 2 chunk for the same chunk stream ID indicated the presence of //an extended timestamp field. 5.3.1.3) if self.current_chunk_info.basic_header.format != 3 { - self.current_message_header().is_extended_timestamp = false; + self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE; } match self.current_chunk_info.basic_header.format { @@ -410,7 +410,8 @@ impl ChunkUnpacketizer { } if self.current_message_header().timestamp >= 0xFFFFFF { - self.current_message_header().is_extended_timestamp = true; + self.current_message_header().extended_timestamp_type = + ExtendTimestampType::FORMAT0; } } /*****************************************************************/ @@ -460,7 +461,8 @@ impl ChunkUnpacketizer { } if self.current_message_header().timestamp_delta >= 0xFFFFFF { - self.current_message_header().is_extended_timestamp = true; + self.current_message_header().extended_timestamp_type = + ExtendTimestampType::FORMAT12; } } /************************************************/ @@ -481,7 +483,8 @@ impl ChunkUnpacketizer { self.reader.read_u24::()?; if self.current_message_header().timestamp_delta >= 0xFFFFFF { - self.current_message_header().is_extended_timestamp = true; + self.current_message_header().extended_timestamp_type = + ExtendTimestampType::FORMAT12; } } @@ -495,32 +498,24 @@ impl ChunkUnpacketizer { } pub fn read_extended_timestamp(&mut self) -> Result { - let mut extended_timestamp: u32 = 0; - - if self.current_message_header().is_extended_timestamp { - extended_timestamp = self.reader.read_u32::()?; - } - - let cur_format_id = self.current_chunk_info.basic_header.format; - - match cur_format_id { - 0 => { - if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp = extended_timestamp; - } + //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, + //1, or 2 chunk for the same chunk stream ID indicated the presence of + //an extended timestamp field. + match self.current_message_header().extended_timestamp_type { + //the current fortmat type can be 0 or 3 + ExtendTimestampType::FORMAT0 => { + self.current_message_header().timestamp = self.reader.read_u32::()?; } - 1 | 2 | 3 => { - //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, - //1, or 2 chunk for the same chunk stream ID indicated the presence of - //an extended timestamp field. - if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp_delta = extended_timestamp; - } + //the current fortmat type can be 1,2 or 3 + ExtendTimestampType::FORMAT12 => { + self.current_message_header().timestamp_delta = + self.reader.read_u32::()?; } - - _ => {} + ExtendTimestampType::NONE => {} } + //compute the abs timestamp + let cur_format_id = self.current_chunk_info.basic_header.format; if cur_format_id == 1 || cur_format_id == 2 || (cur_format_id == 3 && self.current_chunk_info.payload.len() == 0) From 03737a0d6725141ce457c8592b47ee20ba4d4ef1 Mon Sep 17 00:00:00 2001 From: Harlan Date: Sun, 12 Nov 2023 07:38:31 +0800 Subject: [PATCH 5/8] cargo clippy --fix --allow-dirty --allow-no-vcs --- application/xiu/src/config/mod.rs | 2 +- library/container/mpegts/src/ts.rs | 6 +----- protocol/rtmp/src/chunk/unpacketizer.rs | 4 ++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/application/xiu/src/config/mod.rs b/application/xiu/src/config/mod.rs index 3dd8304c..a2757f54 100644 --- a/application/xiu/src/config/mod.rs +++ b/application/xiu/src/config/mod.rs @@ -181,7 +181,7 @@ fn test_toml_parse() { let path = std::env::current_dir(); match path { Ok(val) => println!("The current directory is {}\n", val.display()), - Err(err) => print!("{}\n", err), + Err(err) => println!("{}", err), } let str = fs::read_to_string( diff --git a/library/container/mpegts/src/ts.rs b/library/container/mpegts/src/ts.rs index cc9534ac..af6507af 100644 --- a/library/container/mpegts/src/ts.rs +++ b/library/container/mpegts/src/ts.rs @@ -70,11 +70,7 @@ impl TsMuxer { flags: u16, payload: BytesMut, ) -> Result<(), MpegTsError> { - if (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0 { - self.h264_h265_with_aud = true; - } else { - self.h264_h265_with_aud = false; - } + self.h264_h265_with_aud = (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0; //print!("pes payload length {}\n", payload.len()); //self.packet_number += payload.len(); diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index 2618f47c..bb0f1362 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -518,7 +518,7 @@ impl ChunkUnpacketizer { let cur_format_id = self.current_chunk_info.basic_header.format; if cur_format_id == 1 || cur_format_id == 2 - || (cur_format_id == 3 && self.current_chunk_info.payload.len() == 0) + || (cur_format_id == 3 && self.current_chunk_info.payload.is_empty()) { let timestamp = self.current_message_header().timestamp; let timestamp_delta = self.current_message_header().timestamp_delta; @@ -649,7 +649,7 @@ mod tests { let aa: u32 = u32::MAX; println!("{}", aa); - let (a, b) = aa.overflowing_add(5); + let (_a, _b) = aa.overflowing_add(5); let b = aa.wrapping_add(5); From e8c126030931aa61b12e0dbfa46d629e6229604d Mon Sep 17 00:00:00 2001 From: HarlanC Date: Tue, 14 Nov 2023 13:35:23 +0800 Subject: [PATCH 6/8] do some fix for rtmp --- protocol/rtmp/src/chunk/packetizer.rs | 9 ++++++++- protocol/rtmp/src/chunk/unpacketizer.rs | 6 +++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/protocol/rtmp/src/chunk/packetizer.rs b/protocol/rtmp/src/chunk/packetizer.rs index e3e79812..4bf416b4 100644 --- a/protocol/rtmp/src/chunk/packetizer.rs +++ b/protocol/rtmp/src/chunk/packetizer.rs @@ -46,7 +46,14 @@ impl ChunkPacketizer { let cur_msg_header = &mut chunk_info.message_header; let pre_msg_header = &mut pre_header.message_header; - if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { + if cur_msg_header.timestamp < pre_msg_header.timestamp { + log::warn!( + "Chunk stream id: {}, the current timestamp:{} is smaller than pre chunk timestamp: {}", + chunk_info.basic_header.chunk_stream_id, + cur_msg_header.timestamp, + pre_msg_header.timestamp + ); + } else if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { chunk_info.basic_header.format = 1; cur_msg_header.timestamp_delta = cur_msg_header.timestamp - pre_msg_header.timestamp; diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index bb0f1362..06195896 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -309,7 +309,11 @@ impl ChunkUnpacketizer { } None => { //The format id of the first chunk of a new chunk stream id must be zero. - assert_eq!(format_id, 0); + //assert_eq!(format_id, 0); + log::warn!( + "The chunk stream id: {}'s first chunk format is not 0.", + csid + ); } } } From 625461d24c9566d60936289bdc937c0cbf1420f7 Mon Sep 17 00:00:00 2001 From: HarlanC Date: Tue, 14 Nov 2023 13:52:24 +0800 Subject: [PATCH 7/8] fix logs --- protocol/rtmp/src/chunk/unpacketizer.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index 06195896..eccb6e62 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -310,10 +310,13 @@ impl ChunkUnpacketizer { None => { //The format id of the first chunk of a new chunk stream id must be zero. //assert_eq!(format_id, 0); - log::warn!( - "The chunk stream id: {}'s first chunk format is not 0.", - csid - ); + if format_id != 0 { + log::warn!( + "The chunk stream id: {}'s first chunk format is {}.", + csid, + format_id + ); + } } } } From db9faa657eecc47e54b6391e146f4b8cf6c15e15 Mon Sep 17 00:00:00 2001 From: Harlan Date: Sat, 18 Nov 2023 18:10:07 +0800 Subject: [PATCH 8/8] try to fix parse rtmp chunk error --- protocol/rtmp/src/chunk/unpacketizer.rs | 26 ++++++++++--------------- protocol/rtmp/src/rtmp.rs | 2 +- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index eccb6e62..7771b6ad 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -362,9 +362,9 @@ impl ChunkUnpacketizer { //(This field is present in Type 3 chunks when the most recent Type 0, //1, or 2 chunk for the same chunk stream ID indicated the presence of //an extended timestamp field. 5.3.1.3) - if self.current_chunk_info.basic_header.format != 3 { - self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE; - } + //if self.current_chunk_info.basic_header.format != 3 { + self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE; + //} match self.current_chunk_info.basic_header.format { /*****************************************************************/ @@ -533,24 +533,16 @@ impl ChunkUnpacketizer { let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta); if is_overflow { log::warn!( - "the current timestamp is overflow, current timestamp: {}, timestamp delta: {}", - timestamp, - timestamp_delta + "The current timestamp is overflow, current basic header: {:?}, current message header: {:?}, payload len: {}, abs timestamp: {}", + self.current_chunk_info.basic_header, + self.current_chunk_info.message_header, + self.current_chunk_info.payload.len(), + cur_abs_timestamp ); } self.current_message_header().timestamp = cur_abs_timestamp; } - let timestamp = self.current_message_header().timestamp; - let timestamp_delta = self.current_message_header().timestamp_delta; - - log::trace!( - "the current timestamp is overflow,format: {}, current timestamp: {}. timestamp delta: {}", - self.current_chunk_info.basic_header.format, - timestamp, - timestamp_delta - ); - self.chunk_read_state = ChunkReadState::ReadMessagePayload; self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp); @@ -644,6 +636,8 @@ mod tests { let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body); + println!("{:?}, {:?}", expected.basic_header, expected.message_header); + assert_eq!( rv.unwrap(), UnpackResult::ChunkInfo(expected), diff --git a/protocol/rtmp/src/rtmp.rs b/protocol/rtmp/src/rtmp.rs index 3125f5c3..3246bb03 100644 --- a/protocol/rtmp/src/rtmp.rs +++ b/protocol/rtmp/src/rtmp.rs @@ -36,7 +36,7 @@ impl RtmpServer { ); tokio::spawn(async move { if let Err(err) = session.run().await { - log::error!( + log::info!( "session run error: session_type: {}, app_name: {}, stream_name: {}, err: {}", session.common.session_type, session.app_name,