Source code for circust.cpca

r"""
circust/cpca.py
===============
Etapas 1.1 + 1.2: Ordenamiento circular (CPCA) y deteccion de outliers.

Que es CPCA
------------
PCA sobre los 12(o mas) genes core del reloj. Los loadings de PC1 y PC2 de cada
muestra se tratan como coordenadas (x, y) en el plano, el angulo(phi) se obtiene:

    phi = atan2(PC2, PC1)   en [0, 2pi)

Y su norma(LEi):

    norm12 = sqrt(PC1² + PC2²)

da un *ordenamiento circular* que refleja el ritmo circadiano subyacente.
Muestras cercanas al origen tienen norma pequena y son potenciales outliers.

Deteccion de outliers (Seccion 3.3 del paper suplementario de CIRCUST)
-----------------------------------------------------------------
Dos criterios en logica OR una muestra se elimina si viola cualquiera:

    1. Radial CPCA:  LEi < tight_radius (0.10)
       Muestras con distancia al origen en espacio PC1-PC2 muy pequena
       disrumpen la estructura circular y reflejan desalineaciones individuales.

    2. Residuos FMM: \|ri\| > fmm_threshold (3.0)  para cualquier gen core
       Detecta errores de medicion u otras anomalias comunes entre genes.

Si se detectan outliers, se eliminan, se renormaliza la matriz core y se
re-ejecuta CPCA para obtener el ordenamiento final limpio.

Posicion en el pipeline
-----------------------
    Preprocessor  ->  CPCA  ->  CircularSynchronizer  ->  ...
"""

from __future__ import annotations

import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional

from circust.core_genes import SEED_GENES_DEFAULT
from circust.fitting.fmm import FMMModel
from circust.fitting.cosinor import CosinorModel
from circust.fitting.rhythm_model import FitResult
from circust.preprocessing import normalise_matrix


# ===========================================================================
# Dataclass de resultado
# ===========================================================================

