Skip to content

Commit

Permalink
Commit scan results to the datastore incrementally (#189)
Browse files Browse the repository at this point in the history
* Commit to the datastore every ~1s instead of just once at the end
* Update CHANGELOG with PR link
  • Loading branch information
bradlarsen authored May 28, 2024
1 parent c365dc7 commit 28fb51e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
Previously the default value was determined only by the number of available vCPUs.
Now the default value is additionally limited to ensure at least 4 GiB of system RAM per job.

- The `scan` command now records its results incrementally to the datastore instead of in one enormous transaction ([#189](https://github.com/praetorian-inc/noseyparker/pull/189)).
Now, results are recorded in transactions about every second.
This helps avoid complete loss of scan results in the rare event of a crash.

### Fixes

- A rare crash when parsing malformed Git commit timestamps has been fixed by updating the `gix-date` dependency ([#185](https://github.com/praetorian-inc/noseyparker/pull/185)).
Expand Down
27 changes: 15 additions & 12 deletions crates/noseyparker-cli/src/cmd_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rayon::prelude::*;
use std::path::Path;
use std::str::FromStr;
use std::sync::Mutex;
use std::time::Instant;
use std::time::{Duration, Instant};
use tracing::{debug, error, error_span, info, trace, warn};

use crate::{args, rule_loader::RuleLoader};
Expand Down Expand Up @@ -336,6 +336,7 @@ pub fn run(global_args: &args::GlobalArgs, args: &args::ScanArgs) -> Result<()>

// FIXME: expose the following as a CLI parameter
const BATCH_SIZE: usize = 16 * 1024;
const COMMIT_INTERVAL: Duration = Duration::from_secs(1);

// Create a channel pair for matcher threads to get their results to the datastore recorder.
let channel_size = std::cmp::max(args.num_jobs * BATCH_SIZE, 64 * BATCH_SIZE);
Expand All @@ -357,24 +358,30 @@ pub fn run(global_args: &args::GlobalArgs, args: &args::ScanArgs) -> Result<()>

// Big idea: read until all the senders hang up; panic if recording matches fails.
//
// Record all messages in one big transaction to maximize throughput.
// Record all messages chunked transactions, trying to commit at least every
// COMMIT_ITERVAL.

let mut batch: Vec<DatastoreMessage> = Vec::with_capacity(BATCH_SIZE);
let mut matches_in_batch: usize = 0;

let tx = datastore.begin()?;
let mut last_commit_time = Instant::now();

for message in recv_ds {
total_messages += 1;
matches_in_batch += message.2.len();
batch.push(message);

if batch.len() >= BATCH_SIZE || matches_in_batch >= BATCH_SIZE {
if batch.len() >= BATCH_SIZE
|| matches_in_batch >= BATCH_SIZE
|| last_commit_time.elapsed() >= COMMIT_INTERVAL
{
let t1 = std::time::Instant::now();
let batch_len = batch.len();
let tx = datastore.begin()?;
let num_added = tx
.record(batch.as_slice())
.context("Failed to record batch")?;
tx.commit()?;
last_commit_time = Instant::now();
num_matches_added += num_added;
batch.clear();
matches_in_batch = 0;
Expand All @@ -391,9 +398,11 @@ pub fn run(global_args: &args::GlobalArgs, args: &args::ScanArgs) -> Result<()>
let t1 = std::time::Instant::now();

let batch_len = batch.len();
let tx = datastore.begin()?;
let num_added = tx
.record(batch.as_slice())
.context("Failed to record batch")?;
tx.commit()?;
num_matches_added += num_added;
// batch.clear();
// matches_in_batch = 0;
Expand All @@ -406,21 +415,15 @@ pub fn run(global_args: &args::GlobalArgs, args: &args::ScanArgs) -> Result<()>
total_recording_time += elapsed;
}

let t1 = std::time::Instant::now();
tx.commit()?;
let commit_elapsed = t1.elapsed();

let num_matches = datastore.get_num_matches()?;

let t1 = std::time::Instant::now();
datastore.analyze()?;
let analyzed_elapsed = t1.elapsed();

debug!(
"Summary: recorded {num_matches} matches from {total_messages} messages \
in {:.6}s; committed in {:.6}s; analyzed in {:.6}s",
in {:.6}s; analyzed in {:.6}s",
total_recording_time.as_secs_f64(),
commit_elapsed.as_secs_f64(),
analyzed_elapsed.as_secs_f64()
);

Expand Down

0 comments on commit 28fb51e

Please sign in to comment.