waycap_rs/capture/
video.rs

1use std::{
2    os::fd::{FromRawFd, OwnedFd, RawFd},
3    sync::{
4        mpsc::{self},
5        Arc,
6    },
7};
8
9use crossbeam::channel::Sender;
10use pipewire::{
11    self as pw,
12    context::Context,
13    core::{Core, Listener},
14    main_loop::MainLoop,
15    spa::{
16        buffer::{Data, DataType},
17        utils::Direction,
18    },
19    stream::{Stream, StreamFlags, StreamListener, StreamState},
20    sys::pw_stream_get_nsec,
21};
22use pw::{properties::properties, spa};
23
24use spa::pod::Pod;
25
26use crate::{
27    types::{
28        error::{Result, WaycapError},
29        video_frame::RawVideoFrame,
30    }, CaptureControls, ReadyState, Resolution
31};
32
33use super::Terminate;
34
35
36
37pub struct VideoCapture {
38    termination_recv: Option<pw::channel::Receiver<Terminate>>,
39    pipewire_state: PipewireState,
40}
41
42// Need to keep all of these alive even if never referenced
43struct PipewireState {
44    pw_loop: MainLoop,
45    _pw_context: Context,
46    _core: Core,
47    _core_listener: Listener,
48    _stream: Stream,
49    _stream_listener: StreamListener<UserData>,
50}
51
52#[derive(Clone, Copy, Default)]
53struct UserData {
54    video_format: spa::param::video::VideoInfoRaw,
55}
56
57impl VideoCapture {
58    #[allow(clippy::too_many_arguments)]
59    pub fn new(
60        pipewire_fd: RawFd,
61        stream_node: u32,
62        ready_state: Arc<ReadyState>,
63        controls: Arc<CaptureControls>,
64        resolution_sender: mpsc::Sender<Resolution>,
65        frame_tx: Sender<RawVideoFrame>,
66        termination_recv: pw::channel::Receiver<Terminate>,
67        pw_obj: spa::pod::Object,
68    ) -> Result<Self> {
69        let pw_loop = MainLoop::new(None)?;
70        let context = Context::new(&pw_loop)?;
71        let mut core = context.connect_fd(unsafe { OwnedFd::from_raw_fd(pipewire_fd) }, None)?;
72        let core_listener = Self::setup_core_listener(&mut core)?;
73        let mut stream = Self::create_stream(&core)?;
74        let stream_listener = Self::setup_stream_listener(
75            &mut stream,
76            UserData::default(),
77            ready_state,
78            &controls,
79            resolution_sender.clone(),
80            frame_tx.clone(),
81        )?;
82        Self::connect_stream(&mut stream, stream_node, pw_obj)?;
83
84        Ok(Self {
85            termination_recv: Some(termination_recv),
86            pipewire_state: PipewireState {
87                pw_loop,
88                _pw_context: context,
89                _core: core,
90                _core_listener: core_listener,
91                _stream: stream,
92                _stream_listener: stream_listener,
93            },
94        })
95    }
96
97    fn create_stream(core: &Core) -> Result<Stream> {
98        match Stream::new(
99            core,
100            "waycap-video",
101            properties! {
102                *pw::keys::MEDIA_TYPE => "Video",
103                *pw::keys::MEDIA_CATEGORY => "Capture",
104                *pw::keys::MEDIA_ROLE => "Screen",
105            },
106        ) {
107            Ok(stream) => Ok(stream),
108            Err(e) => Err(WaycapError::from(e)),
109        }
110    }
111
112    fn setup_core_listener(core: &mut Core) -> Result<Listener> {
113        Ok(core
114            .add_listener_local()
115            .info(|i| log::debug!("VIDEO CORE:\n{i:#?}"))
116            .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
117            .done(|d, _| log::debug!("DONE: {d}"))
118            .register())
119    }
120
121    #[allow(clippy::too_many_arguments)]
122    fn setup_stream_listener(
123        stream: &mut Stream,
124        data: UserData,
125        ready_state: Arc<ReadyState>,
126        controls: &Arc<CaptureControls>,
127        resolution_sender: mpsc::Sender<Resolution>,
128        frame_tx: Sender<RawVideoFrame>,
129    ) -> Result<StreamListener<UserData>> {
130        let ready_state_clone = Arc::clone(&ready_state);
131        let controls_clone = Arc::clone(controls);
132
133        let stream_listener = stream
134            .add_local_listener_with_user_data(data)
135            .state_changed(move |_, _, old, new| {
136                log::info!("Video Stream State Changed: {old:?} -> {new:?}");
137                ready_state.video.store(
138                    new == StreamState::Streaming,
139                    std::sync::atomic::Ordering::Release,
140                );
141            })
142            .param_changed(move |_, user_data, id, param| {
143                let Some(param) = param else {
144                    return;
145                };
146
147                if id != pw::spa::param::ParamType::Format.as_raw() {
148                    return;
149                }
150
151                let (media_type, media_subtype) =
152                    match pw::spa::param::format_utils::parse_format(param) {
153                        Ok(v) => v,
154                        Err(_) => return,
155                    };
156
157                if media_type != pw::spa::param::format::MediaType::Video
158                    || media_subtype != pw::spa::param::format::MediaSubtype::Raw
159                {   
160                    return;
161                }
162
163                user_data
164                    .video_format
165                    .parse(param)
166                    .expect("Failed to parse param");
167
168                log::debug!(
169                    "  format: {} ({:?})",
170                    user_data.video_format.format().as_raw(),
171                    user_data.video_format.format()
172                );
173
174                let (width, height) = (
175
176                    user_data.video_format.size().width,
177                    user_data.video_format.size().height,
178                    );
179                match resolution_sender.send(Resolution { width, height }) {
180                    Ok(_) => {}
181                    Err(e) => {
182                        log::error!("Tried to send resolution update {width}x{height} but ran into an error on the channel: {e}");
183                    }
184                };
185
186                log::debug!(
187                    "  size: {}x{}",
188                    user_data.video_format.size().width,
189                    user_data.video_format.size().height
190                );
191                log::debug!(
192                    "  framerate: {}/{}",
193                    user_data.video_format.framerate().num,
194                    user_data.video_format.framerate().denom
195                );
196            })
197            .process(move |stream, udata| {
198                match stream.dequeue_buffer() {
199                    None => log::debug!("out of buffers"),
200                    Some(mut buffer) => {
201                        // Wait until audio is streaming before we try to process
202                        if !ready_state_clone.audio_ready() || controls_clone.skip_processing() {
203                            return;
204                        }
205
206                        let datas = buffer.datas_mut();
207                        if datas.is_empty() {
208                            return;
209                        }
210
211                        let data = &mut datas[0];
212
213                        let fd = Self::get_dmabuf_fd(data);
214
215                        match frame_tx.try_send(RawVideoFrame {
216                            data: data.data().unwrap_or_default().to_vec(),
217                            timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr())} as i64,
218                            dmabuf_fd: fd,
219                            stride: data.chunk().stride(),
220                            offset: data.chunk().offset(),
221                            size: data.chunk().size(),
222                            modifier: udata.video_format.modifier(),
223                            format: udata.video_format.format(),
224                            dimensions: udata.video_format.size()
225                        }) {
226                            Ok(_) => {}
227                            Err(crossbeam::channel::TrySendError::Full(frame)) => {
228                                log::error!(
229                                    "Could not send video frame at: {}. Channel full.",
230                                    frame.timestamp
231                                );
232                            }
233                            Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
234                                // TODO: If we disconnected, terminate the session instead of
235                                // throwing an error it means the receiver was dropped.
236                                log::error!(
237                                    "Could not send video frame at: {}. Connection closed.",
238                                    frame.timestamp
239                                );
240                            }
241                        }
242                    }
243                }
244            })
245            .register()?;
246
247        Ok(stream_listener)
248    }
249
250    fn connect_stream(
251        stream: &mut Stream,
252        stream_node: u32,
253        pw_obj: spa::pod::Object,
254    ) -> Result<()> {
255        let video_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
256            std::io::Cursor::new(Vec::new()),
257            &pw::spa::pod::Value::Object(pw_obj),
258        )
259        .unwrap()
260        .0
261        .into_inner();
262
263        let mut video_params = [Pod::from_bytes(&video_spa_values).unwrap()];
264        stream.connect(
265            Direction::Input,
266            Some(stream_node),
267            StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
268            &mut video_params,
269        )?;
270
271        Ok(())
272    }
273
274    /// Finalizes the pipewire run loop with a terminate receiver and runs it
275    /// Blocks the current thread so this must be called in a separate thread
276    pub fn run(&mut self) -> Result<()> {
277        let terminate_loop = self.pipewire_state.pw_loop.clone();
278        let terminate_recv = self.termination_recv.take().unwrap();
279        let _recv = terminate_recv.attach(self.pipewire_state.pw_loop.loop_(), move |_| {
280            log::debug!("Terminating video capture loop");
281            terminate_loop.quit();
282        });
283
284        self.pipewire_state.pw_loop.run();
285
286        Ok(())
287    }
288
289    fn get_dmabuf_fd(data: &Data) -> Option<RawFd> {
290        let raw_data = data.as_raw();
291
292        if data.type_() == DataType::DmaBuf {
293            let fd = raw_data.fd;
294
295            if fd > 0 {
296                return Some(fd as i32);
297            }
298        }
299
300        None
301    }
302}