1#![warn(clippy::all)]
56use std::{
57 sync::{
58 atomic::{AtomicBool, AtomicU64, Ordering},
59 mpsc::{self},
60 Arc,
61 },
62 time::{Duration, Instant},
63};
64
65use capture::{audio::AudioCapture, video::VideoCapture, Terminate};
66use crossbeam::{
67 channel::{bounded, Receiver, Sender},
68 select,
69};
70use encoders::{audio::AudioEncoder, opus_encoder::OpusEncoder};
71use portal_screencast_waycap::{CursorMode, ScreenCast, SourceType};
72use std::sync::Mutex;
73use types::{
74 audio_frame::{EncodedAudioFrame, RawAudioFrame},
75 config::{AudioEncoder as AudioEncoderType, QualityPreset, VideoEncoder as VideoEncoderType},
76 error::{Result, WaycapError},
77 video_frame::{EncodedVideoFrame, RawVideoFrame},
78};
79
80mod capture;
81mod encoders;
82pub mod pipeline;
83pub mod types;
84mod utils;
85mod waycap_egl;
86
87pub use crate::encoders::dma_buf_encoder::DmaBufEncoder;
88pub use crate::encoders::dynamic_encoder::DynamicEncoder;
89pub use crate::encoders::nvenc_encoder::NvencEncoder;
90pub use crate::encoders::rgba_image_encoder::RgbaImageEncoder;
91pub use crate::encoders::vaapi_encoder::VaapiEncoder;
92pub use encoders::video::VideoEncoder;
93pub use utils::TIME_UNIT_NS;
94
95use crate::encoders::video::{PipewireSPA, StartVideoEncoder};
96
97pub struct Resolution {
99 width: u32,
100 height: u32,
101}
102
103pub struct Capture<V: VideoEncoder + Send> {
136 controls: Arc<CaptureControls>,
137 worker_handles: Vec<std::thread::JoinHandle<Result<()>>>,
138
139 video_encoder: Option<Arc<Mutex<V>>>,
140 pw_video_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
141
142 audio_encoder: Option<Arc<Mutex<dyn AudioEncoder + Send>>>,
143 pw_audio_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
144}
145
146#[derive(Debug)]
148pub struct CaptureControls {
149 stop_flag: AtomicBool,
150 pause_flag: AtomicBool,
151 target_fps: AtomicU64,
152}
153
154impl CaptureControls {
155 fn from_fps(target_fps: u64) -> Self {
156 Self {
157 stop_flag: AtomicBool::new(false),
158 pause_flag: AtomicBool::new(true),
159 target_fps: AtomicU64::new(target_fps),
160 }
161 }
162 pub fn skip_processing(&self) -> bool {
164 self.is_paused() || self.is_stopped()
165 }
166 pub fn is_paused(&self) -> bool {
168 self.pause_flag.load(Ordering::Acquire)
169 }
170 pub fn is_stopped(&self) -> bool {
172 self.stop_flag.load(Ordering::Acquire)
173 }
174 pub fn stop(&self) {
178 self.stop_flag.store(true, Ordering::Release);
179 }
180
181 pub fn pause(&self) {
183 self.pause_flag.store(true, Ordering::Release);
184 }
185
186 pub fn resume(&self) {
188 self.pause_flag.store(false, Ordering::Release);
189 }
190
191 pub fn frame_interval_ns(&self) -> u64 {
193 TIME_UNIT_NS / self.target_fps.load(Ordering::Acquire)
194 }
195}
196
197#[derive(Default, Debug)]
199pub struct ReadyState {
200 audio: AtomicBool,
201 video: AtomicBool,
202}
203
204impl ReadyState {
205 pub fn video_ready(&self) -> bool {
206 self.video.load(Ordering::Acquire)
207 }
208 pub fn audio_ready(&self) -> bool {
209 self.audio.load(Ordering::Acquire)
210 }
211 fn wait_for_both(&self) {
212 while !self.audio.load(Ordering::Acquire) || !self.video.load(Ordering::Acquire) {
213 std::thread::sleep(Duration::from_millis(100));
214 }
215 }
216}
217
218impl<V: VideoEncoder + PipewireSPA + StartVideoEncoder> Capture<V> {
219 pub fn new_with_encoder(video_encoder: V, include_cursor: bool, target_fps: u64) -> Result<Self>
220 where
221 V: 'static,
222 {
223 let mut _self = Self {
224 controls: Arc::new(CaptureControls::from_fps(target_fps)),
225 worker_handles: Vec::new(),
226 video_encoder: Some(Arc::new(Mutex::new(video_encoder))),
227 audio_encoder: None,
228 pw_video_terminate_tx: None,
229 pw_audio_terminate_tx: None,
230 };
231
232 let (frame_rx, ready_state, _) = _self.start_pipewire_video(include_cursor)?;
233
234 std::thread::sleep(Duration::from_millis(100));
235 ready_state.audio.store(true, Ordering::Release);
236 _self.start().unwrap();
237
238 ready_state.wait_for_both();
239
240 V::start_processing(&mut _self, frame_rx)?;
241
242 log::info!("Capture started successfully.");
243 Ok(_self)
244 }
245 fn start_pipewire_video(
246 &mut self,
247 include_cursor: bool,
248 ) -> Result<(Receiver<RawVideoFrame>, Arc<ReadyState>, Resolution)> {
249 let (frame_tx, frame_rx): (Sender<RawVideoFrame>, Receiver<RawVideoFrame>) = bounded(10);
250
251 let ready_state = Arc::new(ReadyState::default());
252 let ready_state_pw = Arc::clone(&ready_state);
253
254 let (pw_sender, pw_recv) = pipewire::channel::channel();
255 self.pw_video_terminate_tx = Some(pw_sender);
256
257 let (reso_sender, reso_recv) = mpsc::channel::<Resolution>();
258
259 let mut screen_cast = ScreenCast::new()?;
260 screen_cast.set_source_types(SourceType::all());
261 screen_cast.set_cursor_mode(if include_cursor {
262 CursorMode::EMBEDDED
263 } else {
264 CursorMode::HIDDEN
265 });
266 let active_cast = screen_cast.start(None)?;
267 let fd = active_cast.pipewire_fd();
268 let stream = active_cast.streams().next().unwrap();
269 let stream_node = stream.pipewire_node();
270 let controls = Arc::clone(&self.controls);
271 self.worker_handles
272 .push(std::thread::spawn(move || -> Result<()> {
273 let mut video_cap = match VideoCapture::new(
274 fd,
275 stream_node,
276 ready_state_pw,
277 controls,
278 reso_sender,
279 frame_tx,
280 pw_recv,
281 V::get_spa_definition()?,
282 ) {
283 Ok(pw_capture) => pw_capture,
284 Err(e) => {
285 log::error!("Error initializing pipewire struct: {e:}");
286 return Err(e);
287 }
288 };
289
290 video_cap.run()?;
291
292 let _ = active_cast.close(); Ok(())
294 }));
295
296 let timeout = Duration::from_secs(5);
298 let start = Instant::now();
299 let resolution = loop {
300 if let Ok(reso) = reso_recv.recv() {
301 break reso;
302 }
303
304 if start.elapsed() > timeout {
305 log::error!("Timeout waiting for PipeWire negotiated resolution.");
306 return Err(WaycapError::Init(
307 "Timed out waiting for pipewire to negotiate video resolution".into(),
308 ));
309 }
310
311 std::thread::sleep(Duration::from_millis(100));
312 };
313
314 Ok((frame_rx, ready_state, resolution))
315 }
316
317 fn start_pipewire_audio(
318 &mut self,
319 audio_encoder_type: AudioEncoderType,
320 ready_state: Arc<ReadyState>,
321 ) -> Result<Receiver<RawAudioFrame>> {
322 let (pw_audio_sender, pw_audio_recv) = pipewire::channel::channel();
323 self.pw_audio_terminate_tx = Some(pw_audio_sender);
324 let (audio_tx, audio_rx): (Sender<RawAudioFrame>, Receiver<RawAudioFrame>) = bounded(10);
325 let controls = Arc::clone(&self.controls);
326 let pw_audio_worker = std::thread::spawn(move || -> Result<()> {
327 log::debug!("Starting audio stream");
328 let audio_cap = AudioCapture::new(ready_state);
329 audio_cap.run(audio_tx, pw_audio_recv, controls)?;
330 Ok(())
331 });
332
333 self.worker_handles.push(pw_audio_worker);
334
335 let enc: Arc<Mutex<dyn AudioEncoder + Send>> = match audio_encoder_type {
336 AudioEncoderType::Opus => Arc::new(Mutex::new(OpusEncoder::new()?)),
337 };
338
339 self.audio_encoder = Some(enc);
340
341 Ok(audio_rx)
342 }
343}
344impl<V: VideoEncoder> Capture<V> {
345 pub fn start(&mut self) -> Result<()> {
347 self.controls.resume();
348 Ok(())
349 }
350
351 pub fn controls(&mut self) -> Arc<CaptureControls> {
353 Arc::clone(&self.controls)
354 }
355
356 pub fn finish(&mut self) -> Result<()> {
359 self.controls.pause();
360 if let Some(ref mut enc) = self.video_encoder {
361 enc.lock().unwrap().drain()?;
362 }
363 if let Some(ref mut enc) = self.audio_encoder {
364 enc.lock().unwrap().drain()?;
365 }
366 Ok(())
367 }
368
369 pub fn reset(&mut self) -> Result<()> {
371 if let Some(ref mut enc) = self.video_encoder {
372 enc.lock().unwrap().reset()?;
373 }
374 if let Some(ref mut enc) = self.audio_encoder {
375 enc.lock().unwrap().reset()?;
376 }
377
378 Ok(())
379 }
380
381 pub fn close(&mut self) -> Result<()> {
385 self.finish()?;
386 self.controls.stop();
387 if let Some(pw_vid) = &self.pw_video_terminate_tx {
388 let _ = pw_vid.send(Terminate {});
389 }
390 if let Some(pw_aud) = &self.pw_audio_terminate_tx {
391 let _ = pw_aud.send(Terminate {});
392 }
393
394 for handle in self.worker_handles.drain(..) {
395 let _ = handle.join();
396 }
397
398 drop(self.video_encoder.take());
399 drop(self.audio_encoder.take());
400
401 Ok(())
402 }
403
404 pub fn get_output(&mut self) -> Receiver<V::Output> {
405 self.video_encoder
406 .as_mut()
407 .unwrap()
408 .lock()
409 .unwrap()
410 .output()
411 .unwrap()
412 }
413}
414
415impl Capture<DynamicEncoder> {
416 pub fn new(
417 video_encoder_type: Option<VideoEncoderType>,
418 audio_encoder_type: AudioEncoderType,
419 quality: QualityPreset,
420 include_cursor: bool,
421 include_audio: bool,
422 target_fps: u64,
423 ) -> Result<Self> {
424 let mut _self = Self {
425 controls: Arc::new(CaptureControls::from_fps(target_fps)),
426 worker_handles: Vec::new(),
427 video_encoder: None,
428 audio_encoder: None,
429 pw_video_terminate_tx: None,
430 pw_audio_terminate_tx: None,
431 };
432
433 let (frame_rx, ready_state, resolution) = _self.start_pipewire_video(include_cursor)?;
434
435 _self.video_encoder = Some(Arc::new(Mutex::new(DynamicEncoder::new(
436 video_encoder_type,
437 resolution.width,
438 resolution.height,
439 quality,
440 )?)));
441
442 if include_audio {
443 println!("including audio");
444 let audio_rx =
445 _self.start_pipewire_audio(audio_encoder_type, Arc::clone(&ready_state))?;
446 ready_state.wait_for_both();
448 let audio_loop = audio_encoding_loop(
449 Arc::clone(_self.audio_encoder.as_ref().unwrap()),
450 audio_rx,
451 Arc::clone(&_self.controls),
452 );
453
454 _self.worker_handles.push(audio_loop);
455 } else {
456 println!("No audio");
457 ready_state.audio.store(true, Ordering::Release);
458 ready_state.wait_for_both();
459 }
460
461 DynamicEncoder::start_processing(&mut _self, frame_rx)?;
462
463 log::info!("Capture started successfully.");
464 Ok(_self)
465 }
466
467 pub fn get_video_receiver(&mut self) -> Receiver<EncodedVideoFrame> {
472 self.video_encoder
473 .as_mut()
474 .expect("Cannot access a video encoder which was never started.")
475 .lock()
476 .unwrap()
477 .output()
478 .unwrap()
479 }
480
481 pub fn get_audio_receiver(&mut self) -> Result<Receiver<EncodedAudioFrame>> {
486 if let Some(ref mut audio_enc) = self.audio_encoder {
487 return Ok(audio_enc.lock().unwrap().get_encoded_recv().unwrap());
488 } else {
489 Err(WaycapError::Validation(
490 "Audio encoder does not exist".to_string(),
491 ))
492 }
493 }
494
495 pub fn with_video_encoder<F, R>(&self, f: F) -> R
517 where
518 F: FnOnce(&Option<ffmpeg_next::encoder::Video>) -> R,
519 {
520 let guard = self
521 .video_encoder
522 .as_ref()
523 .expect("Cannot access a video encoder which was never started.")
524 .lock()
525 .unwrap();
526 f(guard.get_encoder())
527 }
528
529 pub fn with_audio_encoder<F, R>(&self, f: F) -> R
551 where
552 F: FnOnce(&Option<ffmpeg_next::encoder::Audio>) -> R,
553 {
554 assert!(self.audio_encoder.is_some());
555
556 let guard = self.audio_encoder.as_ref().unwrap().lock().unwrap();
557 f(guard.get_encoder())
558 }
559}
560
561impl<V: VideoEncoder> Drop for Capture<V> {
562 fn drop(&mut self) {
563 let _ = self.close();
564
565 for handle in self.worker_handles.drain(..) {
566 let _ = handle.join();
567 }
568 }
569}
570
571#[allow(clippy::too_many_arguments)]
572fn audio_encoding_loop(
573 audio_encoder: Arc<Mutex<dyn AudioEncoder + Send>>,
574 audio_recv: Receiver<RawAudioFrame>,
575 controls: Arc<CaptureControls>,
576) -> std::thread::JoinHandle<Result<()>> {
577 std::thread::spawn(move || -> Result<()> {
578 while !controls.is_stopped() {
581 if controls.is_paused() {
582 std::thread::sleep(Duration::from_millis(100));
583 continue;
584 }
585
586 select! {
587 recv(audio_recv) -> raw_samples => {
588 match raw_samples {
589 Ok(raw_samples) => {
590 audio_encoder.as_ref().lock().unwrap().process(raw_samples)?;
593 }
594 Err(_) => {
595 log::info!("Audio channel disconnected");
596 break;
597 }
598 }
599 }
600 default(Duration::from_millis(100)) => {
601 }
603 }
604 }
605 Ok(())
606 })
607}