"""
circust/fitting/fmm.py
=======================
Frecuency Modulated Mobius (FMM) — ajuste ritmico de un solo componente.
Modelo matematico
-----------------
y(t) = M + A * cos(beta + 2*atan(omega*tan((t - alpha)/2)))
El termino interno Phi(t) = beta + 2*atan(omega*tan((t - alpha)/2)) es una
transformacion de Mobius del circulo. Cuando omega = 1 se reduce a un
simple desfase. Cuando omega -> 0 la forma de onda se aproxima a un coseno
(Cosinor). La ventaja clave sobre Cosinor es que omega controla la *asimetria
de la forma de onda*: el modelo puede representar picos asimetricos presentes
en datos circadianos reales.
Parametros
----------
M mesor — nivel medio
A amplitud — siempre > 0
alpha — localizacion de la region del pico en [0, 2*pi)
beta — parametro de forma en [0, 2*pi)
omega — asimetria en (0, omegaMax]
Algoritmo de estimacion
-----------------------
Paso 1 — Busqueda en rejilla
Para cada combinacion (alpha, omega) en una rejilla, se fijan esos dos
parametros y se estiman M, A, beta por OLS sobre el eje temporal
transformado por Mobius. Se selecciona el candidato con menor RSS que
tambien satisfaga las restricciones de estabilidad de amplitud.
Paso 2 — Refinamiento Nelder-Mead
Se pule la estimacion del Paso 1 minimizando el RSS sobre los cinco
parametros simultaneamente, sujeto a las mismas restricciones de
estabilidad (puntos infactibles devuelven infinito).
Los Pasos 1+2 se repiten ``num_reps`` veces, cada vez estrechando la
rejilla alrededor de la mejor estimacion anterior.
Devuelve
--------
FitResult
Claves de ``params``: ``M``, ``A``, ``alpha``, ``beta``, ``omega``
"""
import numpy as np
from scipy.optimize import minimize
from circust.fitting.rhythm_model import FitResult, RhythmModel
# ── Aceleracion opcional con Numba (CPU JIT) ───────────────────────────────
# Si Numba esta disponible, compilamos la evaluacion del modelo y la funcion
# objetivo que Nelder-Mead llama miles de veces por gen.
try:
from numba import njit
_HAS_NUMBA = True
except ImportError:
_HAS_NUMBA = False
def njit(*args, **kwargs): # type: ignore
def deco(fn): return fn
if len(args) == 1 and callable(args[0]):
return args[0]
return deco
# ── Aceleracion opcional con Numba CUDA (GPU) ──────────────────────────────
# Opcion A: el Paso 1 (busqueda en rejilla) se ejecuta en GPU si esta
# disponible. Para datasets grandes (muchas muestras o rejillas densas) el
# grid search es el cuello de botella principal — se presta bien a GPU porque
# cada punto (alpha, omega) es independiente.
#
# Opcion B (trabajo futuro): paralelizar el ajuste de N genes en GPU
# requeriria reimplementar Nelder-Mead en CUDA, lo que esta fuera del alcance
# actual del TFG. Ver comentario en ``_step1_grid_cuda`` para detalles.
try:
from numba import cuda as _numba_cuda
_HAS_CUDA = _numba_cuda.is_available()
except (ImportError, Exception):
_HAS_CUDA = False
@njit(cache=True, fastmath=True)
def _fmm_predict_jit(
t: np.ndarray,
M: float, A: float, alpha: float, beta: float, omega: float,
out: np.ndarray,
) -> None:
"""Escribe la forma de onda FMM en *out* (sin asignacion)."""
n = t.shape[0]
for i in range(n):
mob = 2.0 * np.arctan(omega * np.tan((t[i] - alpha) / 2.0))
out[i] = M + A * np.cos(beta + mob)
@njit(cache=True, fastmath=True)
def _fmm_objective_jit(
params: np.ndarray,
data: np.ndarray,
t: np.ndarray,
upper: float,
lower: float,
omega_max: float,
buf: np.ndarray,
) -> float:
"""
Objetivo Nelder-Mead compilado. Devuelve +inf cuando los parametros
violan las restricciones de estabilidad de amplitud o rangos validos.
"""
M = params[0]
A = params[1]
alpha = params[2]
beta = params[3]
omega = params[4]
if (M + A) > upper: return np.inf
if (M - A) < lower: return np.inf
if A <= 0.0: return np.inf
if omega <= 0.0: return np.inf
if omega > omega_max: return np.inf
n = t.shape[0]
rss = 0.0
for i in range(n):
mob = 2.0 * np.arctan(omega * np.tan((t[i] - alpha) / 2.0))
pred = M + A * np.cos(beta + mob)
d = pred - data[i]
rss += d * d
return rss / n
# ---------------------------------------------------------------------------
# Funciones internas auxiliares (replican funciones ocultas de R)
# ---------------------------------------------------------------------------
def _mobius(t: np.ndarray, alpha: float, omega: float) -> np.ndarray:
"""Transformacion de Mobius del eje temporal.
Phi(t) = 2*atan(omega*tan((t - alpha)/2))
"""
return 2.0 * np.arctan(omega * np.tan((t - alpha) / 2.0))
def _fmm_predict(
t: np.ndarray,
M: float, A: float, alpha: float, beta: float, omega: float,
) -> np.ndarray:
"""Evalua la forma de onda FMM en los puntos temporales ``t``."""
phi = beta + _mobius(t, alpha, omega)
return M + A * np.cos(phi)
def _step1(
alpha: float,
omega: float,
data: np.ndarray,
t: np.ndarray,
) -> np.ndarray:
"""
Paso 1 (escalar): fija alpha y omega, estima M, A, beta por OLS Cosinor.
Devuelve [M, A, alpha, beta, omega, RSS] — mismo layout que step1FMM de R.
Se usa solo como fallback; la version vectorizada es preferible.
"""
mob = _mobius(t, alpha, omega)
t_star = alpha + mob
xx = np.cos(t_star)
zz = np.sin(t_star)
X = np.column_stack([np.ones(len(data)), xx, zz])
coeffs, *_ = np.linalg.lstsq(X, data, rcond=None)
M_est = coeffs[0]
b = coeffs[1]
g = coeffs[2]
phi_est = np.arctan2(-g, b)
A_est = np.sqrt(b**2 + g**2)
beta_est = (phi_est + alpha) % (2 * np.pi)
fitted = M_est + A_est * np.cos(beta_est + mob)
rss = np.sum((data - fitted)**2) / len(t)
return np.array([M_est, A_est, alpha, beta_est, omega, rss])
def _step1_grid(
alpha_grid: np.ndarray,
omega_grid: np.ndarray,
data: np.ndarray,
t: np.ndarray,
) -> np.ndarray:
"""
Paso 1 vectorizado: evalua TODOS los puntos (alpha, omega) simultaneamente.
Reemplaza el bucle ``for`` de Python sobre la rejilla con una sola pasada
de operaciones NumPy, obteniendo una aceleracion de ~3x.
Estrategia
----------
Para cada par (alpha, omega) el problema OLS se reduce a un sistema de
ecuaciones normales 2x2 sobre los regresores centrados [cos(t*), sin(t*)].
El centrado hace que el sistema este bien condicionado y evita las matrices
3x3 casi singulares que aparecen cuando omega esta cerca de cero. El
intercepto M se recupera analiticamente de la ecuacion de medias.
Parameters
----------
alpha_grid : array (A,) de valores alpha
omega_grid : array (W,) de valores omega
data : array (N,) senal observada
t : array (N,) puntos temporales en [0, 2*pi)
Returns
-------
array (A*W, 6) — columnas: [M, A, alpha, beta, omega, RSS]
Mismo layout de columnas que ``_step1`` escalar.
"""
An, Wn, N = len(alpha_grid), len(omega_grid), len(t)
AW = An * Wn
# ── Broadcasting de toda la trigonometria: formas (A, W, N) ─────────
alpha_b = alpha_grid[:, None, None] # (A, 1, 1)
omega_b = omega_grid[None, :, None] # (1, W, 1)
t_b = t[None, None, :] # (1, 1, N)
mob = 2.0 * np.arctan(omega_b * np.tan((t_b - alpha_b) / 2.0)) # (A,W,N)
t_star = alpha_b + mob # (A,W,N)
xx = np.cos(t_star).reshape(AW, N) # (AW, N)
zz = np.sin(t_star).reshape(AW, N) # (AW, N)
mob_2d = mob.reshape(AW, N) # (AW, N)
# ── OLS 2x2 centrado — evita mal condicionamiento cerca de omega=0 ──
xx_c = xx - xx.mean(axis=1, keepdims=True) # (AW, N)
zz_c = zz - zz.mean(axis=1, keepdims=True) # (AW, N)
y_c = data - data.mean() # (N,) broadcast escalar
sxx = (xx_c ** 2).sum(1) # (AW,)
sxz = (xx_c * zz_c).sum(1) # (AW,)
szz = (zz_c ** 2).sum(1) # (AW,)
sxy = (xx_c * y_c).sum(1) # (AW,)
szy = (zz_c * y_c).sum(1) # (AW,)
det = sxx * szz - sxz ** 2
det = np.where(np.abs(det) < 1e-12, 1e-12, det) # proteger casos singulares
b = (szz * sxy - sxz * szy) / det # coeficiente coseno
g = (sxx * szy - sxz * sxy) / det # coeficiente seno
M_est = data.mean() - b * xx.mean(1) - g * zz.mean(1)
# ── Recuperacion de parametros ───────────────────────────────────────
A_est = np.sqrt(b ** 2 + g ** 2)
phi_est = np.arctan2(-g, b)
alpha_flat = np.repeat(alpha_grid, Wn) # (AW,)
omega_flat = np.tile(omega_grid, An) # (AW,)
beta_est = (phi_est + alpha_flat) % (2 * np.pi)
# ── RSS ──────────────────────────────────────────────────────────────
fitted = M_est[:, None] + A_est[:, None] * np.cos(beta_est[:, None] + mob_2d)
rss = np.sum((data[None, :] - fitted) ** 2, axis=1) / N
return np.column_stack([M_est, A_est, alpha_flat, beta_est, omega_flat, rss])
def _step1_grid_cuda(
alpha_grid: np.ndarray,
omega_grid: np.ndarray,
data: np.ndarray,
t: np.ndarray,
) -> np.ndarray:
"""
Paso 1 vectorizado con Numba CUDA — Opcion A de aceleracion GPU.
Cada hilo GPU procesa un punto (alpha, omega) de la rejilla de forma
independiente, calculando los coeficientes OLS y el RSS correspondiente.
La GPU es especialmente util cuando ``len(alpha_grid) * len(omega_grid)``
es grande (>= varios miles de combinaciones) o cuando ``len(t)`` es
grande (>= cientos de muestras).
Requisito: ``numba.cuda.is_available() == True``.
Opcion B — trabajo futuro
-------------------------
Paralelizar el ajuste de N genes simultaneamente en GPU requeriria
reimplementar el optimizador Nelder-Mead como kernel CUDA, ya que
``scipy.optimize.minimize`` no puede ejecutarse en GPU. Las referencias
de implementacion son:
- Luersen & Le Riche (2004), «Globalized Nelder-Mead method for
engineering optimization», Computers & Structures 82(23-26), 2253-2260.
- Implementaciones existentes de CUDA NM: Vehtari et al. / PyMC3 GPU.
Esta opcion queda documentada como trabajo futuro una vez el pipeline
principal este validado.
Returns
-------
array (A*W, 6) — mismo layout que ``_step1_grid`` (CPU).
"""
from numba import cuda as _cuda
import math
An, Wn, N = len(alpha_grid), len(omega_grid), len(t)
AW = An * Wn
alpha_flat = np.repeat(alpha_grid, Wn).astype(np.float64)
omega_flat = np.tile(omega_grid, An).astype(np.float64)
data_d = _cuda.to_device(data.astype(np.float64))
t_d = _cuda.to_device(t.astype(np.float64))
alpha_d = _cuda.to_device(alpha_flat)
omega_d = _cuda.to_device(omega_flat)
out_d = _cuda.device_array((AW, 6), dtype=np.float64)
@_cuda.jit
def _grid_kernel(alpha_arr, omega_arr, data_arr, t_arr, out, n_samples):
idx = _cuda.grid(1)
if idx >= alpha_arr.shape[0]:
return
alpha = alpha_arr[idx]
omega = omega_arr[idx]
N = n_samples
# Acumula sumas para OLS centrado
sum_xx = 0.0; sum_zz = 0.0; sum_xz = 0.0
sum_xy = 0.0; sum_zy = 0.0
sum_x = 0.0; sum_z = 0.0
data_mean = 0.0
for i in range(N):
data_mean += data_arr[i]
data_mean /= N
for i in range(N):
mob = 2.0 * math.atan(omega * math.tan((t_arr[i] - alpha) / 2.0))
t_star = alpha + mob
x_i = math.cos(t_star)
z_i = math.sin(t_star)
sum_x += x_i
sum_z += z_i
x_mean = sum_x / N
z_mean = sum_z / N
for i in range(N):
mob = 2.0 * math.atan(omega * math.tan((t_arr[i] - alpha) / 2.0))
t_star = alpha + mob
x_c = math.cos(t_star) - x_mean
z_c = math.sin(t_star) - z_mean
y_c = data_arr[i] - data_mean
sum_xx += x_c * x_c
sum_zz += z_c * z_c
sum_xz += x_c * z_c
sum_xy += x_c * y_c
sum_zy += z_c * y_c
det = sum_xx * sum_zz - sum_xz * sum_xz
if abs(det) < 1e-12:
det = 1e-12
b = (sum_zz * sum_xy - sum_xz * sum_zy) / det
g = (sum_xx * sum_zy - sum_xz * sum_xy) / det
# Calcular medias de x, z para recuperar M
sum_x2 = 0.0; sum_z2 = 0.0
for i in range(N):
mob = 2.0 * math.atan(omega * math.tan((t_arr[i] - alpha) / 2.0))
t_star = alpha + mob
sum_x2 += math.cos(t_star)
sum_z2 += math.sin(t_star)
M_est = data_mean - b * (sum_x2 / N) - g * (sum_z2 / N)
A_est = math.sqrt(b * b + g * g)
phi_est = math.atan2(-g, b)
beta_est = math.fmod(phi_est + alpha, 2.0 * math.pi)
if beta_est < 0.0:
beta_est += 2.0 * math.pi
# RSS
rss = 0.0
for i in range(N):
mob = 2.0 * math.atan(omega * math.tan((t_arr[i] - alpha) / 2.0))
fitted = M_est + A_est * math.cos(beta_est + mob)
d = fitted - data_arr[i]
rss += d * d
rss /= N
out[idx, 0] = M_est
out[idx, 1] = A_est
out[idx, 2] = alpha
out[idx, 3] = beta_est
out[idx, 4] = omega
out[idx, 5] = rss
threads_per_block = 128
blocks = (AW + threads_per_block - 1) // threads_per_block
_grid_kernel[blocks, threads_per_block](
alpha_d, omega_d, data_d, t_d, out_d, N
)
return out_d.copy_to_host()
def _best_step1(
data: np.ndarray,
grid_results: np.ndarray,
) -> np.ndarray | None:
"""
Selecciona el candidato de la rejilla con menor RSS que satisfaga
las restricciones de estabilidad de amplitud.
Estabilidad: M+A <= max(datos) + 10% rango Y M-A >= min(datos) - 10% rango.
"""
order = np.argsort(grid_results[:, 5]) # ordenar por RSS
data_min = data.min()
data_max = data.max()
slack = 0.1 * (data_max - data_min)
for idx in order:
row = grid_results[idx]
M, A = row[0], row[1]
if np.isnan(M) or np.isnan(A):
continue
if (M + A) <= (data_max + slack) and (M - A) >= (data_min - slack):
return row
return None
def _make_step2_objective(
data: np.ndarray,
t: np.ndarray,
omega_max: float,
) -> callable:
"""
Construye la funcion objetivo para Nelder-Mead con limites precalculados.
Evita recalcular min/max/margen en cada evaluacion (se llama miles de
veces por ajuste).
Si Numba esta disponible, la funcion objetivo es un wrapper delgado
sobre un nucleo JIT-compilado. Si no, cae en la implementacion pura
NumPy con semantica identica.
"""
data_min = float(data.min())
data_max = float(data.max())
slack = 0.1 * (data_max - data_min)
upper = data_max + slack
lower = data_min - slack
n = len(t)
if _HAS_NUMBA:
buf = np.empty(n, dtype=np.float64)
def objective(params: np.ndarray) -> float:
return float(_fmm_objective_jit(
np.ascontiguousarray(params, dtype=np.float64),
data, t, upper, lower, omega_max, buf,
))
return objective
def objective(params: np.ndarray) -> float:
M, A, alpha, beta, omega = params
if (M + A) > upper: return np.inf
if (M - A) < lower: return np.inf
if A <= 0: return np.inf
if omega <= 0: return np.inf
if omega > omega_max: return np.inf
fitted = _fmm_predict(t, M, A, alpha, beta, omega)
return float(np.sum((fitted - data)**2) / n)
return objective
# ---------------------------------------------------------------------------
# Clase FMMModel
# ---------------------------------------------------------------------------
[docs]
class FMMModel(RhythmModel):
"""
Ajustador ritmico FMM (Frecuency Modulated Mobius).
Ajusta y(t) = M + A*cos(beta + 2*atan(omega*tan((t - alpha)/2)))
usando un algoritmo de dos pasos: busqueda en rejilla + Nelder-Mead.
Parameters
----------
length_alpha_grid : int
Numero de valores de alpha en la rejilla. Valor R por defecto: 48.
length_omega_grid : int
Numero de valores de omega en la rejilla. Valor R por defecto: 24.
omega_max : float
Cota superior de omega. Valor R por defecto: 1.0.
num_reps : int
Numero de iteraciones de refinamiento de rejilla. Valor R por defecto: 3.
Example
-------
>>> import numpy as np
>>> from circust.fitting.fmm import FMMModel
>>>
>>> t = np.linspace(0, 2*np.pi, 50, endpoint=False)
>>> data = 0.3 + 0.6 * np.cos(t - 1.2) + np.random.normal(0, 0.05, 50)
>>> result = FMMModel().fit(data, t)
>>> print(result.summary())
"""
def __init__(
self,
length_alpha_grid: int = 48,
length_omega_grid: int = 24,
omega_max: float = 1.0,
num_reps: int = 3,
) -> None:
self.length_alpha_grid = length_alpha_grid
self.length_omega_grid = length_omega_grid
self.omega_max = omega_max
self.num_reps = num_reps
# ------------------------------------------------------------------
# API publica
# ------------------------------------------------------------------
[docs]
@staticmethod
def peak_time(alpha: float, beta: float, omega: float) -> float:
"""
Tiempo de pico del modelo FMM — formula compUU de R.
t_U = alpha + 2*atan2((1/omega)*sin(-beta/2), cos(-beta/2)) mod 2*pi
"""
return float(
(alpha + 2.0 * np.arctan2(
(1.0 / omega) * np.sin(-beta / 2.0),
np.cos(-beta / 2.0),
)) % (2.0 * np.pi)
)
[docs]
def fit(
self,
data: np.ndarray,
time_points: np.ndarray,
) -> FitResult:
"""
Ajusta el modelo FMM a ``data``.
Parameters
----------
data : np.ndarray, forma (n_muestras,)
Valores de expresion observados (normalizados a [-1, 1]).
time_points : np.ndarray, forma (n_muestras,)
Eje temporal circular en [0, 2*pi), tipicamente ``CPCAResult.circular_scale``.
Returns
-------
FitResult
Claves de ``params``: ``M``, ``A``, ``alpha``, ``beta``, ``omega``
"""
data = np.asarray(data, dtype=float)
time_points = np.asarray(time_points, dtype=float)
# Rejillas iniciales (mismos valores que R por defecto)
alpha_grid = np.linspace(0, 2 * np.pi, self.length_alpha_grid, endpoint=False)
omega_grid = np.exp(
np.linspace(
np.log(1e-4),
np.log(self.omega_max),
self.length_omega_grid,
)
)
best_par = None
objective = _make_step2_objective(data, time_points, self.omega_max)
for rep in range(self.num_reps):
# ── Paso 1: busqueda en rejilla ─────────────────────────────
# Si hay GPU disponible (Opcion A), la rejilla se evalua en CUDA
# con un hilo por combinacion (alpha, omega). Si no, se usa la
# version CPU vectorizada con NumPy (~3x vs bucle for puro).
if _HAS_CUDA:
grid_results = _step1_grid_cuda(
alpha_grid, omega_grid, data, time_points
)
else:
grid_results = _step1_grid(alpha_grid, omega_grid, data, time_points)
prev_best = best_par
best_par = _best_step1(data, grid_results)
if best_par is None:
# Sin candidato estable — revertir al anterior si existe
best_par = prev_best
break
# ── Paso 2: refinamiento Nelder-Mead ────────────────────────
result = minimize(
objective,
x0 = best_par[:5],
method = "Nelder-Mead",
options = {"xatol": 1e-6, "fatol": 1e-6, "maxiter": 5000},
)
par_final = result.x.copy()
# Envolver alpha y beta en [0, 2*pi)
par_final[2] = par_final[2] % (2 * np.pi)
par_final[3] = par_final[3] % (2 * np.pi)
# Estrechar rejillas alrededor del mejor actual para siguiente iteracion
if rep < self.num_reps - 1:
alpha_grid = self._refine_alpha_grid(
par_final[2], alpha_grid
)
omega_grid = self._refine_omega_grid(
par_final[4], omega_grid
)
if best_par is None:
# Fallback: modelo plano
par_final = np.array([data.mean(), 0.0, 0.0, 0.0, 0.5])
M, A, alpha, beta, omega = par_final
# ── Valores ajustados finales y estadisticos ───────────────────
fitted = _fmm_predict(time_points, M, A, alpha, beta, omega)
residuals = data - fitted
residuals_std = self._standardise_residuals(residuals, ddof=5)
r2 = self._r2(data, fitted)
sse = float(np.sum(residuals**2))
return FitResult(
fitted = fitted,
params = {
"M": float(M),
"A": float(A),
"alpha": float(alpha),
"beta": float(beta),
"omega": float(omega),
},
peak_time = FMMModel.peak_time(float(alpha), float(beta), float(omega)),
r2 = r2,
residuals = residuals,
residuals_std = residuals_std,
sse = sse,
model_name = "fmm",
)
# ------------------------------------------------------------------
# Auxiliares para refinamiento de rejilla
# ------------------------------------------------------------------
def _refine_alpha_grid(
self,
centre: float,
prev_grid: np.ndarray,
) -> np.ndarray:
"""
Estrecha la rejilla de alpha alrededor de ``centre``.
"""
amplitude = 1.5 * np.mean(np.diff(np.sort(prev_grid)))
new_grid = np.linspace(
centre - amplitude,
centre + amplitude,
len(prev_grid),
)
return new_grid % (2 * np.pi)
def _refine_omega_grid(
self,
centre: float,
prev_grid: np.ndarray,
) -> np.ndarray:
"""
Estrecha la rejilla de omega alrededor de ``centre``.
"""
amplitude = 1.5 * np.mean(np.diff(np.sort(prev_grid)))
new_grid = np.linspace(
max(centre - amplitude, 1e-6),
min(centre + amplitude, self.omega_max),
len(prev_grid),
)
return new_grid