1use std::{process::Command, sync::Arc};
2
3use crate::{types::audio_frame::RawAudioFrame, CaptureControls, ReadyState};
4use crossbeam::channel::Sender;
5use pipewire::{
6 self as pw,
7 context::ContextRc,
8 core::CoreRc,
9 main_loop::MainLoopRc,
10 properties::properties,
11 spa::{
12 self,
13 param::format::{MediaSubtype, MediaType},
14 pod::Pod,
15 utils::Direction,
16 },
17 stream::{StreamFlags, StreamListener, StreamRc, StreamState},
18 sys::pw_stream_get_nsec,
19};
20
21use super::Terminate;
22
23#[derive(Clone, Copy, Default)]
24struct UserData {
25 audio_format: spa::param::audio::AudioInfoRaw,
26}
27
28struct PipewireState {
29 pw_loop: MainLoopRc,
30 _pw_context: ContextRc,
31 _core: CoreRc,
32 _core_listener: pw::core::Listener,
33 _stream: StreamRc,
34 _stream_listener: StreamListener<UserData>,
35}
36
37pub struct AudioCapture {
38 termination_recv: Option<pw::channel::Receiver<Terminate>>,
39 pipewire_state: PipewireState,
40}
41
42impl AudioCapture {
43 pub fn new(
44 ready_state: Arc<ReadyState>,
45 audio_sender: Sender<RawAudioFrame>,
46 termination_recv: pw::channel::Receiver<Terminate>,
47 controls: Arc<CaptureControls>,
48 ) -> Result<Self, pw::Error> {
49 let pw_loop = MainLoopRc::new(None)?;
50 let pw_context = ContextRc::new(&pw_loop, None)?;
51 let core = pw_context.connect_rc(None)?;
52
53 let mut core_mut = core.clone();
54 let core_listener = Self::setup_core_listener(&mut core_mut);
55
56 let mut stream = Self::create_stream(core.clone())?;
57 let stream_listener =
58 Self::setup_stream_listener(&mut stream, ready_state, controls, audio_sender)?;
59 Self::connect_stream(&mut stream)?;
60
61 Ok(Self {
62 termination_recv: Some(termination_recv),
63 pipewire_state: PipewireState {
64 pw_loop,
65 _pw_context: pw_context,
66 _core: core,
67 _core_listener: core_listener,
68 _stream: stream,
69 _stream_listener: stream_listener,
70 },
71 })
72 }
73
74 fn create_stream(core: CoreRc) -> Result<StreamRc, pw::Error> {
75 StreamRc::new(
76 core,
77 "waycap-audio",
78 properties! {
79 *pw::keys::MEDIA_TYPE => "Audio",
80 *pw::keys::MEDIA_CATEGORY => "Capture",
81 *pw::keys::MEDIA_ROLE => "Music",
82 *pw::keys::NODE_LATENCY => "1024/48000",
83 },
84 )
85 }
86
87 fn setup_core_listener(core: &mut CoreRc) -> pw::core::Listener {
88 core.add_listener_local()
89 .info(|i| log::debug!("AUDIO CORE:\n{i:#?}"))
90 .error(|e, f, g, h| log::error!("{e},{f},{g},{h}"))
91 .done(|d, _| log::debug!("DONE: {d}"))
92 .register()
93 }
94
95 fn setup_stream_listener(
96 stream: &mut StreamRc,
97 ready_state: Arc<ReadyState>,
98 controls: Arc<CaptureControls>,
99 audio_sender: Sender<RawAudioFrame>,
100 ) -> Result<StreamListener<UserData>, pw::Error> {
101 let ready_state_clone = Arc::clone(&ready_state);
102
103 let stream_listener = stream
104 .add_local_listener_with_user_data(UserData::default())
105 .state_changed(move |_, _, old, new| {
106 log::info!("Audio Stream State Changed: {old:?} -> {new:?}");
107 ready_state.audio.store(
108 new == StreamState::Streaming,
109 std::sync::atomic::Ordering::Release,
110 );
111 })
112 .param_changed(|_, udata, id, param| {
113 let Some(param) = param else {
114 return;
115 };
116 if id != pw::spa::param::ParamType::Format.as_raw() {
117 return;
118 }
119
120 let (media_type, media_subtype) =
121 match pw::spa::param::format_utils::parse_format(param) {
122 Ok(v) => v,
123 Err(_) => return,
124 };
125
126 if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
127 return;
128 }
129
130 udata
131 .audio_format
132 .parse(param)
133 .expect("Failed to parse audio params");
134
135 log::debug!(
136 "Capturing Rate:{} channels:{}, format: {}",
137 udata.audio_format.rate(),
138 udata.audio_format.channels(),
139 udata.audio_format.format().as_raw()
140 );
141 })
142 .process(move |stream, _| match stream.dequeue_buffer() {
143 None => log::debug!("Out of audio buffers"),
144 Some(mut buffer) => {
145 if !ready_state_clone.video_ready() || controls.skip_processing() {
146 return;
147 }
148
149 let datas = buffer.datas_mut();
150 if datas.is_empty() {
151 return;
152 }
153
154 let data = &mut datas[0];
155 let n_samples = data.chunk().size() / (std::mem::size_of::<f32>()) as u32;
156
157 if let Some(samples) = data.data() {
158 let samples_f32: &[f32] = bytemuck::cast_slice(samples);
159 let audio_samples = &samples_f32[..n_samples as usize];
160 match audio_sender.try_send(RawAudioFrame {
161 samples: audio_samples.to_vec(),
162 timestamp: unsafe { pw_stream_get_nsec(stream.as_raw_ptr()) } as i64,
163 }) {
164 Ok(_) => {}
165 Err(crossbeam::channel::TrySendError::Full(frame)) => {
166 log::error!(
167 "channel is full when trying to send frame at: {}.",
168 frame.timestamp
169 );
170 }
171 Err(crossbeam::channel::TrySendError::Disconnected(frame)) => {
172 log::error!(
173 "channel is disconnected when trying to send frame at: {}.",
174 frame.timestamp
175 );
176 }
177 }
178 }
179 }
180 })
181 .register()?;
182
183 Ok(stream_listener)
184 }
185
186 fn connect_stream(stream: &mut StreamRc) -> Result<(), pw::Error> {
187 let audio_spa_obj = pw::spa::pod::object! {
188 pw::spa::utils::SpaTypes::ObjectParamFormat,
189 pw::spa::param::ParamType::EnumFormat,
190 pw::spa::pod::property!(
191 pw::spa::param::format::FormatProperties::MediaType,
192 Id,
193 pw::spa::param::format::MediaType::Audio
194 ),
195 pw::spa::pod::property!(
196 pw::spa::param::format::FormatProperties::MediaSubtype,
197 Id,
198 pw::spa::param::format::MediaSubtype::Raw
199 ),
200 pw::spa::pod::property!(
201 pw::spa::param::format::FormatProperties::AudioFormat,
202 Id,
203 pw::spa::param::audio::AudioFormat::F32LE
204 )
205 };
206
207 let audio_spa_values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
208 std::io::Cursor::new(Vec::new()),
209 &pw::spa::pod::Value::Object(audio_spa_obj),
210 )
211 .unwrap()
212 .0
213 .into_inner();
214
215 let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()];
216
217 let sink_id_to_use = get_default_sink_node_id();
218 log::debug!("Default sink id: {sink_id_to_use:?}");
219
220 stream.connect(
221 Direction::Input,
222 sink_id_to_use,
223 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
224 &mut audio_params,
225 )
226 }
227
228 pub fn run(&mut self) {
229 let terminate_loop = self.pipewire_state.pw_loop.clone();
230 let terminate_recv = self.termination_recv.take().unwrap();
231 let _recv = terminate_recv.attach(self.pipewire_state.pw_loop.loop_(), move |_| {
232 log::debug!("Terminating audio capture loop");
233 terminate_loop.quit();
234 });
235
236 log::debug!("Audio Stream: {:?}", self.pipewire_state._stream);
237 self.pipewire_state.pw_loop.run();
238 }
239}
240
241fn get_default_sink_node_id() -> Option<u32> {
243 let output = Command::new("sh")
244 .arg("-c")
245 .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#)
246 .output()
247 .expect("Failed to execute command");
248
249 let stdout = String::from_utf8_lossy(&output.stdout);
250 let cleaned = stdout.replace('"', "");
251 cleaned.trim().parse::<u32>().ok()
252}