1#![warn(clippy::all)]
53use std::{
54 sync::{
55 atomic::{AtomicBool, Ordering},
56 mpsc, Arc, Mutex,
57 },
58 time::{Duration, Instant},
59};
60
61use capture::{audio::AudioCapture, video::VideoCapture, Terminate};
62use encoders::{
63 audio::AudioEncoder, nvenc_encoder::NvencEncoder, opus_encoder::OpusEncoder,
64 vaapi_encoder::VaapiEncoder, video::VideoEncoder,
65};
66use khronos_egl::Image;
67use portal_screencast_waycap::{CursorMode, ScreenCast, SourceType};
68use ringbuf::{
69 traits::{Consumer, Split},
70 HeapCons, HeapRb,
71};
72use types::{
73 audio_frame::{EncodedAudioFrame, RawAudioFrame},
74 config::{AudioEncoder as AudioEncoderType, QualityPreset, VideoEncoder as VideoEncoderType},
75 error::{Result, WaycapError},
76 video_frame::{EncodedVideoFrame, RawVideoFrame},
77};
78use utils::{calculate_dimensions, extract_dmabuf_planes};
79use waycap_egl::EglContext;
80
81mod capture;
82mod encoders;
83pub mod pipeline;
84pub mod types;
85mod utils;
86mod waycap_egl;
87
88pub struct Capture {
116 video_encoder: Arc<Mutex<dyn VideoEncoder + Send>>,
117 audio_encoder: Option<Arc<Mutex<dyn AudioEncoder + Send>>>,
118 stop_flag: Arc<AtomicBool>,
119 pause_flag: Arc<AtomicBool>,
120 egl_ctx: Arc<EglContext>,
121
122 worker_handles: Vec<std::thread::JoinHandle<()>>,
123
124 pw_video_terminate_tx: pipewire::channel::Sender<Terminate>,
125 pw_audio_terminate_tx: Option<pipewire::channel::Sender<Terminate>>,
126}
127
128impl Capture {
129 fn new(
130 video_encoder_type: VideoEncoderType,
131 audio_encoder_type: AudioEncoderType,
132 quality: QualityPreset,
133 include_cursor: bool,
134 include_audio: bool,
135 target_fps: u64,
136 ) -> Result<Self> {
137 let current_time = Instant::now();
138 let pause = Arc::new(AtomicBool::new(true));
139 let stop = Arc::new(AtomicBool::new(false));
140
141 let mut join_handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
142
143 let audio_ready = Arc::new(AtomicBool::new(false));
144 let video_ready = Arc::new(AtomicBool::new(false));
145
146 let video_ring_buf = HeapRb::<RawVideoFrame>::new(120);
147 let (video_ring_sender, video_ring_receiver) = video_ring_buf.split();
148
149 let (pw_sender, pw_recv) = pipewire::channel::channel();
150 let (reso_sender, reso_recv) = mpsc::channel::<(u32, u32)>();
151 let video_ready_pw = Arc::clone(&video_ready);
152 let audio_ready_pw = Arc::clone(&audio_ready);
153 let pause_video = Arc::clone(&pause);
154
155 let mut screen_cast = ScreenCast::new()?;
156 screen_cast.set_source_types(SourceType::all());
157 screen_cast.set_cursor_mode(if include_cursor {
158 CursorMode::EMBEDDED
159 } else {
160 CursorMode::HIDDEN
161 });
162 let active_cast = screen_cast.start(None)?;
163
164 let fd = active_cast.pipewire_fd();
165 let stream = active_cast.streams().next().unwrap();
166 let stream_node = stream.pipewire_node();
167
168 let use_nvenc_modifiers = match video_encoder_type {
169 VideoEncoderType::H264Nvenc => true,
170 VideoEncoderType::H264Vaapi => false,
171 };
172
173 let pw_video_capure = std::thread::spawn(move || {
174 let video_cap = VideoCapture::new(video_ready_pw, audio_ready_pw, use_nvenc_modifiers);
175 video_cap
176 .run(
177 fd,
178 stream_node,
179 video_ring_sender,
180 pw_recv,
181 pause_video,
182 current_time,
183 reso_sender,
184 )
185 .unwrap();
186
187 let _ = active_cast.close(); });
189
190 let timeout = Duration::from_secs(5);
192 let start = Instant::now();
193 let (width, height) = loop {
194 if let Ok((recv_width, recv_height)) = reso_recv.recv() {
195 break (recv_width, recv_height);
196 }
197
198 if start.elapsed() > timeout {
199 log::error!("Timeout waiting for PipeWire negotiated resolution.");
200 std::process::exit(1);
201 }
202
203 std::thread::sleep(Duration::from_millis(10));
204 };
205
206 join_handles.push(pw_video_capure);
207
208 let egl_context = Arc::new(EglContext::new(width as i32, height as i32).unwrap());
209
210 let video_encoder: Arc<Mutex<dyn VideoEncoder + Send>> = match video_encoder_type {
211 VideoEncoderType::H264Nvenc => {
212 let mut encoder = NvencEncoder::new(width, height, quality)?;
213 encoder.init_gl(egl_context.get_texture_id())?;
214
215 Arc::new(Mutex::new(encoder))
216 }
217 VideoEncoderType::H264Vaapi => {
218 Arc::new(Mutex::new(VaapiEncoder::new(width, height, quality)?))
219 }
220 };
221
222 let mut audio_encoder: Option<Arc<Mutex<dyn AudioEncoder + Send>>> = None;
223 let (pw_audio_sender, pw_audio_recv) = pipewire::channel::channel();
224 if include_audio {
225 let audio_ring_buffer = HeapRb::<RawAudioFrame>::new(10);
226 let (audio_ring_sender, audio_ring_receiver) = audio_ring_buffer.split();
227 let pause_capture = Arc::clone(&pause);
228 let video_r = Arc::clone(&video_ready);
229 let audio_r = Arc::clone(&audio_ready);
230 let pw_audio_worker = std::thread::spawn(move || {
231 log::debug!("Starting audio stream");
232 let audio_cap = AudioCapture::new(video_r, audio_r);
233 audio_cap
234 .run(
235 audio_ring_sender,
236 current_time,
237 pw_audio_recv,
238 pause_capture,
239 )
240 .unwrap();
241 });
242 join_handles.push(pw_audio_worker);
243
244 let enc: Arc<Mutex<dyn AudioEncoder + Send>> = match audio_encoder_type {
245 AudioEncoderType::Opus => Arc::new(Mutex::new(OpusEncoder::new()?)),
246 };
247
248 let audio_worker = audio_processor(
249 Arc::clone(&enc),
250 audio_ring_receiver,
251 Arc::clone(&stop),
252 Arc::clone(&pause),
253 );
254 join_handles.push(audio_worker);
255
256 audio_encoder = Some(enc);
257 } else {
258 audio_ready.store(true, Ordering::Release);
259 }
260
261 let video_worker = video_processor(
262 Arc::clone(&video_encoder),
263 video_ring_receiver,
264 Arc::clone(&stop),
265 Arc::clone(&pause),
266 target_fps,
267 Arc::clone(&egl_context),
268 );
269
270 join_handles.push(video_worker);
271
272 while !audio_ready.load(Ordering::Acquire) || !video_ready.load(Ordering::Acquire) {
274 std::thread::sleep(Duration::from_millis(100));
275 }
276
277 log::info!("Capture started sucessfully.");
278
279 Ok(Self {
280 video_encoder,
281 audio_encoder,
282 stop_flag: stop,
283 pause_flag: pause,
284 worker_handles: join_handles,
285 pw_video_terminate_tx: pw_sender,
286 pw_audio_terminate_tx: Some(pw_audio_sender),
287 egl_ctx: egl_context,
288 })
289 }
290
291 pub fn start(&mut self) -> Result<()> {
293 self.pause_flag.store(false, Ordering::Release);
294 Ok(())
295 }
296
297 pub fn pause(&mut self) -> Result<()> {
299 self.pause_flag.store(true, Ordering::Release);
300 Ok(())
301 }
302
303 pub fn finish(&mut self) -> Result<()> {
306 self.pause_flag.store(true, Ordering::Release);
307 self.video_encoder.lock().unwrap().drain()?;
308 if let Some(ref mut enc) = self.audio_encoder {
309 enc.lock().unwrap().drain()?;
310 }
311
312 Ok(())
313 }
314
315 pub fn reset(&mut self) -> Result<()> {
317 self.video_encoder.lock().unwrap().reset()?;
318 if let Some(ref mut enc) = self.audio_encoder {
319 enc.lock().unwrap().reset()?;
320 }
321
322 Ok(())
323 }
324
325 pub fn close(&mut self) -> Result<()> {
329 self.finish()?;
330 self.stop_flag.store(true, Ordering::Release);
331 let _ = self.pw_video_terminate_tx.send(Terminate {});
332 if let Some(pw_aud) = &self.pw_audio_terminate_tx {
333 let _ = pw_aud.send(Terminate {});
334 }
335
336 for handle in self.worker_handles.drain(..) {
337 let _ = handle.join();
338 }
339
340 self.video_encoder.lock().unwrap().drop_encoder();
341 self.audio_encoder.take();
342
343 Ok(())
344 }
345
346 pub fn take_video_receiver(&mut self) -> HeapCons<EncodedVideoFrame> {
352 self.video_encoder
353 .lock()
354 .unwrap()
355 .take_encoded_recv()
356 .unwrap()
357 }
358
359 pub fn take_audio_receiver(&mut self) -> Result<HeapCons<EncodedAudioFrame>> {
365 if let Some(ref mut audio_enc) = self.audio_encoder {
366 return Ok(audio_enc.lock().unwrap().take_encoded_recv().unwrap());
367 } else {
368 Err(WaycapError::Validation(
369 "Audio encoder does not exist".to_string(),
370 ))
371 }
372 }
373
374 pub fn with_video_encoder<F, R>(&self, f: F) -> R
389 where
390 F: FnOnce(&Option<ffmpeg_next::encoder::Video>) -> R,
391 {
392 let guard = self.video_encoder.lock().unwrap();
393 f(guard.get_encoder())
394 }
395
396 pub fn with_audio_encoder<F, R>(&self, f: F) -> R
411 where
412 F: FnOnce(&Option<ffmpeg_next::encoder::Audio>) -> R,
413 {
414 assert!(self.audio_encoder.is_some());
415 let guard = self.audio_encoder.as_ref().unwrap().lock().unwrap();
416 f(guard.get_encoder())
417 }
418}
419
420impl Drop for Capture {
421 fn drop(&mut self) {
422 let _ = self.close();
423
424 let _ = self.egl_ctx.release_current();
426 let _ = self.egl_ctx.make_current();
427 }
428}
429
430fn video_processor(
431 encoder: Arc<Mutex<dyn VideoEncoder + Send>>,
432 mut video_recv: HeapCons<RawVideoFrame>,
433 stop: Arc<AtomicBool>,
434 pause: Arc<AtomicBool>,
435 target_fps: u64,
436 egl_context: Arc<EglContext>,
437) -> std::thread::JoinHandle<()> {
438 egl_context.release_current().unwrap();
439 std::thread::spawn(move || {
440 let is_nvenc = encoder.lock().unwrap().as_any().is::<NvencEncoder>();
441
442 if is_nvenc {
443 encoder
445 .lock()
446 .unwrap()
447 .as_any()
448 .downcast_ref::<NvencEncoder>()
449 .unwrap()
450 .make_current()
451 .unwrap();
452 }
453
454 egl_context.make_current().unwrap();
455
456 let mut last_timestamp: u64 = 0;
457
458 loop {
459 if stop.load(Ordering::Acquire) {
460 break;
461 }
462
463 if pause.load(Ordering::Acquire) {
464 std::thread::sleep(Duration::from_nanos(100));
465 continue;
466 }
467
468 while let Some(raw_frame) = video_recv.try_pop() {
469 let current_time = raw_frame.timestamp as u64;
470
471 if current_time < last_timestamp + (1_000_000 / target_fps) {
472 continue;
473 }
474
475 if is_nvenc {
476 match process_dmabuf_frame(&egl_context, &raw_frame) {
477 Ok(img) => {
478 encoder.lock().unwrap().process(&raw_frame).unwrap();
479 egl_context.destroy_image(img).unwrap();
480 }
481 Err(e) => log::error!("Could not process dma buf frame: {:?}", e),
482 }
483 } else {
484 encoder.lock().unwrap().process(&raw_frame).unwrap();
485 }
486
487 last_timestamp = current_time;
488 }
489
490 std::thread::sleep(Duration::from_nanos(100));
491 }
492 })
493}
494
495fn audio_processor(
496 encoder: Arc<Mutex<dyn AudioEncoder + Send>>,
497 mut audio_recv: HeapCons<RawAudioFrame>,
498 stop: Arc<AtomicBool>,
499 pause: Arc<AtomicBool>,
500) -> std::thread::JoinHandle<()> {
501 std::thread::spawn(move || loop {
502 if stop.load(Ordering::Acquire) {
503 break;
504 }
505
506 if pause.load(Ordering::Acquire) {
507 std::thread::sleep(Duration::from_nanos(100));
508 continue;
509 }
510
511 while let Some(raw_samples) = audio_recv.try_pop() {
512 encoder.lock().unwrap().process(raw_samples).unwrap();
513 }
514
515 std::thread::sleep(Duration::from_nanos(100));
516 })
517}
518
519fn process_dmabuf_frame(egl_ctx: &EglContext, raw_frame: &RawVideoFrame) -> Result<Image> {
520 let dma_buf_planes = extract_dmabuf_planes(raw_frame)?;
521
522 let format = drm_fourcc::DrmFourcc::Argb8888 as u32;
523 let (width, height) = calculate_dimensions(raw_frame)?;
524 let modifier = raw_frame.modifier;
525
526 let egl_image = egl_ctx
527 .create_image_from_dmabuf(&dma_buf_planes, format, width, height, modifier)
528 .unwrap();
529
530 egl_ctx.update_texture_from_image(egl_image).unwrap();
531
532 Ok(egl_image)
533}