[docs] @dataclass class CPCAResult: r""" Salida de :class:`CPCA` — incluye el ordenamiento circular final y la deteccion de outliers (Etapas 1.1 + 1.2 del pipeline). Campos del pipeline (usados por etapas posteriores) --------------------------------------------------- sample_order : np.ndarray de int, forma (n_muestras,) Indices que ordenan las muestras por fase circular final. circular_scale : np.ndarray de float, forma (n_muestras,) Valores de phi ordenados en [0, 2pi). Eje temporal para ajustes. core_genes_found : list[str] Genes centrales presentes en la matriz y usados para CPCA. variance_explained : np.ndarray de float, forma (3,) Fraccion de varianza explicada por PC1, PC2, PC3 (CPCA final). expr_norm_final : pd.DataFrame, forma (n_genes, n_muestras_limpias) Matriz de expresion completa con muestras outlier eliminadas, reordenada por fase circular y renormalizada. core_norm_final : pd.DataFrame, forma (n_genes_core, n_muestras_limpias) Submatriz core normalizada, sin outliers, ordenada por fase. fmm_fits_final : dict[str, FitResult] Ajustes FMM sobre genes core usando el ordenamiento final. fmm_peak_times_final : dict[str, float] Tiempos de pico FMM (via compUU) para cada gen core (final). samples_dropped : list[int] Indices originales de columna de las muestras eliminadas. fmm_outliers : list[int] Subconjunto de samples_dropped detectados exclusivamente por el criterio de residuos FMM (\|ri\| > fmm_threshold). Campos de diagnostico (para visualizacion) ------------------------------------------ pc1, pc2, pc3 : np.ndarray Loadings de PC1/PC2/PC3 del CPCA final (un valor por muestra). std_residuals_fmm : pd.DataFrame o None Residuos FMM estandarizados (genes+PCs x muestras, orden inicial). fmm_fits_initial : dict[str, FitResult] Ajustes FMM sobre genes core + PC1/PC2/PC3 en orden CPCA inicial. cosinor_fits_initial : dict[str, FitResult] Ajustes Cosinor sobre genes core + PC1/PC2/PC3 en orden inicial. """ # ── Pipeline ───────────────────────────────────────────────────────── sample_order: np.ndarray circular_scale: np.ndarray core_genes_found: list[str] variance_explained: np.ndarray expr_norm_final: pd.DataFrame core_norm_final: pd.DataFrame fmm_fits_final: dict fmm_peak_times_final: dict samples_dropped: list[int] = field(default_factory=list) fmm_outliers: list[int] = field(default_factory=list) # ── Diagnostico ─────────────────────────────────────────────────────── pc1: np.ndarray = field(default_factory=lambda: np.array([])) pc2: np.ndarray = field(default_factory=lambda: np.array([])) pc3: np.ndarray = field(default_factory=lambda: np.array([])) std_residuals_fmm: Optional[pd.DataFrame] = None fmm_fits_initial: dict = field(default_factory=dict) cosinor_fits_initial: dict = field(default_factory=dict) pc1_initial: np.ndarray = field(default_factory=lambda: np.array([])) pc2_initial: np.ndarray = field(default_factory=lambda: np.array([]))
[docs] def summary(self) -> str: n_cpca = len(self.samples_dropped) - len(self.fmm_outliers) lines = [ "=== Resumen CPCA + Deteccion de Outliers ===", f" Genes core usados : {len(self.core_genes_found)} {self.core_genes_found}", f" Varianza PC1 / PC2 / PC3 : " f"{self.variance_explained[0]:.1%} / " f"{self.variance_explained[1]:.1%} / " f"{self.variance_explained[2]:.1%}", f" Muestras eliminadas : {len(self.samples_dropped)}", f" Criterio CPCA radial : {n_cpca}", f" Criterio FMM residuos : {len(self.fmm_outliers)}", f" Muestras finales : {self.expr_norm_final.shape[1]}", ] return "\n".join(lines)
# =========================================================================== # Clase CPCA # ===========================================================================
[docs] class CPCA: r""" Etapas 1.1 y 1.2: ordenamiento circular de muestras y deteccion de outliers residuales, segun la Seccion 3.3 del suplementario de CIRCUST. Parametros — CPCA ----------------- core_genes : list[str], opcional Genes ancla circadianos. Por defecto los 12 del articulo CIRCUST. n_outlier_candidates : int Muestras a examinar como candidatas a outlier (menor norma PC1-PC2). Por defecto: 8. tight_radius : float Umbral radial primario. Muestras con LEi <= tight_radius se confirman como outliers CPCA. Por defecto: 0.10. loose_radius : float Umbral de respaldo cuando ninguna muestra califica con tight_radius. Por defecto: 0.15. Parametros — deteccion de outliers FMM --------------------------------------- fmm_threshold : float Umbral de residuo FMM estandarizado. Muestras con \|ri\| > fmm_threshold en cualquier gen core se declaran outliers. Por defecto: 3.0. max_outlier_fraction : float Limite maximo de muestras eliminables: ceil(fraccion x n_muestras). Por defecto: 0.05. Parametros — ajuste FMM ------------------------ fmm_length_alpha_grid : int Resolucion de rejilla para el parametro alfa de FMM. Por defecto: 48. fmm_length_omega_grid : int Resolucion de rejilla para el parametro omega de FMM. Por defecto: 24. fmm_num_reps : int Iteraciones de refinamiento FMM. Por defecto: 3. verbose : bool Imprimir mensajes de progreso. Example ------- >>> from circust.preprocessing import load_expression_matrix, Preprocessor >>> from circust.cpca import CPCA >>> >>> matrix = load_expression_matrix("data/raw/expression.csv") >>> prep = Preprocessor().run(matrix) >>> result = CPCA().run(prep.expr_norm) >>> print(result.summary()) """ def __init__( self, core_genes: list[str] = None, # CPCA n_outlier_candidates: int = 8, tight_radius: float = 0.10, loose_radius: float = 0.15, # Outlier detection fmm_threshold: float = 3.0, max_outlier_fraction: float = 0.05, # FMM fitting fmm_length_alpha_grid: int = 48, fmm_length_omega_grid: int = 24, fmm_num_reps: int = 3, verbose: bool = True, ) -> None: self.core_genes = core_genes if core_genes is not None else SEED_GENES_DEFAULT self.n_outlier_candidates = n_outlier_candidates self.tight_radius = tight_radius self.loose_radius = loose_radius self.fmm_threshold = fmm_threshold self.max_outlier_fraction = max_outlier_fraction self.fmm_length_alpha_grid = fmm_length_alpha_grid self.fmm_length_omega_grid = fmm_length_omega_grid self.fmm_num_reps = fmm_num_reps self.verbose = verbose # ----------------------------------------------------------------------- # API publica # -----------------------------------------------------------------------
[docs] def run(self, expr_norm: pd.DataFrame) -> CPCAResult: """ Ejecuta CPCA + deteccion de outliers sobre la matriz completa. Parameters ---------- expr_norm : pd.DataFrame, forma (n_genes, n_muestras) Matriz de expresion normalizada completa (valores en [-1, 1]). Es ``PreprocessingResult.expr_norm``. Returns ------- CPCAResult """ self._log("=== Etapa 1.1: CPCA ===") # ── Paso 1: extraer submatriz de genes core ─────────────────────── core_matrix, genes_found = self._extract_core_genes(expr_norm) # ── Paso 2: CPCA inicial ────────────────────────────────────────── (sample_order, circular_scale, pc1, pc2, pc3, var_exp, outlier_idx) = self._cpca_step(core_matrix) self._log( f" Varianza PC1/PC2/PC3: {var_exp[0]:.1%} / " f"{var_exp[1]:.1%} / {var_exp[2]:.1%}" ) self._log(f" Outliers radiales CPCA (LEi < {self.tight_radius}): {len(outlier_idx)}") # ── Paso 3: ajuste FMM + Cosinor en orden inicial ──────────────── self._log("=== Etapa 1.2: Deteccion de Outliers ===") self._log(" Ajustando FMM y Cosinor en genes core (orden inicial) ...") fmm_fits_ini, cos_fits_ini, std_res_df = self._fit_initial( core_matrix, genes_found, sample_order, circular_scale, pc1, pc2, pc3, ) # ── Paso 4: detectar outliers (CPCA OR FMM residuos) ───────────── cpca_outs, fmm_outs, all_dropped = self._detect_outliers( outlier_idx, sample_order, std_res_df, genes_found, ) self._log(f" Criterio CPCA radial : {len(cpca_outs)} muestra(s)") self._log(f" Criterio FMM residuos : {len(fmm_outs)} muestra(s)") self._log(f" Total eliminados : {len(all_dropped)}") # Guardar proyecciones iniciales (antes de posible re-ejecucion) # para visualizacion del scatter con outliers pc1_initial = pc1.copy() pc2_initial = pc2.copy() # ── Paso 5: limpiar + re-ejecutar CPCA si hay outliers ─────────── if all_dropped: self._log(" Re-ejecutando CPCA sobre datos limpios ...") core_clean = self._drop_samples(core_matrix, all_dropped) core_norm_clean = normalise_matrix(core_clean) (sample_order, circular_scale, pc1, pc2, pc3, var_exp, _) = self._cpca_step(core_norm_clean) else: self._log(" Sin outliers — manteniendo CPCA inicial.") core_norm_clean = core_matrix # ── Paso 6: ordenar y renormalizar la matriz completa ───────────── self._log(" Ordenando y renormalizando la matriz completa ...") expr_norm_final = self._order_full_matrix(expr_norm, all_dropped, sample_order) # ── Paso 7: ajuste FMM final sobre genes core ───────────────────── self._log(" Ajustando FMM en genes core (orden final) ...") fmm_fits_fin, fmm_peaks_fin = self._fit_final( expr_norm_final, genes_found, circular_scale, ) result = CPCAResult( sample_order = sample_order, circular_scale = circular_scale, core_genes_found = genes_found, variance_explained = var_exp, expr_norm_final = expr_norm_final, core_norm_final = core_norm_clean.iloc[:, sample_order], fmm_fits_final = fmm_fits_fin, fmm_peak_times_final = fmm_peaks_fin, samples_dropped = list(all_dropped), fmm_outliers = fmm_outs, # diagnostico pc1 = pc1, pc2 = pc2, pc3 = pc3, std_residuals_fmm = std_res_df, fmm_fits_initial = fmm_fits_ini, cosinor_fits_initial = cos_fits_ini, pc1_initial = pc1_initial, pc2_initial = pc2_initial, ) self._log(result.summary()) return result
# ----------------------------------------------------------------------- # Pasos privados # ----------------------------------------------------------------------- def _extract_core_genes( self, expr_norm: pd.DataFrame, ) -> tuple[pd.DataFrame, list[str]]: """Selecciona las filas de genes core de la matriz normalizada.""" genes_found, missing = [], [] for gene in self.core_genes: (genes_found if gene in expr_norm.index else missing).append(gene) if missing: self._log( f" AVISO: {len(missing)} gen(es) core no encontrado(s): {missing}" ) if len(genes_found) < 2: raise ValueError( f"CPCA requiere al menos 2 genes core. " f"Solo se encontraron: {genes_found}." ) self._log( f" Genes core extraidos: {len(genes_found)} / {len(self.core_genes)}" ) return expr_norm.loc[genes_found], genes_found def _cpca_step( self, core_matrix: pd.DataFrame, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Ejecuta un paso de CPCA sobre la matriz core normalizada. Returns ------- (sample_order, circular_scale, pc1, pc2, pc3, var_exp, outlier_idx) """ values = core_matrix.values.astype(float) n_genes, n_samples = values.shape # Centrar por filas, escalar por columnas (R: prcomp scale.=TRUE center=FALSE) centred = values - values.mean(axis=1, keepdims=True) col_rms = np.sqrt(np.sum(centred**2, axis=0) / (n_genes - 1)) col_rms[col_rms == 0] = 1.0 scaled = centred / col_rms n_comp = min(3, n_genes, n_samples) _, sigma_all, Vt = np.linalg.svd(scaled, full_matrices=False) pc1 = Vt[0] pc2 = Vt[1] pc3 = Vt[2] if n_comp >= 3 else np.zeros(n_samples) var_exp = np.zeros(3) var_exp[:n_comp] = (sigma_all[:n_comp]**2) / np.sum(sigma_all**2) norm12 = np.sqrt(pc1**2 + pc2**2) safe_norm = np.where(norm12 == 0, 1.0, norm12) phi = np.arctan2(pc2 / safe_norm, pc1 / safe_norm) % (2 * np.pi) sample_order = np.argsort(phi) circular_scale = phi[sample_order] # Outliers radiales: n_outlier_candidates menores normas n_cands = min(self.n_outlier_candidates, n_samples) cand_idx = np.argsort(norm12)[:n_cands] tight_mask = norm12[cand_idx] <= self.tight_radius if tight_mask.any(): outlier_idx = cand_idx[tight_mask] else: loose_mask = norm12[cand_idx] <= self.loose_radius outlier_idx = cand_idx[loose_mask] return sample_order, circular_scale, pc1, pc2, pc3, var_exp, outlier_idx def _fit_initial( self, core_matrix: pd.DataFrame, genes_found: list[str], sample_order: np.ndarray, circular_scale: np.ndarray, pc1: np.ndarray, pc2: np.ndarray, pc3: np.ndarray, ) -> tuple[dict, dict, pd.DataFrame]: """ Ajusta FMM y Cosinor en genes core + eigengenes (PC1/PC2/PC3) usando el ordenamiento CPCA inicial. Returns ------- fmm_fits : {nombre: FitResult} cos_fits : {nombre: FitResult} std_res_df : DataFrame (senales x muestras) de residuos estandarizados """ cm_vals = core_matrix.values signals: dict[str, np.ndarray] = {} for i, gene in enumerate(genes_found): signals[gene] = cm_vals[i, sample_order] signals["PC1"] = pc1[sample_order] signals["PC2"] = pc2[sample_order] signals["PC3"] = pc3[sample_order] fmm_model = FMMModel( length_alpha_grid = self.fmm_length_alpha_grid, length_omega_grid = self.fmm_length_omega_grid, num_reps = self.fmm_num_reps, ) cos_model = CosinorModel() fmm_fits: dict[str, FitResult] = {} cos_fits: dict[str, FitResult] = {} std_res_rows: list[np.ndarray] = [] signal_names: list[str] = [] n_total = len(signals) for idx, (name, data) in enumerate(signals.items(), 1): self._log(f" [{idx}/{n_total}] {name}", end="\r") fr = fmm_model.fit(data, circular_scale) cr = cos_model.fit(data, circular_scale) fmm_fits[name] = fr cos_fits[name] = cr std_res_rows.append(fr.residuals_std) signal_names.append(name) self._log(f" Ajuste completado: {n_total} senales. ") std_res_df = pd.DataFrame( np.vstack(std_res_rows), index = signal_names, columns = np.arange(len(circular_scale)), ) return fmm_fits, cos_fits, std_res_df def _detect_outliers( self, outlier_idx: np.ndarray, sample_order: np.ndarray, std_res_df: pd.DataFrame, genes_found: list[str], ) -> tuple[list[int], list[int], list[int]]: r""" Aplica criterios de outlier en logica OR (Sec. 3.3 suplementario): 1. CPCA radial (LEi < tight_radius): ya en outlier_idx. 2. FMM residuos (\|ri\| > fmm_threshold): para cualquier gen core. Returns ------- cpca_outs : eliminados por criterio radial fmm_outs : eliminados solo por criterio FMM all_dropped: union ordenada (con tope max_outlier_fraction) """ n_samples = len(sample_order) cap = int(np.ceil(self.max_outlier_fraction * n_samples)) # Criterio 1: CPCA radial cpca_set = set(outlier_idx.tolist()) # Criterio 2: residuos FMM sobre genes core (no eigengenes) core_res = std_res_df.loc[genes_found].values # (n_genes, n_samples) fmm_cands: list[int] = [] for i in range(len(genes_found)): for pos in np.where(np.abs(core_res[i]) > self.fmm_threshold)[0]: fmm_cands.append(int(sample_order[pos])) # Union OR con tope seen: set[int] = set() all_dropped: list[int] = [] for idx in list(outlier_idx) + fmm_cands: if idx not in seen: seen.add(idx) all_dropped.append(idx) if len(all_dropped) >= cap: break cpca_outs = [i for i in all_dropped if i in cpca_set] fmm_outs = [i for i in all_dropped if i not in cpca_set] return cpca_outs, fmm_outs, all_dropped def _drop_samples( self, matrix: pd.DataFrame, dropped: list[int], ) -> pd.DataFrame: """Elimina columnas por indice original de muestra.""" all_cols = np.arange(matrix.shape[1]) keep_mask = ~np.isin(all_cols, dropped) return matrix.iloc[:, keep_mask] def _order_full_matrix( self, expr_norm: pd.DataFrame, dropped: list[int], sample_order: np.ndarray, ) -> pd.DataFrame: """ Elimina outliers de la matriz completa, reordena por fase circular final y renormaliza gen a gen. """ mat_clean = self._drop_samples(expr_norm, dropped) mat_ordered = mat_clean.iloc[:, sample_order] return normalise_matrix(mat_ordered) def _fit_final( self, expr_norm_final: pd.DataFrame, genes_found: list[str], circular_scale: np.ndarray, ) -> tuple[dict, dict]: """ Ajusta FMM en cada gen core con el ordenamiento final. Returns ------- fmm_fits_final : {gen: FitResult} peak_times : {gen: float} """ fmm_model = FMMModel( length_alpha_grid = self.fmm_length_alpha_grid, length_omega_grid = self.fmm_length_omega_grid, num_reps = self.fmm_num_reps, ) fmm_fits: dict[str, FitResult] = {} peaks: dict[str, float] = {} n_total = len(genes_found) for idx, gene in enumerate(genes_found, 1): self._log(f" [{idx}/{n_total}] {gene}", end="\r") if gene not in expr_norm_final.index: self._log(f" AVISO: {gene} no en la matriz final, omitido.") continue data = expr_norm_final.loc[gene].values fr = fmm_model.fit(data, circular_scale) fmm_fits[gene] = fr peaks[gene] = fr.peak_time self._log(f" Ajuste final completado: {len(fmm_fits)} genes core. ") return fmm_fits, peaks # ----------------------------------------------------------------------- # Utilidades # ----------------------------------------------------------------------- def _log(self, message: str, end: str = "\n") -> None: if self.verbose: print(message, end=end, flush=True)