waycap_rs/capture/
audio.rs1use std::{process::Command, sync::Arc};
2
3use crate::{types::audio_frame::RawAudioFrame, CaptureControls, ReadyState};
4use crossbeam::channel::Sender;
5use pipewire::{
6 self as pw,
7 context::Context,
8 main_loop::MainLoop,
9 properties::properties,
10 spa::{
11 self,
12 param::format::{MediaSubtype, MediaType},
13 pod::Pod,
14 utils::Direction,
15 },
16 stream::{StreamFlags, StreamState},
17 sys::pw_stream_get_nsec,
18};
19
20use super::Terminate;
21
22#[derive(Clone, Copy, Default)]
23struct UserData {
24 audio_format: spa::param::audio::AudioInfoRaw,
25}
26
27pub struct AudioCapture {
28 ready_state: Arc<ReadyState>,
29}
30
31impl AudioCapture {
33 pub fn new(ready_state: Arc<ReadyState>) -> Self {
34 Self { ready_state }
35 }
36
37 pub fn run(
38 &self,
39 audio_sender: Sender<RawAudioFrame>,
40 termination_recv: pw::channel::Receiver<Terminate>,
41 controls: Arc<CaptureControls>,
42 ) -> Result<(), pw::Error> {
43 let pw_loop = MainLoop::new(None)?;
44 let terminate_loop = pw_loop.clone();
45
46 let _recv = termination_recv.attach(pw_loop.loop_(), move |_| {
47 log::debug!("Terminating audio capture loop");
48 terminate_loop.quit();
49 });
50
51 let pw_context = Context::new(&pw_loop)?;
52 let audio_core = pw_context.connect(None)?;
53
54 let _audio_core_listener = audio_core
55 .add_listener_local()
56 .info(|i| log::debug!("AUDIO CORE:\n{i:#?}"))
57 .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
58 .done(|d, _| log::debug!("DONE: {d}"))
59 .register();
60
61 let data = UserData::default();
62
63 let audio_stream = pw::stream::Stream::new(
65 &audio_core,
66 "waycap-audio",
67 properties! {
68 *pw::keys::MEDIA_TYPE => "Audio",
69 *pw::keys::MEDIA_CATEGORY => "Capture",
70 *pw::keys::MEDIA_ROLE => "Music",
71 *pw::keys::NODE_LATENCY => "1024/48000",
72 },
73 )?;
74
75 let ready_state_a = Arc::clone(&self.ready_state);
76 let ready_state_b = Arc::clone(&self.ready_state);
77 let _audio_stream_shared_data_listener = audio_stream
78 .add_local_listener_with_user_data(data)
79 .state_changed(move |_, _, old, new| {
80 log::info!("Audio Stream State Changed: {old:?} -> {new:?}");
81 ready_state_a.audio.store(
82 new == StreamState::Streaming,
83 std::sync::atomic::Ordering::Release,
84 );
85 })
86 .param_changed(|_, udata, id, param| {
87 let Some(param) = param else {
88 return;
89 };
90 if id != pw::spa::param::ParamType::Format.as_raw() {
91 return;
92 }
93
94 let (media_type, media_subtype) =
95 match pw::spa::param::format_utils::parse_format(param) {
96 Ok(v) => v,
97 Err(_) => return,
98 };
99
100 if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
102 return;
103 }
104
105 udata
106 .audio_format
107 .parse(param)
108 .expect("Failed to parse audio params");
109
110 log::debug!(
111 "Capturing Rate:{} channels:{}, format: {}",
112 udata.audio_format.rate(),
113 udata.audio_format.channels(),
114 udata.audio_format.format().as_raw()
115 );
116 })
117 .process(move |stream, _| match stream.dequeue_buffer() {
118 None => log::debug!("Out of audio buffers"),
119 Some(mut buffer) => {
120 if !ready_state_b.video_ready() || controls.skip_processing() {
122 return;
123 }
124
125 let datas = buffer.datas_mut();
126 if datas.is_empty() {
127 return;
128 }
129
130 let data = &mut datas[0];
131 let n_samples = data.chunk().size() / (std::mem::size_of::<f32>()) as u32;
132
133 if let Some(samples) = data.data() {
134 let samples_f32: &[f32] = bytemuck::cast_slice(samples);
135 let audio_samples = &samples_f32[..n_samples as usize];
136 match audio_sender.try_send(RawAudioFrame {
137 samples: audio_samples.to_vec(),
138 timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr()) } as i64,
139 }) {
140 Ok(_) => {}
141 Err(crossbeam::channel::TrySendError::Full(frame)) => {
142 log::error!(
143 "channel is full when trying to send frame at: {}.",
144 frame.timestamp
145 );
146 }
147 Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
148 log::error!(
151 "channel is disconnected when trying to send frame at: {}.",
152 frame.timestamp
153 );
154 }
155 }
156 }
157 }
158 })
159 .register()?;
160
161 let audio_spa_obj = pw::spa::pod::object! {
162 pw::spa::utils::SpaTypes::ObjectParamFormat,
163 pw::spa::param::ParamType::EnumFormat,
164 pw::spa::pod::property!(
165 pw::spa::param::format::FormatProperties::MediaType,
166 Id,
167 pw::spa::param::format::MediaType::Audio
168 ),
169 pw::spa::pod::property!(
170 pw::spa::param::format::FormatProperties::MediaSubtype,
171 Id,
172 pw::spa::param::format::MediaSubtype::Raw
173 ),
174 pw::spa::pod::property!(
175 pw::spa::param::format::FormatProperties::AudioFormat,
176 Id,
177 pw::spa::param::audio::AudioFormat::F32LE
178 )
179 };
180
181 let audio_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
182 std::io::Cursor::new(Vec::new()),
183 &pw::spa::pod::Value::Object(audio_spa_obj),
184 )
185 .unwrap()
186 .0
187 .into_inner();
188
189 let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()];
190
191 let sink_id_to_use = get_default_sink_node_id();
192
193 log::debug!("Default sink id: {sink_id_to_use:?}");
194 audio_stream.connect(
195 Direction::Input,
196 sink_id_to_use,
197 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
198 &mut audio_params,
199 )?;
200
201 log::debug!("Audio Stream: {audio_stream:?}");
202
203 pw_loop.run();
204 Ok(())
205 }
206}
207
208fn get_default_sink_node_id() -> Option<u32> {
210 let output = Command::new("sh")
211 .arg("-c")
212 .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#)
213 .output()
214 .expect("Failed to execute command");
215
216 let stdout = String::from_utf8_lossy(&output.stdout);
217
218 let cleaned = stdout.replace('"', "");
219
220 cleaned.trim().parse::<u32>().ok()
221}