Skip to main content

waycap_rs/capture/
audio.rs

1use std::{process::Command, sync::Arc};
2
3use crate::{types::audio_frame::RawAudioFrame, CaptureControls, ReadyState};
4use crossbeam::channel::Sender;
5use pipewire::{
6    self as pw,
7    context::ContextRc,
8    core::CoreRc,
9    main_loop::MainLoopRc,
10    properties::properties,
11    spa::{
12        self,
13        param::format::{MediaSubtype, MediaType},
14        pod::Pod,
15        utils::Direction,
16    },
17    stream::{StreamFlags, StreamListener, StreamRc, StreamState},
18    sys::pw_stream_get_nsec,
19};
20
21use super::Terminate;
22
23#[derive(Clone, Copy, Default)]
24struct UserData {
25    audio_format: spa::param::audio::AudioInfoRaw,
26}
27
28struct PipewireState {
29    pw_loop: MainLoopRc,
30    _pw_context: ContextRc,
31    _core: CoreRc,
32    _core_listener: pw::core::Listener,
33    _stream: StreamRc,
34    _stream_listener: StreamListener<UserData>,
35}
36
37pub struct AudioCapture {
38    termination_recv: Option<pw::channel::Receiver<Terminate>>,
39    pipewire_state: PipewireState,
40}
41
42impl AudioCapture {
43    pub fn new(
44        ready_state: Arc<ReadyState>,
45        audio_sender: Sender<RawAudioFrame>,
46        termination_recv: pw::channel::Receiver<Terminate>,
47        controls: Arc<CaptureControls>,
48    ) -> Result<Self, pw::Error> {
49        let pw_loop = MainLoopRc::new(None)?;
50        let pw_context = ContextRc::new(&pw_loop, None)?;
51        let core = pw_context.connect_rc(None)?;
52
53        let mut core_mut = core.clone();
54        let core_listener = Self::setup_core_listener(&mut core_mut);
55
56        let mut stream = Self::create_stream(core.clone())?;
57        let stream_listener =
58            Self::setup_stream_listener(&mut stream, ready_state, controls, audio_sender)?;
59        Self::connect_stream(&mut stream)?;
60
61        Ok(Self {
62            termination_recv: Some(termination_recv),
63            pipewire_state: PipewireState {
64                pw_loop,
65                _pw_context: pw_context,
66                _core: core,
67                _core_listener: core_listener,
68                _stream: stream,
69                _stream_listener: stream_listener,
70            },
71        })
72    }
73
74    fn create_stream(core: CoreRc) -> Result<StreamRc, pw::Error> {
75        StreamRc::new(
76            core,
77            "waycap-audio",
78            properties! {
79                *pw::keys::MEDIA_TYPE => "Audio",
80                *pw::keys::MEDIA_CATEGORY => "Capture",
81                *pw::keys::MEDIA_ROLE => "Music",
82                *pw::keys::NODE_LATENCY => "1024/48000",
83            },
84        )
85    }
86
87    fn setup_core_listener(core: &mut CoreRc) -> pw::core::Listener {
88        core.add_listener_local()
89            .info(|i| log::debug!("AUDIO CORE:\n{i:#?}"))
90            .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
91            .done(|d, _| log::debug!("DONE: {d}"))
92            .register()
93    }
94
95    fn setup_stream_listener(
96        stream: &mut StreamRc,
97        ready_state: Arc<ReadyState>,
98        controls: Arc<CaptureControls>,
99        audio_sender: Sender<RawAudioFrame>,
100    ) -> Result<StreamListener<UserData>, pw::Error> {
101        let ready_state_clone = Arc::clone(&ready_state);
102
103        let stream_listener = stream
104            .add_local_listener_with_user_data(UserData::default())
105            .state_changed(move |_, _, old, new| {
106                log::info!("Audio Stream State Changed: {old:?} -> {new:?}");
107                ready_state.audio.store(
108                    new == StreamState::Streaming,
109                    std::sync::atomic::Ordering::Release,
110                );
111            })
112            .param_changed(|_, udata, id, param| {
113                let Some(param) = param else {
114                    return;
115                };
116                if id != pw::spa::param::ParamType::Format.as_raw() {
117                    return;
118                }
119
120                let (media_type, media_subtype) =
121                    match pw::spa::param::format_utils::parse_format(param) {
122                        Ok(v) => v,
123                        Err(_) => return,
124                    };
125
126                if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
127                    return;
128                }
129
130                udata
131                    .audio_format
132                    .parse(param)
133                    .expect("Failed to parse audio params");
134
135                log::debug!(
136                    "Capturing Rate:{} channels:{}, format: {}",
137                    udata.audio_format.rate(),
138                    udata.audio_format.channels(),
139                    udata.audio_format.format().as_raw()
140                );
141            })
142            .process(move |stream, _| match stream.dequeue_buffer() {
143                None => log::debug!("Out of audio buffers"),
144                Some(mut buffer) => {
145                    if !ready_state_clone.video_ready() || controls.skip_processing() {
146                        return;
147                    }
148
149                    let datas = buffer.datas_mut();
150                    if datas.is_empty() {
151                        return;
152                    }
153
154                    let data = &mut datas[0];
155                    let n_samples = data.chunk().size() / (std::mem::size_of::<f32>()) as u32;
156
157                    if let Some(samples) = data.data() {
158                        let samples_f32: &[f32] = bytemuck::cast_slice(samples);
159                        let audio_samples = &samples_f32[..n_samples as usize];
160                        match audio_sender.try_send(RawAudioFrame {
161                            samples: audio_samples.to_vec(),
162                            timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr()) } as i64,
163                        }) {
164                            Ok(_) => {}
165                            Err(crossbeam::channel::TrySendError::Full(frame)) => {
166                                log::error!(
167                                    "channel is full when trying to send frame at: {}.",
168                                    frame.timestamp
169                                );
170                            }
171                            Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
172                                log::error!(
173                                    "channel is disconnected when trying to send frame at: {}.",
174                                    frame.timestamp
175                                );
176                            }
177                        }
178                    }
179                }
180            })
181            .register()?;
182
183        Ok(stream_listener)
184    }
185
186    fn connect_stream(stream: &mut StreamRc) -> Result<(), pw::Error> {
187        let audio_spa_obj = pw::spa::pod::object! {
188            pw::spa::utils::SpaTypes::ObjectParamFormat,
189            pw::spa::param::ParamType::EnumFormat,
190            pw::spa::pod::property!(
191                pw::spa::param::format::FormatProperties::MediaType,
192                Id,
193                pw::spa::param::format::MediaType::Audio
194            ),
195            pw::spa::pod::property!(
196                pw::spa::param::format::FormatProperties::MediaSubtype,
197                Id,
198                pw::spa::param::format::MediaSubtype::Raw
199            ),
200            pw::spa::pod::property!(
201                pw::spa::param::format::FormatProperties::AudioFormat,
202                Id,
203                pw::spa::param::audio::AudioFormat::F32LE
204            )
205        };
206
207        let audio_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
208            std::io::Cursor::new(Vec::new()),
209            &pw::spa::pod::Value::Object(audio_spa_obj),
210        )
211        .unwrap()
212        .0
213        .into_inner();
214
215        let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()];
216
217        let sink_id_to_use = get_default_sink_node_id();
218        log::debug!("Default sink id: {sink_id_to_use:?}");
219
220        stream.connect(
221            Direction::Input,
222            sink_id_to_use,
223            StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
224            &mut audio_params,
225        )
226    }
227
228    pub fn run(&mut self) {
229        let terminate_loop = self.pipewire_state.pw_loop.clone();
230        let terminate_recv = self.termination_recv.take().unwrap();
231        let _recv = terminate_recv.attach(self.pipewire_state.pw_loop.loop_(), move |_| {
232            log::debug!("Terminating audio capture loop");
233            terminate_loop.quit();
234        });
235
236        log::debug!("Audio Stream: {:?}", self.pipewire_state._stream);
237        self.pipewire_state.pw_loop.run();
238    }
239}
240
241// Theres gotta be a less goofy way to do this
242fn get_default_sink_node_id() -> Option<u32> {
243    let output = Command::new("sh")
244        .arg("-c")
245        .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#)
246        .output()
247        .expect("Failed to execute command");
248
249    let stdout = String::from_utf8_lossy(&output.stdout);
250    let cleaned = stdout.replace('"', "");
251    cleaned.trim().parse::<u32>().ok()
252}