Rename queue to broker and move to different module

This commit is contained in:
Zen 2021-08-17 07:32:35 +03:00
parent 5f9fa28f5d
commit 9d8290d33e
2 changed files with 142 additions and 134 deletions

139
av1an-core/src/broker.rs Normal file
View file

@ -0,0 +1,139 @@
use crate::{
finish_multi_progress_bar, finish_progress_bar, frame_probe, get_done, Chunk, Instant, Project,
TargetQuality, VecDeque, Verbosity,
};
use itertools::Itertools;
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::mpsc::Sender;
pub struct Broker<'a> {
pub chunk_queue: Vec<Chunk>,
pub project: &'a Project,
pub target_quality: Option<TargetQuality<'a>>,
}
impl<'a> Broker<'a> {
pub fn encoding_loop(self, tx: Sender<()>) {
if !self.chunk_queue.is_empty() {
let (sender, receiver) = crossbeam_channel::bounded(self.chunk_queue.len());
let workers = self.project.workers;
for chunk in &self.chunk_queue {
sender.send(chunk.clone()).unwrap();
}
drop(sender);
crossbeam_utils::thread::scope(|s| {
let consumers: Vec<_> = (0..workers)
.map(|i| (receiver.clone(), &self, i))
.map(|(rx, queue, consumer_idx)| {
let tx = tx.clone();
s.spawn(move |_| {
while let Ok(mut chunk) = rx.recv() {
if queue.encode_chunk(&mut chunk, consumer_idx).is_err() {
tx.send(()).unwrap();
return Err(());
}
}
Ok(())
})
})
.collect();
for consumer in consumers {
let _ = consumer.join().unwrap();
}
})
.unwrap();
if self.project.verbosity == Verbosity::Normal {
finish_progress_bar();
} else if self.project.verbosity == Verbosity::Verbose {
finish_multi_progress_bar();
}
}
}
fn encode_chunk(&self, chunk: &mut Chunk, worker_id: usize) -> Result<(), VecDeque<String>> {
let st_time = Instant::now();
info!("Enc: {}, {} fr", chunk.index, chunk.frames);
// Target Quality mode
if self.project.target_quality.is_some() {
if let Some(ref method) = self.project.target_quality_method {
if method == "per_shot" {
if let Some(ref tq) = self.target_quality {
tq.per_shot_target_quality_routine(chunk);
}
}
}
}
// Run all passes for this chunk
const MAX_TRIES: usize = 3;
for current_pass in 1..=self.project.passes {
for r#try in 1..=MAX_TRIES {
let res = self.project.create_pipes(chunk, current_pass, worker_id);
if let Err((exit_status, output)) = res {
warn!(
"Encoder failed (on chunk {}) with {}:\n{}",
chunk.index,
exit_status,
textwrap::indent(&output.iter().join("\n"), /* 8 spaces */ " ")
);
if r#try == MAX_TRIES {
error!(
"Encoder crashed (on chunk {}) {} times, terminating thread",
chunk.index, MAX_TRIES
);
return Err(output);
}
} else {
break;
}
}
}
let encoded_frames = Self::frame_check_output(chunk, chunk.frames);
if encoded_frames == chunk.frames {
let progress_file = Path::new(&self.project.temp).join("done.json");
get_done().done.insert(chunk.name(), encoded_frames);
let mut progress_file = File::create(&progress_file).unwrap();
progress_file
.write_all(serde_json::to_string(get_done()).unwrap().as_bytes())
.unwrap();
let enc_time = st_time.elapsed();
info!(
"Done: {} Fr: {}/{}",
chunk.index, encoded_frames, chunk.frames
);
info!(
"Fps: {:.2} Time: {:?}",
encoded_frames as f64 / enc_time.as_secs_f64(),
enc_time
);
}
Ok(())
}
fn frame_check_output(chunk: &Chunk, expected_frames: usize) -> usize {
let actual_frames = frame_probe(&chunk.output_path());
if actual_frames != expected_frames {
info!(
"Chunk #{}: {}/{} fr",
chunk.index, actual_frames, expected_frames
);
}
actual_frames
}
}

View file

