Merge branch 'unstable'

This commit is contained in:
Josh Holmer 2021-12-15 15:03:21 -05:00
commit 519bd0504e
9 changed files with 218 additions and 35 deletions

View file

@ -49,7 +49,7 @@ fn version() -> &'static str {
Some(commit_date),
) => {
format!(
"{} (rev {}) ({})
"{}-unstable (rev {}) ({})
* Compiler
rustc {} (LLVM {})
@ -173,6 +173,10 @@ pub struct CliOpts {
#[structopt(long, possible_values=&["standard", "fast"], default_value = "standard")]
pub sc_method: ScenecutMethod,
/// Perform scene detection with this pixel format
#[structopt(long)]
pub sc_pix_format: Option<Pixel>,
/// Optional downscaling for scenecut detection.
/// Specify as the desired maximum height to scale to
/// (e.g. "720" to downscale to 720p--this will leave lower resolution content untouched).
@ -386,6 +390,7 @@ pub fn parse_cli(args: CliOpts) -> anyhow::Result<EncodeArgs> {
} else {
None
},
sc_pix_format: args.sc_pix_format,
keep: args.keep,
max_tries: args.max_tries,
min_q: args.min_q,

View file

@ -1,9 +1,13 @@
use crate::progress_bar::update_progress_bar_estimates;
use crate::DoneChunk;
use crate::{
ffmpeg, finish_multi_progress_bar, finish_progress_bar, get_done,
progress_bar::{dec_bar, dec_mp_bar},
settings::EncodeArgs,
Chunk, Encoder, Instant, TargetQuality, Verbosity,
};
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use std::{
fmt::{Debug, Display},
fs::File,
@ -96,7 +100,12 @@ impl Display for EncoderCrash {
impl<'a> Broker<'a> {
/// Main encoding loop. set_thread_affinity may be ignored if the value is invalid.
#[allow(clippy::needless_pass_by_value)]
pub fn encoding_loop(self, tx: Sender<()>, mut set_thread_affinity: Option<usize>) {
pub fn encoding_loop(
self,
tx: Sender<()>,
mut set_thread_affinity: Option<usize>,
audio_size_bytes: Arc<AtomicU64>,
) {
if !self.chunk_queue.is_empty() {
let (sender, receiver) = crossbeam_channel::bounded(self.chunk_queue.len());
@ -125,11 +134,14 @@ impl<'a> Broker<'a> {
}
}
let frame_rate = self.project.input.frame_rate();
crossbeam_utils::thread::scope(|s| {
let consumers: Vec<_> = (0..self.project.workers)
.map(|idx| (receiver.clone(), &self, idx))
.map(|(rx, queue, worker_id)| {
let tx = tx.clone();
let audio_size_ref = Arc::clone(&audio_size_bytes);
s.spawn(move |_| {
cfg_if! {
if #[cfg(any(target_os = "linux", target_os = "windows"))] {
@ -147,7 +159,12 @@ impl<'a> Broker<'a> {
}
while let Ok(mut chunk) = rx.recv() {
if let Err(e) = queue.encode_chunk(&mut chunk, worker_id) {
if let Err(e) = queue.encode_chunk(
&mut chunk,
worker_id,
frame_rate,
Arc::clone(&audio_size_ref),
) {
error!("[chunk {}] {}", chunk.index, e);
tx.send(()).unwrap();
@ -172,7 +189,13 @@ impl<'a> Broker<'a> {
}
}
fn encode_chunk(&self, chunk: &mut Chunk, worker_id: usize) -> Result<(), EncoderCrash> {
fn encode_chunk(
&self,
chunk: &mut Chunk,
worker_id: usize,
frame_rate: f64,
audio_size_bytes: Arc<AtomicU64>,
) -> Result<(), EncoderCrash> {
let st_time = Instant::now();
// space padding at the beginning to align with "finished chunk"
@ -232,13 +255,29 @@ impl<'a> Broker<'a> {
let fps = encoded_frames as f64 / enc_time.as_secs_f64();
let progress_file = Path::new(&self.project.temp).join("done.json");
get_done().done.insert(chunk.name(), encoded_frames);
get_done().done.insert(
chunk.name(),
DoneChunk {
frames: encoded_frames,
size_bytes: Path::new(&chunk.output())
.metadata()
.expect("Unable to get size of finished chunk")
.len(),
},
);
let mut progress_file = File::create(&progress_file).unwrap();
progress_file
.write_all(serde_json::to_string(get_done()).unwrap().as_bytes())
.unwrap();
update_progress_bar_estimates(
frame_rate,
self.project.frames,
self.project.verbosity,
audio_size_bytes.load(atomic::Ordering::SeqCst),
);
debug!(
"finished chunk {:05}: {} frames, {:.2} fps, took {:.2?}",
chunk.index, chunk.frames, fps, enc_time

View file

@ -3,6 +3,7 @@ use ffmpeg_next::format::{input, Pixel};
use ffmpeg_next::media::Type as MediaType;
use ffmpeg_next::Error::StreamNotFound;
use path_abs::{PathAbs, PathInfo};
use std::path::PathBuf;
use std::{
ffi::OsStr,
path::Path,
@ -55,6 +56,16 @@ pub fn num_frames(source: &Path) -> Result<usize, ffmpeg_next::Error> {
)
}
pub fn frame_rate(source: &Path) -> Result<f64, ffmpeg_next::Error> {
let ictx = input(&source)?;
let input = ictx
.streams()
.best(MediaType::Video)
.ok_or(StreamNotFound)?;
let rate = input.avg_frame_rate();
Ok(rate.numerator() as f64 / rate.denominator() as f64)
}
pub fn get_pixel_format(source: &Path) -> Result<Pixel, ffmpeg_next::Error> {
let ictx = ffmpeg_next::format::input(&source)?;
@ -101,14 +112,14 @@ pub fn has_audio(file: &Path) -> bool {
/// Encodes the audio using FFmpeg, blocking the current thread.
///
/// This function returns `true` if the audio exists and the audio
/// successfully encoded, or `false` otherwise.
/// This function returns `Some(output)` if the audio exists and the audio
/// successfully encoded, or `None` otherwise.
#[must_use]
pub fn encode_audio<S: AsRef<OsStr>>(
input: impl AsRef<Path>,
temp: impl AsRef<Path>,
audio_params: &[S],
) -> bool {
) -> Option<PathBuf> {
let input = input.as_ref();
let temp = temp.as_ref();
@ -137,7 +148,7 @@ pub fn encode_audio<S: AsRef<OsStr>>(
]);
encode_audio.args(audio_params);
encode_audio.arg(audio_file);
encode_audio.arg(&audio_file);
let output = encode_audio.output().unwrap();
@ -146,12 +157,12 @@ pub fn encode_audio<S: AsRef<OsStr>>(
"FFmpeg failed to encode audio!\n{:#?}\nParams: {:?}",
output, encode_audio
);
return false;
return None;
}
true
Some(audio_file)
} else {
false
None
}
}

View file

@ -112,6 +112,14 @@ impl Input {
Input::VapourSynth(path) => vapoursynth::num_frames(path.as_path()).expect(FAIL_MSG),
}
}
pub fn frame_rate(&self) -> f64 {
const FAIL_MSG: &str = "Failed to get frame rate for input video";
match &self {
Input::Video(path) => ffmpeg::frame_rate(path.as_path()).expect(FAIL_MSG),
Input::VapourSynth(path) => vapoursynth::frame_rate(path.as_path()).expect(FAIL_MSG),
}
}
}
impl<P: AsRef<Path> + Into<PathBuf>> From<P> for Input {
@ -129,11 +137,17 @@ impl<P: AsRef<Path> + Into<PathBuf>> From<P> for Input {
}
}
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
struct DoneChunk {
frames: usize,
size_bytes: u64,
}
/// Concurrent data structure for keeping track of the finished chunks in an encode
#[derive(Debug, Deserialize, Serialize)]
struct DoneJson {
frames: AtomicUsize,
done: DashMap<String, usize>,
done: DashMap<String, DoneChunk>,
audio_done: AtomicBool,
}

View file

@ -1,3 +1,6 @@
use crate::get_done;
use crate::Verbosity;
use indicatif::HumanBytes;
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use once_cell::sync::OnceCell;
@ -6,9 +9,9 @@ use crate::util::printable_base10_digits;
const INDICATIF_PROGRESS_TEMPLATE: &str = if cfg!(windows) {
// Do not use a spinner on Windows since the default console cannot display
// the characters used for the spinner
"{elapsed_precise:.bold} [{wide_bar:.blue/white.dim}] {percent:.bold} {pos} ({fps:.bold}, eta {eta})"
"{elapsed_precise:.bold} [{wide_bar:.blue/white.dim}] {percent:.bold} {pos} ({fps:.bold}, eta {eta}{msg})"
} else {
"{spinner:.green.bold} {elapsed_precise:.bold} [{wide_bar:.blue/white.dim}] {percent:.bold} {pos} ({fps:.bold}, eta {eta})"
"{spinner:.green.bold} {elapsed_precise:.bold} [{wide_bar:.blue/white.dim}] {percent:.bold} {pos} ({fps:.bold}, eta {eta}{msg})"
};
static PROGRESS_BAR: OnceCell<ProgressBar> = OnceCell::new();
@ -36,7 +39,7 @@ fn pretty_progress_style() -> ProgressStyle {
/// Enables steady 100 ms tick
pub fn init_progress_bar(len: u64) {
let pb = PROGRESS_BAR.get_or_init(|| ProgressBar::new(len).with_style(pretty_progress_style()));
pb.set_draw_target(ProgressDrawTarget::stderr());
pb.set_draw_target(ProgressDrawTarget::stderr_with_hz(60));
pb.enable_steady_tick(100);
pb.reset();
pb.reset_eta();
@ -56,6 +59,12 @@ pub fn dec_bar(dec: u64) {
}
}
pub fn update_bar_info(kbps: f64, est_size: HumanBytes) {
if let Some(pb) = PROGRESS_BAR.get() {
pb.set_message(format!(", {:.1} Kbps, est. {}", kbps, est_size));
}
}
pub fn set_pos(pos: u64) {
if let Some(pb) = PROGRESS_BAR.get() {
pb.set_position(pos);
@ -128,7 +137,7 @@ pub fn init_multi_progress_bar(len: u64, workers: usize) {
pb.reset();
pbs.push(mpb.add(pb));
mpb.set_draw_target(ProgressDrawTarget::stderr());
mpb.set_draw_target(ProgressDrawTarget::stderr_with_hz(60));
(mpb, pbs)
});
@ -153,6 +162,15 @@ pub fn dec_mp_bar(dec: u64) {
}
}
pub fn update_mp_bar_info(kbps: f64, est_size: HumanBytes) {
if let Some((_, pbs)) = MULTI_PROGRESS_BAR.get() {
pbs
.last()
.unwrap()
.set_message(format!(", {:.1} Kbps, est. {}", kbps, est_size));
}
}
pub fn finish_multi_progress_bar() {
if let Some((_, pbs)) = MULTI_PROGRESS_BAR.get() {
for pb in pbs.iter() {
@ -160,3 +178,31 @@ pub fn finish_multi_progress_bar() {
}
}
}
pub fn update_progress_bar_estimates(
frame_rate: f64,
total_frames: usize,
verbosity: Verbosity,
audio_size: u64,
) {
let completed_frames: usize = get_done()
.done
.iter()
.map(|ref_multi| ref_multi.value().frames)
.sum();
let total_size: u64 = get_done()
.done
.iter()
.map(|ref_multi| ref_multi.value().size_bytes)
.sum::<u64>()
+ audio_size;
let seconds_completed = completed_frames as f64 / frame_rate;
let kbps = total_size as f64 * 8. / 1000. / seconds_completed;
let progress = completed_frames as f64 / total_frames as f64;
let est_size = total_size as f64 / progress;
if verbosity == Verbosity::Normal {
update_bar_info(kbps, HumanBytes(est_size as u64));
} else if verbosity == Verbosity::Verbose {
update_mp_bar_info(kbps, HumanBytes(est_size as u64));
}
}

View file

@ -1,5 +1,8 @@
use crate::{ffmpeg, progress_bar, Input, ScenecutMethod, Verbosity};
use crate::{into_array, Encoder};
use ffmpeg_next::format::Pixel;
use smallvec::{smallvec, SmallVec};
use crate::Encoder;
use crate::{ffmpeg, into_smallvec, progress_bar, Input, ScenecutMethod, Verbosity};
use ansi_term::Style;
use av_scenechange::{detect_scene_changes, DetectionOptions, SceneDetectionSpeed};
@ -11,6 +14,7 @@ pub fn av_scenechange_detect(
total_frames: usize,
min_scene_len: usize,
verbosity: Verbosity,
sc_pix_format: Option<Pixel>,
sc_method: ScenecutMethod,
sc_downscale_height: Option<usize>,
) -> anyhow::Result<Vec<usize>> {
@ -30,6 +34,7 @@ pub fn av_scenechange_detect(
}))
},
min_scene_len,
sc_pix_format,
sc_method,
sc_downscale_height,
)?;
@ -51,13 +56,25 @@ pub fn scene_detect(
encoder: Encoder,
callback: Option<Box<dyn Fn(usize, usize)>>,
min_scene_len: usize,
sc_pix_format: Option<Pixel>,
sc_method: ScenecutMethod,
sc_downscale_height: Option<usize>,
) -> anyhow::Result<Vec<usize>> {
let bit_depth;
let filters: Option<[String; 2]> = sc_downscale_height
.map(|downscale_height| into_array!["-vf", format!("scale=-2:'min({},ih)'", downscale_height)]);
let filters: SmallVec<[String; 4]> = match (sc_downscale_height, sc_pix_format) {
(Some(sdh), Some(spf)) => into_smallvec![
"-vf",
format!(
"format={},scale=-2:'min({},ih)'",
spf.descriptor().unwrap().name(),
sdh
)
],
(Some(sdh), None) => into_smallvec!["-vf", format!("scale=-2:'min({},ih)'", sdh)],
(None, Some(spf)) => into_smallvec!["-pix_fmt", spf.descriptor().unwrap().name()],
(None, None) => smallvec![],
};
let decoder = &mut y4m::Decoder::new(match input {
Input::VapourSynth(path) => {
@ -73,7 +90,7 @@ pub fn scene_detect(
.stdout
.unwrap();
if let Some(filters) = &filters {
if !filters.is_empty() {
Command::new("ffmpeg")
.stdin(vspipe)
.args(["-i", "pipe:", "-f", "yuv4mpegpipe", "-strict", "-1"])
@ -91,15 +108,11 @@ pub fn scene_detect(
Input::Video(path) => {
let input_pix_format = ffmpeg::get_pixel_format(path.as_ref())
.unwrap_or_else(|e| panic!("FFmpeg failed to get pixel format for input video: {:?}", e));
bit_depth = encoder.get_format_bit_depth(input_pix_format)?;
bit_depth = encoder.get_format_bit_depth(sc_pix_format.unwrap_or(input_pix_format))?;
Command::new("ffmpeg")
.args(["-r", "1", "-i"])
.arg(path)
.args(
filters
.as_ref()
.map_or(&[] as &[String], |filters| &filters[..]),
)
.args(filters.as_ref())
.args(["-f", "yuv4mpegpipe", "-strict", "-1", "-"])
.stdin(Stdio::null())
.stdout(Stdio::piped())

View file

@ -1,4 +1,5 @@
use crate::parse::valid_params;
use crate::progress_bar::update_progress_bar_estimates;
use crate::progress_bar::{reset_bar_at, reset_mp_bar_at};
use crate::vapoursynth::{is_ffms2_installed, is_lsmash_installed};
use crate::{
@ -26,6 +27,7 @@ use ffmpeg_next::format::Pixel;
use itertools::Itertools;
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::sync::atomic::AtomicUsize;
use std::{
borrow::Cow,
cmp,
@ -39,7 +41,7 @@ use std::{
iter,
path::{Path, PathBuf},
process::{exit, Command, Stdio},
sync::atomic::{self, AtomicBool, AtomicUsize},
sync::atomic::{self, AtomicBool, AtomicU64},
sync::{mpsc, Arc},
};
@ -68,6 +70,7 @@ pub struct EncodeArgs {
pub chunk_method: ChunkMethod,
pub scenes: Option<PathBuf>,
pub split_method: SplitMethod,
pub sc_pix_format: Option<Pixel>,
pub sc_method: ScenecutMethod,
pub sc_downscale_height: Option<usize>,
pub extra_splits_len: Option<usize>,
@ -560,6 +563,7 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
self.frames,
self.min_scene_len,
self.verbosity,
self.sc_pix_format,
self.sc_method,
self.sc_downscale_height,
),
@ -859,12 +863,11 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
let done = fs::read_to_string(done_path)?;
let done: DoneJson = serde_json::from_str(&done)?;
self.frames = done.frames.load(atomic::Ordering::Relaxed);
init_done(done);
get_done()
init_done(done)
.done
.iter()
.map(|ref_multi| *ref_multi.value())
.map(|ref_multi| ref_multi.frames)
.sum()
} else {
self.frames = self.input.frames();
@ -896,14 +899,16 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
crossbeam_utils::thread::scope(|s| -> anyhow::Result<()> {
// vapoursynth audio is currently unsupported
let audio_size_bytes = Arc::new(AtomicU64::new(0));
let audio_thread = if self.input.is_video()
&& (!self.resume || !get_done().audio_done.load(atomic::Ordering::SeqCst))
{
let input = self.input.as_video_path();
let temp = self.temp.as_str();
let audio_params = self.audio_params.as_slice();
let audio_size_ref = Arc::clone(&audio_size_bytes);
Some(s.spawn(move |_| {
let audio_output_exists = ffmpeg::encode_audio(input, temp, audio_params);
let audio_output = ffmpeg::encode_audio(input, temp, audio_params);
get_done().audio_done.store(true, atomic::Ordering::SeqCst);
let progress_file = Path::new(temp).join("done.json");
@ -912,7 +917,14 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
.write_all(serde_json::to_string(get_done()).unwrap().as_bytes())
.unwrap();
audio_output_exists
if let Some(ref audio_output) = audio_output {
audio_size_ref.store(
audio_output.metadata().unwrap().len(),
atomic::Ordering::SeqCst,
);
}
audio_output.is_some()
}))
} else {
None
@ -946,6 +958,16 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
reset_mp_bar_at(initial_frames as u64);
}
if !get_done().done.is_empty() {
let frame_rate = self.input.frame_rate();
update_progress_bar_estimates(
frame_rate,
self.frames,
self.verbosity,
audio_size_bytes.load(atomic::Ordering::SeqCst),
);
}
let broker = Broker {
chunk_queue,
project: self,
@ -957,9 +979,10 @@ properly into a mkv file. Specify mkvmerge as the concatenation method by settin
max_tries: self.max_tries,
};
let audio_size_ref = Arc::clone(&audio_size_bytes);
let (tx, rx) = mpsc::channel();
let handle = s.spawn(|_| {
broker.encoding_loop(tx, self.set_thread_affinity);
broker.encoding_loop(tx, self.set_thread_affinity, audio_size_ref);
});
// Queue::encoding_loop only sends a message if there was an error (meaning a chunk crashed)

View file

@ -80,6 +80,17 @@ macro_rules! into_array {
};
}
#[macro_export]
macro_rules! into_smallvec {
($($x:expr),* $(,)?) => {
smallvec::smallvec![
$(
$x.into(),
)*
]
};
}
/// Attempts to create the directory if it does not exist, logging and returning
/// and error if creating the directory failed.
#[macro_export]

View file

@ -102,6 +102,15 @@ fn get_num_frames(env: &mut Environment) -> anyhow::Result<usize> {
Ok(num_frames)
}
fn get_frame_rate(env: &mut Environment) -> anyhow::Result<f64> {
let info = get_clip_info(env);
match info.framerate {
Property::Variable => bail!("Cannot output clips with varying framerate"),
Property::Constant(fps) => Ok(fps.numerator as f64 / fps.denominator as f64),
}
}
/// Get the bit depth from an environment that has already been
/// evaluated on a script.
fn get_bit_depth(env: &mut Environment) -> anyhow::Result<usize> {
@ -196,3 +205,15 @@ pub fn bit_depth(source: &Path) -> anyhow::Result<usize> {
get_bit_depth(&mut environment)
}
pub fn frame_rate(source: &Path) -> anyhow::Result<f64> {
// Create a new VSScript environment.
let mut environment = Environment::new().unwrap();
// Evaluate the script.
environment
.eval_file(source, EvalFlags::SetWorkingDir)
.unwrap();
get_frame_rate(&mut environment)
}