Source code for analyseur.cbgtc.curate

# ~/analyseur/cbgtc/curate.py
#
# Documentation by Lungsi 17 Oct 2025
#

import re

import numpy as np

from analyseur.cbgtc.parameters import SignalAnalysisParams

siganal = SignalAnalysisParams()

# ==========================================
# get_desired_spiketimes_subset
# ==========================================

def __extract_neuron_no(neuron_id):
    match = re.search(r'n(\d+)', neuron_id)
    return int(match.group(1))

def __filter_spiketimes(desired_spiketimes_subset, window):
    in_window = lambda t_spike: window[0] <= t_spike <= window[1]

    filtered_spikes = [
        list(filter(in_window, indiv_spiketimes))
        for indiv_spiketimes in desired_spiketimes_subset
    ]

    return filtered_spikes

[docs] def get_desired_spiketimes_subset(spiketimes_set, window=None, neurons=None): """ Returns nested list of spike times (row-i for neuron ni, column-j for j-th spike time) and its associated yticks (list of neuron labels corresponding to the spike trains). :param spiketimes_set: Dictionary returned using :meth:`~analyseur.cbgtc.loader.LoadSpikeTimes.get_spiketimes_superset` or using :meth:`~analyseur.cbgtc.loader.LoadSpikeTimes.get_spiketimes_subset` :param window: Tuple in the form `(start_time, end_time)`; `(0, 10)` [default] :param neurons: `"all"` or `scalar` or `range(a, b)` or list of neuron ids like `[2, 3, 6, 7]` - `"all"` means subset = superset - `N` (a scalar) means subset of first N neurons in the superset - `range(a, b)` or `[2, 3, 6, 7]` means subset of selected neurons :return: 2-tuple; nested_list and label_list ========= Use Cases ========= ------------------ 1. Pre-requisites ------------------ 1.1. Import Modules ``````````````````` :: from analyseur.cbgtc.loader import LoadSpikeTimes from analyseur.cbgtc.curate import get_desired_spiketimes_subset 1.2. Load file and get spike times ``````````````````````````````````` :: loadST = LoadSpikeTimes("spikes_GPi.csv") spiketimes_superset = loadST.get_spiketimes_superset() --------- 2. Cases --------- 2.1. Convert spike times set to nested list of spike times `````````````````````````````````````````````````````````` :: [spiketimes_superlist, _] = get_desired_spiketimes_subset(spiketimes_superset) 2.2. Get nested list of spike times for desired neurons; specific range ``````````````````````````````````````````````````````````````````````` :: neurons = range(30, 62) # neuron id from "n30" to "n62" spiketimes_subset = LoadSpikeTimes.get_spiketimes_subset(spiketimes_superset, neurons=neurons) [spiketimes_nestedlist, neuron_labels] = get_desired_spiketimes_subset(spiketimes_subset, neurons="all") Alternatively, :: neurons = range(30, 62) # neuron id from "n30" to "n62" [spiketimes_nestedlist, neuron_labels] = get_desired_spiketimes_subset(spiketimes_superset, neurons=neurons) 2.3. Get nested list of spike times for desired neurons; specific list `````````````````````````````````````````````````````````````````````` :: neurons = [1, 2, 3, 6, 9, 10, 11, 21, 31] # neuron ids "n1", "n2", ..., "n21", "n31" spiketimes_subset = LoadSpikeTimes.get_spiketimes_subset(spiketimes_superset, neurons=neurons) [spiketimes_nestedlist, neuron_labels] = get_desired_spiketimes_subset(spiketimes_subset, neurons="all") Alternatively, :: neurons = [1, 2, 3, 6, 9, 10, 11, 21, 31] # neuron ids "n1", "n2", ..., "n21", "n31" [spiketimes_nestedlist, neuron_labels] = get_desired_spiketimes_subset(spiketimes_superset, neurons=neurons) 2.4. Get nested list of spike times for desired neurons; first N neurons ```````````````````````````````````````````````````````````````````````` :: N = 50 # first 50 neurons regardless of the neuron id spiketimes_subset = LoadSpikeTimes.get_spiketimes_subset(spiketimes_superset, neurons=N) [spiketimes_nestedlist, neuron_labels] = get_desired_spiketimes_subset(spiketimes_subset, neurons="all") Alternatively, :: N = 50 # first 50 neurons regardless of the neuron id neuron_ids = dict(list(spiketimes_superset.items())[:neurons]).keys() neurons = [int(item[1:]) for item in neuron_ids] [spiketimes_nestedlist, neuron_labels] = get_desired_spiketimes_subset(spiketimes_superset, neurons=neurons) Comments ```````` - In 2.2 to 2.4 the alternative method passes the mother set (superset) of spike times. - For 2.2 and 2.3 cases the method choice will depend on the use scenario. - But for 2.4 I prefer the first method (not alternative method) because it is more intuitive. .. raw:: html <hr style="border: 2px solid red; margin: 20px 0;"> """ # ============== DEFAULT Parameters ============== if neurons is None: neurons = "all" desired_spiketimes_subset = [] yticks = [] if neurons=="all": for nX, data in spiketimes_set.items(): desired_spiketimes_subset.append( list(data) ) # yticks.append( _extract_neuron_no(nX) ) yticks.append(nX) else: # neurons = range(a, b) or neurons = [1, 4, 5, 9] for i in neurons: neuron_id = "n" + str(i) desired_spiketimes_subset.append( list(spiketimes_set[neuron_id]) ) # yticks.append( _extract_neuron_no(neuron_id) ) yticks.append(neuron_id) if window is not None: desired_spiketimes_subset = __filter_spiketimes(desired_spiketimes_subset, window) return desired_spiketimes_subset, yticks
def __get_valid_indices(indiv_spiketimes, window, sampling_rate, num_samples): """ This function is essential because spiketimes are only recorded when spikes occur as opposed to recording for every time points. """ indices = ((indiv_spiketimes - window[0]) * sampling_rate).astype(int) valid_indices = indices[(indices >= 0) & (indices < num_samples)] return valid_indices # ========================================== # get_binary_spiketrains # ==========================================
[docs] def get_binary_spiketrains(spiketimes_set, window=None, sampling_rate=None, neurons=None): """ .. math:: s_i(t_k) = \\begin{cases} 1 & \\text{if neuron } i \\text{fires in bin } t_k \n 0 & \\text{otherwise} \\end{cases} where :math:`t_k = k \\Delta t` and :math:`\\Delta t = 1/\\text{sampling\\_rate}`. Returns nested list of spike trains (row-i for neuron ni, column-j for j-th spike time) and its associated yticks (list of neuron labels corresponding to the spike trains). :param spiketimes_set: Dictionary returned using :meth:`~analyseur.cbgtc.loader.LoadSpikeTimes.get_spiketimes_superset` or using :meth:`~analyseur.cbgtc.loader.LoadSpikeTimes.get_spiketimes_subset` :param neurons: `"all"` [default] or `scalar` or `range(a, b)` or list of neuron ids like `[2, 3, 6, 7]` - `"all"` means subset = superset - `N` (a scalar) means subset of first N neurons in the superset - `range(a, b)` or `[2, 3, 6, 7]` means subset of selected neurons :param window: Tuple in the form `(start_time, end_time)`; `(0, 10)` [default] :param sampling_rate: `1000/dt = 10000` Hz [default]; sampling_rate ∊ (0, 10000) :return: 3-tuple; nested_list, label_list and times_axis ========= Use Cases ========= ------------------ 1. Pre-requisites ------------------ 1.1. Import Modules ``````````````````` :: from analyseur.cbgtc.loader import LoadSpikeTimes from analyseur.cbgtc.curate import get_binary_spiketrains 1.2. Load file and get spike times ``````````````````````````````````` :: loadST = LoadSpikeTimes("spikes_GPi.csv") spiketimes_superset = loadST.get_spiketimes_superset() --------- 2. Cases --------- 2.1. Convert spike times set to nested list of binary spike trains `````````````````````````````````````````````````````````````````` :: [spiketrains_superlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_superset) 2.2. Get nested list of binary spike trains for desired neurons; specific range ``````````````````````````````````````````````````````````````````````````````` :: neurons = range(30, 62) # neuron id from "n30" to "n62" spiketimes_subset = LoadSpikeTimes.get_spiketimes_subset(spiketimes_superset, neurons=neurons) [spiketrains_nestedlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_subset, neurons="all") Alternatively, :: neurons = range(30, 62) # neuron id from "n30" to "n62" [spiketrains_nestedlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_superset, neurons=neurons) 2.3. Get nested list of binary spike trains for desired neurons; specific list `````````````````````````````````````````````````````````````````````````````` :: neurons = [1, 2, 3, 6, 9, 10, 11, 21, 31] # neuron ids "n1", "n2", ..., "n21", "n31" spiketimes_subset = LoadSpikeTimes.get_spiketimes_subset(spiketimes_superset, neurons=neurons) [spiketrains_nestedlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_subset, neurons="all") Alternatively, :: neurons = [1, 2, 3, 6, 9, 10, 11, 21, 31] # neuron ids "n1", "n2", ..., "n21", "n31" [spiketrains_nestedlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_superset, neurons=neurons) 2.4. Get nested list of binary spike trains for desired neurons; first N neurons ```````````````````````````````````````````````````````````````````````````````` :: N = 50 # first 50 neurons regardless of the neuron id spiketimes_subset = LoadSpikeTimes.get_spiketimes_subset(spiketimes_superset, neurons=N) [spiketrains_nestedlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_subset, neurons="all") Alternatively, :: N = 50 # first 50 neurons regardless of the neuron id neuron_ids = dict(list(spiketimes_superset.items())[:neurons]).keys() neurons = [int(item[1:]) for item in neuron_ids] [spiketrains_nestedlist, neuron_labels, time_axis] = get_binary_spiketrains(spiketimes_superset, neurons=neurons) .. raw:: html <hr style="border: 2px solid red; margin: 20px 0;"> """ # ============== DEFAULT Parameters ============== if sampling_rate is None: sampling_rate = 1 / siganal.sampling_period if window is None: window = siganal.window if neurons is None: neurons = "all" total_duration = window[1] - window[0] num_samples = int(total_duration * sampling_rate) time_axis = np.linspace(window[0], window[1], num_samples) num_neurons = len(spiketimes_set) yticks = [] if neurons=="all": spiketrains = np.zeros((num_neurons, num_samples)) row = 0 for nX, indiv_spiketimes in spiketimes_set.items(): index = __get_valid_indices(indiv_spiketimes, window, sampling_rate, num_samples) spiketrains[row, index] = 1.0 yticks.append(nX) row += 1 else: # neurons = range(a, b) or neurons = [1, 4, 5, 9] spiketrains = np.zeros((len(neurons), num_samples)) row = 0 for i in neurons: neuron_id = "n" + str(i) indiv_spiketimes = spiketimes_set[neuron_id] index = __get_valid_indices(indiv_spiketimes, window, sampling_rate, num_samples) spiketrains[row, index] = 1.0 # row != i because neurons can be = [13, 14, 15 ,16 ...] yticks.append(neuron_id) return spiketrains, yticks, time_axis