Source code for analyseur.rbcbg.stats.psd

# ~/analyseur/rbcbg/stats/psd.py
#
# Documentation by Lungsi 18 Nov 2025
#
# This contains function for loading the files
#
import numbers

import numpy as np
from scipy import signal

from analyseur.rbcbg.parameters import SignalAnalysisParams

[docs] class PowerSpectrum(object): """ Computes the power spectra (`Welch’s method <https://doi.org/10.1109/TAU.1967.1161901>`_ using `scipy.signal.welch <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.welch.html>`_) of the firing rates for all the neurons. +----------------------------------+-------------------------------------------------------------------------------------------------+ | Methods | Argument | +==================================+=================================================================================================+ | :py:meth:`.compute_spectrogram` | - `mu_rate_array`: see :meth:`~analyseur.rbcbg.loader.LoadRates.get_rates` | | | - `resolution` [OPTIONAL]: `~ 9.76 Hz = sampling_rate/1024` [default] | +----------------------------------+-------------------------------------------------------------------------------------------------+ | :py:meth:`.compute_for_rate` | - `mu_rate_array`: see :meth:`~analyseur.rbcbg.loader.LoadRates.get_rates` | | | - `binsz`: integer or float | | | - `method`: "welch" or "fft" or "fft-mag" | | | - `resolution` [OPTIONAL]: `~ 9.76 Hz = sampling_rate/1024` [default] | +----------------------------------+-------------------------------------------------------------------------------------------------+ - :py:meth:`.compute_spectrogram` computes a time-varying power spectrum, that is, power for each time window and hence the power spectrum evolving over time. - :py:meth:`.compute_for_rate` computes a single (global) power spectral density, that is, power averaged over the whole signal (no time dimension) and hence one summary spectrum for the entire signal. Therefore, :py:meth:`.compute_spectrogram` retains the time structure while :py:meth:`.compute_for_rate` collapses time by averaging. ========= Use Cases ========= ----------------- 1. Pre-requisites ----------------- 1.1. Import Modules ```````````````````` :: from analyseur.rbcbg.loader import LoadRates from analyseur.rbcbg.stats.psd import PowerSpectrum 1.2. Load file and get the firing rates ``````````````````````````````````````` :: loadFR = LoadRates("GPiSNr_model_9_percent_0.csv") t_sec, rates_Hz = loadFR.get_rates() --------- 2. Cases --------- 2.1. Time-varying power spectrum for a desired resolution ````````````````````````````````````````````````````````` :: freq_arr, time_arr, power_arr = PowerSpectrum.compute_spectrogram(rates_Hz, resolution=10) done for resolution 10 Hz. 2.2. Global power spectral density `````````````````````````````````` :: pow_spec = PowerSpectrum.compute_for_rate(mu_rate_array, binsz) doe for default resolution and Welch method (default). .. raw:: html <hr style="border: 2px solid red; margin: 20px 0;"> """ __siganal = SignalAnalysisParams() @staticmethod def __prepare_signal(x): x = np.asarray(x) if x.ndim == 2: if x.shape[0] >= x.shape[1]: x = x.mean(axis=1) else: x = x.mean(axis=0) return x.reshape(-1)
[docs] @classmethod def compute_for_rate(cls, mu_rate_array, binsz, resolution=None, method=None): """ Returns the power spectral density (or power spectrum) of firing rate from all. .. math:: \\hat{P}_{xx}(f) = \\frac{1}{K} \\sum_{m=1}^K S(m,f) is the average of the spectrogram :math:`S(m,f)` (see :py:meth:`.compute_spectrogram`) over time windows (Welch PSD). Thus, :math:`P(f)` is averaged over :math:`m` while :math:`S(m,f)` depends on time. :param mu_rate_array: array returned using :meth:`~analyseur.rbcbg.loader.LoadRates.get_rates` :param binsz: integer or float :param method: `"welch"` or `"fft"` or `"fft-mag"` :param resolution: `~ 9.76 Hz = sampling_rate/1024` [default] :return: a tuple in the following order - array of sample frequencies - power spectral density (or power spectrum) From :py:meth:`.compute_spectrogram` substituting for :math:`S(m,f)` the Welch PSD is expanded as .. math:: \\hat{P}_{xx}(f) = \\frac{1}{K} \\sum_{m=1}^K \\frac{1}{f_s||w||^2} \\cdot \\left|\\sum_{n=0}^{N-1}x[n+m]\\cdot w[n] \\cdot e^{-j2\\pi fn/f_s} \\right|^2 where :math:`N` is the segment length (`nperseg`), :math:`f_s` is the sampling frequency (`sample_rate`) and :math:`||w||^2 = \\sum_n w[n]^2` is the normalized window. Removing the windowing and segmentation we get the FFT PSD .. math:: P_{xx}(f) = \\frac{1}{f_s N} \\cdot \\left|\\sum_{n=0}^{N-1}x[n] \\cdot e^{-j2\\pi fn/f_s} \\right|^2 This is the power spectrum from the single whole global window. **NOTE:** This average (or single/global) power spectrum of the entire signal tells us the *overall frequency content*. .. raw:: html <hr style="border: 2px solid red; margin: 20px 0;"> """ mu_rate_array = cls.__prepare_signal(mu_rate_array) n = len(mu_rate_array) sampling_fs = 1.0 / binsz T = 1 / sampling_fs # ============== DEFAULT Parameters ============== # T = cls.__siganal.sampling_period # seconds # sampling_fs = 1 / T # Hz if resolution is None: points_per_segment = 1024 else: points_per_segment = sampling_fs / resolution if method is None or method=="welch": # Compute power spectrum using Welch's method freqs, power = signal.welch(mu_rate_array, sampling_fs, nperseg=points_per_segment) elif method=="fft": # Compute FFT fft_result = np.fft.fft(mu_rate_array) # Compute Frequencies (positive only) freqs = np.fft.fftfreq(n, T)[:n//2] # Compute Power Spectral Density power = (1.0 / (sampling_fs * n)) * np.abs(fft_result[:n//2]) ** 2 elif method=="fft-mag": # Compute FFT fft_result = np.fft.fft(mu_rate_array) # Compute Frequencies (positive only) freqs = np.fft.fftfreq(n, T)[:n // 2] # [Alternative] Compute Magnitude Spectrum power = (2.0 / n) * np.abs(fft_result[:n // 2]) return freqs, power
[docs] @staticmethod def compute_stft(rate_array, sample_rate, nperseg=256, noverlap=128): """ Computes short-time Fourier transform (STFT) :param rate_array: array returned using :meth:`~analyseur.rbcbg.loader.LoadRates.get_rates` :param sample_rate: float :param nperseg: int, Length of each segment :param noverlap: int, Overlaps between segments :return: a tuple in the following order - f: frequency vector - t: time vector for spectrogram - Sxx: spectrogram power (dB) **Mathematically**, the array of firing rates :math:`x[n]` is segmented into overlapping windows and according to the window position :math:`m` (time index) and angular frequency :math:`\\omega` it is Fourier transformed as .. math:: X(m,\\omega) = \\sum_{n=-\\infty}^\\infty x[n]\\cdot w[n-m] \\cdot e^{-j\\omega n} where the window function :math:`w[n]` is given by the `Hann window <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.windows.hann.html>`_ .. math:: w[n] = 0.5\\left(1 - cos\\left(\\frac{2\\pi n}{N-1}\\right)\\right) where :math:`N` is the segment length (`nperseg`). Then the spectrogram is the squared magnitude of the STFT .. math:: S(m,\\omega) &= |X(m,\\omega)|^2 \n S(m, f) &= \\frac{1}{f_s||w||^2} \\cdot \\left|\\sum_{n=0}^{N-1}x[n+m]\\cdot w[n] \\cdot e^{-j2\\pi fn/f_s} \\right|^2 where :math:`f_s` is the sampling frequency (`sample_rate`) and :math:`||w||^2 = \\sum_n w[n]^2` is the normalized window. Finally, for :math:`k=0,1,\\ldots,N/2` the frequency bins are .. math:: f_k = \\frac{k}{N} \\cdot f_s the time bins are .. math:: t_m = \\frac{m(N-\\text{noverlap})}{f_s} and the power is converted to decibels .. math:: S_\\text{dB}(m,f) = 10 log_{10}(S(m,f) + \\epsilon) where :math:`\\epsilon = 10^{-10}` is a small constant to avoid :math:`log(0)` Note that the no overlap parameter `noverlap` when subtracted from the segment length :math:`N` yields the *hop size* (step size) :math:`= N - \\text{noverlap}`. It acts like a sliding window such that .. code-block:: text noverlap = 0 [--------][--------][--------] but with the presence of overlap .. code-block:: text noverlap = 128 [--------] [--------] [--------] resulting in more redundancy but better continuity and hence smoother time resolution. .. raw:: html <hr style="border: 2px solid red; margin: 20px 0;"> """ f, t_spec, Sxx = signal.spectrogram( rate_array, fs=sample_rate, nperseg=nperseg, noverlap=noverlap, window="hann", scaling="density",) # Convert to dB and avoid log(0) Sxx_db = 10 * np.log10(Sxx + 1e-10) return f, t_spec, Sxx_db
[docs] @classmethod def compute_spectrogram(cls, rates_Hz, resolution=None): """ Compute spectrogram using STFT (see :meth:`.compute_stft`) :param rates_Hz: array returned using :meth:`~analyseur.rbcbg.loader.LoadRates.get_rates` :param resolution: `~ 9.76 Hz = sampling_rate/1024` [default] :return: a tuple in the following order - f: frequency vector - t: time vector for spectrogram - Sxx: spectrogram power (dB) **NOTE:** This time-varying power spectrum tells us *how frequencies evolve*. .. raw:: html <hr style="border: 2px solid red; margin: 20px 0;"> """ # ============== DEFAULT Parameters ============== sampling_rate = 1 / cls.__siganal.sampling_period if resolution is None: points_per_segment = 1024 else: points_per_segment = int(round(sampling_rate / resolution)) points_per_segment = max(2, points_per_segment) # Clamp to signal length points_per_segment = min(points_per_segment, len(rates_Hz)) noverlap_points_between_segments = points_per_segment // 2 noverlap_points_between_segments = min(noverlap_points_between_segments, points_per_segment - 1) # print("len:", len(rates_Hz), "nperseg:", points_per_segment, # "noverlap:", noverlap_points_between_segments) rates_Hz = cls.__prepare_signal(rates_Hz) f, t_spec, Sxx_db = cls.compute_stft(rates_Hz, sampling_rate, nperseg=points_per_segment, noverlap=noverlap_points_between_segments) return f, t_spec, Sxx_db # freq, time, power