Source code for pycartool.ris.source_estimate

# -*- coding: utf-8 -*-
# Authors: Victor Férat <victor.ferat@live.fr>
#
# License: BSD (3-clause)
import struct

import numpy as np

from ..spi.source_space import SourceSpace, write_spi
from ..utils._docs import fill_doc
from ..utils._logs import logger, verbose


def _check_method(method):
    if method not in ["svd", "mean", "median"]:
        raise ValueError("Method must be either svd, mean, or median")


def _check_sources_tc(sources_tc):
    if not isinstance(sources_tc, np.ndarray):
        raise TypeError(f"sources_tc must be an instance of numpy.ndarray")
    if sources_tc.ndim != 3:
        raise ValueError(
            f"sources_tc must be of shape"
            f"(n_solutionpoints, n_dim, n_timeframes)"
        )
    if sources_tc.shape[1] not in [1, 3]:
        raise ValueError(
            f"sources_tc.shape[1] must be either 1 ( scalar) "
            f"or 3 (vectorial)"
        )
    return sources_tc


[docs]@fill_doc @verbose def read_ris(filename, source_space=None, subject=None, verbose=None): """Read Cartool Results of Inverse Solution computation (``.ris``) file. Parameters ---------- filename : str or file-like The ``.ris`` file to read. source_space : `pycartool.spi.SourceSpace` The source space corresponding to the source estimate. subject : str or file-like The subject used to create the source estimate. %(verbose)s Returns ------- source_estimate : `pycartool.ris.SourceEstimate` The SourceEstimate extracted from ris file. """ with open(filename, "rb") as f: logger.info(f"Reading {filename}") logger.info(f"Reading Header...") ris_type = [ struct.unpack("c", f.read(1))[0].decode("utf-8") for i in range(4) ] ris_type = "".join(ris_type) if ris_type not in ["RI01"]: raise ValueError( f"{ris_type} : Invalid RI type, please check that" f" input file is a Result of Inverse Solution " f" computation" ) logger.info(f"IS type: {ris_type}") n_solutionpoints = struct.unpack("I", f.read(4))[0] logger.info(f"n_solutionpoints: {n_solutionpoints}") n_timeframes = struct.unpack("I", f.read(4))[0] logger.info(f"n_timeframes: {n_timeframes}") s_freq = struct.unpack("f", f.read(4))[0] logger.info(f"Samplimg frequency: {s_freq}") isinversescalar = struct.unpack("c", f.read(1))[0] logger.info(isinversescalar) if isinversescalar == b"\x01": n_dim = 1 logger.info(f"Result of Inverse Solution computation is Scalar") elif isinversescalar == b"\x00": logger.info(f"Result of Inverse Solution computation is Vectorial") n_dim = 3 else: raise ValueError( f"isinversescalar must be either 1 for scalar, " f"either 0 for vectorial, but " f"{ord(isinversescalar)} found." ) buf = f.read(n_dim * n_solutionpoints * n_timeframes * 4) data = np.frombuffer(buf, dtype=np.float32) data = data.reshape(n_timeframes, n_solutionpoints, n_dim) data = np.swapaxes(data, 1, 2) source_estimate = SourceEstimate( data.T, s_freq, source_space=source_space, subject=subject, filename=filename, ) return source_estimate
[docs]def write_ris(source_estimate, filename): """Write SourceEstimate ``.ris`` instance to file. Parameters ---------- source_estimate : `~pycartool.ris.SourceEstimate` The SourceEstimate to save as a ``.ris`` file. filename : str or file-like Filename of the exported inverse solution computation. """ data = source_estimate.sources_tc.T sfreq = source_estimate.sfreq if not isinstance(data, np.ndarray): raise TypeError("Input data must be a ndarray") if data.ndim != 3: raise ValueError("Input data must be a 3D array") if data.shape[1] == 1: isinversescalar = b"\x01" elif data.shape[1] == 3: isinversescalar = b"\x00" else: raise ValueError( "Input data must have shape (_,1,_) (scalar)" " or (_,3,_) (vectorial)" ) ris_type = "RI01" n_timeframes = data.shape[0] n_solutionpoints = data.shape[2] f = open(filename, "wb") f.write(ris_type.encode("utf-8")) f.write(struct.pack("I", n_solutionpoints)) f.write(struct.pack("I", n_timeframes)) f.write(struct.pack("f", sfreq)) f.write(struct.pack("c", isinversescalar)) if isinversescalar == b"\x00": data = np.swapaxes(data, 1, 2) else: data = np.swapaxes(data, 0, 1) data = data.astype(np.float32) data.tofile(f) f.close()
[docs]class SourceEstimate(object): """Container for source estimate data. Parameters ---------- sources_tc : `~numpy.array` The sources time courses. Can be either scalar (``ndim=1``) or vectorial (``ndim=3``). shape (``n_solution_points``, ``n_dim``, ``n_timeframes``). sfreq : float The sampling frequency. source_space : `pycartool.spi.SourceSpace` The SourceSpace corresponding to the source estimate. subject : str The subject used to create the source estimate. filename : str Filename from which the source estimate was imported. Attributes ---------- is_scalar : bool True if estimate is scalar, False is estimate is vectorial. n_sources : int Number of sources. sources_tc : `~numpy.array` The sources time courses. Can be either scalar (``ndim=1``) or vectorial (``ndim=3``). shape (``n_solution_points``, ``n_dim``, ``n_timeframes``). source_space : `pycartool.spi.SourceSpace` The SourceSpace corresponding to the source estimate. subject : str The subject used to create the source estimate. filename : str Filename from which the source estimate was imported. """ def __init__( self, sources_tc, sfreq, source_space=None, subject=None, filename=None ): _check_sources_tc(sources_tc) # Check that source estimate correspond to source space if source_space is not None: if not isinstance(source_space, SourceSpace): raise TypeError( f"sourcespace must be an instance" f" of SourceSpace." ) if not len(source_space.names) == sources_tc.shape[0]: raise ValueError( f"Expect {len(source_space.names)} time " f" courses from Source space, but found only " f"{sources_tc.shape[0]} in sources_tc" ) if sources_tc.shape[1] == 1: self.is_scalar = True elif sources_tc.shape[1] == 3: self.is_scalar = False self.n_sources = sources_tc.shape[0] self.sources_tc = sources_tc self.source_space = source_space self.sfreq = sfreq self.subject = subject self.filename = filename def __repr__(self): # noqa: D401 """String representation.""" if self.is_scalar is True: s = f"Scalar, " else: s = f"Vectorial, " s += f"{self.n_sources} sources, sfreq : {self.sfreq}" if self.subject is not None: s += f", subject : {self.subject}" if self.filename is not None: s += f", filename : {self.filename}" return f"<SourceEstimate or {s}>"
[docs] def save(self, filename, export_spi=False): """Write SourceEstimate to Cartool ris file. Parameters ---------- filename : str or file-like The ris file to write. export_spi : bool If True, also export the corresponding source space as ris file. """ write_ris(self, filename) if export_spi is True: if self.source_space is not None: write_spi(filename[:-3] + ".spi", self.source_space) else: raise ValueError( "Cannot save source space to file, source " "space is not defined" )
[docs] def per_roi(self, region_of_interest): """Get source in specific rois. Parameters ---------- region_of_interest : `pycartool.rois.RegionsOfInterest` The region of interest used to split the source estimate. Returns ------- rois_source_estimate : list of `pycartool.ris.SourceEstimate` A list of Source estimate instance restricted to a Region of interest. """ # Check is SourceSpace are the equals. if self.source_space is None: raise ValueError("Source spaces must be defined") if region_of_interest.source_space is None: raise ValueError("Source spaces must be defined") if self.source_space != region_of_interest.source_space: raise ValueError( "Source estimate and Regions of interest must " "share the same source space" ) rois_source_estimate = [] rois_names = region_of_interest.names for r, _ in enumerate(rois_names): indices = region_of_interest.groups_of_indexes[r] roi_sources_tc = self.sources_tc[indices, :, :] roi_sources_pos = self.source_space.coordinates[indices] roi_sources_names = [self.source_space.names[i] for i in indices] source_space = SourceSpace(roi_sources_names, roi_sources_pos) source_estimate = SourceEstimate( roi_sources_tc, self.sfreq, source_space=source_space, subject=self.subject, filename=None, ) rois_source_estimate.append(source_estimate) return rois_source_estimate
[docs] def compute_tc(self, method="svd"): """Compute time course. Parameters ---------- method : str The method use to compute the time course. Can be either 'mean', 'median' or 'svd'. Default to 'svd'. Returns ------- tc : `~numpy.array` The global source estimate time course. Can be either Vectorial or Scalar depending of the source estimate and the method. shape(``n_dim``, ``n_times``). """ _check_method(method) if method == "median": tc = np.median(self.sources_tc, axis=0) elif method == "mean": tc = np.mean(self.sources_tc, axis=0) elif method == "svd": n_times = self.sources_tc.shape[-1] sources_tc_flat = self.sources_tc.reshape(-1, n_times) U, s, V = np.linalg.svd(sources_tc_flat, full_matrices=False) scale = np.linalg.norm(s) / np.sqrt(len(sources_tc_flat)) tc = np.array([scale * V[0]]) pos = self.source_space.coordinates pos_flat = pos.reshape(-1) v = np.multiply(U[:, 0], pos_flat).reshape(-1, 3).mean(axis=0) v = v / np.linalg.norm(v) return tc
[docs] def compute_rois_tc(self, region_of_interest, method="svd"): """Compute time course in Region of interest. Parameters ---------- region_of_interest : pycartool.rois.RegionsOfInterest The region of interest used to split the source estimate. method : str The method use to compute the time course. Can be either 'mean', 'median' or 'svd'. Default to 'svd'. Returns ------- Roi_source_estimate : `pycartool.ris.SourceEstimate` A source estimate instance where each source correspond to a region of interest. """ rois_names = region_of_interest.names rois_estimates = self.per_roi(region_of_interest) rois_tc = np.array( [roi.compute_tc(method=method) for roi in rois_estimates] ) rois_coordinates = np.array( [ estimate.source_space.get_center_of_mass() for estimate in rois_estimates ] ) Roi_source_space = SourceSpace(rois_names, rois_coordinates) Roi_source_estimate = SourceEstimate( rois_tc, self.sfreq, source_space=Roi_source_space ) return Roi_source_estimate