waycap_rs/capture/
video.rs

1use std::{
2    os::fd::{FromRawFd, OwnedFd, RawFd},
3    sync::{mpsc, Arc},
4};
5
6use crossbeam::channel::Sender;
7use pipewire::stream::StreamRc;
8use pipewire::{
9    self as pw,
10    context::ContextRc,
11    core::CoreRc,
12    main_loop::MainLoopRc,
13    spa::{
14        buffer::{meta::MetaHeader, Data, DataType},
15        param::ParamType,
16        pod::Property,
17        sys::{SPA_META_Header, SPA_PARAM_META_size, SPA_PARAM_META_type},
18        utils::{Direction, SpaTypes},
19    },
20    stream::{StreamFlags, StreamListener, StreamState},
21};
22use pw::{properties::properties, spa};
23
24use spa::pod::Pod;
25
26use crate::{
27    types::{
28        error::{Result, WaycapError},
29        video_frame::RawVideoFrame,
30    },
31    CaptureControls, ReadyState, Resolution,
32};
33
34use super::Terminate;
35
36pub struct VideoCapture {
37    termination_recv: Option<pw::channel::Receiver<Terminate>>,
38    pipewire_state: PipewireState,
39}
40
41// Need to keep all of these alive even if never referenced
42struct PipewireState {
43    pw_loop: MainLoopRc,
44    _pw_context: ContextRc,
45    _core: CoreRc,
46    _core_listener: pw::core::Listener,
47    _stream: StreamRc,
48    _stream_listener: StreamListener<UserData>,
49}
50
51#[derive(Clone, Copy, Default)]
52struct UserData {
53    video_format: spa::param::video::VideoInfoRaw,
54}
55
56impl VideoCapture {
57    #[allow(clippy::too_many_arguments)]
58    pub fn new(
59        pipewire_fd: RawFd,
60        stream_node: u32,
61        ready_state: Arc<ReadyState>,
62        controls: Arc<CaptureControls>,
63        resolution_sender: mpsc::Sender<Resolution>,
64        frame_tx: Sender<RawVideoFrame>,
65        termination_recv: pw::channel::Receiver<Terminate>,
66        pw_obj: spa::pod::Object,
67    ) -> Result<Self> {
68        let pw_loop = MainLoopRc::new(None)?;
69        let context = ContextRc::new(&pw_loop, None)?;
70        let core = context.connect_fd_rc(unsafe { OwnedFd::from_raw_fd(pipewire_fd) }, None)?;
71
72        let mut core_mut = core.clone();
73        let core_listener = Self::setup_core_listener(&mut core_mut)?;
74
75        let mut stream = Self::create_stream(core.clone())?;
76        let stream_listener = Self::setup_stream_listener(
77            &mut stream,
78            UserData::default(),
79            ready_state,
80            &controls,
81            resolution_sender.clone(),
82            frame_tx.clone(),
83        )?;
84        Self::connect_stream(&mut stream, stream_node, pw_obj)?;
85
86        Ok(Self {
87            termination_recv: Some(termination_recv),
88            pipewire_state: PipewireState {
89                pw_loop,
90                _pw_context: context,
91                _core: core,
92                _core_listener: core_listener,
93                _stream: stream,
94                _stream_listener: stream_listener,
95            },
96        })
97    }
98
99    fn create_stream(core: CoreRc) -> Result<StreamRc> {
100        match StreamRc::new(
101            core,
102            "waycap-video",
103            properties! {
104                *pw::keys::MEDIA_TYPE => "Video",
105                *pw::keys::MEDIA_CATEGORY => "Capture",
106                *pw::keys::MEDIA_ROLE => "Screen",
107            },
108        ) {
109            Ok(stream) => Ok(stream),
110            Err(e) => Err(WaycapError::from(e)),
111        }
112    }
113
114    fn setup_core_listener(core: &mut CoreRc) -> Result<pw::core::Listener> {
115        Ok(core
116            .add_listener_local()
117            .info(|i| log::debug!("VIDEO CORE:\n{i:#?}"))
118            .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
119            .done(|d, _| log::debug!("DONE: {d}"))
120            .register())
121    }
122
123    #[allow(clippy::too_many_arguments)]
124    fn setup_stream_listener(
125        stream: &mut StreamRc,
126        data: UserData,
127        ready_state: Arc<ReadyState>,
128        controls: &Arc<CaptureControls>,
129        resolution_sender: mpsc::Sender<Resolution>,
130        frame_tx: Sender<RawVideoFrame>,
131    ) -> Result<StreamListener<UserData>> {
132        let ready_state_clone = Arc::clone(&ready_state);
133        let controls_clone = Arc::clone(controls);
134
135        let stream_listener = stream
136            .add_local_listener_with_user_data(data)
137            .state_changed(move |_, _, old, new| {
138                log::info!("Video Stream State Changed: {old:?} -> {new:?}");
139                ready_state.video.store(
140                    new == StreamState::Streaming,
141                    std::sync::atomic::Ordering::Release,
142                );
143            })
144            .param_changed(move |_, user_data, id, param| {
145                let Some(param) = param else {
146                    return;
147                };
148
149                if id != pw::spa::param::ParamType::Format.as_raw() {
150                    return;
151                }
152
153                let (media_type, media_subtype) =
154                    match pw::spa::param::format_utils::parse_format(param) {
155                        Ok(v) => v,
156                        Err(_) => return,
157                    };
158
159                if media_type != pw::spa::param::format::MediaType::Video
160                    || media_subtype != pw::spa::param::format::MediaSubtype::Raw
161                {
162                    return;
163                }
164
165                user_data
166                    .video_format
167                    .parse(param)
168                    .expect("Failed to parse param");
169
170                log::debug!(
171                    "  format: {} ({:?})",
172                    user_data.video_format.format().as_raw(),
173                    user_data.video_format.format()
174                );
175
176                let (width, height) = (
177                    user_data.video_format.size().width,
178                    user_data.video_format.size().height,
179                );
180                match resolution_sender.send(Resolution { width, height }) {
181                    Ok(_) => {}
182                    Err(e) => {
183                        log::error!("Tried to send resolution update {width}x{height} but ran into an error on the channel: {e}");
184                    }
185                };
186
187                log::debug!(
188                    "  size: {}x{}",
189                    user_data.video_format.size().width,
190                    user_data.video_format.size().height
191                );
192                log::debug!(
193                    "  framerate: {}/{}",
194                    user_data.video_format.framerate().num,
195                    user_data.video_format.framerate().denom
196                );
197            })
198            .process(move |stream, udata| {
199                match stream.dequeue_buffer() {
200                    None => log::debug!("out of buffers"),
201                    Some(mut buffer) => {
202                        // Wait until audio is streaming before we try to process
203                        if !ready_state_clone.audio_ready() || controls_clone.skip_processing() {
204                            return;
205                        }
206
207                        let timestamp = if let Some(metas) = buffer.find_meta::<MetaHeader>() {
208                            metas.pts()
209                        } else {
210                            0
211                        };
212
213                        let datas = buffer.datas_mut();
214                        if datas.is_empty() {
215                            return;
216                        }
217
218                        let data = &mut datas[0];
219
220                        let fd = Self::get_dmabuf_fd(data);
221
222                        match frame_tx.try_send(RawVideoFrame {
223                            data: data.data().unwrap_or_default().to_vec(),
224                            timestamp,
225                            dmabuf_fd: fd,
226                            stride: data.chunk().stride(),
227                            offset: data.chunk().offset(),
228                            size: data.chunk().size(),
229                            modifier: udata.video_format.modifier(),
230                            format: udata.video_format.format(),
231                            dimensions: udata.video_format.size()
232                        }) {
233                            Ok(_) => {}
234                            Err(crossbeam::channel::TrySendError::Full(frame)) => {
235                                log::error!(
236                                    "Could not send video frame at: {}. Channel full.",
237                                    frame.timestamp
238                                );
239                            }
240                            Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
241                                log::error!(
242                                    "Could not send video frame at: {}. Connection closed.",
243                                    frame.timestamp
244                                );
245                            }
246                        }
247                    }
248                }
249            })
250            .register()?;
251
252        Ok(stream_listener)
253    }
254
255    fn connect_stream(
256        stream: &mut StreamRc,
257        stream_node: u32,
258        pw_obj: spa::pod::Object,
259    ) -> Result<()> {
260        let video_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
261            std::io::Cursor::new(Vec::new()),
262            &pw::spa::pod::Value::Object(pw_obj),
263        )
264        .unwrap()
265        .0
266        .into_inner();
267
268        let metas_obj = pw::spa::pod::object!(
269            SpaTypes::ObjectParamMeta,
270            ParamType::Meta,
271            Property::new(
272                SPA_PARAM_META_type,
273                pw::spa::pod::Value::Id(pw::spa::utils::Id(SPA_META_Header))
274            ),
275            Property::new(
276                SPA_PARAM_META_size,
277                pw::spa::pod::Value::Int(size_of::<pw::spa::sys::spa_meta_header>() as i32)
278            ),
279        );
280
281        let metas_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
282            std::io::Cursor::new(Vec::new()),
283            &pw::spa::pod::Value::Object(metas_obj),
284        )
285        .unwrap()
286        .0
287        .into_inner();
288
289        let mut video_params = [
290            Pod::from_bytes(&video_spa_values).unwrap(),
291            Pod::from_bytes(&metas_values).unwrap(),
292        ];
293
294        stream.connect(
295            Direction::Input,
296            Some(stream_node),
297            StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
298            &mut video_params,
299        )?;
300
301        Ok(())
302    }
303
304    /// Finalizes the pipewire run loop with a terminate receiver and runs it
305    /// Blocks the current thread so this must be called in a separate thread
306    pub fn run(&mut self) -> Result<()> {
307        let terminate_loop = self.pipewire_state.pw_loop.clone();
308        let terminate_recv = self.termination_recv.take().unwrap();
309        let _recv = terminate_recv.attach(self.pipewire_state.pw_loop.loop_(), move |_| {
310            log::debug!("Terminating video capture loop");
311            terminate_loop.quit();
312        });
313
314        self.pipewire_state.pw_loop.run();
315
316        Ok(())
317    }
318
319    fn get_dmabuf_fd(data: &Data) -> Option<RawFd> {
320        let raw_data = data.as_raw();
321
322        if data.type_() == DataType::DmaBuf {
323            let fd = raw_data.fd;
324
325            if fd > 0 {
326                return Some(fd as i32);
327            }
328        }
329
330        None
331    }
332}