1#![warn(clippy::all)]
56use std::{
57 sync::{
58 atomic::{AtomicBool, AtomicU64, Ordering},
59 mpsc::{self},
60 Arc,
61 },
62 time::{Duration, Instant},
63};
64
65use capture::{audio::AudioCapture, video::VideoCapture, Terminate};
66use crossbeam::{
67 channel::{bounded, Receiver, Sender},
68 select,
69};
70use encoders::{audio::AudioEncoder, opus_encoder::OpusEncoder};
71use portal_screencast_waycap::{CursorMode, ScreenCast, SourceType};
72use std::sync::Mutex;
73use types::{
74 audio_frame::{EncodedAudioFrame, RawAudioFrame},
75 config::{AudioEncoder as AudioEncoderType, QualityPreset, VideoEncoder as VideoEncoderType},
76 error::{Result, WaycapError},
77 video_frame::{EncodedVideoFrame, RawVideoFrame},
78};
79
80#[cfg(not(any(feature = "vaapi", feature = "nvidia")))]
81compile_error!("At least one encoder must be enabled: 'vaapi' or 'nvidia'.");
82
83#[cfg(all(feature = "vulkan", feature = "egl"))]
84compile_error!("Features 'vulkan' and 'egl' are mutually exclusive. Enable only one.");
85
86#[cfg(all(feature = "nvidia", not(any(feature = "vulkan", feature = "egl"))))]
87compile_error!("The 'nvidia' feature requires either 'vulkan' or 'egl' to also be enabled.");
88
89mod capture;
90mod encoders;
91pub mod pipeline;
92pub mod types;
93mod utils;
94#[cfg(all(feature = "nvidia", feature = "vulkan"))]
95mod waycap_vulkan;
96#[cfg(all(feature = "nvidia", feature = "egl"))]
97mod waycap_egl;
98
99pub use crate::encoders::dma_buf_encoder::DmaBufEncoder;
100pub use crate::encoders::dynamic_encoder::DynamicEncoder;
101#[cfg(feature = "nvidia")]
102pub use crate::encoders::nvenc_encoder::NvencEncoder;
103pub use crate::encoders::rgba_image_encoder::RgbaImageEncoder;
104#[cfg(feature = "vaapi")]
105pub use crate::encoders::vaapi_encoder::VaapiEncoder;
106pub use encoders::video::VideoEncoder;
107pub use utils::TIME_UNIT_NS;
108
109use crate::encoders::video::{PipewireSPA, StartVideoEncoder};
110
111pub struct Resolution {
113 width: u32,
114 height: u32,
115}
116
117pub struct Capture<V: VideoEncoder + Send> {
150 controls: Arc<CaptureControls>,
151 worker_handles: Vec<std::thread::JoinHandle<Result<()>>>,
152
153 video_encoder: Option<Arc<Mutex<V>>>,
154 pw_video_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
155
156 audio_encoder: Option<Arc<Mutex<dyn AudioEncoder + Send>>>,
157 pw_audio_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
158
159 pub restore_token: Option<String>,
163}
164
165#[derive(Debug)]
167pub struct CaptureControls {
168 stop_flag: AtomicBool,
169 pause_flag: AtomicBool,
170 target_fps: AtomicU64,
171}
172
173impl CaptureControls {
174 fn from_fps(target_fps: u64) -> Self {
175 Self {
176 stop_flag: AtomicBool::new(false),
177 pause_flag: AtomicBool::new(true),
178 target_fps: AtomicU64::new(target_fps),
179 }
180 }
181 pub fn skip_processing(&self) -> bool {
183 self.is_paused() || self.is_stopped()
184 }
185 pub fn is_paused(&self) -> bool {
187 self.pause_flag.load(Ordering::Acquire)
188 }
189 pub fn is_stopped(&self) -> bool {
191 self.stop_flag.load(Ordering::Acquire)
192 }
193 pub fn stop(&self) {
197 self.stop_flag.store(true, Ordering::Release);
198 }
199
200 pub fn pause(&self) {
202 self.pause_flag.store(true, Ordering::Release);
203 }
204
205 pub fn resume(&self) {
207 self.pause_flag.store(false, Ordering::Release);
208 }
209
210 pub fn frame_interval_ns(&self) -> u64 {
212 TIME_UNIT_NS / self.target_fps.load(Ordering::Acquire)
213 }
214}
215
216#[derive(Default, Debug)]
218pub struct ReadyState {
219 audio: AtomicBool,
220 video: AtomicBool,
221}
222
223impl ReadyState {
224 pub fn video_ready(&self) -> bool {
225 self.video.load(Ordering::Acquire)
226 }
227 pub fn audio_ready(&self) -> bool {
228 self.audio.load(Ordering::Acquire)
229 }
230 fn wait_for_both(&self) {
231 while !self.audio.load(Ordering::Acquire) || !self.video.load(Ordering::Acquire) {
232 std::thread::sleep(Duration::from_millis(100));
233 }
234 }
235}
236
237impl<V: VideoEncoder + PipewireSPA + StartVideoEncoder> Capture<V> {
238 pub fn new_with_encoder(video_encoder: V, include_cursor: bool, target_fps: u64) -> Result<Self>
239 where
240 V: 'static,
241 {
242 let mut _self = Self {
243 controls: Arc::new(CaptureControls::from_fps(target_fps)),
244 worker_handles: Vec::new(),
245 video_encoder: Some(Arc::new(Mutex::new(video_encoder))),
246 audio_encoder: None,
247 pw_video_terminate_tx: None,
248 pw_audio_terminate_tx: None,
249 restore_token: None,
250 };
251
252 let (frame_rx, ready_state, _, restore_token) =
253 _self.start_pipewire_video(include_cursor, None)?;
254 _self.restore_token = restore_token;
255
256 std::thread::sleep(Duration::from_millis(100));
257 ready_state.audio.store(true, Ordering::Release);
258 _self.start().unwrap();
259
260 ready_state.wait_for_both();
261
262 V::start_processing(&mut _self, frame_rx)?;
263
264 log::info!("Capture started successfully.");
265 Ok(_self)
266 }
267
268 #[allow(clippy::type_complexity)]
269 fn start_pipewire_video(
270 &mut self,
271 include_cursor: bool,
272 restore_token: Option<String>,
273 ) -> Result<(Receiver<RawVideoFrame>, Arc<ReadyState>, Resolution, Option<String>)> {
274 let (frame_tx, frame_rx): (Sender<RawVideoFrame>, Receiver<RawVideoFrame>) = bounded(10);
275
276 let ready_state = Arc::new(ReadyState::default());
277 let ready_state_pw = Arc::clone(&ready_state);
278
279 let (pw_sender, pw_recv) = pipewire::channel::channel();
280 self.pw_video_terminate_tx = Some(pw_sender);
281
282 let (reso_sender, reso_recv) = mpsc::channel::<Resolution>();
283
284 let mut screen_cast = ScreenCast::new()?;
285 screen_cast.set_source_types(SourceType::all());
286 screen_cast.set_cursor_mode(if include_cursor {
287 CursorMode::EMBEDDED
288 } else {
289 CursorMode::HIDDEN
290 });
291 if let Some(token) = restore_token {
292 screen_cast.set_restore_token(token);
293 }
294 let active_cast = screen_cast.start(None)?;
295 let new_restore_token = active_cast.restore_token().map(|s| s.to_owned());
296 let fd = active_cast.pipewire_fd();
297 let stream = active_cast.streams().next().unwrap();
298 let stream_node = stream.pipewire_node();
299 let controls = Arc::clone(&self.controls);
300
301 self.worker_handles
302 .push(std::thread::spawn(move || -> Result<()> {
303 let mut video_cap = match VideoCapture::new(
304 fd,
305 stream_node,
306 ready_state_pw,
307 controls,
308 reso_sender,
309 frame_tx,
310 pw_recv,
311 V::get_spa_definition()?,
312 ) {
313 Ok(pw_capture) => pw_capture,
314 Err(e) => {
315 log::error!("Error initializing pipewire struct: {e:}");
316 return Err(e);
317 }
318 };
319
320 video_cap.run()?;
321
322 let _ = active_cast.close(); Ok(())
324 }));
325
326 let timeout = Duration::from_secs(5);
328 let start = Instant::now();
329 let resolution = loop {
330 if let Ok(reso) = reso_recv.try_recv() {
331 break reso;
332 }
333
334 if start.elapsed() > timeout {
335 log::error!("Timeout waiting for PipeWire negotiated resolution.");
336 return Err(WaycapError::Init(
337 "Timed out waiting for pipewire to negotiate video resolution".into(),
338 ));
339 }
340
341 std::thread::sleep(Duration::from_millis(100));
342 };
343
344 Ok((frame_rx, ready_state, resolution, new_restore_token))
345 }
346
347 fn start_pipewire_audio(
348 &mut self,
349 audio_encoder_type: AudioEncoderType,
350 ready_state: Arc<ReadyState>,
351 ) -> Result<Receiver<RawAudioFrame>> {
352 let (pw_audio_sender, pw_audio_recv) = pipewire::channel::channel();
353 self.pw_audio_terminate_tx = Some(pw_audio_sender);
354 let (audio_tx, audio_rx): (Sender<RawAudioFrame>, Receiver<RawAudioFrame>) = bounded(10);
355 let controls = Arc::clone(&self.controls);
356 let pw_audio_worker = std::thread::spawn(move || -> Result<()> {
357 log::debug!("Starting audio stream");
358 let mut audio_cap = AudioCapture::new(ready_state, audio_tx, pw_audio_recv, controls)?;
359 audio_cap.run();
360 Ok(())
361 });
362
363 self.worker_handles.push(pw_audio_worker);
364
365 let enc: Arc<Mutex<dyn AudioEncoder + Send>> = match audio_encoder_type {
366 AudioEncoderType::Opus => Arc::new(Mutex::new(OpusEncoder::new()?)),
367 };
368
369 self.audio_encoder = Some(enc);
370
371 Ok(audio_rx)
372 }
373}
374impl<V: VideoEncoder> Capture<V> {
375 pub fn start(&mut self) -> Result<()> {
377 self.controls.resume();
378 Ok(())
379 }
380
381 pub fn controls(&mut self) -> Arc<CaptureControls> {
383 Arc::clone(&self.controls)
384 }
385
386 pub fn finish(&mut self) -> Result<()> {
389 self.controls.pause();
390 if let Some(ref mut enc) = self.video_encoder {
391 enc.lock().unwrap().drain()?;
392 }
393 if let Some(ref mut enc) = self.audio_encoder {
394 enc.lock().unwrap().drain()?;
395 }
396 Ok(())
397 }
398
399 pub fn reset(&mut self) -> Result<()> {
401 if let Some(ref mut enc) = self.video_encoder {
402 enc.lock().unwrap().reset()?;
403 }
404 if let Some(ref mut enc) = self.audio_encoder {
405 enc.lock().unwrap().reset()?;
406 }
407
408 Ok(())
409 }
410
411 pub fn close(&mut self) -> Result<()> {
415 self.finish()?;
416 self.controls.stop();
417 if let Some(pw_vid) = &self.pw_video_terminate_tx {
418 let _ = pw_vid.send(Terminate {});
419 }
420 if let Some(pw_aud) = &self.pw_audio_terminate_tx {
421 let _ = pw_aud.send(Terminate {});
422 }
423
424 for handle in self.worker_handles.drain(..) {
425 let _ = handle.join();
426 }
427
428 drop(self.video_encoder.take());
429 drop(self.audio_encoder.take());
430
431 Ok(())
432 }
433
434 pub fn get_output(&mut self) -> Receiver<V::Output> {
435 self.video_encoder
436 .as_mut()
437 .unwrap()
438 .lock()
439 .unwrap()
440 .output()
441 .unwrap()
442 }
443}
444
445impl Capture<DynamicEncoder> {
446 pub fn new(
447 video_encoder_type: Option<VideoEncoderType>,
448 audio_encoder_type: AudioEncoderType,
449 quality: QualityPreset,
450 include_cursor: bool,
451 include_audio: bool,
452 target_fps: u64,
453 restore_token: Option<String>,
454 ) -> Result<Self> {
455 let mut _self = Self {
456 controls: Arc::new(CaptureControls::from_fps(target_fps)),
457 worker_handles: Vec::new(),
458 video_encoder: None,
459 audio_encoder: None,
460 pw_video_terminate_tx: None,
461 pw_audio_terminate_tx: None,
462 restore_token: None,
463 };
464
465 let (frame_rx, ready_state, resolution, new_restore_token) =
466 _self.start_pipewire_video(include_cursor, restore_token)?;
467 _self.restore_token = new_restore_token;
468
469 _self.video_encoder = Some(Arc::new(Mutex::new(DynamicEncoder::new(
470 video_encoder_type,
471 resolution.width,
472 resolution.height,
473 quality,
474 )?)));
475
476 if include_audio {
477 let audio_rx =
478 _self.start_pipewire_audio(audio_encoder_type, Arc::clone(&ready_state))?;
479 ready_state.wait_for_both();
481 let audio_loop = audio_encoding_loop(
482 Arc::clone(_self.audio_encoder.as_ref().unwrap()),
483 audio_rx,
484 Arc::clone(&_self.controls),
485 );
486
487 _self.worker_handles.push(audio_loop);
488 } else {
489 println!("No audio");
490 ready_state.audio.store(true, Ordering::Release);
491 ready_state.wait_for_both();
492 }
493
494 DynamicEncoder::start_processing(&mut _self, frame_rx)?;
495
496 log::info!("Capture started successfully.");
497 Ok(_self)
498 }
499
500 pub fn get_video_receiver(&mut self) -> Receiver<EncodedVideoFrame> {
505 self.video_encoder
506 .as_mut()
507 .expect("Cannot access a video encoder which was never started.")
508 .lock()
509 .unwrap()
510 .output()
511 .unwrap()
512 }
513
514 pub fn get_audio_receiver(&mut self) -> Result<Receiver<EncodedAudioFrame>> {
519 if let Some(ref mut audio_enc) = self.audio_encoder {
520 return Ok(audio_enc.lock().unwrap().get_encoded_recv().unwrap());
521 } else {
522 Err(WaycapError::Validation(
523 "Audio encoder does not exist".to_string(),
524 ))
525 }
526 }
527
528 pub fn with_video_encoder<F, R>(&self, f: F) -> R
550 where
551 F: FnOnce(&Option<ffmpeg_next::encoder::Video>) -> R,
552 {
553 let guard = self
554 .video_encoder
555 .as_ref()
556 .expect("Cannot access a video encoder which was never started.")
557 .lock()
558 .unwrap();
559 f(guard.get_encoder())
560 }
561
562 pub fn with_audio_encoder<F, R>(&self, f: F) -> R
584 where
585 F: FnOnce(&Option<ffmpeg_next::encoder::Audio>) -> R,
586 {
587 assert!(self.audio_encoder.is_some());
588
589 let guard = self.audio_encoder.as_ref().unwrap().lock().unwrap();
590 f(guard.get_encoder())
591 }
592}
593
594impl<V: VideoEncoder> Drop for Capture<V> {
595 fn drop(&mut self) {
596 let _ = self.close();
597
598 for handle in self.worker_handles.drain(..) {
599 let _ = handle.join();
600 }
601 }
602}
603
604#[allow(clippy::too_many_arguments)]
605fn audio_encoding_loop(
606 audio_encoder: Arc<Mutex<dyn AudioEncoder + Send>>,
607 audio_recv: Receiver<RawAudioFrame>,
608 controls: Arc<CaptureControls>,
609) -> std::thread::JoinHandle<Result<()>> {
610 std::thread::spawn(move || -> Result<()> {
611 while !controls.is_stopped() {
614 if controls.is_paused() {
615 std::thread::sleep(Duration::from_millis(100));
616 continue;
617 }
618
619 select! {
620 recv(audio_recv) -> raw_samples => {
621 match raw_samples {
622 Ok(raw_samples) => {
623 audio_encoder.as_ref().lock().unwrap().process(raw_samples)?;
626 }
627 Err(_) => {
628 log::info!("Audio channel disconnected");
629 break;
630 }
631 }
632 }
633 default(Duration::from_millis(100)) => {
634 }
636 }
637 }
638 Ok(())
639 })
640}