@ -12,6 +12,7 @@
#[macro_use]
extern crate log;
use crate::broker::Broker;
use crate::target_quality::TargetQuality;
use chunk::Chunk;
use dashmap::DashMap;
@ -24,7 +25,6 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::Sender;
use std::sync::{atomic, mpsc};
use sysinfo::SystemExt;
@ -56,13 +56,12 @@ use std::iter;
use std::string::ToString;
use std::time::Instant;
use itertools::Itertools;
use crate::encoder::Encoder;
use regex::Regex;
use flexi_logger::{Duplicate, FileSpec, Logger};
pub mod broker;
pub mod chunk;
pub mod concat;
pub mod encoder;
@ -359,136 +358,6 @@ pub fn is_vapoursynth(s: &str) -> bool {
[".vpy", ".py"].iter().any(|ext| s.ends_with(ext))
}
struct Queue<'a> {
chunk_queue: Vec<Chunk>,
project: &'a Project,
target_quality: Option<TargetQuality<'a>>,
}
impl<'a> Queue<'a> {
fn encoding_loop(self, tx: Sender<()>) {
if !self.chunk_queue.is_empty() {
let (sender, receiver) = crossbeam_channel::bounded(self.chunk_queue.len());
let workers = self.project.workers;
for chunk in &self.chunk_queue {
sender.send(chunk.clone()).unwrap();
}
drop(sender);
crossbeam_utils::thread::scope(|s| {
let consumers: Vec<_> = (0..workers)
.map(|i| (receiver.clone(), &self, i))
.map(|(rx, queue, consumer_idx)| {
let tx = tx.clone();
s.spawn(move |_| {
while let Ok(mut chunk) = rx.recv() {
if queue.encode_chunk(&mut chunk, consumer_idx).is_err() {
tx.send(()).unwrap();
return Err(());
}
}
Ok(())
})
})
.collect();
for consumer in consumers {
let _ = consumer.join().unwrap();
}
})
.unwrap();
if self.project.verbosity == Verbosity::Normal {
finish_progress_bar();
} else if self.project.verbosity == Verbosity::Verbose {
finish_multi_progress_bar();
}
}
}
fn encode_chunk(&self, chunk: &mut Chunk, worker_id: usize) -> Result<(), VecDeque<String>> {
let st_time = Instant::now();
info!("Enc: {}, {} fr", chunk.index, chunk.frames);
// Target Quality mode
if self.project.target_quality.is_some() {
if let Some(ref method) = self.project.target_quality_method {
if method == "per_shot" {
if let Some(ref tq) = self.target_quality {
tq.per_shot_target_quality_routine(chunk);
}
}
}
}
// Run all passes for this chunk
const MAX_TRIES: usize = 3;
for current_pass in 1..=self.project.passes {
for r#try in 1..=MAX_TRIES {
let res = self.project.create_pipes(chunk, current_pass, worker_id);
if let Err((exit_status, output)) = res {
warn!(
"Encoder failed (on chunk {}) with {}:\n{}",
chunk.index,
exit_status,
textwrap::indent(&output.iter().join("\n"), /* 8 spaces */ " ")
);
if r#try == MAX_TRIES {
error!(
"Encoder crashed (on chunk {}) {} times, terminating thread",
chunk.index, MAX_TRIES
);
return Err(output);
}
} else {
break;
}
}
}
let encoded_frames = Self::frame_check_output(chunk, chunk.frames);
if encoded_frames == chunk.frames {
let progress_file = Path::new(&self.project.temp).join("done.json");
get_done().done.insert(chunk.name(), encoded_frames);
let mut progress_file = File::create(&progress_file).unwrap();
progress_file
.write_all(serde_json::to_string(get_done()).unwrap().as_bytes())
.unwrap();
let enc_time = st_time.elapsed();
info!(
"Done: {} Fr: {}/{}",
chunk.index, encoded_frames, chunk.frames
);
info!(
"Fps: {:.2} Time: {:?}",
encoded_frames as f64 / enc_time.as_secs_f64(),
enc_time
);
}
Ok(())
}
fn frame_check_output(chunk: &Chunk, expected_frames: usize) -> usize {
let actual_frames = frame_probe(&chunk.output_path());
if actual_frames != expected_frames {
info!(
"Chunk #{}: {}/{} fr",
chunk.index, actual_frames, expected_frames
);
}
actual_frames
}
}
pub async fn process_pipe(pipe: tokio::process::Child, chunk_index: usize) -> Result<(), String> {
let status = pipe.wait_with_output().await.unwrap();
@ -1388,7 +1257,7 @@ impl Project {
let model = self.vmaf_path.as_ref();
let keep = self.keep;
let queue = Queue {
let queue = Broker {
chunk_queue,
project: self,
target_quality: if self.target_quality.is_some() {