Skip to content

Commit

Permalink
Implement gear CDC
Browse files Browse the repository at this point in the history
  • Loading branch information
dpc committed Jul 23, 2017
1 parent b1298e6 commit ef746ab
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 110 deletions.
154 changes: 75 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ name = "rdedup_lib"
path = "src/lib.rs"

[dependencies]
rollsum = "0.2.1"
rollsum = { git = "https://github.com/dpc/rsroll.git", rev="44b37b1" }
sha2 = "0.5"
argparse = "0.2.1"
sodiumoxide = { version = "0.0.14", features = ["serde"] }
Expand Down
4 changes: 3 additions & 1 deletion lib/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ impl AsyncIOThreadShared {
in_progress: Default::default(),
};

AsyncIOThreadShared { inner: Arc::new(Mutex::new(inner)) }
AsyncIOThreadShared {
inner: Arc::new(Mutex::new(inner)),
}
}

pub fn get_stats(&self) -> WriteStats {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/chunk_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl ChunkProcessor {
let ((i, sg), digests_tx, data_type) = input;

let digest = calculate_digest(&sg);
let chunk_path = self.repo
.chunk_path_by_digest(&digest, DataType::Data);
let chunk_path =
self.repo.chunk_path_by_digest(&digest, DataType::Data);
if !chunk_path.exists() {
let sg = if data_type.should_compress() {
timer.start("compress");
Expand Down
44 changes: 44 additions & 0 deletions lib/src/chunking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use rollsum;

/// Abstraction over the specific chunking algorithms being used
pub(crate) trait Chunking {
fn find_chunk_edge(&mut self, &[u8]) -> Option<usize>;
}

pub(crate) struct Bup {
engine: rollsum::Bup,
}

impl Bup {
pub fn new(bits: u32) -> Self {
Bup {
engine: rollsum::Bup::new_with_chunk_bits(bits),
}
}
}

impl Chunking for Bup {
fn find_chunk_edge(&mut self, data: &[u8]) -> Option<usize> {
self.engine.find_chunk_edge(data)
}
}

pub(crate) struct Gear {
engine: rollsum::Gear,
}


impl Gear {
pub fn new(bits: u32) -> Self {
Gear {
engine: rollsum::Gear::new_with_chunk_bits(bits),
}
}
}


impl Chunking for Gear {
fn find_chunk_edge(&mut self, data: &[u8]) -> Option<usize> {
self.engine.find_chunk_edge(data)
}
}
23 changes: 20 additions & 3 deletions lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use {serde_yaml, PassphraseFn, SGData};

use asyncio;
use chunking;
use compression;

use encryption;
Expand Down Expand Up @@ -60,19 +61,35 @@ pub enum Chunking {
/// and 30 (1KB to 1GB)
#[serde(rename = "bup")]
Bup { chunk_bits: u32 },
#[serde(rename = "gear")]
Gear { chunk_bits: u32 },
}

/// Default implementation for the `Chunking`
impl Default for Chunking {
fn default() -> Chunking {
Chunking::Bup { chunk_bits: DEFAULT_BUP_CHUNK_BITS }
Chunking::Bup {
chunk_bits: DEFAULT_BUP_CHUNK_BITS,
}
}
}

impl Chunking {
pub fn valid(self) -> bool {
match self {
Chunking::Bup { chunk_bits: bits } => 30 >= bits && bits >= 10,
Chunking::Gear { chunk_bits: bits } => 30 >= bits && bits >= 10,
}
}

pub(crate) fn to_engine(&self) -> Box<chunking::Chunking> {
match *self {
Chunking::Bup { chunk_bits } => Box::new(
chunking::Bup::new(chunk_bits),
),
Chunking::Gear { chunk_bits } => Box::new(
chunking::Gear::new(chunk_bits),
),
}
}
}
Expand Down Expand Up @@ -218,8 +235,8 @@ impl Repo {

pub fn write(&self, aio: &asyncio::AsyncIO) -> super::Result<()> {

let config_str = serde_yaml::to_string(self)
.expect("yaml serialization failed");
let config_str =
serde_yaml::to_string(self).expect("yaml serialization failed");

aio.write(
CONFIG_YML_FILE.into(),
Expand Down
4 changes: 2 additions & 2 deletions lib/src/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ struct Curve25519Decrypter {
}
impl Decrypter for Curve25519Decrypter {
fn decrypt(&self, buf: SGData, digest: &[u8]) -> io::Result<SGData> {
let nonce = box_::Nonce::from_slice(&digest[0..box_::NONCEBYTES])
.unwrap();
let nonce =
box_::Nonce::from_slice(&digest[0..box_::NONCEBYTES]).unwrap();

let buf = buf.to_linear();

Expand Down
21 changes: 11 additions & 10 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ mod iterators;
use iterators::StoredChunks;

mod config;
use config::Chunking;

mod sg;
use sg::*;

mod asyncio;
use asyncio::*;

mod chunking;

mod chunk_processor;
use chunk_processor::*;

Expand Down Expand Up @@ -539,7 +540,9 @@ impl Repo {
info!(self.log, "Opening read handle");
let decrypter = self.config.encryption.decrypter(pass)?;

Ok(DecryptHandle { decrypter: decrypter })
Ok(DecryptHandle {
decrypter: decrypter,
})

}

Expand All @@ -551,7 +554,9 @@ impl Repo {
let encrypter = self.config.encryption.encrypter(pass)?;


Ok(EncryptHandle { encrypter: encrypter })
Ok(EncryptHandle {
encrypter: encrypter,
})
}

pub fn path(&self) -> &Path {
Expand Down Expand Up @@ -765,20 +770,16 @@ impl Repo {
self.log.clone(),
);

let chunk_bits = match self.config.chunking {
Chunking::Bup { chunk_bits: bits } => bits,
};

let chunker = Chunker::new(
input_data_iter.into_iter(),
BupEdgeFinder::new(chunk_bits),
BupEdgeFinder::new(self.config.chunking.to_engine()),
);

// TODO: Change to `enumerate_u64`
let mut data = chunker.enumerate();

while let Some(i_sg) = timer
.start_with("rx-and-chunking", || data.next())
while let Some(i_sg) =
timer.start_with("rx-and-chunking", || data.next())
{
timer.start("tx");
let (i, sg) = i_sg;
Expand Down
13 changes: 5 additions & 8 deletions lib/src/sg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//!

use DataType;
use chunking;
use owning_ref::ArcRef;
use rollsum;
use sgdata::SGData;
use std::{io, mem};
use std::result::Result;
Expand All @@ -15,15 +15,12 @@ pub trait EdgeFinder {

/// Finds edges using rolling sum
pub struct BupEdgeFinder {
roll: rollsum::Bup,
chunking: Box<chunking::Chunking>,
}

impl BupEdgeFinder {
pub fn new(chunk_bits: u32) -> Self {
BupEdgeFinder {
chunk_bits: chunk_bits,
roll: rollsum::Bup::new_with_chunk_bits(chunk_bits),
}
pub(crate) fn new(chunking: Box<chunking::Chunking>) -> Self {
BupEdgeFinder { chunking: chunking }
}
}

Expand All @@ -34,7 +31,7 @@ impl EdgeFinder for BupEdgeFinder {
let mut edges = vec![];

while ofs < len {
if let Some(count) = self.roll.find_chunk_edge(&buf[ofs..len]) {
if let Some(count) = self.chunking.find_chunk_edge(&buf[ofs..len]) {
ofs += count;

edges.push(ofs);
Expand Down
8 changes: 4 additions & 4 deletions lib/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ fn change_passphrase() {
None,
).unwrap();

let enc_handle = repo.unlock_encrypt(&|| Ok(prev_passphrase.into()))
.unwrap();
let enc_handle =
repo.unlock_encrypt(&|| Ok(prev_passphrase.into())).unwrap();

repo.write("data", &mut io::Cursor::new(&data_before), &enc_handle)
.unwrap();
Expand All @@ -271,8 +271,8 @@ fn change_passphrase() {

{
let repo = lib::Repo::open(dir_path, None).unwrap();
let dec_handle = repo.unlock_decrypt(&|| Ok(prev_passphrase.into()))
.unwrap();
let dec_handle =
repo.unlock_decrypt(&|| Ok(prev_passphrase.into())).unwrap();
let mut data_after = vec![];
repo.read("data", &mut data_after, &dec_handle).unwrap();

Expand Down

0 comments on commit ef746ab

Please sign in to comment.