Source code for uwacan.spectral

  1"""Implementations for spectral transforms and filtering.
  2
  3.. currentmodule:: uwacan.spectral
  4
  5Core processing and analysis
  6----------------------------
  7.. autosummary::
  8    :toctree: generated
  9
 10    spectrum
 11    fft
 12    ifft
 13    Filterbank
 14    FilterbankRoller
 15    linear_to_banded
 16    nth_decade_band_edges
 17
 18"""
 19
 20
 21from . import _core
 22import xarray as xr
 23import numpy as np
 24import scipy.signal
 25import numba
 26
 27
[docs] 28def fft(time_data, nfft=None): 29 """Compute the fft of a time signal. 30 31 This computes the single-sided fft of the input data, intended to go from real time data to frequency data. 32 The output array has ``nfft//2+1`` frequency bins. 33 No scaling is applied to the output, i.e., it has the same normalization as `numpy.fft.rfft`. 34 35 Parameters 36 ---------- 37 time_data : _core.TimeData or xr.DataArray or numpy.ndarray 38 The input time-domain data to compute the spectrum for. The data can be one of the following: 39 40 - `~uwacan.TimeData`: Wrapped time data from ``uwacan``. 41 - `xarray.DataArray`: An xarray DataArray with a 'time' dimension. 42 - `numpy.ndarray`: A NumPy array containing time-series data. The fft will be over the last axis. 43 nfft : int, optional 44 The number of bins to use for the fft. Defaults to the length of the time signal. 45 46 Returns 47 ------- 48 _core.FrequencyData or xr.DataArray or numpy.ndarray 49 The computed single-sided fft of the input data. The return type matches the input type: 50 51 - If ``time_data`` is a `~uwacan.TimeData`, returns a `~uwacan.FrequencyData` object. 52 - If ``time_data`` is an `xarray.DataArray`, returns an `xarray.DataArray` with a 'frequency' dimension. 53 - If ``time_data`` is a `numpy.ndarray`, returns a NumPy array containing the fft data. 54 """ 55 if isinstance(time_data, _core.TimeData): 56 return _core.FrequencyData(fft(time_data.data, nfft=nfft)) 57 if isinstance(time_data, xr.DataArray): 58 nfft = nfft or time_data.time.size 59 freq_data = xr.apply_ufunc( 60 fft, 61 time_data, 62 input_core_dims=[["time"]], 63 output_core_dims=[["frequency"]], 64 kwargs={"nfft": nfft}, 65 ) 66 freq_data.coords["frequency"] = np.fft.rfftfreq(nfft, 1 / time_data.time.rate) 67 center_offset = np.timedelta64(round(time_data.time.size / 2 / time_data.time.rate * 1e9), "ns") 68 freq_data.coords["time"] = time_data.time[0] + center_offset 69 freq_data.coords["start_time"] = time_data.time[0] 70 return freq_data 71 return np.fft.rfft(time_data, n=nfft)
72 73
[docs] 74def ifft(freq_data, nfft=None): 75 """Compute the inverst fft of a single-sided spectrum. 76 77 This computes the single-sided inverse fft of the input data, intended to go from frequency data to real time data. 78 With an input array with ``nfft//2+1`` frequency bins, the output array has ``nfft`` time samples. 79 No scaling is applied to the output, i.e., it has the same normalization as `numpy.fft.irfft`. 80 81 Parameters 82 ---------- 83 freq_data : _core.FrequencyData or xr.DataArray or numpy.ndarray 84 Single-sided fft of some time data. The data can be one of the following: 85 86 - `~uwacan.FrequencyData`: Wrapped frequency data from ``uwacan``. 87 - `xarray.DataArray`: An xarray DataArray with a 'frequency' dimension. 88 - `numpy.ndarray`: A NumPy array containing a single-sided fft spectrum. The ifft will be over the last axis. 89 90 nfft : int, optional 91 The number of bins to use for the ifft (= number of samples in the output). 92 See notes for info on how this is computed from the input. 93 94 Returns 95 ------- 96 _core.FrequencyData or xr.DataArray or numpy.ndarray 97 The computed single-sided fft of the input data. The return type matches the input type: 98 99 - If ``time_data`` is a `~uwacan.TimeData`, returns a `~uwacan.FrequencyData` object. 100 - If ``time_data`` is an `xarray.DataArray`, returns an `xarray.DataArray` with a 'frequency' dimension. 101 - If ``time_data`` is a `numpy.ndarray`, returns a NumPy array containing the fft data. 102 103 Notes 104 ----- 105 For a real-valued time signal with ``N`` samples, there are ``N//2 + 1`` independent values in the spectrum. 106 If ``N`` is even, the last bin is the Nyquist bin, which always has a real value. The first bin (index 0) 107 corresponds to 0 Hz (DC), and is also always real-valued. 108 If ``N`` is odd the last bin is not the Nyquist bin, but one half bin below, and is not necessarily real-valued. 109 Thus, given a spectrum with ``K`` frequency bins, we assume that the original signal has an even number of samples 110 ``N = 2*(K-1)`` if all values at the highest bin are real. If there are any non-zero imaginary parts at the highest 111 frequency bin, we assume an odd number of samples ``N = 2*(K-1) + 1``. 112 """ 113 if isinstance(freq_data, _core.FrequencyData): 114 return _core.TimeData(ifft(freq_data.data, nfft=nfft)) 115 if isinstance(freq_data, xr.DataArray): 116 time_data = xr.apply_ufunc( 117 ifft, 118 freq_data.drop_vars("time"), 119 input_core_dims=[['frequency']], 120 output_core_dims=[['time']], 121 kwargs={"nfft": nfft}, 122 ) 123 samplerate = round(freq_data.frequency[1].item() * time_data.time.size) 124 offsets = np.arange(time_data.time.size) * 1e9 / samplerate 125 time = freq_data["start_time"].values + offsets.astype("timedelta64[ns]") 126 time = xr.DataArray(time, dims="time", attrs={"rate": samplerate}) 127 # time = np.arange(time_data.time.size) * np.timedelta64(int(1e9 / samplerate), "ns") 128 time_data.coords["time"] = time 129 return time_data 130 131 if nfft is None: 132 is_odd = np.any(freq_data[..., -1].imag) 133 nfft = (freq_data.shape[0] - 1) * 2 + (1 if is_odd else 0) 134 135 time_data = np.fft.irfft(freq_data, n=nfft) 136 return time_data
137 138
[docs] 139def spectrum(time_data, window=None, scaling="density", nfft=None, detrend=True, samplerate=None, axis=None): 140 """Compute the power spectrum of time-domain data. 141 142 The `spectrum` function calculates the power spectrum of input time-series data. It supports 143 various input types, including `~uwacan.TimeData`, `xarray.DataArray`, and NumPy arrays. The 144 function applies windowing, detrending, and scaling as specified by the parameters to 145 produce the frequency-domain representation of the data. 146 147 Parameters 148 ---------- 149 time_data : _core.TimeData or xr.DataArray or numpy.ndarray 150 The input time-domain data to compute the spectrum for. The data can be one of the following: 151 152 - `~uwacan.TimeData`: Wrapped time data from ``uwacan``. 153 - `xarray.DataArray`: An xarray DataArray with a 'time' dimension. 154 - `numpy.ndarray`: A NumPy array containing time-series data. 155 156 window : str or array_like, optional 157 The window function to apply to the data before computing the FFT. This can be: 158 159 - A string specifying the type of window to use (e.g., ``"hann"``, ``"kaiser"``, ``"blackman"``). 160 - An array-like sequence of window coefficients. 161 - If ``None``, no window is applied. Default is ``None``. 162 163 scaling : {'density', 'spectrum', 'dc-nyquist'} or numeric, optional 164 Specifies the scaling of the power spectrum. Options include: 165 166 - ``'density'``: Computes the power spectral density. 167 - ``'spectrum'``: Computes the power spectrum. 168 - ``'dc-nyquist'``: Halves the output at DC and Nyquist frequencies. Use with a pre-scaled window that takes care to scale the remainder of the single-sided spectrum. 169 - any numeric value: The output of the fft will be scaled by this value. 170 171 Default is ``"density"``. 172 173 nfft : int, optional 174 The number of points to use in the FFT computation. If ``None``, it defaults to the length 175 of the input data along the specified axis. 176 177 detrend : bool, default=True 178 If ``True``, removes the mean from the data before computing the FFT to reduce spectral leakage. 179 If ``False``, no detrending is performed. 180 181 samplerate : float, optional 182 The sampling rate of the input data in Hz. Required if ``time_data`` is an numpy array. 183 If not provided, it defaults to 1. This parameter is used to compute the frequency axis and proper density scaling. 184 185 axis : int, optional 186 The axis along which to compute the FFT. If ``None``, the last axis is used. 187 Only used for numpy inputs. 188 This parameter allows flexibility in handling multi-dimensional data. 189 190 Returns 191 ------- 192 _core.FrequencyData or xr.DataArray or numpy.ndarray 193 The computed power spectrum of the input data. The return type matches the input type: 194 195 - If ``time_data`` is a `~uwacan.TimeData`, returns a `~uwacan.FrequencyData` object. 196 - If ``time_data`` is an `xarray.DataArray`, returns an `xarray.DataArray` with a 'frequency' dimension. 197 - If ``time_data`` is a `numpy.ndarray`, returns a NumPy array containing the power spectrum. 198 199 """ 200 if isinstance(time_data, _core.TimeData): 201 return _core.FrequencyData(spectrum(time_data.data, window=window, scaling=scaling, nfft=nfft, detrend=detrend)) 202 if isinstance(time_data, xr.DataArray): 203 freq_data = xr.apply_ufunc( 204 spectrum, 205 time_data, 206 input_core_dims=[["time"]], 207 output_core_dims=[["frequency"]], 208 kwargs=dict( 209 window=window, 210 scaling=scaling, 211 nfft=nfft, 212 detrend=detrend, 213 samplerate=samplerate or time_data.time.attrs.get("rate", None), 214 ), 215 ) 216 freq_data.coords["frequency"] = np.fft.rfftfreq(nfft or time_data.time.size, 1 / time_data.time.rate) 217 freq_data.coords["time"] = time_data.time[0] + np.timedelta64( 218 round(time_data.time.size / 2 / time_data.time.rate * 1e9), "ns" 219 ) 220 return freq_data 221 222 if axis is not None: 223 time_data = np.moveaxis(time_data, axis, -1) 224 225 if detrend: 226 time_data = time_data - time_data.mean(axis=-1, keepdims=True) 227 228 if window is not None: 229 if not isinstance(window, np.ndarray): 230 window = scipy.signal.windows.get_window(window, time_data.shape[-1], False) 231 time_data = time_data * window 232 233 nfft = nfft or time_data.shape[-1] 234 freq_data = np.fft.rfft(time_data, axis=-1, n=nfft) 235 freq_data = np.abs(freq_data) ** 2 236 237 if scaling == "density": 238 samplerate = samplerate or 1 239 if window is not None: 240 scaling = 2 / (np.sum(window**2) * samplerate) 241 else: 242 scaling = 2 / (time_data.shape[-1] * samplerate) 243 elif scaling == "spectrum": 244 if window is not None: 245 scaling = 2 / np.sum(window) ** 2 246 else: 247 scaling = 2 / time_data.shape[-1] 248 elif scaling == "dc-nyquist": 249 # Remove doubling of DC 250 freq_data[..., 0] /= 2 251 if nfft % 2 == 0: 252 # Even size, remove doubling of Nyquist 253 freq_data[..., -1] /= 2 254 scaling = False 255 256 if scaling: 257 freq_data *= scaling 258 259 # Remove doubling of DC 260 freq_data[..., 0] /= 2 261 if nfft % 2 == 0: 262 # Even size, remove doubling of Nyquist 263 freq_data[..., -1] /= 2 264 265 if axis is not None: 266 freq_data = np.moveaxis(freq_data, -1, axis) 267 268 return freq_data
269 270 271@numba.njit() 272def _linear_to_banded(linear_spectrum, lower_edges, upper_edges, spectral_resolution): 273 banded_spectrum = np.full(lower_edges.shape + linear_spectrum.shape[1:], np.nan) 274 for band_idx, (lower_edge, upper_edge) in enumerate(zip(lower_edges, upper_edges)): 275 lower_idx = int(np.floor(lower_edge / spectral_resolution + 0.5)) # (l_idx - 0.5) * Δf = l 276 upper_idx = int(np.ceil(upper_edge / spectral_resolution - 0.5)) # (u_idx + 0.5) * Δf = u 277 lower_idx = max(lower_idx, 0) 278 upper_idx = min(upper_idx, linear_spectrum.shape[0] - 1) 279 280 if lower_idx == upper_idx: 281 # This can only happen if both frequencies l and u are within the same fft bin. 282 # Since we don't allow the fft bins to be larger than the output bins, we thus have the exact same band. 283 banded_spectrum[band_idx] = linear_spectrum[lower_idx] 284 else: 285 # weight edge bins by "(whole bin - what is not in the new band) / whole bin" 286 # lower fft bin edge l_e = (l_idx - 0.5) * Δf 287 # w_l = (Δf - (l - l_e)) / Δf = l_idx + 0.5 - l / Δf 288 first_weight = lower_idx + 0.5 - lower_edge / spectral_resolution 289 # upper fft bin edge u_e = (u_idx + 0.5) * Δf 290 # w_u = (Δf - (u_e - u)) / Δf = 0.5 - u_idx + u / Δf 291 last_weight = upper_edge / spectral_resolution - upper_idx + 0.5 292 # Sum the components fully within the output bin `[l_idx + 1:u_idx]`, and weighted components partially in the band. 293 this_band = ( 294 linear_spectrum[lower_idx + 1 : upper_idx].sum(axis=0) 295 + linear_spectrum[lower_idx] * first_weight 296 + linear_spectrum[upper_idx] * last_weight 297 ) 298 banded_spectrum[band_idx] = this_band * ( 299 spectral_resolution / (upper_edge - lower_edge) 300 ) # Rescale the power density. 301 return banded_spectrum 302 303
[docs] 304def linear_to_banded(linear_spectrum, lower_edges, upper_edges, spectral_resolution, axis=0): 305 """Aggregate a linear power spectrum into specified frequency bands. 306 307 The `linear_to_banded` function converts a linear power spectrum into a banded spectrum by 308 summing power within frequency bands defined by ``lower_edges`` and ``upper_edges``. It handles 309 multi-dimensional spectra by allowing specification of the axis corresponding to frequency 310 bins. 311 312 Parameters 313 ---------- 314 linear_spectrum : `numpy.ndarray` 315 The input linear power spectrum. The axis specified by ``axis`` should correspond to 316 frequency bins. 317 lower_edges : array_like 318 The lower frequency edges for each band. Must be in ascending order. 319 upper_edges : array_like 320 The upper frequency edges for each band. Must be in ascending order and greater 321 than or equal to ``lower_edges``. 322 spectral_resolution : float 323 The frequency resolution (Δf) of the linear spectrum. 324 axis : int, optional, default=0 325 The axis of ``linear_spectrum`` that corresponds to frequency bins. If the frequency 326 bins are not along the first axis, specify the appropriate axis index. 327 328 Returns 329 ------- 330 banded_spectrum : numpy.ndarray 331 The aggregated banded power spectrum. 332 333 """ 334 # TODO: add features here to allow non-numpy inputs. Simply unwrap and rewrap as needed. 335 if axis: 336 linear_spectrum = np.moveaxis(linear_spectrum, axis, 0) 337 banded = _linear_to_banded(linear_spectrum, lower_edges, upper_edges, spectral_resolution) 338 if axis: 339 banded = np.moveaxis(banded, 0, axis) 340 return banded
341 342
[docs] 343def nth_decade_band_edges(bands_per_decade, min_frequency, max_frequency, hybrid_resolution=False, limit_to="center"): 344 """Find band centers and edges for Nth decade (hybrid) bands. 345 346 Parameters 347 ---------- 348 bands_per_decade : int 349 The number of bands per decade 350 min_frequency : float 351 A lower frequency limit to use for the band centers, in Hz. 352 max_frequency : float 353 An upper frequency limit to use for the band centers, in Hz. 354 hybrid_resolution : float, optional 355 An optional linear hybrid resolution to use at the lower frequencies, in Hz. 356 limit_to : {"center", "strict", "inclusive"}, default "center" 357 Controls how the minimum and maximum frequency limits are applied. 358 359 - "center": keeps bands with center frequencies inside the range. 360 - "strict": keeps only bands where the entire band is within the range. 361 - "inclusive": keeps bands to cover the entire range, allowing the highest and lowers 362 bands to extend beyond the specified limits. 363 364 Returns 365 ------- 366 centers : numpy.ndarray 367 The center frequencies of the bands 368 edges : (numpy.ndarray, numpy.ndarray) 369 Tuple of lower and upper band edges. 370 """ 371 log_band_scaling = 10 ** (0.5 / bands_per_decade) 372 373 if hybrid_resolution: 374 # The frequency at which the logspaced bands cover at least one linspaced band 375 minimum_bandwidth_frequency = hybrid_resolution / (log_band_scaling - 1 / log_band_scaling) 376 first_log_idx = int( 377 np.ceil(bands_per_decade * np.log10(minimum_bandwidth_frequency / 1e3)) 378 ) 379 last_linear_idx = int(np.floor(minimum_bandwidth_frequency / hybrid_resolution)) 380 381 # Since the logspaced bands have pre-determined centers, we can't just start them after the linspaced bands. 382 # Often, the bands will overlap at the minimum bandwidth frequency, so we look for the first band 383 # that does not overlap, i.e., the upper edge of the last linspaced band is below the lower edge of the first 384 # logspaced band 385 while (last_linear_idx + 0.5) * hybrid_resolution > 1e3 * 10 ** ( 386 (first_log_idx - 0.5) / bands_per_decade 387 ): 388 # Condition is "upper edge of last linear band is higher than lower edge of first logarithmic band" 389 last_linear_idx += 1 390 first_log_idx += 1 391 392 first_linear_idx = int(np.floor(min_frequency / hybrid_resolution)) 393 else: 394 first_linear_idx = last_linear_idx = 0 395 first_log_idx = int(np.floor(bands_per_decade * np.log10(min_frequency / 1e3))) 396 397 last_log_idx = int(np.ceil(bands_per_decade * np.log10(max_frequency / 1e3))) 398 399 lin_centers = np.arange(first_linear_idx, last_linear_idx + 1) * hybrid_resolution 400 lin_lowers = lin_centers - 0.5 * hybrid_resolution 401 lin_uppers = lin_centers + 0.5 * hybrid_resolution 402 403 if first_linear_idx == 0: 404 # Including DC-bin - convers from 0 to df / 2. 405 lin_lowers[0] = 0 406 407 log_centers = 1e3 * 10 ** (np.arange(first_log_idx, last_log_idx + 1) / bands_per_decade) 408 log_lowers = log_centers / log_band_scaling 409 log_uppers = log_centers * log_band_scaling 410 411 if lin_centers.size and log_centers.size: 412 log_lowers[0] = lin_uppers[-1] 413 414 centers = np.concatenate([lin_centers, log_centers]) 415 lowers = np.concatenate([lin_lowers, log_lowers]) 416 uppers = np.concatenate([lin_uppers, log_uppers]) 417 418 if "center" in limit_to: 419 if centers[0] < min_frequency: 420 mask = centers >= min_frequency 421 lowers = lowers[mask] 422 centers = centers[mask] 423 uppers = uppers[mask] 424 if centers[-1] > max_frequency: 425 mask = centers <= max_frequency 426 lowers = lowers[mask] 427 centers = centers[mask] 428 uppers = uppers[mask] 429 elif "strict" in limit_to: 430 if lowers[0] < min_frequency: 431 mask = lowers >= min_frequency 432 lowers = lowers[mask] 433 centers = centers[mask] 434 uppers = uppers[mask] 435 if uppers[-1] > max_frequency: 436 mask = uppers <= max_frequency 437 lowers = lowers[mask] 438 centers = centers[mask] 439 uppers = uppers[mask] 440 elif "inclusive" in limit_to: 441 if uppers[0] < min_frequency: 442 mask = uppers >= min_frequency 443 lowers = lowers[mask] 444 centers = centers[mask] 445 uppers = uppers[mask] 446 if lowers[-1] > max_frequency: 447 mask = lowers <= max_frequency 448 lowers = lowers[mask] 449 centers = centers[mask] 450 uppers = uppers[mask] 451 else: 452 raise ValueError(f"Unknown limit option `{limit_to}`") 453 454 return centers, (lowers, uppers)
455 456
[docs] 457class FilterbankRoller(_core.Roller): 458 """Rolling computation of power spectra and spectrograms, both linear and banded. 459 460 Parameters 461 ---------- 462 time_data : TimeData 463 The time-data wrapper to process. 464 duration : float, optional 465 The duration of the fft windows, in seconds. 466 step : float, optional 467 The step size between consecutive fft windows, in seconds. 468 overlap : float, optional 469 The amount of overlap between fft windows, as a fraction of the duration. 470 min_frequency : float 471 The lowest frequency to include in the processing. 472 max_frequency : float 473 The highest frequency to include in the processing. 474 bands_per_decade : float, optional 475 The number of frequency bands per decade for logarithmic scaling. 476 hybrid_resolution : float 477 A frequency resolution to aim for. Only used if ``frame_duration`` is not given 478 fft_window : str, default="hann" 479 The window function to apply to each rolling window before computing the FFT. 480 Can be a string specifying a window type (e.g., ``"hann"``, ``"kaiser"``, ``"blackman"``) 481 or an array-like sequence of window coefficients.. 482 483 Notes 484 ----- 485 The processing is done in stft frames determined by ``duration``, ``step`` 486 ``overlap``, and ``hybrid_resolution``. At least one of ``duration``, ``step``, 487 or ``resolution`` has to be given, see `~_core.time_frame_settings` for further details. 488 At least one of ``min_frequency`` and ``hybrid_resolution`` has to be given. 489 Note that the ``duration`` and ``step`` can be auto-chosen from the overlap 490 and required frequency resolution, either from ``hybrid_resolution`` or ``min_frequency``. 491 492 Raises 493 ------ 494 ValueError 495 If the processing settings are not compatible, e.g., 496 - frequency bands with bandwidth smaller than the frame duration allows 497 """ 498 499 def __init__( 500 self, 501 time_data, 502 duration=None, 503 step=None, 504 overlap=None, 505 min_frequency=None, 506 max_frequency=None, 507 bands_per_decade=None, 508 hybrid_resolution=None, 509 fft_window="hann", 510 ): 511 self.settings = _core.time_frame_settings( 512 duration=duration, 513 step=step, 514 overlap=overlap, 515 resolution=None if isinstance(hybrid_resolution, bool) else hybrid_resolution, 516 signal_length=time_data.time_window.duration, 517 samplerate=time_data.samplerate, 518 ) 519 self.min_frequency = min_frequency or 0 520 self.max_frequency = max_frequency or time_data.samplerate / 2 521 522 self.bands_per_decade = bands_per_decade 523 self.hybrid_resolution = hybrid_resolution 524 self.fft_window = fft_window 525 526 self.time_data = time_data 527 self.roller = self.time_data.rolling( 528 duration=self.settings["duration"], step=self.settings["step"], overlap=self.settings["overlap"] 529 ) 530 531 self.processing_axis = self.roller.dims.index("time") 532 self.check_frequency_resolution() 533 self.make_frequency_vectors() 534 535 @property 536 def dims(self): # noqa: D102 537 dims = list(self.roller.dims) 538 dims[self.processing_axis] = "frequency" 539 return tuple(dims) 540 541 @property 542 def shape(self): # noqa: D102 543 shape = list(self.roller.shape) 544 shape[self.processing_axis] = len(self.frequency) 545 return tuple(shape) 546 547 @property 548 def coords(self): # noqa: D102 549 coords = dict(self.roller.coords) 550 del coords["time"] 551 coords["frequency"] = xr.DataArray(self.frequency, dims="frequency", coords={"frequency": self.frequency}) 552 return coords 553
[docs] 554 def check_frequency_resolution(self): 555 """Validate the frequency resolution against the temporal window settings.""" 556 # TODO: Check what this used to do when you used parameters from a spectrogram object. 557 if not self.bands_per_decade: 558 self.bands_per_decade = False 559 self.hybrid_resolution = False 560 else: 561 if not self.hybrid_resolution: 562 self.hybrid_resolution = False 563 # Not using hybrid, we need long enough frames to compute the lowest band. 564 lowest_bandwidth = self.min_frequency * ( 565 10 ** (0.5 / self.bands_per_decade) - 10 ** (-0.5 / self.bands_per_decade) 566 ) 567 if lowest_bandwidth * self.settings["duration"] < 1: 568 raise ValueError( 569 f'{self.bands_per_decade}th-decade filter band at {self.min_frequency:.2f} Hz with bandwidth of {lowest_bandwidth:.2f} Hz ' 570 f'cannot be calculated from temporal windows of {self.settings["duration"]:.2f} s.' 571 ) 572 else: 573 # Get the hybrid resolution settings. 574 if self.hybrid_resolution is True: 575 self.hybrid_resolution = 1 / self.settings["duration"] 576 if self.hybrid_resolution * self.settings["duration"] < 1: 577 raise ValueError( 578 f'Hybrid filterbank with resolution of {self.hybrid_resolution:.2f} Hz ' 579 f'cannot be calculated from temporal windows of {self.settings["duration"]:.2f} s.' 580 )
581
[docs] 582 def make_frequency_vectors(self): 583 """Construct frequency vectors and band definitions based on spectrogram settings.""" 584 nfft = self.settings["samples_per_frame"] 585 self.linear_frequency = np.arange(nfft // 2 + 1) * self.time_data.samplerate / nfft 586 self.bandwidth = self.linear_frequency[1] 587 588 if self.max_frequency < self.linear_frequency[-1]: 589 upper_index = int(np.floor(self.max_frequency / self.time_data.samplerate * nfft)) 590 else: 591 upper_index = None 592 593 if self.min_frequency > 0: 594 lower_index = int(np.ceil(self.min_frequency / self.time_data.samplerate * nfft)) 595 else: 596 lower_index = None 597 598 if upper_index or lower_index: 599 self.linear_slice = (slice(None),) * self.processing_axis + (slice(lower_index, upper_index),) 600 else: 601 self.linear_slice = False 602 603 if self.linear_slice: 604 self.frequency = self.linear_frequency[self.linear_slice] 605 606 if self.bands_per_decade: 607 centers, (lowers, uppers) = nth_decade_band_edges( 608 bands_per_decade=self.bands_per_decade, 609 min_frequency=self.min_frequency, 610 max_frequency=self.max_frequency, 611 hybrid_resolution=self.hybrid_resolution, 612 ) 613 614 self.band_lower_edges = lowers 615 self.band_centers = centers 616 self.band_upper_edges = uppers 617 self.frequency = centers 618 self.bandwidth = uppers - lowers
619
[docs] 620 def numpy_frames(self): # noqa: D102 621 window = scipy.signal.windows.get_window(self.fft_window, self.settings["samples_per_frame"], False) 622 window /= ((window**2).sum() * self.time_data.samplerate / 2) ** 0.5 623 624 for idx, time_frame in enumerate(self.roller.numpy_frames()): 625 freq_frame = spectrum(time_frame, window=window, scaling="dc-nyquist", axis=self.processing_axis) 626 if self.bands_per_decade: 627 freq_frame = linear_to_banded( 628 freq_frame, 629 lower_edges=self.band_lower_edges, 630 upper_edges=self.band_upper_edges, 631 spectral_resolution=self.settings["resolution"], 632 axis=self.processing_axis, 633 ) 634 elif self.linear_slice: 635 freq_frame = freq_frame[self.linear_slice] 636 yield freq_frame
637 638 def __iter__(self): 639 # For linear FFT (no banding), compute edges from frequency resolution 640 if hasattr(self, 'band_lower_edges'): 641 frequency_band_lower = self.band_lower_edges 642 frequency_band_upper = self.band_upper_edges 643 else: 644 # Linear FFT case: assume centered bins 645 frequency_band_lower = self.frequency - self.bandwidth / 2 646 frequency_band_upper = self.frequency + self.bandwidth / 2 647 648 start_time = _core.time_to_np(self.time_data.time_window.start) 649 start_time += np.timedelta64( 650 int(self.settings["samples_per_frame"] / 2 / self.time_data.samplerate * 1e9), "ns" 651 ) 652 for frame_idx, frame in enumerate(self.numpy_frames()): 653 time_since_start = frame_idx * self.settings["sample_step"] / self.time_data.samplerate 654 time_since_start = np.timedelta64(int(time_since_start * 1e9), "ns") 655 656 frame = _core.FrequencyData( 657 frame, 658 frequency=self.frequency, 659 frequency_band_lower=frequency_band_lower, 660 frequency_band_upper=frequency_band_upper, 661 coords=self.coords, 662 dims=self.dims, 663 ) 664 frame.data["time"] = start_time + time_since_start 665 yield frame 666
[docs] 667 def batches(self, batch_size): 668 """Generate batches of spectrogram frames. 669 670 Parameters 671 ---------- 672 batch_size : int 673 Number of frames to include in each batch. 674 675 Yields 676 ------ 677 TimeFrequencyData 678 Batches of spectrogram frames with time and frequency information. 679 Each batch contains up to `batch_size` frames, with the final batch 680 potentially containing fewer frames if the total number of frames 681 is not evenly divisible by the batch size. 682 """ 683 batch_output = np.zeros((batch_size,) + self.shape) 684 # For linear FFT (no banding), compute edges from frequency resolution 685 if hasattr(self, 'band_lower_edges'): 686 frequency_band_lower = self.band_lower_edges 687 frequency_band_upper = self.band_upper_edges 688 else: 689 # Linear FFT case: assume centered bins 690 frequency_band_lower = self.frequency - self.bandwidth / 2 691 frequency_band_upper = self.frequency + self.bandwidth / 2 692 for frame_idx, frame in enumerate(self.numpy_frames()): 693 if frame_idx % batch_size == 0: 694 batch_start_time = self.time_data.time_window.start.add(seconds=frame_idx * self.settings["step"]) 695 batch_output[frame_idx % batch_size] = frame 696 if (frame_idx + 1) % batch_size == 0 or frame_idx == self.num_frames - 1: 697 batch_data = _core.TimeFrequencyData( 698 batch_output[:frame_idx % batch_size + 1].copy(), # Only use the frames we've filled 699 frequency=self.frequency, 700 frequency_band_lower=frequency_band_lower, 701 frequency_band_upper=frequency_band_upper, 702 samplerate=self.time_data.samplerate / self.settings["sample_step"], 703 start_time=batch_start_time.add(seconds=self.settings["duration"] / 2), 704 coords=self.coords, 705 dims=("time",) + self.dims, 706 attrs=dict( 707 frame_duration=self.settings["duration"], 708 frame_overlap=self.settings["overlap"], 709 frame_step=self.settings["step"], 710 bands_per_decade=self.bands_per_decade, 711 hybrid_resolution=self.hybrid_resolution, 712 ), 713 ) 714 yield batch_data
715 716
[docs] 717class Filterbank: 718 """Calculates spectrograms, both linear and banded. 719 720 Parameters 721 ---------- 722 bands_per_decade : float, optional 723 The number of frequency bands per decade for logarithmic scaling. 724 frame_step : float 725 The time step between stft frames, in seconds. 726 frame_duration : float 727 The duration of each stft frame, in seconds. 728 frame_overlap : float, default=0.5 729 The overlap factor between stft frames. A negative value leaves 730 gaps between frames. 731 min_frequency : float 732 The lowest frequency to include in the processing. 733 max_frequency : float 734 The highest frequency to include in the processing. 735 hybrid_resolution : float 736 A frequency resolution to aim for. Only used if ``frame_duration`` is not given 737 fft_window : str, default="hann" 738 The window function to apply to each rolling window before computing the FFT. 739 Can be a string specifying a window type (e.g., ``"hann"``, ``"kaiser"``, ``"blackman"``) 740 or an array-like sequence of window coefficients.. 741 742 Notes 743 ----- 744 The processing is done in stft frames determined by ``frame_duration``, ``frame_step`` 745 ``frame_overlap``, and ``hybrid_resolution``. At least one of ``duration``, ``step``, 746 or ``resolution`` has to be given, see `~_core.time_frame_settings` for further details. 747 At least one of ``min_frequency`` and ``hybrid_resolution`` has to be given. 748 Note that the ``frame_duration`` and ``frame_step`` can be auto-chosen from the overlap 749 and required frequency resolution, either from ``hybrid_resolution`` or ``min_frequency``. 750 751 Raises 752 ------ 753 ValueError 754 If the processing settings are not compatible, e.g., 755 - frequency bands with bandwidth smaller than the frame duration allows 756 """ 757 758 def __init__( 759 self, 760 *, 761 bands_per_decade=None, 762 frame_step=None, 763 frame_duration=None, 764 frame_overlap=0.5, 765 min_frequency=None, 766 max_frequency=None, 767 hybrid_resolution=None, 768 fft_window="hann", 769 ): 770 self.frame_duration = frame_duration 771 self.frame_overlap = frame_overlap 772 self.frame_step = frame_step 773 self.fft_window = fft_window 774 self.bands_per_decade = bands_per_decade 775 self.min_frequency = min_frequency 776 self.max_frequency = max_frequency 777 self.hybrid_resolution = hybrid_resolution 778
[docs] 779 def rolling(self, time_data): 780 roller = FilterbankRoller( 781 time_data=time_data, 782 duration=self.frame_duration, 783 step=self.frame_step, 784 overlap=self.frame_overlap, 785 min_frequency=self.min_frequency, 786 max_frequency=self.max_frequency, 787 bands_per_decade=self.bands_per_decade, 788 hybrid_resolution=self.hybrid_resolution, 789 fft_window=self.fft_window, 790 ) 791 return roller
792
[docs] 793 def __call__(self, time_data): 794 """Process time data to spectrograms. 795 796 Parameters 797 ---------- 798 time_data : `~uwacan.TimeData` or `~uwacan.recordings.AudioFileRecording` 799 The data to process. 800 801 Returns 802 ------- 803 filtered_data : `~uwacan.TimeFrequencyData` 804 """ 805 roller = self.rolling(time_data) 806 return next(roller.batches(roller.num_frames))