1#![warn(clippy::all)]
56use std::{
57 sync::{
58 atomic::{AtomicBool, AtomicU64, Ordering},
59 mpsc::{self},
60 Arc,
61 },
62 time::{Duration, Instant},
63};
64
65use capture::{audio::AudioCapture, video::VideoCapture, Terminate};
66use crossbeam::{
67 channel::{bounded, Receiver, Sender},
68 select,
69};
70use encoders::{audio::AudioEncoder, opus_encoder::OpusEncoder};
71use portal_screencast_waycap::{CursorMode, ScreenCast, SourceType};
72use std::sync::Mutex;
73use types::{
74 audio_frame::{EncodedAudioFrame, RawAudioFrame},
75 config::{AudioEncoder as AudioEncoderType, QualityPreset, VideoEncoder as VideoEncoderType},
76 error::{Result, WaycapError},
77 video_frame::{EncodedVideoFrame, RawVideoFrame},
78};
79
80mod capture;
81mod encoders;
82pub mod pipeline;
83pub mod types;
84mod utils;
85mod waycap_egl;
86
87pub use crate::encoders::dma_buf_encoder::DmaBufEncoder;
88pub use crate::encoders::dynamic_encoder::DynamicEncoder;
89pub use crate::encoders::nvenc_encoder::NvencEncoder;
90pub use crate::encoders::rgba_image_encoder::RgbaImageEncoder;
91pub use crate::encoders::vaapi_encoder::VaapiEncoder;
92pub use encoders::video::VideoEncoder;
93pub use utils::TIME_UNIT_NS;
94
95use crate::encoders::video::{PipewireSPA, StartVideoEncoder};
96
97pub struct Resolution {
99 width: u32,
100 height: u32,
101}
102
103pub struct Capture<V: VideoEncoder + Send> {
136 controls: Arc<CaptureControls>,
137 worker_handles: Vec<std::thread::JoinHandle<Result<()>>>,
138
139 video_encoder: Option<Arc<Mutex<V>>>,
140 pw_video_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
141
142 audio_encoder: Option<Arc<Mutex<dyn AudioEncoder + Send>>>,
143 pw_audio_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
144}
145
146#[derive(Debug)]
148pub struct CaptureControls {
149 stop_flag: AtomicBool,
150 pause_flag: AtomicBool,
151 target_fps: AtomicU64,
152}
153
154impl CaptureControls {
155 fn from_fps(target_fps: u64) -> Self {
156 Self {
157 stop_flag: AtomicBool::new(false),
158 pause_flag: AtomicBool::new(true),
159 target_fps: AtomicU64::new(target_fps),
160 }
161 }
162 pub fn skip_processing(&self) -> bool {
164 self.is_paused() || self.is_stopped()
165 }
166 pub fn is_paused(&self) -> bool {
168 self.pause_flag.load(Ordering::Acquire)
169 }
170 pub fn is_stopped(&self) -> bool {
172 self.stop_flag.load(Ordering::Acquire)
173 }
174 pub fn stop(&self) {
178 self.stop_flag.store(true, Ordering::Release);
179 }
180
181 pub fn pause(&self) {
183 self.pause_flag.store(true, Ordering::Release);
184 }
185
186 pub fn resume(&self) {
188 self.pause_flag.store(false, Ordering::Release);
189 }
190
191 pub fn frame_interval_ns(&self) -> u64 {
193 TIME_UNIT_NS / self.target_fps.load(Ordering::Acquire)
194 }
195}
196
197#[derive(Default, Debug)]
199pub struct ReadyState {
200 audio: AtomicBool,
201 video: AtomicBool,
202}
203
204impl ReadyState {
205 pub fn video_ready(&self) -> bool {
206 self.video.load(Ordering::Acquire)
207 }
208 pub fn audio_ready(&self) -> bool {
209 self.audio.load(Ordering::Acquire)
210 }
211 fn wait_for_both(&self) {
212 while !self.audio.load(Ordering::Acquire) || !self.video.load(Ordering::Acquire) {
213 std::thread::sleep(Duration::from_millis(100));
214 }
215 }
216}
217
218impl<V: VideoEncoder + PipewireSPA + StartVideoEncoder> Capture<V> {
219 pub fn new_with_encoder(video_encoder: V, include_cursor: bool, target_fps: u64) -> Result<Self>
220 where
221 V: 'static,
222 {
223 let mut _self = Self {
224 controls: Arc::new(CaptureControls::from_fps(target_fps)),
225 worker_handles: Vec::new(),
226 video_encoder: Some(Arc::new(Mutex::new(video_encoder))),
227 audio_encoder: None,
228 pw_video_terminate_tx: None,
229 pw_audio_terminate_tx: None,
230 };
231
232 let (frame_rx, ready_state, _) = _self.start_pipewire_video(include_cursor)?;
233
234 std::thread::sleep(Duration::from_millis(100));
235 ready_state.audio.store(true, Ordering::Release);
236 _self.start().unwrap();
237
238 ready_state.wait_for_both();
239
240 V::start_processing(&mut _self, frame_rx)?;
241
242 log::info!("Capture started successfully.");
243 Ok(_self)
244 }
245
246 fn start_pipewire_video(
247 &mut self,
248 include_cursor: bool,
249 ) -> Result<(Receiver<RawVideoFrame>, Arc<ReadyState>, Resolution)> {
250 let (frame_tx, frame_rx): (Sender<RawVideoFrame>, Receiver<RawVideoFrame>) = bounded(10);
251
252 let ready_state = Arc::new(ReadyState::default());
253 let ready_state_pw = Arc::clone(&ready_state);
254
255 let (pw_sender, pw_recv) = pipewire::channel::channel();
256 self.pw_video_terminate_tx = Some(pw_sender);
257
258 let (reso_sender, reso_recv) = mpsc::channel::<Resolution>();
259
260 let mut screen_cast = ScreenCast::new()?;
261 screen_cast.set_source_types(SourceType::all());
262 screen_cast.set_cursor_mode(if include_cursor {
263 CursorMode::EMBEDDED
264 } else {
265 CursorMode::HIDDEN
266 });
267 let active_cast = screen_cast.start(None)?;
268 let fd = active_cast.pipewire_fd();
269 let stream = active_cast.streams().next().unwrap();
270 let stream_node = stream.pipewire_node();
271 let controls = Arc::clone(&self.controls);
272
273 self.worker_handles
274 .push(std::thread::spawn(move || -> Result<()> {
275 let mut video_cap = match VideoCapture::new(
276 fd,
277 stream_node,
278 ready_state_pw,
279 controls,
280 reso_sender,
281 frame_tx,
282 pw_recv,
283 V::get_spa_definition()?,
284 ) {
285 Ok(pw_capture) => pw_capture,
286 Err(e) => {
287 log::error!("Error initializing pipewire struct: {e:}");
288 return Err(e);
289 }
290 };
291
292 video_cap.run()?;
293
294 let _ = active_cast.close(); Ok(())
296 }));
297
298 let timeout = Duration::from_secs(5);
300 let start = Instant::now();
301 let resolution = loop {
302 if let Ok(reso) = reso_recv.try_recv() {
303 break reso;
304 }
305
306 if start.elapsed() > timeout {
307 log::error!("Timeout waiting for PipeWire negotiated resolution.");
308 return Err(WaycapError::Init(
309 "Timed out waiting for pipewire to negotiate video resolution".into(),
310 ));
311 }
312
313 std::thread::sleep(Duration::from_millis(100));
314 };
315
316 Ok((frame_rx, ready_state, resolution))
317 }
318
319 fn start_pipewire_audio(
320 &mut self,
321 audio_encoder_type: AudioEncoderType,
322 ready_state: Arc<ReadyState>,
323 ) -> Result<Receiver<RawAudioFrame>> {
324 let (pw_audio_sender, pw_audio_recv) = pipewire::channel::channel();
325 self.pw_audio_terminate_tx = Some(pw_audio_sender);
326 let (audio_tx, audio_rx): (Sender<RawAudioFrame>, Receiver<RawAudioFrame>) = bounded(10);
327 let controls = Arc::clone(&self.controls);
328 let pw_audio_worker = std::thread::spawn(move || -> Result<()> {
329 log::debug!("Starting audio stream");
330 let audio_cap = AudioCapture::new(ready_state);
331 audio_cap.run(audio_tx, pw_audio_recv, controls)?;
332 Ok(())
333 });
334
335 self.worker_handles.push(pw_audio_worker);
336
337 let enc: Arc<Mutex<dyn AudioEncoder + Send>> = match audio_encoder_type {
338 AudioEncoderType::Opus => Arc::new(Mutex::new(OpusEncoder::new()?)),
339 };
340
341 self.audio_encoder = Some(enc);
342
343 Ok(audio_rx)
344 }
345}
346impl<V: VideoEncoder> Capture<V> {
347 pub fn start(&mut self) -> Result<()> {
349 self.controls.resume();
350 Ok(())
351 }
352
353 pub fn controls(&mut self) -> Arc<CaptureControls> {
355 Arc::clone(&self.controls)
356 }
357
358 pub fn finish(&mut self) -> Result<()> {
361 self.controls.pause();
362 if let Some(ref mut enc) = self.video_encoder {
363 enc.lock().unwrap().drain()?;
364 }
365 if let Some(ref mut enc) = self.audio_encoder {
366 enc.lock().unwrap().drain()?;
367 }
368 Ok(())
369 }
370
371 pub fn reset(&mut self) -> Result<()> {
373 if let Some(ref mut enc) = self.video_encoder {
374 enc.lock().unwrap().reset()?;
375 }
376 if let Some(ref mut enc) = self.audio_encoder {
377 enc.lock().unwrap().reset()?;
378 }
379
380 Ok(())
381 }
382
383 pub fn close(&mut self) -> Result<()> {
387 self.finish()?;
388 self.controls.stop();
389 if let Some(pw_vid) = &self.pw_video_terminate_tx {
390 let _ = pw_vid.send(Terminate {});
391 }
392 if let Some(pw_aud) = &self.pw_audio_terminate_tx {
393 let _ = pw_aud.send(Terminate {});
394 }
395
396 for handle in self.worker_handles.drain(..) {
397 let _ = handle.join();
398 }
399
400 drop(self.video_encoder.take());
401 drop(self.audio_encoder.take());
402
403 Ok(())
404 }
405
406 pub fn get_output(&mut self) -> Receiver<V::Output> {
407 self.video_encoder
408 .as_mut()
409 .unwrap()
410 .lock()
411 .unwrap()
412 .output()
413 .unwrap()
414 }
415}
416
417impl Capture<DynamicEncoder> {
418 pub fn new(
419 video_encoder_type: Option<VideoEncoderType>,
420 audio_encoder_type: AudioEncoderType,
421 quality: QualityPreset,
422 include_cursor: bool,
423 include_audio: bool,
424 target_fps: u64,
425 ) -> Result<Self> {
426 let mut _self = Self {
427 controls: Arc::new(CaptureControls::from_fps(target_fps)),
428 worker_handles: Vec::new(),
429 video_encoder: None,
430 audio_encoder: None,
431 pw_video_terminate_tx: None,
432 pw_audio_terminate_tx: None,
433 };
434
435 let (frame_rx, ready_state, resolution) = _self.start_pipewire_video(include_cursor)?;
436
437 _self.video_encoder = Some(Arc::new(Mutex::new(DynamicEncoder::new(
438 video_encoder_type,
439 resolution.width,
440 resolution.height,
441 quality,
442 )?)));
443
444 if include_audio {
445 let audio_rx =
446 _self.start_pipewire_audio(audio_encoder_type, Arc::clone(&ready_state))?;
447 ready_state.wait_for_both();
449 let audio_loop = audio_encoding_loop(
450 Arc::clone(_self.audio_encoder.as_ref().unwrap()),
451 audio_rx,
452 Arc::clone(&_self.controls),
453 );
454
455 _self.worker_handles.push(audio_loop);
456 } else {
457 println!("No audio");
458 ready_state.audio.store(true, Ordering::Release);
459 ready_state.wait_for_both();
460 }
461
462 DynamicEncoder::start_processing(&mut _self, frame_rx)?;
463
464 log::info!("Capture started successfully.");
465 Ok(_self)
466 }
467
468 pub fn get_video_receiver(&mut self) -> Receiver<EncodedVideoFrame> {
473 self.video_encoder
474 .as_mut()
475 .expect("Cannot access a video encoder which was never started.")
476 .lock()
477 .unwrap()
478 .output()
479 .unwrap()
480 }
481
482 pub fn get_audio_receiver(&mut self) -> Result<Receiver<EncodedAudioFrame>> {
487 if let Some(ref mut audio_enc) = self.audio_encoder {
488 return Ok(audio_enc.lock().unwrap().get_encoded_recv().unwrap());
489 } else {
490 Err(WaycapError::Validation(
491 "Audio encoder does not exist".to_string(),
492 ))
493 }
494 }
495
496 pub fn with_video_encoder<F, R>(&self, f: F) -> R
518 where
519 F: FnOnce(&Option<ffmpeg_next::encoder::Video>) -> R,
520 {
521 let guard = self
522 .video_encoder
523 .as_ref()
524 .expect("Cannot access a video encoder which was never started.")
525 .lock()
526 .unwrap();
527 f(guard.get_encoder())
528 }
529
530 pub fn with_audio_encoder<F, R>(&self, f: F) -> R
552 where
553 F: FnOnce(&Option<ffmpeg_next::encoder::Audio>) -> R,
554 {
555 assert!(self.audio_encoder.is_some());
556
557 let guard = self.audio_encoder.as_ref().unwrap().lock().unwrap();
558 f(guard.get_encoder())
559 }
560}
561
562impl<V: VideoEncoder> Drop for Capture<V> {
563 fn drop(&mut self) {
564 let _ = self.close();
565
566 for handle in self.worker_handles.drain(..) {
567 let _ = handle.join();
568 }
569 }
570}
571
572#[allow(clippy::too_many_arguments)]
573fn audio_encoding_loop(
574 audio_encoder: Arc<Mutex<dyn AudioEncoder + Send>>,
575 audio_recv: Receiver<RawAudioFrame>,
576 controls: Arc<CaptureControls>,
577) -> std::thread::JoinHandle<Result<()>> {
578 std::thread::spawn(move || -> Result<()> {
579 while !controls.is_stopped() {
582 if controls.is_paused() {
583 std::thread::sleep(Duration::from_millis(100));
584 continue;
585 }
586
587 select! {
588 recv(audio_recv) -> raw_samples => {
589 match raw_samples {
590 Ok(raw_samples) => {
591 audio_encoder.as_ref().lock().unwrap().process(raw_samples)?;
594 }
595 Err(_) => {
596 log::info!("Audio channel disconnected");
597 break;
598 }
599 }
600 }
601 default(Duration::from_millis(100)) => {
602 }
604 }
605 }
606 Ok(())
607 })
608}