Source code for circust.synchronizer

"""
circust/synchronizer.py
=======================
Etapa 2: Sincronizacion del orden circular.

Objetivo
--------
El ordenamiento circular derivado de CPCA es invariante a rotacion y tiene
una direccion arbitraria (sentido horario o antihorario). Esta etapa corrige
ambos problemas estableciendo un marco de referencia biologico.

Modalidades
-----------
``mode="manual"`` (implementada)
    Sincronizacion manual basada en genes ancla conocidos:

    * Un gen de referencia (``anchor_gene``, por defecto ARNTL) se coloca en
      π en la escala circular — su pico define el punto de arranque.
    * Un gen de direccion (``direction_gene``, por defecto DBP) determina la
      orientacion (CW vs CCW): si su pico cae en la mitad incorrecta se invierte.
    * Un gen de consistencia (``consistency_gene``, por defecto CRY1) participa
      en una verificacion de consistencia biologica refinada (Paso 2.2).

    Paso 2.1 — Rotacion y determinacion de direccion:
        1. Rotar escala circular: peak(anchor) → π.
        2. Si (direction_peak − anchor_peak + π) mod 2π ≥ π → invertir.
        3. Reparametrizar ajustes FMM de genes core en el nuevo marco.
        4. Clasificar genes core como *dia* (pico ∈ [0, π)) o *noche* (pico ∈ [π, 2π)).

    Paso 2.2 — Verificacion de consistencia biologica:
        Si direction_gene esta demasiado cerca de 0 o π, o menos de la mitad
        de los genes sin anchor/direction tienen pico en [0, π), y el orden
        direction < consistency < anchor NO se cumple, entonces invertir.

``mode="hybrid"`` (reservado para implementacion futura)
    Sincronizacion hibrida in vivo / post mortem inspirada en el molecular
    timetable (Ueda et al., 2004; Higashi et al., 2016). Formula la alineacion
    como un problema de minimizacion en el circulo: busca la rotacion y
    orientacion que maximizan la concordancia entre los tiempos de pico
    estimados en el tejido objetivo (post mortem) y los tiempos de referencia
    procedentes de tejido in vivo con etiqueta temporal.
    Ver :meth:`CircularSynchronizer._hybrid_sync` para la API prevista.

Posicion en el pipeline
-----------------------
    OutlierRefiner  →  CircularSynchronizer  →  (Etapa 3)
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from math import pi

from circust.cpca import CPCAResult
from circust.fitting.fmm import FMMModel
from circust.fitting.rhythm_model import FitResult


# ---------------------------------------------------------------------------
# Dataclass de resultado
# ---------------------------------------------------------------------------

[docs] @dataclass class SynchronizationResult: """ Salida combinada de la Etapa 2. Attributes ---------- sample_order : np.ndarray de int, forma (n_muestras,) Indices finales de muestra en la matriz de expresion limpia. Equivalente en R: ``basicOrdRefG2[[1]]`` (= ``indSin``). circular_scale : np.ndarray de float, forma (n_muestras,) Eje temporal circular en [0, 2π) tras correccion de direccion. Equivalente en R: ``basicOrdRefG2[[2]]`` (= ``escSincroRefG``). expr_ordered : pd.DataFrame, forma (n_genes, n_muestras) Matriz de expresion normalizada completa en el orden circular final. Equivalente en R: ``basicOrdRefG2[[3]]`` (= ``matNewNew``). peak_times : np.ndarray de float, forma (n_genes_core,) Tiempo de pico FMM para cada gen central en el marco de coordenadas final. Equivalente en R: ``basicOrdRefG2[[4]]`` (= ``peaksPreNew``). r2_fmm : np.ndarray de float, forma (n_genes_core,) R² del ajuste FMM para cada gen central. Equivalente en R: ``basicOrdRefG2[[5]]``. fmm_params : np.ndarray de float, forma (n_genes_core, 5) Parametros FMM [M, A, α, β, ω] por gen central en el nuevo marco. Equivalente en R: ``basicOrdRefG2[[6]]`` (= ``pars``). direction_flipped : bool True si la direccion fue invertida (por el Paso 2.1 o 2.2). Equivalente en R: ``basicOrdRefG2[[7]]`` (= ``cambiaOri``). within_group_indices : np.ndarray de int, forma (n_muestras,) Indices posicionales base 0 dentro del grupo tras posible inversion. Equivalente en R: ``basicOrdRefG2[[8]]`` (= ``indNewNew``). core_genes : list[str] Simbolos de genes centrales en el orden utilizado. day_genes : list[str] Genes centrales (exc. anchor y direction) con pico en [0, π). night_genes : list[str] Genes centrales (exc. anchor y direction) con pico en [π, 2π). pre_sample_order : np.ndarray de int Orden de muestras del Paso 2.1 (antes de la verificacion del Paso 2.2). Equivalente en R: ``preOrdRefG2[[1]]``. pre_circular_scale : np.ndarray de float Escala circular del Paso 2.1. Equivalente en R: ``preOrdRefG2[[2]]``. pre_expr_ordered : pd.DataFrame Matriz de expresion ordenada por el Paso 2.1. Equivalente en R: ``preOrdRefG2[[3]]``. pre_peak_times : np.ndarray de float Tiempos de pico FMM del Paso 2.1. Equivalente en R: ``preOrdRefG2[[4]]``. pre_fmm_params : np.ndarray de float, forma (n_genes_core, 5) Parametros FMM del Paso 2.1. Equivalente en R: ``preOrdRefG2[[6]]`` (= ``parCore``). pre_direction_reversed : bool True si el Paso 2.1 invirtio la direccion (direction_gene en la mitad incorrecta). Equivalente en R: ``preOrdRefG2[[13]]`` (= ``reverse``). """ sample_order: np.ndarray circular_scale: np.ndarray expr_ordered: pd.DataFrame peak_times: np.ndarray r2_fmm: np.ndarray fmm_params: np.ndarray direction_flipped: bool within_group_indices: np.ndarray core_genes: list day_genes: list = field(default_factory=list) night_genes: list = field(default_factory=list) pre_sample_order: np.ndarray = field(default_factory=lambda: np.array([], dtype=int)) pre_circular_scale: np.ndarray = field(default_factory=lambda: np.array([])) pre_expr_ordered: pd.DataFrame = field(default_factory=pd.DataFrame) pre_peak_times: np.ndarray = field(default_factory=lambda: np.array([])) pre_fmm_params: np.ndarray = field(default_factory=lambda: np.zeros((0, 5))) pre_direction_reversed: bool = False
[docs] def summary(self) -> str: lines = [ "=== Resumen de Sincronizacion Circular ===", f" Direccion invertida : {self.direction_flipped}", f" Paso 2.1 invertido : {self.pre_direction_reversed}", f" Genes dia (0..π) : {self.day_genes}", f" Genes noche (π..2π) : {self.night_genes}", f" Picos genes core : {np.round(self.peak_times, 3).tolist()}", ] return "\n".join(lines)
# --------------------------------------------------------------------------- # CircularSynchronizer # ---------------------------------------------------------------------------
[docs] class CircularSynchronizer: """ Etapa 2 de CIRCUST: establece el marco de referencia biologico del ordenamiento circular. Parameters ---------- mode : str Modalidad de sincronizacion. Opciones: ``"manual"`` (defecto) Sincronizacion basada en genes ancla con fases biologicamente conocidas. Requiere ``anchor_gene`` y ``direction_gene`` presentes en los genes core. ``"hybrid"`` (futuro) Sincronizacion hibrida in vivo/post mortem. Requiere ``reference_peaks`` en la llamada a :meth:`run`. No implementada: lanza ``NotImplementedError``. anchor_gene : str Gen cuyo pico se coloca en π para definir el punto de arranque del ordenamiento. En el reloj mamifero, ARNTL tiene su pico cerca de la medianoche subjetiva (Zeitgeber 0). Por defecto: ``"ARNTL"``. direction_gene : str Gen que determina la orientacion CW/CCW del ordenamiento. Se espera que su pico caiga en [0, π) (fase activa) tras la rotacion. Por defecto: ``"DBP"``. consistency_gene : str Gen auxiliar usado en la verificacion de consistencia biologica del Paso 2.2. Se comprueba el orden direction < consistency < anchor. Por defecto: ``"CRY1"``. verbose : bool Imprimir mensajes de progreso. Example ------- >>> syncer = CircularSynchronizer() >>> result = syncer.run(refined, core_genes) >>> print(result.summary()) """ def __init__( self, mode: str = "manual", anchor_gene: str = "ARNTL", direction_gene: str = "DBP", consistency_gene: str = "CRY1", verbose: bool = True, ) -> None: if mode not in ("manual", "hybrid"): raise ValueError( f"mode debe ser 'manual' o 'hybrid', se recibio {mode!r}" ) self.mode = mode self.anchor_gene = anchor_gene self.direction_gene = direction_gene self.consistency_gene = consistency_gene self.verbose = verbose # ------------------------------------------------------------------ # API publica # ------------------------------------------------------------------
[docs] def run( self, refined: CPCAResult, core_genes: list[str], reference_peaks: dict[str, float] | None = None, ) -> SynchronizationResult: """ Ejecuta la sincronizacion circular. Parameters ---------- refined : CPCAResult Salida de ``CPCA.run()``. core_genes : list de str Lista ordenada de simbolos de genes reloj centrales. reference_peaks : dict {gen: float} en radianes, opcional Solo necesario para ``mode="hybrid"``. Tiempos de pico de referencia in vivo por gen (en [0, 2π)). Ignorado en modo manual. Returns ------- SynchronizationResult """ self._log("=== Etapa 2: Sincronizacion Circular ===") self._log(f" Modo: {self.mode}") if self.mode == "manual": return self._manual_sync(refined, core_genes) else: return self._hybrid_sync(refined, core_genes, reference_peaks)
# ------------------------------------------------------------------ # Modalidad manual (implementada) # ------------------------------------------------------------------ def _manual_sync( self, refined: CPCAResult, core_genes: list[str], ) -> SynchronizationResult: """Sincronizacion manual en dos pasos usando genes ancla.""" # ── Paso 2.1: rotacion + determinacion de direccion ─────────────── self._log(" Paso 2.1 — rotando al marco de referencia ...") (o_pre, esc_pre, mat_pre, peaks_pre, r2_fmm, par_pre, names_day, names_night, reversed_21) = self._pre_order( refined, core_genes ) # ── Paso 2.2: verificacion de consistencia biologica ────────────── self._log(" Paso 2.2 — verificando consistencia biologica ...") (o_fin, esc_fin, mat_fin, peaks_fin, pars_fin, flipped_22, ind_new) = self._basic_order( o_pre, esc_pre, mat_pre, peaks_pre, par_pre, core_genes ) direction_flipped = reversed_21 ^ flipped_22 result = SynchronizationResult( sample_order = o_fin, circular_scale = esc_fin, expr_ordered = mat_fin, peak_times = peaks_fin, r2_fmm = r2_fmm, fmm_params = pars_fin, direction_flipped = direction_flipped, within_group_indices = ind_new, core_genes = core_genes, day_genes = names_day, night_genes = names_night, pre_sample_order = o_pre, pre_circular_scale = esc_pre, pre_expr_ordered = mat_pre, pre_peak_times = peaks_pre, pre_fmm_params = par_pre, pre_direction_reversed = reversed_21, ) self._log(result.summary()) return result # ------------------------------------------------------------------ # Modalidad hibrida (punto de extension — no implementada) # ------------------------------------------------------------------ def _hybrid_sync( self, refined: CPCAResult, core_genes: list[str], reference_peaks: dict[str, float] | None, ) -> SynchronizationResult: """ Sincronizacion hibrida in vivo / post mortem. Aproximacion prevista --------------------- Formulada como un problema de minimizacion en el circulo: busca la rotacion θ ∈ [0, 2π) y la orientacion s ∈ {+1, −1} que minimizan la distancia circular total entre los tiempos de pico estimados en el tejido post mortem y los tiempos de referencia in vivo: argmin_{θ, s} Σ_g d_circ(s · peak_g + θ, ref_g) donde d_circ es la distancia angular minima en [0, π]. Inspirado en: Ueda et al. (2004) PNAS, Higashi et al. (2016). Parameters ---------- reference_peaks : dict {gen: float} Tiempos de pico de referencia in vivo en radianes [0, 2π). Debe contener al menos los genes core presentes en ``core_genes``. Para implementar ---------------- Sobreescribir este metodo en una subclase, o implementarlo aqui cuando los datos de referencia esten disponibles. """ raise NotImplementedError( "Sincronizacion hibrida in vivo/post mortem no implementada.\n" "Usa mode='manual' o subclasifica CircularSynchronizer y " "sobreescribe _hybrid_sync()." ) # ------------------------------------------------------------------ # Pasos internos de la modalidad manual # ------------------------------------------------------------------ def _pre_order( self, refined: CPCAResult, core_genes: list[str], ) -> tuple: """ Paso 2.1 — Equivalente en R: ``basicPreOder_cores`` (lineas 4192-4263). Returns ------- o_new : np.ndarray — indices de ordenamiento de muestras esc_new : np.ndarray — escala circular en [0, 2π) mat_new : pd.DataFrame — matriz completa en nuevo orden peaks : np.ndarray — tiempos de pico FMM por gen central r2_fmm : np.ndarray — R² por gen central par_core : np.ndarray forma (n_core, 5) — [M, A, α, β, ω] names_day : list[str] — genes con pico en [0, π) names_night : list[str] — genes con pico en [π, 2π) reversed : bool — True si la direccion se invirtio """ circular_scale = refined.circular_scale expr_ordered = refined.expr_norm_final fmm_fits = refined.fmm_fits_final peak_times_fin = refined.fmm_peak_times_final n_core = len(core_genes) # ── R² de ajustes FMM con ordenamiento final ────────────────────── r2_fmm = np.array([ fmm_fits[g].r2 if g in fmm_fits else float("nan") for g in core_genes ]) # ── Tiempos de pico anchor y direction en el marco CPCA original ── anchor_peak = peak_times_fin[self.anchor_gene] direction_peak = peak_times_fin[self.direction_gene] # ── Rotar: anchor → π ───────────────────────────────────────────── rotated = (circular_scale - anchor_peak + pi) % (2.0 * pi) esc_T = np.argsort(rotated) esc2 = rotated[esc_T] # ── Determinar orientacion ───────────────────────────────────────── direction_rotated = (direction_peak - anchor_peak + pi) % (2.0 * pi) forward = direction_rotated < pi # ── Reparametrizar FMM de cada gen central en el nuevo marco ────── peaks = np.zeros(n_core) par_core = np.zeros((n_core, 5)) for i, gene in enumerate(core_genes): if gene not in fmm_fits: continue fr = fmm_fits[gene] alpha = fr.params["alpha"] beta = fr.params["beta"] omega = fr.params["omega"] M = fr.params["M"] A = fr.params["A"] new_alpha = (alpha - anchor_peak + pi) % (2.0 * pi) if forward: peaks[i] = FMMModel.peak_time(new_alpha, beta, omega) par_core[i] = [M, A, new_alpha, beta, omega] else: na2 = (2.0 * pi - new_alpha) % (2.0 * pi) nb2 = (2.0 * pi - beta) % (2.0 * pi) peaks[i] = FMMModel.peak_time(na2, nb2, omega) par_core[i] = [M, A, na2, nb2, omega] # ── Reordenar columnas de la matriz ─────────────────────────────── if forward: o_new = esc_T esc_new = esc2 mat_new = expr_ordered.iloc[:, esc_T].copy() else: o_new = esc_T[::-1].copy() esc_new = (2.0 * pi - esc2[::-1]) % (2.0 * pi) mat_new = expr_ordered.iloc[:, esc_T[::-1]].copy() mat_new.columns = range(mat_new.shape[1]) # ── Clasificar como dia / noche ──────────────────────────────────── names_day = [] names_night = [] for i, gene in enumerate(core_genes): if gene in (self.anchor_gene, self.direction_gene): continue if 0 <= peaks[i] < pi: names_day.append(gene) else: names_night.append(gene) self._log( f" {self.anchor_gene} peak: {anchor_peak:.3f} rad | " f"{self.direction_gene} rotado: {direction_rotated:.3f} rad | " f"Direccion: {'directa' if forward else 'INVERTIDA'}" ) return (o_new, esc_new, mat_new, peaks, r2_fmm, par_core, names_day, names_night, not forward) def _basic_order( self, o: np.ndarray, esc: np.ndarray, mat: pd.DataFrame, peaks: np.ndarray, par: np.ndarray, core_genes: list[str], ) -> tuple: """ Paso 2.2 — Equivalente en R: ``basicOder_cores``. Returns ------- o_new, esc_new, mat_new, peaks_new, pars_new, flipped, ind_new """ anchor_i = core_genes.index(self.anchor_gene) if self.anchor_gene in core_genes else None direction_i = core_genes.index(self.direction_gene) if self.direction_gene in core_genes else None consistency_i = core_genes.index(self.consistency_gene) if self.consistency_gene in core_genes else None if direction_i is None: self._log( f" AVISO: {self.direction_gene} no encontrado en core_genes; " "omitiendo verificacion de direccion." ) n = len(o) return o, esc, mat, peaks, par, False, np.arange(n) direction_peak = peaks[direction_i] n_core = len(core_genes) # Contar genes en (0, π) excluyendo anchor y direction peak_d = sum( 1 for i, g in enumerate(core_genes) if g not in (self.anchor_gene, self.direction_gene) and 0 < peaks[i] < pi ) mitad = int(np.floor((n_core - 2) / 2)) p6am = 1.0 - np.cos(direction_peak) p6pm = 1.0 - np.cos(direction_peak - pi) aviso = (p6am <= 0.1) or (p6pm <= 0.1) or (peak_d < mitad) if aviso and consistency_i is not None and anchor_i is not None: excl = (peaks[direction_i] < peaks[consistency_i] < peaks[anchor_i]) flip = not excl elif aviso: flip = True else: flip = False n = len(o) if flip: self._log( f" Invirtiendo direccion " f"(aviso=True, {self.direction_gene}_peak={direction_peak:.3f}, " f"peak_d={peak_d}/{mitad})" ) peaks_new = (2.0 * pi - peaks) % (2.0 * pi) o_new = o[::-1].copy() esc_new = (2.0 * pi - esc[::-1]) % (2.0 * pi) mat_new = mat.iloc[:, ::-1].copy() mat_new.columns = range(mat_new.shape[1]) pars_new = par.copy() pars_new[:, 2] = (2.0 * pi - par[:, 2]) % (2.0 * pi) # α pars_new[:, 3] = (2.0 * pi - par[:, 3]) % (2.0 * pi) # β ind_new = np.arange(n - 1, -1, -1) else: self._log(" La direccion es consistente — no se necesita inversion.") peaks_new = peaks.copy() o_new = o.copy() esc_new = esc.copy() mat_new = mat.copy() pars_new = par.copy() ind_new = np.arange(n) return o_new, esc_new, mat_new, peaks_new, pars_new, flip, ind_new # ------------------------------------------------------------------ # Utilidad # ------------------------------------------------------------------ def _log(self, message: str) -> None: if self.verbose: print(message)