1"""Implementations for spectral transforms and filtering.
2
3.. currentmodule:: uwacan.spectral
4
5Core processing and analysis
6----------------------------
7.. autosummary::
8 :toctree: generated
9
10 spectrum
11 fft
12 ifft
13 Filterbank
14 FilterbankRoller
15 linear_to_banded
16 nth_decade_band_edges
17
18"""
19
20
21from . import _core
22import xarray as xr
23import numpy as np
24import scipy.signal
25import numba
26
27
[docs]
28def fft(time_data, nfft=None):
29 """Compute the fft of a time signal.
30
31 This computes the single-sided fft of the input data, intended to go from real time data to frequency data.
32 The output array has ``nfft//2+1`` frequency bins.
33 No scaling is applied to the output, i.e., it has the same normalization as `numpy.fft.rfft`.
34
35 Parameters
36 ----------
37 time_data : _core.TimeData or xr.DataArray or numpy.ndarray
38 The input time-domain data to compute the spectrum for. The data can be one of the following:
39
40 - `~uwacan.TimeData`: Wrapped time data from ``uwacan``.
41 - `xarray.DataArray`: An xarray DataArray with a 'time' dimension.
42 - `numpy.ndarray`: A NumPy array containing time-series data. The fft will be over the last axis.
43 nfft : int, optional
44 The number of bins to use for the fft. Defaults to the length of the time signal.
45
46 Returns
47 -------
48 _core.FrequencyData or xr.DataArray or numpy.ndarray
49 The computed single-sided fft of the input data. The return type matches the input type:
50
51 - If ``time_data`` is a `~uwacan.TimeData`, returns a `~uwacan.FrequencyData` object.
52 - If ``time_data`` is an `xarray.DataArray`, returns an `xarray.DataArray` with a 'frequency' dimension.
53 - If ``time_data`` is a `numpy.ndarray`, returns a NumPy array containing the fft data.
54 """
55 if isinstance(time_data, _core.TimeData):
56 return _core.FrequencyData(fft(time_data.data, nfft=nfft))
57 if isinstance(time_data, xr.DataArray):
58 nfft = nfft or time_data.time.size
59 freq_data = xr.apply_ufunc(
60 fft,
61 time_data,
62 input_core_dims=[["time"]],
63 output_core_dims=[["frequency"]],
64 kwargs={"nfft": nfft},
65 )
66 freq_data.coords["frequency"] = np.fft.rfftfreq(nfft, 1 / time_data.time.rate)
67 center_offset = np.timedelta64(round(time_data.time.size / 2 / time_data.time.rate * 1e9), "ns")
68 freq_data.coords["time"] = time_data.time[0] + center_offset
69 freq_data.coords["start_time"] = time_data.time[0]
70 return freq_data
71 return np.fft.rfft(time_data, n=nfft)
72
73
[docs]
74def ifft(freq_data, nfft=None):
75 """Compute the inverst fft of a single-sided spectrum.
76
77 This computes the single-sided inverse fft of the input data, intended to go from frequency data to real time data.
78 With an input array with ``nfft//2+1`` frequency bins, the output array has ``nfft`` time samples.
79 No scaling is applied to the output, i.e., it has the same normalization as `numpy.fft.irfft`.
80
81 Parameters
82 ----------
83 freq_data : _core.FrequencyData or xr.DataArray or numpy.ndarray
84 Single-sided fft of some time data. The data can be one of the following:
85
86 - `~uwacan.FrequencyData`: Wrapped frequency data from ``uwacan``.
87 - `xarray.DataArray`: An xarray DataArray with a 'frequency' dimension.
88 - `numpy.ndarray`: A NumPy array containing a single-sided fft spectrum. The ifft will be over the last axis.
89
90 nfft : int, optional
91 The number of bins to use for the ifft (= number of samples in the output).
92 See notes for info on how this is computed from the input.
93
94 Returns
95 -------
96 _core.FrequencyData or xr.DataArray or numpy.ndarray
97 The computed single-sided fft of the input data. The return type matches the input type:
98
99 - If ``time_data`` is a `~uwacan.TimeData`, returns a `~uwacan.FrequencyData` object.
100 - If ``time_data`` is an `xarray.DataArray`, returns an `xarray.DataArray` with a 'frequency' dimension.
101 - If ``time_data`` is a `numpy.ndarray`, returns a NumPy array containing the fft data.
102
103 Notes
104 -----
105 For a real-valued time signal with ``N`` samples, there are ``N//2 + 1`` independent values in the spectrum.
106 If ``N`` is even, the last bin is the Nyquist bin, which always has a real value. The first bin (index 0)
107 corresponds to 0 Hz (DC), and is also always real-valued.
108 If ``N`` is odd the last bin is not the Nyquist bin, but one half bin below, and is not necessarily real-valued.
109 Thus, given a spectrum with ``K`` frequency bins, we assume that the original signal has an even number of samples
110 ``N = 2*(K-1)`` if all values at the highest bin are real. If there are any non-zero imaginary parts at the highest
111 frequency bin, we assume an odd number of samples ``N = 2*(K-1) + 1``.
112 """
113 if isinstance(freq_data, _core.FrequencyData):
114 return _core.TimeData(ifft(freq_data.data, nfft=nfft))
115 if isinstance(freq_data, xr.DataArray):
116 time_data = xr.apply_ufunc(
117 ifft,
118 freq_data.drop_vars("time"),
119 input_core_dims=[['frequency']],
120 output_core_dims=[['time']],
121 kwargs={"nfft": nfft},
122 )
123 samplerate = round(freq_data.frequency[1].item() * time_data.time.size)
124 offsets = np.arange(time_data.time.size) * 1e9 / samplerate
125 time = freq_data["start_time"].values + offsets.astype("timedelta64[ns]")
126 time = xr.DataArray(time, dims="time", attrs={"rate": samplerate})
127 # time = np.arange(time_data.time.size) * np.timedelta64(int(1e9 / samplerate), "ns")
128 time_data.coords["time"] = time
129 return time_data
130
131 if nfft is None:
132 is_odd = np.any(freq_data[..., -1].imag)
133 nfft = (freq_data.shape[0] - 1) * 2 + (1 if is_odd else 0)
134
135 time_data = np.fft.irfft(freq_data, n=nfft)
136 return time_data
137
138
[docs]
139def spectrum(time_data, window=None, scaling="density", nfft=None, detrend=True, samplerate=None, axis=None):
140 """Compute the power spectrum of time-domain data.
141
142 The `spectrum` function calculates the power spectrum of input time-series data. It supports
143 various input types, including `~uwacan.TimeData`, `xarray.DataArray`, and NumPy arrays. The
144 function applies windowing, detrending, and scaling as specified by the parameters to
145 produce the frequency-domain representation of the data.
146
147 Parameters
148 ----------
149 time_data : _core.TimeData or xr.DataArray or numpy.ndarray
150 The input time-domain data to compute the spectrum for. The data can be one of the following:
151
152 - `~uwacan.TimeData`: Wrapped time data from ``uwacan``.
153 - `xarray.DataArray`: An xarray DataArray with a 'time' dimension.
154 - `numpy.ndarray`: A NumPy array containing time-series data.
155
156 window : str or array_like, optional
157 The window function to apply to the data before computing the FFT. This can be:
158
159 - A string specifying the type of window to use (e.g., ``"hann"``, ``"kaiser"``, ``"blackman"``).
160 - An array-like sequence of window coefficients.
161 - If ``None``, no window is applied. Default is ``None``.
162
163 scaling : {'density', 'spectrum', 'dc-nyquist'} or numeric, optional
164 Specifies the scaling of the power spectrum. Options include:
165
166 - ``'density'``: Computes the power spectral density.
167 - ``'spectrum'``: Computes the power spectrum.
168 - ``'dc-nyquist'``: Halves the output at DC and Nyquist frequencies. Use with a pre-scaled window that takes care to scale the remainder of the single-sided spectrum.
169 - any numeric value: The output of the fft will be scaled by this value.
170
171 Default is ``"density"``.
172
173 nfft : int, optional
174 The number of points to use in the FFT computation. If ``None``, it defaults to the length
175 of the input data along the specified axis.
176
177 detrend : bool, default=True
178 If ``True``, removes the mean from the data before computing the FFT to reduce spectral leakage.
179 If ``False``, no detrending is performed.
180
181 samplerate : float, optional
182 The sampling rate of the input data in Hz. Required if ``time_data`` is an numpy array.
183 If not provided, it defaults to 1. This parameter is used to compute the frequency axis and proper density scaling.
184
185 axis : int, optional
186 The axis along which to compute the FFT. If ``None``, the last axis is used.
187 Only used for numpy inputs.
188 This parameter allows flexibility in handling multi-dimensional data.
189
190 Returns
191 -------
192 _core.FrequencyData or xr.DataArray or numpy.ndarray
193 The computed power spectrum of the input data. The return type matches the input type:
194
195 - If ``time_data`` is a `~uwacan.TimeData`, returns a `~uwacan.FrequencyData` object.
196 - If ``time_data`` is an `xarray.DataArray`, returns an `xarray.DataArray` with a 'frequency' dimension.
197 - If ``time_data`` is a `numpy.ndarray`, returns a NumPy array containing the power spectrum.
198
199 """
200 if isinstance(time_data, _core.TimeData):
201 return _core.FrequencyData(spectrum(time_data.data, window=window, scaling=scaling, nfft=nfft, detrend=detrend))
202 if isinstance(time_data, xr.DataArray):
203 freq_data = xr.apply_ufunc(
204 spectrum,
205 time_data,
206 input_core_dims=[["time"]],
207 output_core_dims=[["frequency"]],
208 kwargs=dict(
209 window=window,
210 scaling=scaling,
211 nfft=nfft,
212 detrend=detrend,
213 samplerate=samplerate or time_data.time.attrs.get("rate", None),
214 ),
215 )
216 freq_data.coords["frequency"] = np.fft.rfftfreq(nfft or time_data.time.size, 1 / time_data.time.rate)
217 freq_data.coords["time"] = time_data.time[0] + np.timedelta64(
218 round(time_data.time.size / 2 / time_data.time.rate * 1e9), "ns"
219 )
220 return freq_data
221
222 if axis is not None:
223 time_data = np.moveaxis(time_data, axis, -1)
224
225 if detrend:
226 time_data = time_data - time_data.mean(axis=-1, keepdims=True)
227
228 if window is not None:
229 if not isinstance(window, np.ndarray):
230 window = scipy.signal.windows.get_window(window, time_data.shape[-1], False)
231 time_data = time_data * window
232
233 nfft = nfft or time_data.shape[-1]
234 freq_data = np.fft.rfft(time_data, axis=-1, n=nfft)
235 freq_data = np.abs(freq_data) ** 2
236
237 if scaling == "density":
238 samplerate = samplerate or 1
239 if window is not None:
240 scaling = 2 / (np.sum(window**2) * samplerate)
241 else:
242 scaling = 2 / (time_data.shape[-1] * samplerate)
243 elif scaling == "spectrum":
244 if window is not None:
245 scaling = 2 / np.sum(window) ** 2
246 else:
247 scaling = 2 / time_data.shape[-1]
248 elif scaling == "dc-nyquist":
249 # Remove doubling of DC
250 freq_data[..., 0] /= 2
251 if nfft % 2 == 0:
252 # Even size, remove doubling of Nyquist
253 freq_data[..., -1] /= 2
254 scaling = False
255
256 if scaling:
257 freq_data *= scaling
258
259 # Remove doubling of DC
260 freq_data[..., 0] /= 2
261 if nfft % 2 == 0:
262 # Even size, remove doubling of Nyquist
263 freq_data[..., -1] /= 2
264
265 if axis is not None:
266 freq_data = np.moveaxis(freq_data, -1, axis)
267
268 return freq_data
269
270
271@numba.njit()
272def _linear_to_banded(linear_spectrum, lower_edges, upper_edges, spectral_resolution):
273 banded_spectrum = np.full(lower_edges.shape + linear_spectrum.shape[1:], np.nan)
274 for band_idx, (lower_edge, upper_edge) in enumerate(zip(lower_edges, upper_edges)):
275 lower_idx = int(np.floor(lower_edge / spectral_resolution + 0.5)) # (l_idx - 0.5) * Δf = l
276 upper_idx = int(np.ceil(upper_edge / spectral_resolution - 0.5)) # (u_idx + 0.5) * Δf = u
277 lower_idx = max(lower_idx, 0)
278 upper_idx = min(upper_idx, linear_spectrum.shape[0] - 1)
279
280 if lower_idx == upper_idx:
281 # This can only happen if both frequencies l and u are within the same fft bin.
282 # Since we don't allow the fft bins to be larger than the output bins, we thus have the exact same band.
283 banded_spectrum[band_idx] = linear_spectrum[lower_idx]
284 else:
285 # weight edge bins by "(whole bin - what is not in the new band) / whole bin"
286 # lower fft bin edge l_e = (l_idx - 0.5) * Δf
287 # w_l = (Δf - (l - l_e)) / Δf = l_idx + 0.5 - l / Δf
288 first_weight = lower_idx + 0.5 - lower_edge / spectral_resolution
289 # upper fft bin edge u_e = (u_idx + 0.5) * Δf
290 # w_u = (Δf - (u_e - u)) / Δf = 0.5 - u_idx + u / Δf
291 last_weight = upper_edge / spectral_resolution - upper_idx + 0.5
292 # Sum the components fully within the output bin `[l_idx + 1:u_idx]`, and weighted components partially in the band.
293 this_band = (
294 linear_spectrum[lower_idx + 1 : upper_idx].sum(axis=0)
295 + linear_spectrum[lower_idx] * first_weight
296 + linear_spectrum[upper_idx] * last_weight
297 )
298 banded_spectrum[band_idx] = this_band * (
299 spectral_resolution / (upper_edge - lower_edge)
300 ) # Rescale the power density.
301 return banded_spectrum
302
303
[docs]
304def linear_to_banded(linear_spectrum, lower_edges, upper_edges, spectral_resolution, axis=0):
305 """Aggregate a linear power spectrum into specified frequency bands.
306
307 The `linear_to_banded` function converts a linear power spectrum into a banded spectrum by
308 summing power within frequency bands defined by ``lower_edges`` and ``upper_edges``. It handles
309 multi-dimensional spectra by allowing specification of the axis corresponding to frequency
310 bins.
311
312 Parameters
313 ----------
314 linear_spectrum : `numpy.ndarray`
315 The input linear power spectrum. The axis specified by ``axis`` should correspond to
316 frequency bins.
317 lower_edges : array_like
318 The lower frequency edges for each band. Must be in ascending order.
319 upper_edges : array_like
320 The upper frequency edges for each band. Must be in ascending order and greater
321 than or equal to ``lower_edges``.
322 spectral_resolution : float
323 The frequency resolution (Δf) of the linear spectrum.
324 axis : int, optional, default=0
325 The axis of ``linear_spectrum`` that corresponds to frequency bins. If the frequency
326 bins are not along the first axis, specify the appropriate axis index.
327
328 Returns
329 -------
330 banded_spectrum : numpy.ndarray
331 The aggregated banded power spectrum.
332
333 """
334 # TODO: add features here to allow non-numpy inputs. Simply unwrap and rewrap as needed.
335 if axis:
336 linear_spectrum = np.moveaxis(linear_spectrum, axis, 0)
337 banded = _linear_to_banded(linear_spectrum, lower_edges, upper_edges, spectral_resolution)
338 if axis:
339 banded = np.moveaxis(banded, 0, axis)
340 return banded
341
342
[docs]
343def nth_decade_band_edges(bands_per_decade, min_frequency, max_frequency, hybrid_resolution=False, limit_to="center"):
344 """Find band centers and edges for Nth decade (hybrid) bands.
345
346 Parameters
347 ----------
348 bands_per_decade : int
349 The number of bands per decade
350 min_frequency : float
351 A lower frequency limit to use for the band centers, in Hz.
352 max_frequency : float
353 An upper frequency limit to use for the band centers, in Hz.
354 hybrid_resolution : float, optional
355 An optional linear hybrid resolution to use at the lower frequencies, in Hz.
356 limit_to : {"center", "strict", "inclusive"}, default "center"
357 Controls how the minimum and maximum frequency limits are applied.
358
359 - "center": keeps bands with center frequencies inside the range.
360 - "strict": keeps only bands where the entire band is within the range.
361 - "inclusive": keeps bands to cover the entire range, allowing the highest and lowers
362 bands to extend beyond the specified limits.
363
364 Returns
365 -------
366 centers : numpy.ndarray
367 The center frequencies of the bands
368 edges : (numpy.ndarray, numpy.ndarray)
369 Tuple of lower and upper band edges.
370 """
371 log_band_scaling = 10 ** (0.5 / bands_per_decade)
372
373 if hybrid_resolution:
374 # The frequency at which the logspaced bands cover at least one linspaced band
375 minimum_bandwidth_frequency = hybrid_resolution / (log_band_scaling - 1 / log_band_scaling)
376 first_log_idx = int(
377 np.ceil(bands_per_decade * np.log10(minimum_bandwidth_frequency / 1e3))
378 )
379 last_linear_idx = int(np.floor(minimum_bandwidth_frequency / hybrid_resolution))
380
381 # Since the logspaced bands have pre-determined centers, we can't just start them after the linspaced bands.
382 # Often, the bands will overlap at the minimum bandwidth frequency, so we look for the first band
383 # that does not overlap, i.e., the upper edge of the last linspaced band is below the lower edge of the first
384 # logspaced band
385 while (last_linear_idx + 0.5) * hybrid_resolution > 1e3 * 10 ** (
386 (first_log_idx - 0.5) / bands_per_decade
387 ):
388 # Condition is "upper edge of last linear band is higher than lower edge of first logarithmic band"
389 last_linear_idx += 1
390 first_log_idx += 1
391
392 first_linear_idx = int(np.floor(min_frequency / hybrid_resolution))
393 else:
394 first_linear_idx = last_linear_idx = 0
395 first_log_idx = int(np.floor(bands_per_decade * np.log10(min_frequency / 1e3)))
396
397 last_log_idx = int(np.ceil(bands_per_decade * np.log10(max_frequency / 1e3)))
398
399 lin_centers = np.arange(first_linear_idx, last_linear_idx + 1) * hybrid_resolution
400 lin_lowers = lin_centers - 0.5 * hybrid_resolution
401 lin_uppers = lin_centers + 0.5 * hybrid_resolution
402
403 if first_linear_idx == 0:
404 # Including DC-bin - convers from 0 to df / 2.
405 lin_lowers[0] = 0
406
407 log_centers = 1e3 * 10 ** (np.arange(first_log_idx, last_log_idx + 1) / bands_per_decade)
408 log_lowers = log_centers / log_band_scaling
409 log_uppers = log_centers * log_band_scaling
410
411 if lin_centers.size and log_centers.size:
412 log_lowers[0] = lin_uppers[-1]
413
414 centers = np.concatenate([lin_centers, log_centers])
415 lowers = np.concatenate([lin_lowers, log_lowers])
416 uppers = np.concatenate([lin_uppers, log_uppers])
417
418 if "center" in limit_to:
419 if centers[0] < min_frequency:
420 mask = centers >= min_frequency
421 lowers = lowers[mask]
422 centers = centers[mask]
423 uppers = uppers[mask]
424 if centers[-1] > max_frequency:
425 mask = centers <= max_frequency
426 lowers = lowers[mask]
427 centers = centers[mask]
428 uppers = uppers[mask]
429 elif "strict" in limit_to:
430 if lowers[0] < min_frequency:
431 mask = lowers >= min_frequency
432 lowers = lowers[mask]
433 centers = centers[mask]
434 uppers = uppers[mask]
435 if uppers[-1] > max_frequency:
436 mask = uppers <= max_frequency
437 lowers = lowers[mask]
438 centers = centers[mask]
439 uppers = uppers[mask]
440 elif "inclusive" in limit_to:
441 if uppers[0] < min_frequency:
442 mask = uppers >= min_frequency
443 lowers = lowers[mask]
444 centers = centers[mask]
445 uppers = uppers[mask]
446 if lowers[-1] > max_frequency:
447 mask = lowers <= max_frequency
448 lowers = lowers[mask]
449 centers = centers[mask]
450 uppers = uppers[mask]
451 else:
452 raise ValueError(f"Unknown limit option `{limit_to}`")
453
454 return centers, (lowers, uppers)
455
456
[docs]
457class FilterbankRoller(_core.Roller):
458 """Rolling computation of power spectra and spectrograms, both linear and banded.
459
460 Parameters
461 ----------
462 time_data : TimeData
463 The time-data wrapper to process.
464 duration : float, optional
465 The duration of the fft windows, in seconds.
466 step : float, optional
467 The step size between consecutive fft windows, in seconds.
468 overlap : float, optional
469 The amount of overlap between fft windows, as a fraction of the duration.
470 min_frequency : float
471 The lowest frequency to include in the processing.
472 max_frequency : float
473 The highest frequency to include in the processing.
474 bands_per_decade : float, optional
475 The number of frequency bands per decade for logarithmic scaling.
476 hybrid_resolution : float
477 A frequency resolution to aim for. Only used if ``frame_duration`` is not given
478 fft_window : str, default="hann"
479 The window function to apply to each rolling window before computing the FFT.
480 Can be a string specifying a window type (e.g., ``"hann"``, ``"kaiser"``, ``"blackman"``)
481 or an array-like sequence of window coefficients..
482
483 Notes
484 -----
485 The processing is done in stft frames determined by ``duration``, ``step``
486 ``overlap``, and ``hybrid_resolution``. At least one of ``duration``, ``step``,
487 or ``resolution`` has to be given, see `~_core.time_frame_settings` for further details.
488 At least one of ``min_frequency`` and ``hybrid_resolution`` has to be given.
489 Note that the ``duration`` and ``step`` can be auto-chosen from the overlap
490 and required frequency resolution, either from ``hybrid_resolution`` or ``min_frequency``.
491
492 Raises
493 ------
494 ValueError
495 If the processing settings are not compatible, e.g.,
496 - frequency bands with bandwidth smaller than the frame duration allows
497 """
498
499 def __init__(
500 self,
501 time_data,
502 duration=None,
503 step=None,
504 overlap=None,
505 min_frequency=None,
506 max_frequency=None,
507 bands_per_decade=None,
508 hybrid_resolution=None,
509 fft_window="hann",
510 ):
511 self.settings = _core.time_frame_settings(
512 duration=duration,
513 step=step,
514 overlap=overlap,
515 resolution=None if isinstance(hybrid_resolution, bool) else hybrid_resolution,
516 signal_length=time_data.time_window.duration,
517 samplerate=time_data.samplerate,
518 )
519 self.min_frequency = min_frequency or 0
520 self.max_frequency = max_frequency or time_data.samplerate / 2
521
522 self.bands_per_decade = bands_per_decade
523 self.hybrid_resolution = hybrid_resolution
524 self.fft_window = fft_window
525
526 self.time_data = time_data
527 self.roller = self.time_data.rolling(
528 duration=self.settings["duration"], step=self.settings["step"], overlap=self.settings["overlap"]
529 )
530
531 self.processing_axis = self.roller.dims.index("time")
532 self.check_frequency_resolution()
533 self.make_frequency_vectors()
534
535 @property
536 def dims(self): # noqa: D102
537 dims = list(self.roller.dims)
538 dims[self.processing_axis] = "frequency"
539 return tuple(dims)
540
541 @property
542 def shape(self): # noqa: D102
543 shape = list(self.roller.shape)
544 shape[self.processing_axis] = len(self.frequency)
545 return tuple(shape)
546
547 @property
548 def coords(self): # noqa: D102
549 coords = dict(self.roller.coords)
550 del coords["time"]
551 coords["frequency"] = xr.DataArray(self.frequency, dims="frequency", coords={"frequency": self.frequency})
552 return coords
553
[docs]
554 def check_frequency_resolution(self):
555 """Validate the frequency resolution against the temporal window settings."""
556 # TODO: Check what this used to do when you used parameters from a spectrogram object.
557 if not self.bands_per_decade:
558 self.bands_per_decade = False
559 self.hybrid_resolution = False
560 else:
561 if not self.hybrid_resolution:
562 self.hybrid_resolution = False
563 # Not using hybrid, we need long enough frames to compute the lowest band.
564 lowest_bandwidth = self.min_frequency * (
565 10 ** (0.5 / self.bands_per_decade) - 10 ** (-0.5 / self.bands_per_decade)
566 )
567 if lowest_bandwidth * self.settings["duration"] < 1:
568 raise ValueError(
569 f'{self.bands_per_decade}th-decade filter band at {self.min_frequency:.2f} Hz with bandwidth of {lowest_bandwidth:.2f} Hz '
570 f'cannot be calculated from temporal windows of {self.settings["duration"]:.2f} s.'
571 )
572 else:
573 # Get the hybrid resolution settings.
574 if self.hybrid_resolution is True:
575 self.hybrid_resolution = 1 / self.settings["duration"]
576 if self.hybrid_resolution * self.settings["duration"] < 1:
577 raise ValueError(
578 f'Hybrid filterbank with resolution of {self.hybrid_resolution:.2f} Hz '
579 f'cannot be calculated from temporal windows of {self.settings["duration"]:.2f} s.'
580 )
581
[docs]
582 def make_frequency_vectors(self):
583 """Construct frequency vectors and band definitions based on spectrogram settings."""
584 nfft = self.settings["samples_per_frame"]
585 self.linear_frequency = np.arange(nfft // 2 + 1) * self.time_data.samplerate / nfft
586 self.bandwidth = self.linear_frequency[1]
587
588 if self.max_frequency < self.linear_frequency[-1]:
589 upper_index = int(np.floor(self.max_frequency / self.time_data.samplerate * nfft))
590 else:
591 upper_index = None
592
593 if self.min_frequency > 0:
594 lower_index = int(np.ceil(self.min_frequency / self.time_data.samplerate * nfft))
595 else:
596 lower_index = None
597
598 if upper_index or lower_index:
599 self.linear_slice = (slice(None),) * self.processing_axis + (slice(lower_index, upper_index),)
600 else:
601 self.linear_slice = False
602
603 if self.linear_slice:
604 self.frequency = self.linear_frequency[self.linear_slice]
605
606 if self.bands_per_decade:
607 centers, (lowers, uppers) = nth_decade_band_edges(
608 bands_per_decade=self.bands_per_decade,
609 min_frequency=self.min_frequency,
610 max_frequency=self.max_frequency,
611 hybrid_resolution=self.hybrid_resolution,
612 )
613
614 self.band_lower_edges = lowers
615 self.band_centers = centers
616 self.band_upper_edges = uppers
617 self.frequency = centers
618 self.bandwidth = uppers - lowers
619
[docs]
620 def numpy_frames(self): # noqa: D102
621 window = scipy.signal.windows.get_window(self.fft_window, self.settings["samples_per_frame"], False)
622 window /= ((window**2).sum() * self.time_data.samplerate / 2) ** 0.5
623
624 for idx, time_frame in enumerate(self.roller.numpy_frames()):
625 freq_frame = spectrum(time_frame, window=window, scaling="dc-nyquist", axis=self.processing_axis)
626 if self.bands_per_decade:
627 freq_frame = linear_to_banded(
628 freq_frame,
629 lower_edges=self.band_lower_edges,
630 upper_edges=self.band_upper_edges,
631 spectral_resolution=self.settings["resolution"],
632 axis=self.processing_axis,
633 )
634 elif self.linear_slice:
635 freq_frame = freq_frame[self.linear_slice]
636 yield freq_frame
637
638 def __iter__(self):
639 # For linear FFT (no banding), compute edges from frequency resolution
640 if hasattr(self, 'band_lower_edges'):
641 frequency_band_lower = self.band_lower_edges
642 frequency_band_upper = self.band_upper_edges
643 else:
644 # Linear FFT case: assume centered bins
645 frequency_band_lower = self.frequency - self.bandwidth / 2
646 frequency_band_upper = self.frequency + self.bandwidth / 2
647
648 start_time = _core.time_to_np(self.time_data.time_window.start)
649 start_time += np.timedelta64(
650 int(self.settings["samples_per_frame"] / 2 / self.time_data.samplerate * 1e9), "ns"
651 )
652 for frame_idx, frame in enumerate(self.numpy_frames()):
653 time_since_start = frame_idx * self.settings["sample_step"] / self.time_data.samplerate
654 time_since_start = np.timedelta64(int(time_since_start * 1e9), "ns")
655
656 frame = _core.FrequencyData(
657 frame,
658 frequency=self.frequency,
659 frequency_band_lower=frequency_band_lower,
660 frequency_band_upper=frequency_band_upper,
661 coords=self.coords,
662 dims=self.dims,
663 )
664 frame.data["time"] = start_time + time_since_start
665 yield frame
666
[docs]
667 def batches(self, batch_size):
668 """Generate batches of spectrogram frames.
669
670 Parameters
671 ----------
672 batch_size : int
673 Number of frames to include in each batch.
674
675 Yields
676 ------
677 TimeFrequencyData
678 Batches of spectrogram frames with time and frequency information.
679 Each batch contains up to `batch_size` frames, with the final batch
680 potentially containing fewer frames if the total number of frames
681 is not evenly divisible by the batch size.
682 """
683 batch_output = np.zeros((batch_size,) + self.shape)
684 # For linear FFT (no banding), compute edges from frequency resolution
685 if hasattr(self, 'band_lower_edges'):
686 frequency_band_lower = self.band_lower_edges
687 frequency_band_upper = self.band_upper_edges
688 else:
689 # Linear FFT case: assume centered bins
690 frequency_band_lower = self.frequency - self.bandwidth / 2
691 frequency_band_upper = self.frequency + self.bandwidth / 2
692 for frame_idx, frame in enumerate(self.numpy_frames()):
693 if frame_idx % batch_size == 0:
694 batch_start_time = self.time_data.time_window.start.add(seconds=frame_idx * self.settings["step"])
695 batch_output[frame_idx % batch_size] = frame
696 if (frame_idx + 1) % batch_size == 0 or frame_idx == self.num_frames - 1:
697 batch_data = _core.TimeFrequencyData(
698 batch_output[:frame_idx % batch_size + 1].copy(), # Only use the frames we've filled
699 frequency=self.frequency,
700 frequency_band_lower=frequency_band_lower,
701 frequency_band_upper=frequency_band_upper,
702 samplerate=self.time_data.samplerate / self.settings["sample_step"],
703 start_time=batch_start_time.add(seconds=self.settings["duration"] / 2),
704 coords=self.coords,
705 dims=("time",) + self.dims,
706 attrs=dict(
707 frame_duration=self.settings["duration"],
708 frame_overlap=self.settings["overlap"],
709 frame_step=self.settings["step"],
710 bands_per_decade=self.bands_per_decade,
711 hybrid_resolution=self.hybrid_resolution,
712 ),
713 )
714 yield batch_data
715
716
[docs]
717class Filterbank:
718 """Calculates spectrograms, both linear and banded.
719
720 Parameters
721 ----------
722 bands_per_decade : float, optional
723 The number of frequency bands per decade for logarithmic scaling.
724 frame_step : float
725 The time step between stft frames, in seconds.
726 frame_duration : float
727 The duration of each stft frame, in seconds.
728 frame_overlap : float, default=0.5
729 The overlap factor between stft frames. A negative value leaves
730 gaps between frames.
731 min_frequency : float
732 The lowest frequency to include in the processing.
733 max_frequency : float
734 The highest frequency to include in the processing.
735 hybrid_resolution : float
736 A frequency resolution to aim for. Only used if ``frame_duration`` is not given
737 fft_window : str, default="hann"
738 The window function to apply to each rolling window before computing the FFT.
739 Can be a string specifying a window type (e.g., ``"hann"``, ``"kaiser"``, ``"blackman"``)
740 or an array-like sequence of window coefficients..
741
742 Notes
743 -----
744 The processing is done in stft frames determined by ``frame_duration``, ``frame_step``
745 ``frame_overlap``, and ``hybrid_resolution``. At least one of ``duration``, ``step``,
746 or ``resolution`` has to be given, see `~_core.time_frame_settings` for further details.
747 At least one of ``min_frequency`` and ``hybrid_resolution`` has to be given.
748 Note that the ``frame_duration`` and ``frame_step`` can be auto-chosen from the overlap
749 and required frequency resolution, either from ``hybrid_resolution`` or ``min_frequency``.
750
751 Raises
752 ------
753 ValueError
754 If the processing settings are not compatible, e.g.,
755 - frequency bands with bandwidth smaller than the frame duration allows
756 """
757
758 def __init__(
759 self,
760 *,
761 bands_per_decade=None,
762 frame_step=None,
763 frame_duration=None,
764 frame_overlap=0.5,
765 min_frequency=None,
766 max_frequency=None,
767 hybrid_resolution=None,
768 fft_window="hann",
769 ):
770 self.frame_duration = frame_duration
771 self.frame_overlap = frame_overlap
772 self.frame_step = frame_step
773 self.fft_window = fft_window
774 self.bands_per_decade = bands_per_decade
775 self.min_frequency = min_frequency
776 self.max_frequency = max_frequency
777 self.hybrid_resolution = hybrid_resolution
778
[docs]
779 def rolling(self, time_data):
780 roller = FilterbankRoller(
781 time_data=time_data,
782 duration=self.frame_duration,
783 step=self.frame_step,
784 overlap=self.frame_overlap,
785 min_frequency=self.min_frequency,
786 max_frequency=self.max_frequency,
787 bands_per_decade=self.bands_per_decade,
788 hybrid_resolution=self.hybrid_resolution,
789 fft_window=self.fft_window,
790 )
791 return roller
792
[docs]
793 def __call__(self, time_data):
794 """Process time data to spectrograms.
795
796 Parameters
797 ----------
798 time_data : `~uwacan.TimeData` or `~uwacan.recordings.AudioFileRecording`
799 The data to process.
800
801 Returns
802 -------
803 filtered_data : `~uwacan.TimeFrequencyData`
804 """
805 roller = self.rolling(time_data)
806 return next(roller.batches(roller.num_frames))