waycap_rs/capture/
audio.rs

1use std::{process::Command, sync::Arc};
2
3use crate::{types::audio_frame::RawAudioFrame, CaptureControls, ReadyState};
4use crossbeam::channel::Sender;
5use pipewire::{
6    self as pw,
7    context::Context,
8    main_loop::MainLoop,
9    properties::properties,
10    spa::{
11        self,
12        param::format::{MediaSubtype, MediaType},
13        pod::Pod,
14        utils::Direction,
15    },
16    stream::{StreamFlags, StreamState},
17    sys::pw_stream_get_nsec,
18};
19
20use super::Terminate;
21
22#[derive(Clone, Copy, Default)]
23struct UserData {
24    audio_format: spa::param::audio::AudioInfoRaw,
25}
26
27pub struct AudioCapture {
28    ready_state: Arc<ReadyState>,
29}
30
31// TODO: Similar approach to video capture in how the struct should look
32impl AudioCapture {
33    pub fn new(ready_state: Arc<ReadyState>) -> Self {
34        Self { ready_state }
35    }
36
37    pub fn run(
38        &self,
39        audio_sender: Sender<RawAudioFrame>,
40        termination_recv: pw::channel::Receiver<Terminate>,
41        controls: Arc<CaptureControls>,
42    ) -> Result<(), pw::Error> {
43        let pw_loop = MainLoop::new(None)?;
44        let terminate_loop = pw_loop.clone();
45
46        let _recv = termination_recv.attach(pw_loop.loop_(), move |_| {
47            log::debug!("Terminating audio capture loop");
48            terminate_loop.quit();
49        });
50
51        let pw_context = Context::new(&pw_loop)?;
52        let audio_core = pw_context.connect(None)?;
53
54        let _audio_core_listener = audio_core
55            .add_listener_local()
56            .info(|i| log::debug!("AUDIO CORE:\n{i:#?}"))
57            .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
58            .done(|d, _| log::debug!("DONE: {d}"))
59            .register();
60
61        let data = UserData::default();
62
63        // Audio Stream
64        let audio_stream = pw::stream::Stream::new(
65            &audio_core,
66            "waycap-audio",
67            properties! {
68            *pw::keys::MEDIA_TYPE => "Audio",
69            *pw::keys::MEDIA_CATEGORY => "Capture",
70            *pw::keys::MEDIA_ROLE => "Music",
71            *pw::keys::NODE_LATENCY => "1024/48000",
72            },
73        )?;
74
75        let ready_state_a = Arc::clone(&self.ready_state);
76        let ready_state_b = Arc::clone(&self.ready_state);
77        let _audio_stream_shared_data_listener = audio_stream
78            .add_local_listener_with_user_data(data)
79            .state_changed(move |_, _, old, new| {
80                log::info!("Audio Stream State Changed: {old:?} -> {new:?}");
81                ready_state_a.audio.store(
82                    new == StreamState::Streaming,
83                    std::sync::atomic::Ordering::Release,
84                );
85            })
86            .param_changed(|_, udata, id, param| {
87                let Some(param) = param else {
88                    return;
89                };
90                if id != pw::spa::param::ParamType::Format.as_raw() {
91                    return;
92                }
93
94                let (media_type, media_subtype) =
95                    match pw::spa::param::format_utils::parse_format(param) {
96                        Ok(v) => v,
97                        Err(_) => return,
98                    };
99
100                // only accept raw audio
101                if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
102                    return;
103                }
104
105                udata
106                    .audio_format
107                    .parse(param)
108                    .expect("Failed to parse audio params");
109
110                log::debug!(
111                    "Capturing Rate:{} channels:{}, format: {}",
112                    udata.audio_format.rate(),
113                    udata.audio_format.channels(),
114                    udata.audio_format.format().as_raw()
115                );
116            })
117            .process(move |stream, _| match stream.dequeue_buffer() {
118                None => log::debug!("Out of audio buffers"),
119                Some(mut buffer) => {
120                    // Wait until video is streaming before we try to process
121                    if !ready_state_b.video_ready() || controls.skip_processing() {
122                        return;
123                    }
124
125                    let datas = buffer.datas_mut();
126                    if datas.is_empty() {
127                        return;
128                    }
129
130                    let data = &mut datas[0];
131                    let n_samples = data.chunk().size() / (std::mem::size_of::<f32>()) as u32;
132
133                    if let Some(samples) = data.data() {
134                        let samples_f32: &[f32] = bytemuck::cast_slice(samples);
135                        let audio_samples = &samples_f32[..n_samples as usize];
136                        match audio_sender.try_send(RawAudioFrame {
137                            samples: audio_samples.to_vec(),
138                            timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr()) } as i64,
139                        }) {
140                            Ok(_) => {}
141                            Err(crossbeam::channel::TrySendError::Full(frame)) => {
142                                log::error!(
143                                    "channel is full when trying to send frame at: {}.",
144                                    frame.timestamp
145                                );
146                            }
147                            Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
148                                // TODO: If we disconnected, terminate the session instead of
149                                // throwing an error it means the receiver was dropped.
150                                log::error!(
151                                    "channel is disconnected when trying to send frame at: {}.",
152                                    frame.timestamp
153                                );
154                            }
155                        }
156                    }
157                }
158            })
159            .register()?;
160
161        let audio_spa_obj = pw::spa::pod::object! {
162            pw::spa::utils::SpaTypes::ObjectParamFormat,
163            pw::spa::param::ParamType::EnumFormat,
164            pw::spa::pod::property!(
165                pw::spa::param::format::FormatProperties::MediaType,
166                Id,
167                pw::spa::param::format::MediaType::Audio
168                ),
169            pw::spa::pod::property!(
170                pw::spa::param::format::FormatProperties::MediaSubtype,
171                Id,
172                pw::spa::param::format::MediaSubtype::Raw
173            ),
174            pw::spa::pod::property!(
175                pw::spa::param::format::FormatProperties::AudioFormat,
176                Id,
177                pw::spa::param::audio::AudioFormat::F32LE
178            )
179        };
180
181        let audio_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
182            std::io::Cursor::new(Vec::new()),
183            &pw::spa::pod::Value::Object(audio_spa_obj),
184        )
185        .unwrap()
186        .0
187        .into_inner();
188
189        let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()];
190
191        let sink_id_to_use = get_default_sink_node_id();
192
193        log::debug!("Default sink id: {sink_id_to_use:?}");
194        audio_stream.connect(
195            Direction::Input,
196            sink_id_to_use,
197            StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
198            &mut audio_params,
199        )?;
200
201        log::debug!("Audio Stream: {audio_stream:?}");
202
203        pw_loop.run();
204        Ok(())
205    }
206}
207
208// Theres gotta be a less goofy way to do this
209fn get_default_sink_node_id() -> Option<u32> {
210    let output = Command::new("sh")
211        .arg("-c")
212        .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#)
213        .output()
214        .expect("Failed to execute command");
215
216    let stdout = String::from_utf8_lossy(&output.stdout);
217
218    let cleaned = stdout.replace('"', "");
219
220    cleaned.trim().parse::<u32>().ok()
221}