1use std::{
2 os::fd::{FromRawFd, OwnedFd, RawFd},
3 sync::{mpsc, Arc},
4};
5
6use crossbeam::channel::Sender;
7use pipewire::stream::StreamRc;
8use pipewire::{
9 self as pw,
10 context::ContextRc,
11 core::CoreRc,
12 main_loop::MainLoopRc,
13 spa::{
14 buffer::{meta::MetaHeader, Data, DataType},
15 param::ParamType,
16 pod::Property,
17 sys::{SPA_META_Header, SPA_PARAM_META_size, SPA_PARAM_META_type},
18 utils::{Direction, SpaTypes},
19 },
20 stream::{StreamFlags, StreamListener, StreamState},
21};
22use pw::{properties::properties, spa};
23
24use spa::pod::Pod;
25
26use crate::{
27 types::{
28 error::{Result, WaycapError},
29 video_frame::RawVideoFrame,
30 },
31 CaptureControls, ReadyState, Resolution,
32};
33
34use super::Terminate;
35
36pub struct VideoCapture {
37 termination_recv: Option<pw::channel::Receiver<Terminate>>,
38 pipewire_state: PipewireState,
39}
40
41struct PipewireState {
43 pw_loop: MainLoopRc,
44 _pw_context: ContextRc,
45 _core: CoreRc,
46 _core_listener: pw::core::Listener,
47 _stream: StreamRc,
48 _stream_listener: StreamListener<UserData>,
49}
50
51#[derive(Clone, Copy, Default)]
52struct UserData {
53 video_format: spa::param::video::VideoInfoRaw,
54}
55
56impl VideoCapture {
57 #[allow(clippy::too_many_arguments)]
58 pub fn new(
59 pipewire_fd: RawFd,
60 stream_node: u32,
61 ready_state: Arc<ReadyState>,
62 controls: Arc<CaptureControls>,
63 resolution_sender: mpsc::Sender<Resolution>,
64 frame_tx: Sender<RawVideoFrame>,
65 termination_recv: pw::channel::Receiver<Terminate>,
66 pw_obj: spa::pod::Object,
67 ) -> Result<Self> {
68 let pw_loop = MainLoopRc::new(None)?;
69 let context = ContextRc::new(&pw_loop, None)?;
70 let core = context.connect_fd_rc(unsafe { OwnedFd::from_raw_fd(pipewire_fd) }, None)?;
71
72 let mut core_mut = core.clone();
73 let core_listener = Self::setup_core_listener(&mut core_mut)?;
74
75 let mut stream = Self::create_stream(core.clone())?;
76 let stream_listener = Self::setup_stream_listener(
77 &mut stream,
78 UserData::default(),
79 ready_state,
80 &controls,
81 resolution_sender.clone(),
82 frame_tx.clone(),
83 )?;
84 Self::connect_stream(&mut stream, stream_node, pw_obj)?;
85
86 Ok(Self {
87 termination_recv: Some(termination_recv),
88 pipewire_state: PipewireState {
89 pw_loop,
90 _pw_context: context,
91 _core: core,
92 _core_listener: core_listener,
93 _stream: stream,
94 _stream_listener: stream_listener,
95 },
96 })
97 }
98
99 fn create_stream(core: CoreRc) -> Result<StreamRc> {
100 match StreamRc::new(
101 core,
102 "waycap-video",
103 properties! {
104 *pw::keys::MEDIA_TYPE => "Video",
105 *pw::keys::MEDIA_CATEGORY => "Capture",
106 *pw::keys::MEDIA_ROLE => "Screen",
107 },
108 ) {
109 Ok(stream) => Ok(stream),
110 Err(e) => Err(WaycapError::from(e)),
111 }
112 }
113
114 fn setup_core_listener(core: &mut CoreRc) -> Result<pw::core::Listener> {
115 Ok(core
116 .add_listener_local()
117 .info(|i| log::debug!("VIDEO CORE:\n{i:#?}"))
118 .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
119 .done(|d, _| log::debug!("DONE: {d}"))
120 .register())
121 }
122
123 #[allow(clippy::too_many_arguments)]
124 fn setup_stream_listener(
125 stream: &mut StreamRc,
126 data: UserData,
127 ready_state: Arc<ReadyState>,
128 controls: &Arc<CaptureControls>,
129 resolution_sender: mpsc::Sender<Resolution>,
130 frame_tx: Sender<RawVideoFrame>,
131 ) -> Result<StreamListener<UserData>> {
132 let ready_state_clone = Arc::clone(&ready_state);
133 let controls_clone = Arc::clone(controls);
134
135 let stream_listener = stream
136 .add_local_listener_with_user_data(data)
137 .state_changed(move |_, _, old, new| {
138 log::info!("Video Stream State Changed: {old:?} -> {new:?}");
139 ready_state.video.store(
140 new == StreamState::Streaming,
141 std::sync::atomic::Ordering::Release,
142 );
143 })
144 .param_changed(move |_, user_data, id, param| {
145 let Some(param) = param else {
146 return;
147 };
148
149 if id != pw::spa::param::ParamType::Format.as_raw() {
150 return;
151 }
152
153 let (media_type, media_subtype) =
154 match pw::spa::param::format_utils::parse_format(param) {
155 Ok(v) => v,
156 Err(_) => return,
157 };
158
159 if media_type != pw::spa::param::format::MediaType::Video
160 || media_subtype != pw::spa::param::format::MediaSubtype::Raw
161 {
162 return;
163 }
164
165 user_data
166 .video_format
167 .parse(param)
168 .expect("Failed to parse param");
169
170 log::debug!(
171 " format: {} ({:?})",
172 user_data.video_format.format().as_raw(),
173 user_data.video_format.format()
174 );
175
176 let (width, height) = (
177 user_data.video_format.size().width,
178 user_data.video_format.size().height,
179 );
180 match resolution_sender.send(Resolution { width, height }) {
181 Ok(_) => {}
182 Err(e) => {
183 log::error!("Tried to send resolution update {width}x{height} but ran into an error on the channel: {e}");
184 }
185 };
186
187 log::debug!(
188 " size: {}x{}",
189 user_data.video_format.size().width,
190 user_data.video_format.size().height
191 );
192 log::debug!(
193 " framerate: {}/{}",
194 user_data.video_format.framerate().num,
195 user_data.video_format.framerate().denom
196 );
197 })
198 .process(move |stream, udata| {
199 match stream.dequeue_buffer() {
200 None => log::debug!("out of buffers"),
201 Some(mut buffer) => {
202 if !ready_state_clone.audio_ready() || controls_clone.skip_processing() {
204 return;
205 }
206
207 let timestamp = if let Some(metas) = buffer.find_meta::<MetaHeader>() {
208 metas.pts()
209 } else {
210 0
211 };
212
213 let datas = buffer.datas_mut();
214 if datas.is_empty() {
215 return;
216 }
217
218 let data = &mut datas[0];
219
220 let fd = Self::get_dmabuf_fd(data);
221
222 match frame_tx.try_send(RawVideoFrame {
223 data: data.data().unwrap_or_default().to_vec(),
224 timestamp,
225 dmabuf_fd: fd,
226 stride: data.chunk().stride(),
227 offset: data.chunk().offset(),
228 size: data.chunk().size(),
229 modifier: udata.video_format.modifier(),
230 format: udata.video_format.format(),
231 dimensions: udata.video_format.size()
232 }) {
233 Ok(_) => {}
234 Err(crossbeam::channel::TrySendError::Full(frame)) => {
235 log::error!(
236 "Could not send video frame at: {}. Channel full.",
237 frame.timestamp
238 );
239 }
240 Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
241 log::error!(
242 "Could not send video frame at: {}. Connection closed.",
243 frame.timestamp
244 );
245 }
246 }
247 }
248 }
249 })
250 .register()?;
251
252 Ok(stream_listener)
253 }
254
255 fn connect_stream(
256 stream: &mut StreamRc,
257 stream_node: u32,
258 pw_obj: spa::pod::Object,
259 ) -> Result<()> {
260 let video_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
261 std::io::Cursor::new(Vec::new()),
262 &pw::spa::pod::Value::Object(pw_obj),
263 )
264 .unwrap()
265 .0
266 .into_inner();
267
268 let metas_obj = pw::spa::pod::object!(
269 SpaTypes::ObjectParamMeta,
270 ParamType::Meta,
271 Property::new(
272 SPA_PARAM_META_type,
273 pw::spa::pod::Value::Id(pw::spa::utils::Id(SPA_META_Header))
274 ),
275 Property::new(
276 SPA_PARAM_META_size,
277 pw::spa::pod::Value::Int(size_of::<pw::spa::sys::spa_meta_header>() as i32)
278 ),
279 );
280
281 let metas_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
282 std::io::Cursor::new(Vec::new()),
283 &pw::spa::pod::Value::Object(metas_obj),
284 )
285 .unwrap()
286 .0
287 .into_inner();
288
289 let mut video_params = [
290 Pod::from_bytes(&video_spa_values).unwrap(),
291 Pod::from_bytes(&metas_values).unwrap(),
292 ];
293
294 stream.connect(
295 Direction::Input,
296 Some(stream_node),
297 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
298 &mut video_params,
299 )?;
300
301 Ok(())
302 }
303
304 pub fn run(&mut self) -> Result<()> {
307 let terminate_loop = self.pipewire_state.pw_loop.clone();
308 let terminate_recv = self.termination_recv.take().unwrap();
309 let _recv = terminate_recv.attach(self.pipewire_state.pw_loop.loop_(), move |_| {
310 log::debug!("Terminating video capture loop");
311 terminate_loop.quit();
312 });
313
314 self.pipewire_state.pw_loop.run();
315
316 Ok(())
317 }
318
319 fn get_dmabuf_fd(data: &Data) -> Option<RawFd> {
320 let raw_data = data.as_raw();
321
322 if data.type_() == DataType::DmaBuf {
323 let fd = raw_data.fd;
324
325 if fd > 0 {
326 return Some(fd as i32);
327 }
328 }
329
330 None
331 }
332}