Added --one-shot-resume for distributed Av1an usage

This commit is contained in:
DataHoarder 2022-07-01 22:38:52 +02:00
parent d9a7418ccd
commit a33040fd85
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
4 changed files with 57 additions and 5 deletions

View file

@ -50,6 +50,9 @@ Or with your own parameters:
-r --resume Resumes encoding.
--one-shot-resume One-shot resume. Each worker will process one chunk, then quit.
No concat will be done.
--keep Doesn't delete temporary folders after encode has finished.
-q --quiet Do not print a progress bar to the terminal.

View file

@ -157,6 +157,10 @@ pub struct CliOpts {
#[clap(short, long)]
pub resume: bool,
/// One-shot resume. Each worker will process one chunk, then quit.
#[clap(short, long)]
pub one_shot_resume: bool,
/// Do not delete the temporary folder after encoding has finished
#[clap(short, long)]
pub keep: bool,
@ -654,6 +658,7 @@ pub fn parse_cli(args: CliOpts) -> anyhow::Result<Vec<EncodeArgs>> {
probes: args.probes,
probing_rate: args.probing_rate,
resume: args.resume,
one_shot_resume: args.one_shot_resume,
scenes: args.scenes.clone(),
split_method: args.split_method.clone(),
sc_method: args.sc_method,

View file

@ -251,6 +251,7 @@ struct DoneChunk {
struct DoneJson {
frames: AtomicUsize,
done: DashMap<String, DoneChunk>,
locked: DashMap<String, bool>,
audio_done: AtomicBool,
}

View file

@ -1,5 +1,5 @@
use std::borrow::{Borrow, Cow};
use std::cmp::{Ordering, Reverse};
use std::cmp::{min, Ordering, Reverse};
use std::collections::{BTreeSet, HashSet};
use std::convert::TryInto;
use std::ffi::OsString;
@ -93,6 +93,7 @@ pub struct EncodeArgs {
pub verbosity: Verbosity,
pub log_file: PathBuf,
pub resume: bool,
pub one_shot_resume: bool,
pub keep: bool,
pub force: bool,
@ -180,6 +181,7 @@ impl EncodeArgs {
init_done(DoneJson {
frames: AtomicUsize::new(0),
done: DashMap::new(),
locked: DashMap::new(),
audio_done: AtomicBool::new(false),
});
@ -1038,7 +1040,22 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
/// Returns unfinished chunks and number of total chunks
fn load_or_gen_chunk_queue(&mut self, splits: &[Scene]) -> anyhow::Result<(Vec<Chunk>, usize)> {
if self.resume {
if self.resume && self.one_shot_resume {
let mut chunks = read_chunk_queue(self.temp.as_ref())?;
let mut num_chunks = chunks.len();
let done = get_done();
// only keep the chunks that are not done
chunks.retain(|chunk| !done.done.contains_key(&chunk.name()) && !done.locked.contains_key(&chunk.name()));
num_chunks = min(num_chunks, self.workers);
//only take as many chunks as workers
chunks = chunks[0..num_chunks].to_vec();
Ok((chunks, num_chunks))
} else if self.resume {
let mut chunks = read_chunk_queue(self.temp.as_ref())?;
let num_chunks = chunks.len();
@ -1126,6 +1143,10 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
exit(0);
}
if self.workers == 0 {
self.workers = determine_workers(self.encoder) as usize;
}
let (mut chunk_queue, total_chunks) = self.load_or_gen_chunk_queue(&splits)?;
if self.resume {
@ -1138,6 +1159,26 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
);
}
if self.one_shot_resume {
//Save lockfile into done so other chunks can start
let temp = self.temp.as_str();
let progress_file = Path::new(temp).join("done.json");
for chunk in &mut chunk_queue {
get_done().locked.insert(
chunk.name(),
true,
);
}
let mut progress_file = File::create(&progress_file).unwrap();
progress_file
.write_all(serde_json::to_string(get_done()).unwrap().as_bytes())
.unwrap();
info!("Updated done chunk locks");
}
if let Some(vspipe_cache) = vspipe_cache {
vspipe_cache.join().unwrap();
}
@ -1234,9 +1275,6 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
None
};
if self.workers == 0 {
self.workers = determine_workers(self.encoder) as usize;
}
self.workers = cmp::min(self.workers, chunk_queue.len());
if atty::is(atty::Stream::Stderr) {
@ -1318,6 +1356,11 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
let _audio_output_exists =
audio_thread.map_or(false, |audio_thread| audio_thread.join().unwrap());
if self.one_shot_resume {
debug!("encoding finished, exiting due to one-shot");
return Ok(())
}
debug!("encoding finished, concatenating with {}", self.concat);
match self.concat {