waycap_rs/capture/
audio.rs1use std::{process::Command, sync::Arc};
2
3use crate::{types::audio_frame::RawAudioFrame, CaptureControls, ReadyState};
4use crossbeam::channel::Sender;
5use pipewire::{
6 self as pw,
7 context::ContextRc,
8 main_loop::MainLoopRc,
9 properties::properties,
10 spa::{
11 self,
12 param::format::{MediaSubtype, MediaType},
13 pod::Pod,
14 utils::Direction,
15 },
16 stream::{StreamFlags, StreamRc, StreamState},
17 sys::pw_stream_get_nsec,
18};
19
20use super::Terminate;
21
22#[derive(Clone, Copy, Default)]
23struct UserData {
24 audio_format: spa::param::audio::AudioInfoRaw,
25}
26
27pub struct AudioCapture {
28 ready_state: Arc<ReadyState>,
29}
30
31impl AudioCapture {
32 pub fn new(ready_state: Arc<ReadyState>) -> Self {
33 Self { ready_state }
34 }
35
36 pub fn run(
37 &self,
38 audio_sender: Sender<RawAudioFrame>,
39 termination_recv: pw::channel::Receiver<Terminate>,
40 controls: Arc<CaptureControls>,
41 ) -> Result<(), pw::Error> {
42 let pw_loop = MainLoopRc::new(None)?;
43 let terminate_loop = pw_loop.clone();
44
45 let _recv = termination_recv.attach(pw_loop.loop_(), move |_| {
46 log::debug!("Terminating audio capture loop");
47 terminate_loop.quit();
48 });
49
50 let pw_context = ContextRc::new(&pw_loop, None)?;
51 let audio_core = pw_context.connect_rc(None)?;
52
53 let _audio_core_listener = audio_core
54 .add_listener_local()
55 .info(|i| log::debug!("AUDIO CORE:\n{i:#?}"))
56 .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
57 .done(|d, _| log::debug!("DONE: {d}"))
58 .register();
59
60 let data = UserData::default();
61
62 let audio_stream = StreamRc::new(
64 audio_core,
65 "waycap-audio",
66 properties! {
67 *pw::keys::MEDIA_TYPE => "Audio",
68 *pw::keys::MEDIA_CATEGORY => "Capture",
69 *pw::keys::MEDIA_ROLE => "Music",
70 *pw::keys::NODE_LATENCY => "1024/48000",
71 },
72 )?;
73
74 let ready_state_a = Arc::clone(&self.ready_state);
75 let ready_state_b = Arc::clone(&self.ready_state);
76 let _audio_stream_shared_data_listener = audio_stream
77 .add_local_listener_with_user_data(data)
78 .state_changed(move |_, _, old, new| {
79 log::info!("Audio Stream State Changed: {old:?} -> {new:?}");
80 ready_state_a.audio.store(
81 new == StreamState::Streaming,
82 std::sync::atomic::Ordering::Release,
83 );
84 })
85 .param_changed(|_, udata, id, param| {
86 let Some(param) = param else {
87 return;
88 };
89 if id != pw::spa::param::ParamType::Format.as_raw() {
90 return;
91 }
92
93 let (media_type, media_subtype) =
94 match pw::spa::param::format_utils::parse_format(param) {
95 Ok(v) => v,
96 Err(_) => return,
97 };
98
99 if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
101 return;
102 }
103
104 udata
105 .audio_format
106 .parse(param)
107 .expect("Failed to parse audio params");
108
109 log::debug!(
110 "Capturing Rate:{} channels:{}, format: {}",
111 udata.audio_format.rate(),
112 udata.audio_format.channels(),
113 udata.audio_format.format().as_raw()
114 );
115 })
116 .process(move |stream, _| match stream.dequeue_buffer() {
117 None => log::debug!("Out of audio buffers"),
118 Some(mut buffer) => {
119 if !ready_state_b.video_ready() || controls.skip_processing() {
121 return;
122 }
123
124 let datas = buffer.datas_mut();
125 if datas.is_empty() {
126 return;
127 }
128
129 let data = &mut datas[0];
130 let n_samples = data.chunk().size() / (std::mem::size_of::<f32>()) as u32;
131
132 if let Some(samples) = data.data() {
133 let samples_f32: &[f32] = bytemuck::cast_slice(samples);
134 let audio_samples = &samples_f32[..n_samples as usize];
135 match audio_sender.try_send(RawAudioFrame {
136 samples: audio_samples.to_vec(),
137 timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr()) } as i64,
138 }) {
139 Ok(_) => {}
140 Err(crossbeam::channel::TrySendError::Full(frame)) => {
141 log::error!(
142 "channel is full when trying to send frame at: {}.",
143 frame.timestamp
144 );
145 }
146 Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
147 log::error!(
148 "channel is disconnected when trying to send frame at: {}.",
149 frame.timestamp
150 );
151 }
152 }
153 }
154 }
155 })
156 .register()?;
157
158 let audio_spa_obj = pw::spa::pod::object! {
159 pw::spa::utils::SpaTypes::ObjectParamFormat,
160 pw::spa::param::ParamType::EnumFormat,
161 pw::spa::pod::property!(
162 pw::spa::param::format::FormatProperties::MediaType,
163 Id,
164 pw::spa::param::format::MediaType::Audio
165 ),
166 pw::spa::pod::property!(
167 pw::spa::param::format::FormatProperties::MediaSubtype,
168 Id,
169 pw::spa::param::format::MediaSubtype::Raw
170 ),
171 pw::spa::pod::property!(
172 pw::spa::param::format::FormatProperties::AudioFormat,
173 Id,
174 pw::spa::param::audio::AudioFormat::F32LE
175 )
176 };
177
178 let audio_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
179 std::io::Cursor::new(Vec::new()),
180 &pw::spa::pod::Value::Object(audio_spa_obj),
181 )
182 .unwrap()
183 .0
184 .into_inner();
185
186 let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()];
187
188 let sink_id_to_use = get_default_sink_node_id();
189
190 log::debug!("Default sink id: {sink_id_to_use:?}");
191 audio_stream.connect(
192 Direction::Input,
193 sink_id_to_use,
194 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
195 &mut audio_params,
196 )?;
197
198 log::debug!("Audio Stream: {audio_stream:?}");
199
200 pw_loop.run();
201 Ok(())
202 }
203}
204
205fn get_default_sink_node_id() -> Option<u32> {
207 let output = Command::new("sh")
208 .arg("-c")
209 .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#)
210 .output()
211 .expect("Failed to execute command");
212
213 let stdout = String::from_utf8_lossy(&output.stdout);
214
215 let cleaned = stdout.replace('"', "");
216
217 cleaned.trim().parse::<u32>().ok()
218}