Skip to content

Commit

Permalink
Merge pull request #4141 from remotion-dev/dont-use-threadpool-with-1…
Browse files Browse the repository at this point in the history
…-thread
  • Loading branch information
JonnyBurger authored Jul 31, 2024
2 parents 142f6d3 + 2b1053c commit 17d4fe5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 36 deletions.
28 changes: 21 additions & 7 deletions packages/renderer/rust/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ use crate::compositor::draw_layer;
use crate::copy_clipboard::copy_to_clipboard;
use crate::errors::ErrorWithBacktrace;
use crate::image::{save_as_jpeg, save_as_png};
use crate::memory::is_about_to_run_out_of_memory;
use crate::opened_video_manager::OpenedVideoManager;
use crate::payloads::payloads::CliInputCommandPayload;
use crate::{ffmpeg, get_silent_parts};
use crate::{ffmpeg, get_silent_parts, max_cache_size};
use std::io::ErrorKind;

pub fn execute_command(
opts: CliInputCommandPayload,
maximum_frame_cache_size_in_bytes: Option<u128>,
) -> Result<Vec<u8>, ErrorWithBacktrace> {
match opts {
pub fn execute_command(opts: CliInputCommandPayload) -> Result<Vec<u8>, ErrorWithBacktrace> {
let current_maximum_cache_size = max_cache_size::get_instance().lock().unwrap().get_value();

if is_about_to_run_out_of_memory() && current_maximum_cache_size.is_some() {
ffmpeg::emergency_memory_free_up().unwrap();
max_cache_size::get_instance()
.lock()
.unwrap()
.set_value(Some(current_maximum_cache_size.unwrap() / 2));
}

let res = match opts {
CliInputCommandPayload::ExtractFrame(command) => {
let res = ffmpeg::extract_frame(
command.src,
command.original_src,
command.time,
command.transparent,
command.tone_mapped,
maximum_frame_cache_size_in_bytes,
max_cache_size::get_instance().lock().unwrap().get_value(),
)?;
Ok(res)
}
Expand Down Expand Up @@ -99,5 +107,11 @@ pub fn execute_command(
ffmpeg::extract_audio(&_command.input_path, &_command.output_path)?;
Ok(vec![])
}
};
if current_maximum_cache_size.is_some() {
ffmpeg::keep_only_latest_frames_and_close_videos(current_maximum_cache_size.unwrap())
.unwrap();
}

return res;
}
67 changes: 38 additions & 29 deletions packages/renderer/rust/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod get_silent_parts;
mod global_printer;
mod image;
mod logger;
mod max_cache_size;
mod memory;
mod opened_stream;
mod opened_video;
Expand All @@ -17,11 +18,12 @@ mod payloads;
mod rotation;
mod scalable_frame;
mod tone_map;

use commands::execute_command;
use errors::{error_to_json, ErrorWithBacktrace};
use global_printer::{_print_verbose, set_verbose_logging};
use memory::{get_ideal_maximum_frame_cache_size, is_about_to_run_out_of_memory};
use std::{ env, thread};
use memory::get_ideal_maximum_frame_cache_size;
use std::{env, thread};

use payloads::payloads::{parse_cli, CliInputCommand, CliInputCommandPayload};

Expand Down Expand Up @@ -57,7 +59,7 @@ fn mainfn() -> Result<(), ErrorWithBacktrace> {
start_long_running_process(payload.concurrency, max_video_cache_size)?;
}
_ => {
let data = execute_command(opts.payload, None)?;
let data = execute_command(opts.payload)?;
global_printer::synchronized_write_buf(0, &opts.nonce, &data)?;
}
}
Expand All @@ -79,6 +81,11 @@ fn start_long_running_process(
.num_threads(threads)
.build()?;

max_cache_size::get_instance()
.lock()
.unwrap()
.set_value(Some(maximum_frame_cache_size_in_bytes));

loop {
let mut input = String::new();
let matched = match std::io::stdin().read_line(&mut input) {
Expand All @@ -94,33 +101,35 @@ fn start_long_running_process(
}
let opts: CliInputCommand = parse_cli(&input)?;

let mut current_maximum_cache_size = maximum_frame_cache_size_in_bytes;

pool.install(move || {
let handle = thread::spawn(move || {
if is_about_to_run_out_of_memory() {
ffmpeg::emergency_memory_free_up().unwrap();
current_maximum_cache_size = current_maximum_cache_size / 2;
}

match execute_command(opts.payload, Some(current_maximum_cache_size)) {
Ok(res) => {
global_printer::synchronized_write_buf(0, &opts.nonce, &res).unwrap()
}
Err(err) => global_printer::synchronized_write_buf(
1,
&opts.nonce,
&error_to_json(err).unwrap().as_bytes(),
)
.unwrap(),
};

ffmpeg::keep_only_latest_frames_and_close_videos(current_maximum_cache_size)
.unwrap();
if threads > 1 {
pool.install(move || {
let handle = thread::spawn(move || {
match execute_command(opts.payload) {
Ok(res) => {
global_printer::synchronized_write_buf(0, &opts.nonce, &res).unwrap()
}
Err(err) => global_printer::synchronized_write_buf(
1,
&opts.nonce,
&error_to_json(err).unwrap().as_bytes(),
)
.unwrap(),
};
});

handle.join().unwrap();
});

handle.join().unwrap();
});
} else {
match execute_command(opts.payload) {
Ok(res) => global_printer::synchronized_write_buf(0, &opts.nonce, &res).unwrap(),
Err(err) => global_printer::synchronized_write_buf(
1,
&opts.nonce,
&error_to_json(err).unwrap().as_bytes(),
)
.unwrap(),
};
}
}

Ok(())
Expand Down
33 changes: 33 additions & 0 deletions packages/renderer/rust/max_cache_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use lazy_static::lazy_static;
use std::sync::Mutex;

pub struct MaxCacheSize {
value: Option<u128>,
}

impl MaxCacheSize {
// Private constructor
fn new() -> Self {
MaxCacheSize { value: None }
}

// Getter for the value
pub fn get_value(&self) -> Option<u128> {
self.value
}

// Setter for the value
pub fn set_value(&mut self, value: Option<u128>) {
self.value = value;
}
}

// Global static instance of MaxCacheSize
lazy_static! {
static ref INSTANCE: Mutex<MaxCacheSize> = Mutex::new(MaxCacheSize::new());
}

// Function to access the global singleton instance
pub fn get_instance() -> &'static Mutex<MaxCacheSize> {
&INSTANCE
}

0 comments on commit 17d4fe5

Please sign in to comment.