use std::fmt::{Debug, Display}; use std::fs::File; use std::io::Write; use std::path::Path; use std::process::ExitStatus; use std::sync::atomic::{self, AtomicU64}; use std::sync::mpsc::Sender; use std::sync::Arc; use std::thread::available_parallelism; use cfg_if::cfg_if; use memchr::memmem; use smallvec::SmallVec; use thiserror::Error; use crate::progress_bar::{dec_bar, dec_mp_bar, update_progress_bar_estimates}; use crate::settings::EncodeArgs; use crate::util::printable_base10_digits; use crate::{ finish_multi_progress_bar, finish_progress_bar, get_done, Chunk, DoneChunk, Encoder, Instant, TargetQuality, Verbosity, }; pub struct Broker<'a> { pub max_tries: usize, pub chunk_queue: Vec, pub total_chunks: usize, pub project: &'a EncodeArgs, pub target_quality: Option>, } #[derive(Clone)] pub enum StringOrBytes { String(String), Bytes(Vec), } impl Debug for StringOrBytes { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::String(s) => { if f.alternate() { f.write_str(&textwrap::indent(s, /* 8 spaces */ " "))?; } else { f.write_str(s)?; } } Self::Bytes(b) => write!(f, "raw bytes: {:?}", b)?, } Ok(()) } } impl From> for StringOrBytes { fn from(bytes: Vec) -> Self { if simdutf8::basic::from_utf8(&bytes).is_ok() { // SAFETY: this branch guarantees that the input is valid UTF8 Self::String(unsafe { String::from_utf8_unchecked(bytes) }) } else { Self::Bytes(bytes) } } } impl From for StringOrBytes { fn from(s: String) -> Self { Self::String(s) } } impl StringOrBytes { pub fn as_bytes(&self) -> &[u8] { match self { Self::String(s) => s.as_bytes(), Self::Bytes(b) => b, } } } #[derive(Error, Debug)] pub struct EncoderCrash { pub exit_status: ExitStatus, pub stdout: StringOrBytes, pub stderr: StringOrBytes, pub source_pipe_stderr: StringOrBytes, pub ffmpeg_pipe_stderr: Option, } impl Display for EncoderCrash { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "encoder crashed: {}\nstdout:\n{:#?}\nstderr:\n{:#?}\nsource pipe stderr:\n{:#?}", self.exit_status, self.stdout, self.stderr, self.source_pipe_stderr, )?; if let Some(ffmpeg_pipe_stderr) = &self.ffmpeg_pipe_stderr { write!(f, "\nffmpeg pipe stderr:\n{:#?}", ffmpeg_pipe_stderr)?; } Ok(()) } } impl<'a> Broker<'a> { /// Main encoding loop. set_thread_affinity may be ignored if the value is invalid. pub fn encoding_loop( self, tx: Sender<()>, mut set_thread_affinity: Option, audio_size_bytes: Arc, ) { assert!(self.total_chunks != 0); if !self.chunk_queue.is_empty() { let (sender, receiver) = crossbeam_channel::bounded(self.chunk_queue.len()); for chunk in &self.chunk_queue { sender.send(chunk.clone()).unwrap(); } drop(sender); cfg_if! { if #[cfg(any(target_os = "linux", target_os = "windows"))] { if let Some(threads) = set_thread_affinity { let available_threads = available_parallelism().expect("Unrecoverable: Failed to get thread count").get(); let requested_threads = threads.saturating_mul(self.project.workers); if requested_threads > available_threads { warn!( "ignoring set_thread_affinity: requested more threads than available ({}/{})", requested_threads, available_threads ); set_thread_affinity = None; } else if requested_threads == 0 { warn!("ignoring set_thread_affinity: requested 0 threads"); set_thread_affinity = None; } } } } let frame_rate = self.project.input.frame_rate().unwrap(); crossbeam_utils::thread::scope(|s| { let consumers: Vec<_> = (0..self.project.workers) .map(|idx| (receiver.clone(), &self, idx)) .map(|(rx, queue, worker_id)| { let tx = tx.clone(); let audio_size_ref = Arc::clone(&audio_size_bytes); s.spawn(move |_| { cfg_if! { if #[cfg(any(target_os = "linux", target_os = "windows"))] { if let Some(threads) = set_thread_affinity { let mut cpu_set = SmallVec::<[usize; 16]>::new(); cpu_set.extend((threads * worker_id..).take(threads)); if let Err(e) = affinity::set_thread_affinity(&cpu_set) { warn!( "failed to set thread affinity for worker {}: {}", worker_id, e ); } } } } while let Ok(mut chunk) = rx.recv() { if let Err(e) = queue.encode_chunk( &mut chunk, worker_id, frame_rate, Arc::clone(&audio_size_ref), ) { error!("[chunk {}] {}", chunk.index, e); tx.send(()).unwrap(); return Err(()); } } Ok(()) }) }) .collect(); for consumer in consumers { let _ = consumer.join().unwrap(); } }) .unwrap(); if self.project.verbosity == Verbosity::Normal { finish_progress_bar(); } else if self.project.verbosity == Verbosity::Verbose { finish_multi_progress_bar(); } } } fn encode_chunk( &self, chunk: &mut Chunk, worker_id: usize, frame_rate: f64, audio_size_bytes: Arc, ) -> Result<(), EncoderCrash> { let st_time = Instant::now(); // space padding at the beginning to align with "finished chunk" debug!(" started chunk {:05}: {} frames", chunk.index, chunk.frames); if let Some(ref tq) = self.target_quality { tq.per_shot_target_quality_routine(chunk)?; } // we display the index, so we need to subtract 1 to get the max index let padding = printable_base10_digits(self.total_chunks - 1) as usize; // Run all passes for this chunk let encoder = chunk .overrides .as_ref() .map_or(self.project.encoder, |ovr| ovr.encoder); let passes = chunk .overrides .as_ref() .map_or(self.project.passes, |ovr| ovr.passes); let mut tpl_crash_workaround = false; for current_pass in 1..=passes { for r#try in 1..=self.max_tries { let res = self.project.create_pipes( chunk, encoder, passes, current_pass, worker_id, padding, tpl_crash_workaround, ); if let Err((e, frames)) = res { if self.project.verbosity == Verbosity::Normal { dec_bar(frames); } else if self.project.verbosity == Verbosity::Verbose { dec_mp_bar(frames); } if r#try == self.max_tries { error!( "[chunk {}] encoder failed {} times, shutting down worker", chunk.index, self.max_tries ); return Err(e); } // avoids double-print of the error message as both a WARN and ERROR, // since `Broker::encoding_loop` will print the error message as well warn!("Encoder failed (on chunk {}):\n{}", chunk.index, e); if encoder == Encoder::aom && !tpl_crash_workaround && memmem::rfind(e.stderr.as_bytes(), b"av1_tpl_stats_ready").is_some() { // aomenc has had a history of crashes related to TPL on certain chunks, // particularly in videos with less motion, such as animated content. // This workaround retries a chunk with TPL disabled if such a crash is detected. // Although there is some amount of psychovisual quality loss with TPL disabled, // this is preferable to being unable to complete the encode. warn!("TPL-based crash, retrying chunk without TPL"); tpl_crash_workaround = true; } } else { break; } } } let enc_time = st_time.elapsed(); let fps = chunk.frames as f64 / enc_time.as_secs_f64(); let progress_file = Path::new(&self.project.temp).join("done.json"); get_done().done.insert( chunk.name(), DoneChunk { frames: chunk.frames, size_bytes: Path::new(&chunk.output()) .metadata() .expect("Unable to get size of finished chunk") .len(), }, ); let mut progress_file = File::create(&progress_file).unwrap(); progress_file .write_all(serde_json::to_string(get_done()).unwrap().as_bytes()) .unwrap(); update_progress_bar_estimates( frame_rate, self.project.frames, self.project.verbosity, audio_size_bytes.load(atomic::Ordering::SeqCst), ); debug!( "finished chunk {:05}: {} frames, {:.2} fps, took {:.2?}", chunk.index, chunk.frames, fps, enc_time ); Ok(()) } }