waycap_rs/capture/
audio.rs

1use std::{process::Command, sync::Arc};
2
3use crate::{types::audio_frame::RawAudioFrame, CaptureControls, ReadyState};
4use crossbeam::channel::Sender;
5use pipewire::{
6    self as pw,
7    context::ContextRc,
8    main_loop::MainLoopRc,
9    properties::properties,
10    spa::{
11        self,
12        param::format::{MediaSubtype, MediaType},
13        pod::Pod,
14        utils::Direction,
15    },
16    stream::{StreamFlags, StreamRc, StreamState},
17    sys::pw_stream_get_nsec,
18};
19
20use super::Terminate;
21
22#[derive(Clone, Copy, Default)]
23struct UserData {
24    audio_format: spa::param::audio::AudioInfoRaw,
25}
26
27pub struct AudioCapture {
28    ready_state: Arc<ReadyState>,
29}
30
31impl AudioCapture {
32    pub fn new(ready_state: Arc<ReadyState>) -> Self {
33        Self { ready_state }
34    }
35
36    pub fn run(
37        &self,
38        audio_sender: Sender<RawAudioFrame>,
39        termination_recv: pw::channel::Receiver<Terminate>,
40        controls: Arc<CaptureControls>,
41    ) -> Result<(), pw::Error> {
42        let pw_loop = MainLoopRc::new(None)?;
43        let terminate_loop = pw_loop.clone();
44
45        let _recv = termination_recv.attach(pw_loop.loop_(), move |_| {
46            log::debug!("Terminating audio capture loop");
47            terminate_loop.quit();
48        });
49
50        let pw_context = ContextRc::new(&pw_loop, None)?;
51        let audio_core = pw_context.connect_rc(None)?;
52
53        let _audio_core_listener = audio_core
54            .add_listener_local()
55            .info(|i| log::debug!("AUDIO CORE:\n{i:#?}"))
56            .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
57            .done(|d, _| log::debug!("DONE: {d}"))
58            .register();
59
60        let data = UserData::default();
61
62        // Audio Stream
63        let audio_stream = StreamRc::new(
64            audio_core,
65            "waycap-audio",
66            properties! {
67                *pw::keys::MEDIA_TYPE => "Audio",
68                *pw::keys::MEDIA_CATEGORY => "Capture",
69                *pw::keys::MEDIA_ROLE => "Music",
70                *pw::keys::NODE_LATENCY => "1024/48000",
71            },
72        )?;
73
74        let ready_state_a = Arc::clone(&self.ready_state);
75        let ready_state_b = Arc::clone(&self.ready_state);
76        let _audio_stream_shared_data_listener = audio_stream
77            .add_local_listener_with_user_data(data)
78            .state_changed(move |_, _, old, new| {
79                log::info!("Audio Stream State Changed: {old:?} -> {new:?}");
80                ready_state_a.audio.store(
81                    new == StreamState::Streaming,
82                    std::sync::atomic::Ordering::Release,
83                );
84            })
85            .param_changed(|_, udata, id, param| {
86                let Some(param) = param else {
87                    return;
88                };
89                if id != pw::spa::param::ParamType::Format.as_raw() {
90                    return;
91                }
92
93                let (media_type, media_subtype) =
94                    match pw::spa::param::format_utils::parse_format(param) {
95                        Ok(v) => v,
96                        Err(_) => return,
97                    };
98
99                // only accept raw audio
100                if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
101                    return;
102                }
103
104                udata
105                    .audio_format
106                    .parse(param)
107                    .expect("Failed to parse audio params");
108
109                log::debug!(
110                    "Capturing Rate:{} channels:{}, format: {}",
111                    udata.audio_format.rate(),
112                    udata.audio_format.channels(),
113                    udata.audio_format.format().as_raw()
114                );
115            })
116            .process(move |stream, _| match stream.dequeue_buffer() {
117                None => log::debug!("Out of audio buffers"),
118                Some(mut buffer) => {
119                    // Wait until video is streaming before we try to process
120                    if !ready_state_b.video_ready() || controls.skip_processing() {
121                        return;
122                    }
123
124                    let datas = buffer.datas_mut();
125                    if datas.is_empty() {
126                        return;
127                    }
128
129                    let data = &mut datas[0];
130                    let n_samples = data.chunk().size() / (std::mem::size_of::<f32>()) as u32;
131
132                    if let Some(samples) = data.data() {
133                        let samples_f32: &[f32] = bytemuck::cast_slice(samples);
134                        let audio_samples = &samples_f32[..n_samples as usize];
135                        match audio_sender.try_send(RawAudioFrame {
136                            samples: audio_samples.to_vec(),
137                            timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr()) } as i64,
138                        }) {
139                            Ok(_) => {}
140                            Err(crossbeam::channel::TrySendError::Full(frame)) => {
141                                log::error!(
142                                    "channel is full when trying to send frame at: {}.",
143                                    frame.timestamp
144                                );
145                            }
146                            Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
147                                log::error!(
148                                    "channel is disconnected when trying to send frame at: {}.",
149                                    frame.timestamp
150                                );
151                            }
152                        }
153                    }
154                }
155            })
156            .register()?;
157
158        let audio_spa_obj = pw::spa::pod::object! {
159            pw::spa::utils::SpaTypes::ObjectParamFormat,
160            pw::spa::param::ParamType::EnumFormat,
161            pw::spa::pod::property!(
162                pw::spa::param::format::FormatProperties::MediaType,
163                Id,
164                pw::spa::param::format::MediaType::Audio
165            ),
166            pw::spa::pod::property!(
167                pw::spa::param::format::FormatProperties::MediaSubtype,
168                Id,
169                pw::spa::param::format::MediaSubtype::Raw
170            ),
171            pw::spa::pod::property!(
172                pw::spa::param::format::FormatProperties::AudioFormat,
173                Id,
174                pw::spa::param::audio::AudioFormat::F32LE
175            )
176        };
177
178        let audio_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
179            std::io::Cursor::new(Vec::new()),
180            &pw::spa::pod::Value::Object(audio_spa_obj),
181        )
182        .unwrap()
183        .0
184        .into_inner();
185
186        let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()];
187
188        let sink_id_to_use = get_default_sink_node_id();
189
190        log::debug!("Default sink id: {sink_id_to_use:?}");
191        audio_stream.connect(
192            Direction::Input,
193            sink_id_to_use,
194            StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
195            &mut audio_params,
196        )?;
197
198        log::debug!("Audio Stream: {audio_stream:?}");
199
200        pw_loop.run();
201        Ok(())
202    }
203}
204
205// Theres gotta be a less goofy way to do this
206fn get_default_sink_node_id() -> Option<u32> {
207    let output = Command::new("sh")
208        .arg("-c")
209        .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#)
210        .output()
211        .expect("Failed to execute command");
212
213    let stdout = String::from_utf8_lossy(&output.stdout);
214
215    let cleaned = stdout.replace('"', "");
216
217    cleaned.trim().parse::<u32>().ok()
218}