Source code for circust.preprocessing

# preprocessing.py  ── seccion de carga y preprocesamiento
import pathlib
from typing import Optional
from dataclasses import dataclass, field
from scipy.stats import median_abs_deviation

import pandas as pd
import numpy as np


# ── formatos soportados ───────────────────────────────────────────────────
_LOADERS = {
    ".csv":  "csv",
    ".tsv":  "tsv",
    ".txt":  "tsv",       # se asume separado por tabuladores cuando .txt
    ".xlsx": "excel",
    ".xls":  "excel",
    ".parquet": "parquet",
}


[docs] def load_expression_matrix( path: str, gene_column: Optional[str] = None, chunk_size: Optional[int] = None, ) -> pd.DataFrame: """ Carga una matriz de expresion genica desde disco en un DataFrame de pandas. Forma esperada tras la carga ---------------------------- - Filas = genes (indice de fila = simbolos genicos, ej. "ARNTL", "PER1") - Columnas = muestras (cada columna = un individuo/muestra) - Valores = conteos de expresion brutos (float); ceros y NaN permitidos Formatos soportados ------------------- .csv Valores separados por comas .tsv / .txt Valores separados por tabuladores .xlsx / .xls Libro Excel (se lee la primera hoja) .parquet Apache Parquet — recomendado para conjuntos grandes Parameters ---------- path : str Ruta al archivo. El formato se detecta por la extension. gene_column : str, opcional Nombre de la columna que contiene los simbolos genicos. Especificar explicitamente si el archivo tiene un layout no estandar, ej. gene_column="gene_id". chunk_size : int, opcional Solo para archivos .parquet. Numero de row-groups a leer por vez. Util cuando el archivo es mas grande que la RAM disponible. Si None (defecto), se carga el archivo completo de una vez. Ignorado para CSV/TSV/Excel. Returns ------- pd.DataFrame Matriz con simbolos genicos como indice de fila (index.name = "gene_symbol") e IDs de muestra como nombres de columna. Todos los valores son float64. Celdas que no se pudieron parsear como numeros se establecen a NaN. Raises ------ FileNotFoundError Si no existe archivo en ``path``. ValueError Si la extension del archivo no esta en la lista soportada. ImportError Si se lee un .parquet y pyarrow no esta instalado. Example ------- Cargar un CSV estandar: >>> df = load_expression_matrix("data/raw/gtex_brain.csv") >>> df.shape (56200, 479) >>> df.index[:3].tolist() ['ARNTL', 'PER1', 'CRY1'] Cargar un parquet grande con lectura por bloques: >>> df = load_expression_matrix( ... "data/raw/gtex_brain.parquet", ... chunk_size=10_000, ... ) """ file_path = pathlib.Path(path) # ── verificacion de existencia ──────────────────────────────────────── if not file_path.exists(): raise FileNotFoundError( f"Archivo de matriz de expresion no encontrado: '{path}'\n" f"Verifica la ruta y asegurate de que el archivo esta en la carpeta data/." ) # ── deteccion de formato ────────────────────────────────────────────── extension = file_path.suffix.lower() fmt = _LOADERS.get(extension) if fmt is None: supported = ", ".join(_LOADERS.keys()) raise ValueError( f"Formato de archivo no soportado: '{extension}'\n" f"Formatos soportados: {supported}" ) # ── que columna se convierte en el indice de fila? ──────────────────── # index_col=0 significa "usar la primera columna como indice de fila" # Layout estandar: primera columna = nombres de genes, # columnas restantes = muestras. # Si el usuario conoce su columna de genes por nombre, usar esa. if fmt in ("csv", "tsv", "excel"): index_col = gene_column if gene_column is not None else 0 elif fmt == "parquet": if gene_column is None: raise ValueError( "Para archivos Parquet debes especificar el nombre de la columna " "que contiene los identificadores de genes mediante el parametro 'gene_column'." ) index_col = gene_column else: # no deberia ocurrir porque ya validamos fmt index_col = None # ── carga ───────────────────────────────────────────────────────────── if fmt == "csv": matrix = pd.read_csv(file_path, index_col=index_col) elif fmt == "tsv": matrix = pd.read_csv(file_path, sep="\t", index_col=index_col) elif fmt == "excel": matrix = pd.read_excel(file_path, index_col=index_col) elif fmt == "parquet": matrix = _load_parquet(file_path, index_col=index_col, chunk_size=chunk_size) # ── forzar valores numericos ────────────────────────────────────────── # Algunos Excel o CSV mal formateados contienen cadenas sueltas. # errors="coerce" las convierte en NaN en lugar de fallar. # Los archivos Parquet ya estan tipados asi que esto es un no-op para ellos. matrix = matrix.apply(pd.to_numeric, errors="coerce") # ── nombrar el indice ───────────────────────────────────────────────── matrix.index.name = "gene_symbol" return matrix
def _load_parquet( file_path: pathlib.Path, index_col, chunk_size: Optional[int], ) -> pd.DataFrame: try: import pyarrow.parquet as pq except ImportError: raise ImportError( "Leer archivos parquet requiere pyarrow.\n" "Instalalo con: pip install pyarrow" ) parquet_file = pq.ParquetFile(file_path) if chunk_size is None: # Usar pandas directamente — restaura correctamente el indice guardado matrix = pd.read_parquet(file_path) else: chunks = [] for batch in parquet_file.iter_batches(batch_size=chunk_size): chunks.append(batch.to_pandas()) matrix = pd.concat(chunks, ignore_index=False) # Solo establecer indice manualmente si no se restauro automaticamente # (es decir, el archivo se escribio SIN un indice con nombre) if matrix.index.name != "gene_symbol": if isinstance(index_col, str) and index_col in matrix.columns: matrix = matrix.set_index(index_col) elif isinstance(index_col, int) and index_col < len(matrix.columns): matrix = matrix.set_index(matrix.columns[index_col]) return matrix
[docs] @dataclass class PreprocessingResult: """ Todos los resultados producidos por :class:`Preprocessor`. Attributes ---------- expr_norm : pd.DataFrame Matriz de expresion normalizada, genes x muestras, valores en [-1, 1]. Es la matriz que consume cada paso (CPCA, FMM). expr_raw : pd.DataFrame Matriz bruta filtrada despues de eliminar genes malos pero antes de normalizar. Util para depuracion y validacion. dropped_sparse : List[str] Simbolos genicos eliminados porque > zero_threshold de muestras eran cero o NaN. dropped_duplicates : List[str] Simbolos genicos eliminados durante la resolucion de duplicados (filas con menor MAD). n_genes_in : int Numero de genes que entraron al preprocesador. n_genes_out : int Numero de genes que sobrevivieron a todos los pasos de filtrado. n_samples : int Numero de muestras. No cambia nunca durante el preprocesamiento. """ expr_norm: pd.DataFrame expr_raw: pd.DataFrame dropped_sparse: list[str] = field(default_factory=list) dropped_duplicates: list[str] = field(default_factory=list) n_genes_in: int = 0 n_genes_out: int = 0 n_samples: int = 0
[docs] def summary(self) -> str: """Devuelve un resumen legible de lo ocurrido durante el preprocesamiento.""" lines = [ "=== Resumen del Preprocesamiento ===", f" Genes entrada : {self.n_genes_in}", f" Genes salida : {self.n_genes_out}", f" Muestras : {self.n_samples}", f" Eliminados (sparse) : {len(self.dropped_sparse)}", f" Eliminados (duplicados): {len(self.dropped_duplicates)}", ] if self.dropped_sparse: shown = self.dropped_sparse[:5] tail = f" ... (+{len(self.dropped_sparse)-5} mas)" if len(self.dropped_sparse) > 5 else "" lines.append(f" genes sparse : {shown}{tail}") if self.dropped_duplicates: unique_dupes = sorted(set(self.dropped_duplicates))[:5] tail = f" ... (+{len(set(self.dropped_duplicates))-5} mas)" if len(set(self.dropped_duplicates)) > 5 else "" lines.append(f" genes duplicados : {unique_dupes}{tail}") return "\n".join(lines)
[docs] def normalise_matrix( mat: pd.DataFrame, norm_min: float = -1.0, norm_max: float = 1.0, ) -> pd.DataFrame: """ Normaliza min-max cada fila de ``mat`` al intervalo [norm_min, norm_max]. Filas constantes (min == max) se mapean a todos ceros. Parameters ---------- mat : pd.DataFrame Matriz genes × muestras. Valores pueden ser float o int. norm_min, norm_max : float Extremos del rango de salida. Por defecto [-1, 1]. Returns ------- pd.DataFrame con la misma forma e indices que ``mat``. """ values = mat.values.astype(float) row_min = values.min(axis=1, keepdims=True) row_max = values.max(axis=1, keepdims=True) span = row_max - row_min with np.errstate(invalid="ignore", divide="ignore"): normed = np.where( span == 0, 0.0, (norm_max - norm_min) * ((values - row_min) / span) + norm_min, ) return pd.DataFrame(normed, index=mat.index, columns=mat.columns)
[docs] class Preprocessor: """ Limpia y normaliza una matriz de expresion genica bruta. 1. Eliminar genes sin nombre (NA rownames en R) 2. Eliminar genes sparse (> ``sparse_threshold`` de muestras son cero O NaN) 3. Resolver nombres de genes duplicados — conservar la fila con mayor MAD 4. Normalizar cada gen a [-1, 1] Parameters ---------- sparse_threshold : float Fraccion de muestras que pueden ser cero o NaN antes de que un gen se elimine. La comparacion es estricta (>), asi que un gen exactamente en el umbral se conserva. zero_threshold : float o None Override para el umbral solo de ceros. Si None (defecto), usa ``sparse_threshold`` para ceros. nan_threshold : float o None Override para el umbral solo de NaN. Si None (defecto), usa ``sparse_threshold`` para NaN. verbose : bool Si True, imprime mensajes de progreso tras cada paso. Example ------- >>> import pandas as pd >>> import numpy as np >>> >>> rng = np.random.default_rng(0) >>> mat = pd.DataFrame( ... rng.poisson(10, size=(200, 60)).astype(float), ... index=[f"GENE_{i}" for i in range(200)], ... columns=[f"sample_{j}" for j in range(60)], ... ) >>> >>> result = Preprocessor().run(mat) >>> result.expr_norm.values.min() >= -1.0 True >>> result.expr_norm.values.max() <= 1.0 True """ def __init__( self, sparse_threshold: float = 0.3, zero_threshold: Optional[float] = None, nan_threshold: Optional[float] = None, norm_min: float = -1.0, norm_max: float = 1.0, verbose: bool = True, ) -> None: if not 0.0 < sparse_threshold < 1.0: raise ValueError( f"sparse_threshold debe estar entre 0 y 1, se recibio {sparse_threshold}" ) self.sparse_threshold = sparse_threshold self.zero_threshold = zero_threshold if zero_threshold is not None else sparse_threshold self.nan_threshold = nan_threshold if nan_threshold is not None else sparse_threshold self.norm_min = norm_min self.norm_max = norm_max self.verbose = verbose
[docs] def run(self, matrix: pd.DataFrame) -> PreprocessingResult: """ Ejecuta los cuatro pasos de preprocesamiento en orden. Parameters ---------- matrix : pd.DataFrame Matriz de expresion bruta cargada por ``load_expression_matrix()``. Filas = genes, columnas = muestras. Puede contener ceros y NaN. Returns ------- PreprocessingResult """ if matrix.empty: raise ValueError("La matriz de entrada esta vacia.") n_genes_in, n_samples = matrix.shape self._log(f"Matriz de entrada: {n_genes_in} genes x {n_samples} muestras") # paso 1 — eliminar genes sin nombre mat = self._drop_unnamed(matrix) # paso 2 — eliminar genes sparse mat, dropped_sparse = self._drop_sparse(mat) # paso 3 — resolver nombres de genes duplicados mat, dropped_duplicates = self._resolve_duplicates(mat) # paso 4 — normalizar cada gen a [-1, 1] expr_norm = normalise_matrix(mat, self.norm_min, self.norm_max) result = PreprocessingResult( expr_norm = expr_norm, expr_raw = mat, dropped_sparse = dropped_sparse, dropped_duplicates = dropped_duplicates, n_genes_in = n_genes_in, n_genes_out = len(expr_norm), n_samples = n_samples, ) self._log(result.summary()) return result
def _drop_unnamed(self, mat: pd.DataFrame) -> pd.DataFrame: """ Elimina filas cuyo simbolo genico falta o esta efectivamente vacio. - NaN real en el indice (R: NA rowname) - Cadena vacia ``""`` - Cadena de solo espacios ``" "`` - La cadena literal ``"nan"`` (ocurre cuando NaN se convierte a str) """ index_str = mat.index.astype(str) is_invalid = ( mat.index.isna() # NaN real | (index_str.str.strip() == "") # vacio / solo espacios | (index_str.str.lower() == "nan") # NaN convertido a cadena ) n_bad = int(is_invalid.sum()) if n_bad > 0: self._log(f" Paso 1 — eliminados {n_bad} gen(es) sin nombre") else: self._log(" Paso 1 — no se encontraron genes sin nombre") return mat.loc[~is_invalid] def _drop_sparse(self, mat: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]: """ Elimina genes donde mas de zero_threshold de muestras son cero O NaN. Nota: la condicion es estrictamente > asi que un gen exactamente en el 30% se conserva. Preservado aqui: (frac > threshold), no (frac >= threshold). """ # fraccion de ceros y NaN por gen (por fila) zero_frac = (mat == 0).mean(axis=1) nan_frac = mat.isna().mean(axis=1) is_sparse = (zero_frac > self.zero_threshold) | (nan_frac > self.nan_threshold) dropped = mat.index[is_sparse].tolist() # replica el print exacto de R (linea 3925) self._log( f" Paso 2 — eliminados {len(dropped)} gen(es) con mas del " f"{int(self.zero_threshold * 100)}% de ceros o NaN" ) return mat.loc[~is_sparse].copy(), dropped def _resolve_duplicates(self, mat: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]: """ Para cada nombre de gen que aparece mas de una vez, conserva la fila con mayor MAD y descarta el resto. """ # encontrar nombres de genes que aparecen mas de una vez duplicated_mask = mat.index.duplicated(keep=False) duplicated_names = mat.index[duplicated_mask].unique().tolist() if not duplicated_names: self._log(" Paso 3 — no se encontraron nombres de genes duplicados") return mat, [] self._log( f" Paso 3 — resolviendo {len(duplicated_names)} " f"nombre(s) de gen duplicado(s)" ) # recopilar posiciones enteras de filas a eliminar rows_to_drop: list[int] = [] dropped_names: list[str] = [] for gene in duplicated_names: # np.where da posiciones enteras — necesario porque dos filas # comparten la misma etiqueta, asi que .loc devolveria ambas positions = np.where(mat.index == gene)[0] # calcular MAD para cada fila duplicada usando scale=1 para replicar R, con default seria "1.482" mads = np.array([ float(median_abs_deviation(mat.iloc[pos].values, scale=1)) for pos in positions ]) # conservar la fila con mayor MAD, eliminar el resto best_position = positions[int(np.argmax(mads))] loser_positions = [p for p in positions if p != best_position] rows_to_drop.extend(loser_positions) dropped_names.extend([gene] * len(loser_positions)) # construir mascara booleana de conservacion y aplicar keep_mask = np.ones(len(mat), dtype=bool) keep_mask[rows_to_drop] = False return mat.iloc[keep_mask].copy(), dropped_names # ----------------------------------------------------------------------- # Utilidades # ----------------------------------------------------------------------- def _log(self, message: str) -> None: """Imprime solo cuando verbose=True.""" if self.verbose: print(message)