"""
circust/fitting/ori.py
=======================
Modelo ritmico ORI — regresion isotonica circular unimodal (no parametrica).
Referencia
----------
Larriba, Rueda & Fernandez (2016). «Identification of rhythmic genes
in complex biological datasets and application to the study of human
circadian system gene expression.» *Statistical Applications in
Genetics and Molecular Biology*, 15(3), 205-223.
Modelo matematico
-----------------
Dado un perfil de expresion genica ordenado por tiempo circular, el
modelo ORI busca el mejor ajuste de forma subida-bajada (un pico, un
valle) mediante el Algoritmo Pool-Adjacent-Violators (PAVA).
Para cada par candidato (L=valle, U=pico) de extremos locales se ajusta:
- Un PAVA creciente de L a U (circularmente).
- Un PAVA decreciente de U a L (circularmente).
Se selecciona el par (L_opt, U_opt) que minimiza el MSE del ajuste
unimodal resultante. La puntuacion de ritmicidad es:
R2_ORI = 1 - MSE_unimodal / MSE_plano
donde MSE_plano = Var(datos) es el error del modelo nulo (media constante).
Algoritmo PAVA (Pool-Adjacent-Violators)
-----------------------------------------
Complejidad: O(n) tiempo, O(n) espacio.
Dado un vector de observaciones *y* con pesos *w*, produce el vector
y_hat que minimiza sum w_i (y_i - y_hat_i)^2 sujeto a restricciones
de monotonia (creciente o decreciente).
Aceleracion
-----------
Cuando Numba esta disponible, las funciones criticas (PAVA, busqueda
de pares L-U) se compilan JIT a codigo maquina, obteniendo ~20-50x de
aceleracion dado que el bucle (L, U) x PAVA es O(n^3) por gen.
"""
from __future__ import annotations
import numpy as np
from circust.fitting.rhythm_model import FitResult, RhythmModel
# ── Aceleracion opcional con Numba ────────────────────────────────────────
try:
from numba import njit
_HAS_NUMBA = True
except ImportError:
_HAS_NUMBA = False
def njit(*args, **kwargs): # type: ignore
def deco(fn): return fn
if len(args) == 1 and callable(args[0]):
return args[0]
return deco
# ═══════════════════════════════════════════════════════════════════════════
# PAVA — Pool-Adjacent-Violators Algorithm
# ═══════════════════════════════════════════════════════════════════════════
@njit(cache=True, fastmath=True)
def _pava_inc_numba(y: np.ndarray, w: np.ndarray) -> np.ndarray:
"""
PAVA creciente compilado JIT.
Minimiza sum w_i (y_i - y_hat_i)^2 s.a. y_hat_1 <= ... <= y_hat_n.
"""
n = y.shape[0]
if n <= 1:
out = np.empty(n, dtype=np.float64)
for i in range(n):
out[i] = y[i]
return out
block_sum = np.empty(n, dtype=np.float64)
block_w = np.empty(n, dtype=np.float64)
block_end = np.empty(n, dtype=np.int64)
nb = 0
for i in range(n):
block_sum[nb] = y[i] * w[i]
block_w[nb] = w[i]
block_end[nb] = i
nb += 1
while nb >= 2:
avg_prev = block_sum[nb - 2] / block_w[nb - 2]
avg_curr = block_sum[nb - 1] / block_w[nb - 1]
if avg_prev <= avg_curr:
break
block_sum[nb - 2] += block_sum[nb - 1]
block_w[nb - 2] += block_w[nb - 1]
block_end[nb - 2] = block_end[nb - 1]
nb -= 1
out = np.empty(n, dtype=np.float64)
start = 0
for b in range(nb):
end = block_end[b] + 1
avg = block_sum[b] / block_w[b]
for k in range(start, end):
out[k] = avg
start = end
return out
@njit(cache=True, fastmath=True)
def _pava_inc_range(buf: np.ndarray, out: np.ndarray, m: int) -> None:
"""PAVA creciente sobre buf[:m], escribe resultado en out[:m]. Sin alloc."""
if m <= 0:
return
block_sum = np.empty(m, dtype=np.float64)
block_w = np.empty(m, dtype=np.float64)
block_end = np.empty(m, dtype=np.int64)
nb = 0
for i in range(m):
block_sum[nb] = buf[i]
block_w[nb] = 1.0
block_end[nb] = i
nb += 1
while nb >= 2:
ap = block_sum[nb - 2] / block_w[nb - 2]
ac = block_sum[nb - 1] / block_w[nb - 1]
if ap <= ac:
break
block_sum[nb - 2] += block_sum[nb - 1]
block_w[nb - 2] += block_w[nb - 1]
block_end[nb - 2] = block_end[nb - 1]
nb -= 1
start = 0
for b in range(nb):
end = block_end[b] + 1
avg = block_sum[b] / block_w[b]
for k in range(start, end):
out[k] = avg
start = end
[docs]
def pava_increasing(y: np.ndarray, w: np.ndarray | None = None) -> np.ndarray:
"""
Regresion isotonica creciente mediante PAVA.
Complejidad: O(n) tiempo, O(n) espacio.
Parameters
----------
y : array (n,)
Valores observados.
w : array (n,) o None
Pesos positivos. Si es None, pesos uniformes.
Returns
-------
array (n,) — valores ajustados monotonicamente no decrecientes.
"""
y = np.ascontiguousarray(np.asarray(y, dtype=np.float64))
n = len(y)
if n <= 1:
return y.copy()
w_arr = (np.ones(n, dtype=np.float64) if w is None
else np.ascontiguousarray(np.asarray(w, dtype=np.float64)))
if _HAS_NUMBA:
return _pava_inc_numba(y, w_arr)
# Fallback Python puro
block_sum = np.empty(n, dtype=np.float64)
block_w = np.empty(n, dtype=np.float64)
block_end = np.empty(n, dtype=np.intp)
nb = 0
for i in range(n):
block_sum[nb] = y[i] * w_arr[i]
block_w[nb] = w_arr[i]
block_end[nb] = i
nb += 1
while nb >= 2:
avg_prev = block_sum[nb - 2] / block_w[nb - 2]
avg_curr = block_sum[nb - 1] / block_w[nb - 1]
if avg_prev <= avg_curr:
break
block_sum[nb - 2] += block_sum[nb - 1]
block_w[nb - 2] += block_w[nb - 1]
block_end[nb - 2] = block_end[nb - 1]
nb -= 1
result = np.empty(n, dtype=np.float64)
start = 0
for b in range(nb):
end = block_end[b] + 1
result[start:end] = block_sum[b] / block_w[b]
start = end
return result
[docs]
def pava_decreasing(y: np.ndarray, w: np.ndarray | None = None) -> np.ndarray:
"""
Regresion isotonica decreciente.
Equivalente a negar *y*, ejecutar ``pava_increasing``, y negar de vuelta.
"""
return -pava_increasing(-np.asarray(y, dtype=np.float64), w)
# ═══════════════════════════════════════════════════════════════════════════
# Deteccion de extremos locales circulares
# ═══════════════════════════════════════════════════════════════════════════
[docs]
def find_local_extrema(v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Encuentra minimos locales (valles) y maximos locales (picos) usando
vecinos circulares.
Un punto i es un maximo local si v[i-1] <= v[i] Y v[i] >= v[i+1],
con indices envolventes circulares. Minimos definidos simetricamente.
Parameters
----------
v : array (n,)
Returns
-------
candL : array int — indices de minimos locales (valles).
candU : array int — indices de maximos locales (picos).
"""
v = np.asarray(v, dtype=np.float64)
n = len(v)
if n < 3:
return np.array([], dtype=int), np.array([], dtype=int)
prev = np.roll(v, 1)
next_ = np.roll(v, -1)
candU = np.where((prev <= v) & (v >= next_))[0] # local maxima
candL = np.where((prev >= v) & (v <= next_))[0] # local minima
return candL, candU
# ═══════════════════════════════════════════════════════════════════════════
# Ajuste isotonico circular unimodal — nucleo Numba
# ═══════════════════════════════════════════════════════════════════════════
@njit(cache=True, fastmath=True)
def _circular_unimodal_fit_numba(
v: np.ndarray, candL: np.ndarray, candU: np.ndarray,
):
"""
Version JIT del bucle (L, U) x PAVA. Devuelve:
(fitted, mse, L_opt, U_opt, ok)
``ok`` es True si se encontro un ajuste valido.
"""
n = v.shape[0]
nL = candL.shape[0]
nU = candU.shape[0]
best_mse = 1e300
best_L = 0
best_U = 0
found = False
best_fit = np.empty(n, dtype=np.float64)
# Buffers reutilizables
ordered_U = np.empty(nU, dtype=np.int64)
inc_buf = np.empty(n + 2, dtype=np.float64)
inc_out = np.empty(n + 2, dtype=np.float64)
full_inc = np.empty(n + 2, dtype=np.float64)
dec_buf = np.empty(n + 2, dtype=np.float64)
dec_out = np.empty(n + 2, dtype=np.float64)
valid_Us = np.empty(nU, dtype=np.int64)
valid_inc_len = np.empty(nU, dtype=np.int64)
valid_inc_mat = np.empty((nU, n + 2), dtype=np.float64)
fitted = np.empty(n, dtype=np.float64)
for li in range(nL):
indL = candL[li]
# Ordenar candU: primero >= indL, luego < indL
k = 0
for j in range(nU):
if candU[j] >= indL:
ordered_U[k] = candU[j]; k += 1
for j in range(nU):
if candU[j] < indL:
ordered_U[k] = candU[j]; k += 1
n_valid = 0
for uj in range(nU):
indU = ordered_U[uj]
# ── _increasing_segment ──
if indL == indU:
mean_v = 0.0
for t in range(n):
mean_v += v[t]
mean_v /= n
mse = 0.0
for t in range(n):
d = v[t] - mean_v
mse += d * d
mse /= n
if mse < best_mse:
best_mse = mse
best_L = indL
best_U = indU
for t in range(n):
best_fit[t] = mean_v
found = True
continue
if indL < indU:
k_interior = indU - indL - 1
if k_interior > 0:
for t in range(k_interior):
inc_buf[t] = v[indL + 1 + t]
else:
k_interior = (n - indL - 1) + indU
if k_interior > 0:
p = 0
for t in range(indL + 1, n):
inc_buf[p] = v[t]; p += 1
for t in range(0, indU):
inc_buf[p] = v[t]; p += 1
inc_len = k_interior + 2
if k_interior > 0:
_pava_inc_range(inc_buf, inc_out, k_interior)
full_inc[0] = v[indL]
for t in range(k_interior):
full_inc[1 + t] = inc_out[t]
full_inc[k_interior + 1] = v[indU]
else:
full_inc[0] = v[indL]
full_inc[1] = v[indU]
# Validez
if inc_len > 2:
if full_inc[1] <= v[indL]:
break
is_valid_U = full_inc[inc_len - 2] < v[indU]
else:
is_valid_U = True
if is_valid_U:
valid_Us[n_valid] = indU
valid_inc_len[n_valid] = inc_len
for t in range(inc_len):
valid_inc_mat[n_valid, t] = full_inc[t]
n_valid += 1
# Pasada hacia atras
for rev in range(n_valid - 1, -1, -1):
indU = valid_Us[rev]
inc_len = valid_inc_len[rev]
if indL == indU:
continue
# ── _decreasing_segment ──
if indU < indL:
k_int = indL - indU - 1
if k_int > 0:
for t in range(k_int):
dec_buf[t] = v[indU + 1 + t]
else:
k_int = (n - indU - 1) + indL
if k_int > 0:
p = 0
for t in range(indU + 1, n):
dec_buf[p] = v[t]; p += 1
for t in range(0, indL):
dec_buf[p] = v[t]; p += 1
if k_int == 0:
is_valid_dec = True
else:
for t in range(k_int):
dec_buf[t] = -dec_buf[t]
_pava_inc_range(dec_buf, dec_out, k_int)
for t in range(k_int):
dec_out[t] = -dec_out[t]
if dec_out[k_int - 1] < v[indL]:
break
is_valid_dec = dec_out[0] <= v[indU]
if not is_valid_dec:
continue
# ── _assemble ──
if indL <= indU:
for t in range(inc_len):
fitted[indL + t] = valid_inc_mat[rev, t]
else:
p = 0
for t in range(indL, n):
fitted[t] = valid_inc_mat[rev, p]; p += 1
for t in range(0, indU + 1):
fitted[t] = valid_inc_mat[rev, p]; p += 1
if k_int > 0:
if indU < indL:
for t in range(k_int):
fitted[indU + 1 + t] = dec_out[t]
else:
p = 0
for t in range(indU + 1, n):
fitted[t] = dec_out[p]; p += 1
for t in range(0, indL):
fitted[t] = dec_out[p]; p += 1
# MSE
mse = 0.0
for t in range(n):
d = v[t] - fitted[t]
mse += d * d
mse /= n
if mse < best_mse:
best_mse = mse
best_L = indL
best_U = indU
for t in range(n):
best_fit[t] = fitted[t]
found = True
if best_mse == 0.0:
break
return best_fit, best_mse, best_L, best_U, found
# ═══════════════════════════════════════════════════════════════════════════
# Ajuste isotonico circular unimodal — API Python
# ═══════════════════════════════════════════════════════════════════════════
def _increasing_segment(
v: np.ndarray, v2: np.ndarray,
indL: int, indU: int, n: int,
) -> tuple[np.ndarray, bool] | None:
"""
PAVA creciente de indL a indU (circular).
Devuelve (full_inc, is_valid_U) o None si no hay mas U's validos.
"""
if indL == indU:
return np.full(n, v.mean()), True
if indL < indU:
k_interior = indU - indL - 1
interior_vals = v[indL + 1 : indU] if k_interior > 0 else np.array([], dtype=np.float64)
else:
k_interior = (n - indL - 1) + indU
interior_vals = v2[indL + 1 : indL + 1 + k_interior] if k_interior > 0 else np.array([], dtype=np.float64)
if len(interior_vals) > 0:
inc_fit = pava_increasing(interior_vals)
full_inc = np.empty(len(interior_vals) + 2, dtype=np.float64)
full_inc[0] = v[indL]
full_inc[1:-1] = inc_fit
full_inc[-1] = v[indU]
else:
full_inc = np.array([v[indL], v[indU]], dtype=np.float64)
if len(full_inc) > 2:
if full_inc[1] <= v[indL]:
return None
is_valid = full_inc[-2] < v[indU]
else:
is_valid = True
return full_inc, is_valid
def _decreasing_segment(
v: np.ndarray, v2: np.ndarray,
indL: int, indU: int, n: int,
) -> tuple[np.ndarray, bool] | None:
"""
PAVA decreciente de indU a indL (circular, interior).
Devuelve (dec_fit, is_valid) o None si no hay mas U's validos.
"""
if indU < indL:
k_interior = indL - indU - 1
interior_vals = v[indU + 1 : indL] if k_interior > 0 else np.array([], dtype=np.float64)
else:
k_interior = (n - indU - 1) + indL
interior_vals = v2[indU + 1 : indU + 1 + k_interior] if k_interior > 0 else np.array([], dtype=np.float64)
if len(interior_vals) == 0:
return np.array([], dtype=np.float64), True
dec_fit = pava_decreasing(interior_vals)
if dec_fit[-1] < v[indL]:
return None
is_valid = dec_fit[0] <= v[indU]
return dec_fit, is_valid
def _assemble(
v: np.ndarray,
indL: int, indU: int,
full_inc: np.ndarray,
dec_fit: np.ndarray,
n: int,
) -> np.ndarray:
"""Coloca arcos creciente y decreciente en un array de longitud n."""
fitted = np.empty(n, dtype=np.float64)
if indL <= indU:
inc_pos = np.arange(indL, indU + 1)
else:
inc_pos = np.concatenate([np.arange(indL, n), np.arange(0, indU + 1)])
fitted[inc_pos] = full_inc
if len(dec_fit) > 0:
if indU < indL:
dec_pos = np.arange(indU + 1, indL)
else:
dec_pos = np.concatenate([np.arange(indU + 1, n), np.arange(0, indL)])
fitted[dec_pos] = dec_fit
return fitted
[docs]
def circular_unimodal_fit(
v: np.ndarray,
) -> tuple[np.ndarray, float, int, int] | None:
"""
Ajusta una regresion isotonica circular unimodal (subida-bajada).
Busca sobre todos los pares candidatos (valle, pico) — minimos y
maximos locales de *v* — para encontrar el par (L, U) que produce
el menor MSE al ajustar un PAVA creciente de L->U y un PAVA
decreciente de U->L (circularmente).
Parameters
----------
v : array (n,)
Valores de expresion genica ordenados por tiempo circular.
Returns
-------
fitted : array (n,) — valores ajustados.
mse : float — error cuadratico medio.
L_opt : int — indice del valle optimo (base 0).
U_opt : int — indice del pico optimo (base 0).
None si no se encuentra un ajuste valido.
"""
v = np.asarray(v, dtype=np.float64)
n = len(v)
if n == 0:
return None
candL, candU = find_local_extrema(v)
if len(candL) == 0 or len(candU) == 0:
fitted = np.full(n, v.mean())
mse = float(np.mean((v - fitted) ** 2))
return fitted, mse, 0, 0
# ── Ruta rapida Numba ────────────────────────────────────────────────
if _HAS_NUMBA:
fitted, mse, L_opt, U_opt, ok = _circular_unimodal_fit_numba(
v,
np.ascontiguousarray(candL.astype(np.int64)),
np.ascontiguousarray(candU.astype(np.int64)),
)
if ok:
return fitted, float(mse), int(L_opt), int(U_opt)
return None
# ── Fallback Python puro ─────────────────────────────────────────────
v2 = np.concatenate([v, v])
best_mse = np.inf
best_fitted = None
best_L = 0
best_U = 0
for indL in candL:
mask = candU >= indL
ordered_U = np.concatenate([candU[mask], candU[~mask]])
valid_Us: list[int] = []
valid_incs: list[np.ndarray] = []
for indU in ordered_U:
inc_result = _increasing_segment(v, v2, indL, indU, n)
if inc_result is None:
break
full_inc, is_valid_U = inc_result
if is_valid_U:
valid_Us.append(indU)
valid_incs.append(full_inc)
for idx_rev, indU in enumerate(reversed(valid_Us)):
j = len(valid_Us) - 1 - idx_rev
full_inc = valid_incs[j]
if indL == indU:
fitted = full_inc
mse = float(np.mean((v - fitted) ** 2))
if mse < best_mse:
best_mse, best_fitted = mse, fitted
best_L, best_U = indL, indU
continue
dec_result = _decreasing_segment(v, v2, indL, indU, n)
if dec_result is None:
break
dec_fit, is_valid = dec_result
if not is_valid:
continue
fitted = _assemble(v, indL, indU, full_inc, dec_fit, n)
mse = float(np.mean((v - fitted) ** 2))
if mse < best_mse:
best_mse, best_fitted = mse, fitted
best_L, best_U = indL, indU
if best_mse == 0.0:
break
if best_fitted is None:
return None
return best_fitted, best_mse, best_L, best_U
# ═══════════════════════════════════════════════════════════════════════════
# ORIModel — implementacion de RhythmModel
# ═══════════════════════════════════════════════════════════════════════════
[docs]
class ORIModel(RhythmModel):
"""
Modelo de ritmicidad no parametrica basado en regresion isotonica
circular unimodal (ORI / ORIOS).
A diferencia de Cosinor y FMM, este modelo no asume una forma
de onda parametrica. Busca la mejor forma subida-bajada (unimodal)
compatible con restricciones de monotonia impuestas por PAVA.
Los "parametros" del modelo son los indices del valle (L_opt)
y pico (U_opt) optimos; el R2 compara contra el modelo plano.
Example
-------
>>> import numpy as np
>>> from circust.fitting.ori import ORIModel
>>>
>>> t = np.linspace(0, 2*np.pi, 50, endpoint=False)
>>> data = 0.3 + 0.6 * np.cos(t - 1.2) + np.random.normal(0, 0.05, 50)
>>> result = ORIModel().fit(data, t)
>>> print(result.r2)
>>> print(result.summary())
"""
[docs]
@staticmethod
def peak_time(U_opt: int, time_points: np.ndarray) -> float:
"""
Tiempo de pico del modelo ORI.
El pico se localiza en la posicion U_opt del vector ordenado
circularmente. El tiempo de pico es ``time_points[U_opt]``.
"""
if len(time_points) == 0:
return 0.0
return float(time_points[U_opt % len(time_points)])
[docs]
def fit(
self,
data: np.ndarray,
time_points: np.ndarray,
) -> FitResult:
"""
Ajusta el modelo ORI a ``data``.
Parameters
----------
data : np.ndarray, forma (n_muestras,)
Valores de expresion ordenados por el tiempo circular
preliminar.
time_points : np.ndarray, forma (n_muestras,)
Eje temporal circular en [0, 2*pi), producido por CPCA.
Returns
-------
FitResult
Claves de ``params``: ``L_opt``, ``U_opt``.
- L_opt : indice del valle optimo (base 0)
- U_opt : indice del pico optimo (base 0)
``peak_time`` = ``time_points[U_opt]``.
``r2`` = R2_ORI = 1 - MSE_unimodal / MSE_plano.
"""
data = np.asarray(data, dtype=np.float64)
time_points = np.asarray(time_points, dtype=np.float64)
n = len(data)
# ── Ajuste unimodal circular (PAVA) ───────────────────────────
result = circular_unimodal_fit(data)
if result is not None:
fitted, _, L_opt, U_opt = result
else:
fitted = np.full(n, data.mean())
L_opt = 0
U_opt = 0
# ── Residuos y estadisticos (metodos heredados de RhythmModel) ─
residuals = data - fitted
residuals_std = self._standardise_residuals(residuals)
r2 = self._r2(data, fitted)
sse = float(np.sum(residuals ** 2))
pk = ORIModel.peak_time(U_opt, time_points)
return FitResult(
fitted = fitted,
params = {"L_opt": int(L_opt), "U_opt": int(U_opt)},
peak_time = pk,
r2 = r2,
residuals = residuals,
residuals_std = residuals_std,
sse = sse,
model_name = "ori",
)