From e80d5aa5382c5393db78429aed0a14a93f3a3627 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 29 Dec 2021 18:02:44 +0800 Subject: [PATCH] encoder: move encoder to a independent mod --- Cargo.lock | 66 ++++++++++++++-------------- components/br-stream/Cargo.toml | 7 ++- components/br-stream/src/codec.rs | 55 +++++++++++++++++++++++ components/br-stream/src/endpoint.rs | 11 ----- components/br-stream/src/lib.rs | 2 +- components/br-stream/src/router.rs | 4 +- 6 files changed, 96 insertions(+), 49 deletions(-) create mode 100644 components/br-stream/src/codec.rs diff --git a/Cargo.lock b/Cargo.lock index 7956cae99f6..1f4d9b148c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -# version = 3 [[package]] @@ -215,7 +214,7 @@ name = "aws" version = "0.0.1" dependencies = [ "async-trait", - "bytes 1.0.1", + "bytes 1.1.0", "cloud", "fail", "futures 0.3.15", @@ -427,6 +426,7 @@ dependencies = [ "protobuf", "raft", "raftstore", + "rand 0.8.3", "regex", "slog", "slog-global", @@ -477,9 +477,9 @@ dependencies = [ [[package]] name = "bytes" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" dependencies = [ "serde", ] @@ -691,7 +691,7 @@ name = "codec" version = "0.0.1" dependencies = [ "byteorder", - "bytes 1.0.1", + "bytes 1.1.0", "error_code", "libc 0.2.106", "panic_hook", @@ -1115,7 +1115,7 @@ version = "0.0.1" dependencies = [ "async-trait", "byteorder", - "bytes 1.0.1", + "bytes 1.1.0", "crc32fast", "crossbeam", "derive_more", @@ -1312,7 +1312,7 @@ name = "external_storage" version = "0.0.1" dependencies = [ "async-trait", - "bytes 1.0.1", + "bytes 1.1.0", "encryption", "engine_traits", "fail", @@ -1902,7 +1902,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "fnv", "futures-core", "futures-sink", @@ -1993,7 +1993,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "fnv", "itoa", ] @@ -2004,7 +2004,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "http", "pin-project-lite", ] @@ -2027,7 +2027,7 @@ version = "0.14.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b61cf2d1aebcf6e6352c97b81dc2244ca29194be1b276f5d8ad5c6330fffb11" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "futures-channel", "futures-core", "futures-util", @@ -2081,7 +2081,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "hyper", "native-tls", "tokio", @@ -3426,7 +3426,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de5e2533f59d08fcf364fd374ebda0692a70bd6d7e66ef97f306f45c6c5d8020" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "prost-derive 0.8.0", ] @@ -3436,7 +3436,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "prost-derive 0.9.0", ] @@ -3446,7 +3446,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "heck", "itertools 0.10.0", "log", @@ -3464,7 +3464,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "heck", "itertools 0.10.0", "lazy_static", @@ -3510,7 +3510,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "603bbd6394701d13f3f25aada59c7de9d35a6a5887cfc156181234a44002771b" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "prost 0.8.0", ] @@ -3520,7 +3520,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "prost 0.9.0", ] @@ -3529,7 +3529,7 @@ name = "protobuf" version = "2.8.0" source = "git+https://github.com/pingcap/rust-protobuf?branch=v2.8#6642ebaae4352ea01bf00e160480d8da966d3109" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "heck", "hex 0.3.2", ] @@ -3579,7 +3579,7 @@ name = "raft" version = "0.6.0" source = "git+https://github.com/tikv/raft-rs?branch=master#e6d28ef0f509c072e13288409835d5764a3b175e" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "fxhash", "getset", "protobuf", @@ -3622,7 +3622,7 @@ name = "raft-proto" version = "0.6.0" source = "git+https://github.com/tikv/raft-rs?branch=master#e6d28ef0f509c072e13288409835d5764a3b175e" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "protobuf", "protobuf-build", ] @@ -3656,7 +3656,7 @@ dependencies = [ "batch-system", "bitflags", "byteorder", - "bytes 1.0.1", + "bytes 1.1.0", "collections", "concurrency_manager", "crc32fast", @@ -3962,7 +3962,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0460542b551950620a3648c6aa23318ac6b3cd779114bd873209e6e8b5eb1c34" dependencies = [ "base64", - "bytes 1.0.1", + "bytes 1.1.0", "encoding_rs", "futures-core", "futures-util", @@ -4096,7 +4096,7 @@ source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#5fcf2d dependencies = [ "async-trait", "base64", - "bytes 1.0.1", + "bytes 1.1.0", "crc32fast", "futures 0.3.15", "http", @@ -4136,7 +4136,7 @@ version = "0.46.0" source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#5fcf2d1c36b93d0146cc49f257dd850e01b6e4db" dependencies = [ "async-trait", - "bytes 1.0.1", + "bytes 1.1.0", "futures 0.3.15", "rusoto_core", "serde", @@ -4163,7 +4163,7 @@ version = "0.46.0" source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#5fcf2d1c36b93d0146cc49f257dd850e01b6e4db" dependencies = [ "async-trait", - "bytes 1.0.1", + "bytes 1.1.0", "futures 0.3.15", "rusoto_core", "xml-rs", @@ -4175,7 +4175,7 @@ version = "0.46.0" source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#5fcf2d1c36b93d0146cc49f257dd850e01b6e4db" dependencies = [ "base64", - "bytes 1.0.1", + "bytes 1.1.0", "chrono", "digest", "futures 0.3.15", @@ -4200,7 +4200,7 @@ version = "0.46.0" source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#5fcf2d1c36b93d0146cc49f257dd850e01b6e4db" dependencies = [ "async-trait", - "bytes 1.0.1", + "bytes 1.1.0", "chrono", "futures 0.3.15", "rusoto_core", @@ -4881,7 +4881,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d20ec2d6525a66afebdff9e1d8ef143c9deae9a3b040c61d3cfa9ae6fda80060" dependencies = [ "base64", - "bytes 1.0.1", + "bytes 1.1.0", "chrono", "futures-util", "http", @@ -5633,7 +5633,7 @@ dependencies = [ "async-speed-limit", "backtrace", "byteorder", - "bytes 1.0.1", + "bytes 1.1.0", "chrono", "codec", "collections", @@ -5734,7 +5734,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" dependencies = [ "autocfg", - "bytes 1.0.1", + "bytes 1.1.0", "libc 0.2.106", "memchr", "mio 0.7.11", @@ -5829,7 +5829,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "futures-core", "futures-io", "futures-sink", @@ -5856,7 +5856,7 @@ dependencies = [ "async-stream 0.3.2", "async-trait", "base64", - "bytes 1.0.1", + "bytes 1.1.0", "futures-core", "futures-util", "h2", diff --git a/components/br-stream/Cargo.toml b/components/br-stream/Cargo.toml index df2465832a8..10032606417 100644 --- a/components/br-stream/Cargo.toml +++ b/components/br-stream/Cargo.toml @@ -18,7 +18,7 @@ tokio = { version = "1.5", features = ["rt-multi-thread", "macros", "time", "syn prometheus = { version = "0.13", default-features = false, features = ["nightly"] } async-trait = { version = "0.1" } thiserror = "1" -bytes = "0.4" +bytes = "0.4.12" etcd-client = { version = "0.7", features = ["pub-response-field"] } kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "br-stream"} tokio-stream = "0.1" @@ -43,4 +43,7 @@ engine_traits = { path = "../engine_traits", default-features = false } raft = { version = "0.6.0-alpha", default-features = false, features = ["protobuf-codec"] } raftstore = { path = "../raftstore", default-features = false } tikv = { path = "../../", default-features = false } -online_config = { path = "../online_config" } \ No newline at end of file +online_config = { path = "../online_config" } + +[dev-dependencies] +rand = "0.8.0" \ No newline at end of file diff --git a/components/br-stream/src/codec.rs b/components/br-stream/src/codec.rs new file mode 100644 index 00000000000..1ed9c4ca577 --- /dev/null +++ b/components/br-stream/src/codec.rs @@ -0,0 +1,55 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use bytes::{Buf, Bytes}; +use std::io::prelude::*; +use std::io::Cursor; + +pub struct Encoder; + +impl Encoder { + // TODO move this function to a indepentent module. + pub fn encode_event<'e>(key: &'e [u8], value: &'e [u8]) -> [impl AsRef<[u8]> + 'e; 4] { + let key_len = (key.len() as u32).to_le_bytes(); + let val_len = (value.len() as u32).to_le_bytes(); + [ + Either::Left(key_len), + Either::Right(key), + Either::Left(val_len), + Either::Right(value), + ] + } + + #[allow(dead_code)] + pub fn decode_event(e: &[u8]) -> (Vec, Vec) { + let mut buf = Cursor::new(Bytes::from(e)); + let len = buf.get_u32_le() as usize; + let mut key = vec![0; len]; + buf.read_exact(key.as_mut_slice()).unwrap(); + let len = buf.get_u32_le() as usize; + let mut val = vec![0; len]; + buf.read_exact(buffer.as_mut_slice()).unwrap(); + (key, val) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::Rng; + + #[test] + fn test_encode_decode() { + let mut rng = rand::thread_rng(); + for _i in 0..10 { + let key: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); + let val: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); + let e = Encoder::encode_event(&key, &val); + let mut event = vec![]; + for s in e { + event.push(s.into()); + } + let (decoded_key, decoded_val) = Encoder::decode_event(&event); + assert_eq!(key, decoded_key); + assert_eq!(val, decoded_val); + } + } +} diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index 3f7f7ab795a..969e356c962 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -138,17 +138,6 @@ where } } } - // TODO move this function to a indepentent module. - pub fn encode_event<'e>(key: &'e [u8], value: &'e [u8]) -> [impl AsRef<[u8]> + 'e; 4] { - let key_len = (key.len() as u32).to_le_bytes(); - let val_len = (value.len() as u32).to_le_bytes(); - [ - Either::Left(key_len), - Either::Right(key), - Either::Left(val_len), - Either::Right(value), - ] - } fn backup_batch(&self, batch: CmdBatch) { let mut sw = StopWatch::new(); diff --git a/components/br-stream/src/lib.rs b/components/br-stream/src/lib.rs index 19f4ca84524..2da58a05499 100644 --- a/components/br-stream/src/lib.rs +++ b/components/br-stream/src/lib.rs @@ -1,6 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. #![feature(inherent_ascii_escape)] - +mod codec; pub mod config; mod endpoint; pub mod errors; diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index 067ad6c268e..c80145f1e3d 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use crate::{endpoint::Task, errors::Error, metadata::store::EtcdStore, utils::SlotMap}; +use crate::{codec::Encoder, endpoint::Task, errors::Error}; use super::errors::Result; use engine_traits::{CF_DEFAULT, CF_WRITE}; @@ -538,7 +538,7 @@ impl DataFile { /// Add a new KV pair to the file, returning its size. async fn on_event(&mut self, mut kv: ApplyEvent) -> Result { - let encoded = super::endpoint::Endpoint::::encode_event(&kv.key, &kv.value); + let encoded = Encoder::encode_event(&kv.key, &kv.value); let mut size = 0; for slice in encoded { let slice = slice.as_ref();