1use std::{
2 os::fd::{FromRawFd, OwnedFd, RawFd},
3 sync::{
4 mpsc::{self},
5 Arc,
6 },
7};
8
9use crossbeam::channel::Sender;
10use pipewire::{
11 self as pw,
12 context::Context,
13 core::{Core, Listener},
14 main_loop::MainLoop,
15 spa::{
16 buffer::{Data, DataType},
17 utils::Direction,
18 },
19 stream::{Stream, StreamFlags, StreamListener, StreamState},
20 sys::pw_stream_get_nsec,
21};
22use pw::{properties::properties, spa};
23
24use spa::pod::Pod;
25
26use crate::{
27 types::{
28 error::{Result, WaycapError},
29 video_frame::RawVideoFrame,
30 }, CaptureControls, ReadyState, Resolution
31};
32
33use super::Terminate;
34
35
36
37pub struct VideoCapture {
38 termination_recv: Option<pw::channel::Receiver<Terminate>>,
39 pipewire_state: PipewireState,
40}
41
42struct PipewireState {
44 pw_loop: MainLoop,
45 _pw_context: Context,
46 _core: Core,
47 _core_listener: Listener,
48 _stream: Stream,
49 _stream_listener: StreamListener<UserData>,
50}
51
52#[derive(Clone, Copy, Default)]
53struct UserData {
54 video_format: spa::param::video::VideoInfoRaw,
55}
56
57impl VideoCapture {
58 #[allow(clippy::too_many_arguments)]
59 pub fn new(
60 pipewire_fd: RawFd,
61 stream_node: u32,
62 ready_state: Arc<ReadyState>,
63 controls: Arc<CaptureControls>,
64 resolution_sender: mpsc::Sender<Resolution>,
65 frame_tx: Sender<RawVideoFrame>,
66 termination_recv: pw::channel::Receiver<Terminate>,
67 pw_obj: spa::pod::Object,
68 ) -> Result<Self> {
69 let pw_loop = MainLoop::new(None)?;
70 let context = Context::new(&pw_loop)?;
71 let mut core = context.connect_fd(unsafe { OwnedFd::from_raw_fd(pipewire_fd) }, None)?;
72 let core_listener = Self::setup_core_listener(&mut core)?;
73 let mut stream = Self::create_stream(&core)?;
74 let stream_listener = Self::setup_stream_listener(
75 &mut stream,
76 UserData::default(),
77 ready_state,
78 &controls,
79 resolution_sender.clone(),
80 frame_tx.clone(),
81 )?;
82 Self::connect_stream(&mut stream, stream_node, pw_obj)?;
83
84 Ok(Self {
85 termination_recv: Some(termination_recv),
86 pipewire_state: PipewireState {
87 pw_loop,
88 _pw_context: context,
89 _core: core,
90 _core_listener: core_listener,
91 _stream: stream,
92 _stream_listener: stream_listener,
93 },
94 })
95 }
96
97 fn create_stream(core: &Core) -> Result<Stream> {
98 match Stream::new(
99 core,
100 "waycap-video",
101 properties! {
102 *pw::keys::MEDIA_TYPE => "Video",
103 *pw::keys::MEDIA_CATEGORY => "Capture",
104 *pw::keys::MEDIA_ROLE => "Screen",
105 },
106 ) {
107 Ok(stream) => Ok(stream),
108 Err(e) => Err(WaycapError::from(e)),
109 }
110 }
111
112 fn setup_core_listener(core: &mut Core) -> Result<Listener> {
113 Ok(core
114 .add_listener_local()
115 .info(|i| log::debug!("VIDEO CORE:\n{i:#?}"))
116 .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
117 .done(|d, _| log::debug!("DONE: {d}"))
118 .register())
119 }
120
121 #[allow(clippy::too_many_arguments)]
122 fn setup_stream_listener(
123 stream: &mut Stream,
124 data: UserData,
125 ready_state: Arc<ReadyState>,
126 controls: &Arc<CaptureControls>,
127 resolution_sender: mpsc::Sender<Resolution>,
128 frame_tx: Sender<RawVideoFrame>,
129 ) -> Result<StreamListener<UserData>> {
130 let ready_state_clone = Arc::clone(&ready_state);
131 let controls_clone = Arc::clone(controls);
132
133 let stream_listener = stream
134 .add_local_listener_with_user_data(data)
135 .state_changed(move |_, _, old, new| {
136 log::info!("Video Stream State Changed: {old:?} -> {new:?}");
137 ready_state.video.store(
138 new == StreamState::Streaming,
139 std::sync::atomic::Ordering::Release,
140 );
141 })
142 .param_changed(move |_, user_data, id, param| {
143 let Some(param) = param else {
144 return;
145 };
146
147 if id != pw::spa::param::ParamType::Format.as_raw() {
148 return;
149 }
150
151 let (media_type, media_subtype) =
152 match pw::spa::param::format_utils::parse_format(param) {
153 Ok(v) => v,
154 Err(_) => return,
155 };
156
157 if media_type != pw::spa::param::format::MediaType::Video
158 || media_subtype != pw::spa::param::format::MediaSubtype::Raw
159 {
160 return;
161 }
162
163 user_data
164 .video_format
165 .parse(param)
166 .expect("Failed to parse param");
167
168 log::debug!(
169 " format: {} ({:?})",
170 user_data.video_format.format().as_raw(),
171 user_data.video_format.format()
172 );
173
174 let (width, height) = (
175
176 user_data.video_format.size().width,
177 user_data.video_format.size().height,
178 );
179 match resolution_sender.send(Resolution { width, height }) {
180 Ok(_) => {}
181 Err(e) => {
182 log::error!("Tried to send resolution update {width}x{height} but ran into an error on the channel: {e}");
183 }
184 };
185
186 log::debug!(
187 " size: {}x{}",
188 user_data.video_format.size().width,
189 user_data.video_format.size().height
190 );
191 log::debug!(
192 " framerate: {}/{}",
193 user_data.video_format.framerate().num,
194 user_data.video_format.framerate().denom
195 );
196 })
197 .process(move |stream, udata| {
198 match stream.dequeue_buffer() {
199 None => log::debug!("out of buffers"),
200 Some(mut buffer) => {
201 if !ready_state_clone.audio_ready() || controls_clone.skip_processing() {
203 return;
204 }
205
206 let datas = buffer.datas_mut();
207 if datas.is_empty() {
208 return;
209 }
210
211 let data = &mut datas[0];
212
213 let fd = Self::get_dmabuf_fd(data);
214
215 match frame_tx.try_send(RawVideoFrame {
216 data: data.data().unwrap_or_default().to_vec(),
217 timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr())} as i64,
218 dmabuf_fd: fd,
219 stride: data.chunk().stride(),
220 offset: data.chunk().offset(),
221 size: data.chunk().size(),
222 modifier: udata.video_format.modifier(),
223 format: udata.video_format.format(),
224 dimensions: udata.video_format.size()
225 }) {
226 Ok(_) => {}
227 Err(crossbeam::channel::TrySendError::Full(frame)) => {
228 log::error!(
229 "Could not send video frame at: {}. Channel full.",
230 frame.timestamp
231 );
232 }
233 Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
234 log::error!(
237 "Could not send video frame at: {}. Connection closed.",
238 frame.timestamp
239 );
240 }
241 }
242 }
243 }
244 })
245 .register()?;
246
247 Ok(stream_listener)
248 }
249
250 fn connect_stream(
251 stream: &mut Stream,
252 stream_node: u32,
253 pw_obj: spa::pod::Object,
254 ) -> Result<()> {
255 let video_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
256 std::io::Cursor::new(Vec::new()),
257 &pw::spa::pod::Value::Object(pw_obj),
258 )
259 .unwrap()
260 .0
261 .into_inner();
262
263 let mut video_params = [Pod::from_bytes(&video_spa_values).unwrap()];
264 stream.connect(
265 Direction::Input,
266 Some(stream_node),
267 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
268 &mut video_params,
269 )?;
270
271 Ok(())
272 }
273
274 pub fn run(&mut self) -> Result<()> {
277 let terminate_loop = self.pipewire_state.pw_loop.clone();
278 let terminate_recv = self.termination_recv.take().unwrap();
279 let _recv = terminate_recv.attach(self.pipewire_state.pw_loop.loop_(), move |_| {
280 log::debug!("Terminating video capture loop");
281 terminate_loop.quit();
282 });
283
284 self.pipewire_state.pw_loop.run();
285
286 Ok(())
287 }
288
289 fn get_dmabuf_fd(data: &Data) -> Option<RawFd> {
290 let raw_data = data.as_raw();
291
292 if data.type_() == DataType::DmaBuf {
293 let fd = raw_data.fd;
294
295 if fd > 0 {
296 return Some(fd as i32);
297 }
298 }
299
300 None
301 }
302}