Skip to content

simace.core

Core infrastructure: utilities, pedigree graph, hazard models, and CLI helpers.

schema

simace.core.schema

Schema contracts for the phenotype → censor → sample handoff.

Each pipeline stage produces a DataFrame whose shape the next stage relies on:

PEDIGREE — output of simulate / dropout PHENOTYPE — output of run_phenotype (PEDIGREE + raw event times) CENSORED — output of run_censor / run_sample (PHENOTYPE + censoring cols)

Dtypes are checked at the coarse numpy.dtype.kind level (i integer, f float, b bool). This tolerates the int32/int8/float32 narrowing applied by the parquet writer at save time without losing the contract.

assert_schema

assert_schema(df, schema, *, where)

Verify df carries every column in schema with a compatible dtype kind.

PARAMETER DESCRIPTION
df

DataFrame to check.

TYPE: DataFrame

schema

Mapping of required column name → allowed numpy.dtype.kind characters (e.g. "f" for float, "iu" for any integer).

TYPE: Mapping[str, str]

where

Stage label included in the error message (e.g. "censor input") so failures pinpoint the offending boundary.

TYPE: str

RAISES DESCRIPTION
ValueError

If columns are missing or have an unexpected dtype kind. Extra columns are allowed — stages are free to pass through additional fields.

Source code in simace/core/schema.py
def assert_schema(df: pd.DataFrame, schema: Mapping[str, str], *, where: str) -> None:
    """Verify ``df`` carries every column in ``schema`` with a compatible dtype kind.

    Args:
        df: DataFrame to check.
        schema: Mapping of required column name → allowed ``numpy.dtype.kind``
            characters (e.g. ``"f"`` for float, ``"iu"`` for any integer).
        where: Stage label included in the error message (e.g.
            ``"censor input"``) so failures pinpoint the offending boundary.

    Raises:
        ValueError: If columns are missing or have an unexpected dtype kind.
            Extra columns are allowed — stages are free to pass through
            additional fields.
    """
    missing = [c for c in schema if c not in df.columns]
    if missing:
        raise ValueError(f"{where}: missing required columns {missing}")

    bad: list[str] = []
    for col, kinds in schema.items():
        actual = df[col].dtype.kind
        if actual not in kinds:
            bad.append(f"{col}={df[col].dtype.name} (expected kind in {kinds!r})")
    if bad:
        raise ValueError(f"{where}: dtype mismatch — {'; '.join(bad)}")

parquet

simace.core.parquet

Parquet writer with pedigree-aware dtype narrowing.

save_parquet

save_parquet(df, path, **kwargs)

Save DataFrame as parquet with optimized dtypes and zstd compression.

Calls _optimize_dtypes before writing to minimize file size.

PARAMETER DESCRIPTION
df

DataFrame to save.

TYPE: DataFrame

path

Output file path.

TYPE: Any

**kwargs

Extra keyword arguments passed to DataFrame.to_parquet.

TYPE: Any DEFAULT: {}

Source code in simace/core/parquet.py
def save_parquet(df: pd.DataFrame, path: Any, **kwargs: Any) -> None:
    """Save DataFrame as parquet with optimized dtypes and zstd compression.

    Calls ``_optimize_dtypes`` before writing to minimize file size.

    Args:
        df: DataFrame to save.
        path: Output file path.
        **kwargs: Extra keyword arguments passed to ``DataFrame.to_parquet``.
    """
    _optimize_dtypes(df)
    df.to_parquet(path, index=False, compression="zstd", **kwargs)

yaml_io

simace.core.yaml_io

YAML serialization helpers: numpy → Python conversion, fast loader, and file I/O wrappers.

to_native

to_native(obj)

Recursively convert numpy types to native Python types for YAML serialization.

PARAMETER DESCRIPTION
obj

Value or nested structure (dict, list, ndarray, numpy scalar).

TYPE: Any

RETURNS DESCRIPTION
Any

Equivalent structure with all numpy types replaced by Python builtins.

Source code in simace/core/yaml_io.py
def to_native(obj: Any) -> Any:
    """Recursively convert numpy types to native Python types for YAML serialization.

    Args:
        obj: Value or nested structure (dict, list, ndarray, numpy scalar).

    Returns:
        Equivalent structure with all numpy types replaced by Python builtins.
    """
    if isinstance(obj, dict):
        return {k: to_native(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [to_native(v) for v in obj]
    if isinstance(obj, np.ndarray):
        return to_native(obj.tolist())
    if isinstance(obj, (np.integer, np.int64, np.int32)):
        return int(obj)
    if isinstance(obj, (np.floating, np.float64, np.float32)):
        return float(obj)
    if isinstance(obj, (np.bool_, bool)):
        return bool(obj)
    return obj

yaml_loader

yaml_loader()

Return the fastest available YAML SafeLoader (CSafeLoader if present).

Source code in simace/core/yaml_io.py
def yaml_loader() -> type:
    """Return the fastest available YAML SafeLoader (``CSafeLoader`` if present)."""
    return _LOADER

load_yaml

load_yaml(path)

Load a YAML file using the fastest available SafeLoader.

Source code in simace/core/yaml_io.py
def load_yaml(path: str | Path) -> Any:
    """Load a YAML file using the fastest available SafeLoader."""
    with open(path, encoding="utf-8") as fh:
        return yaml.load(fh, Loader=_LOADER)

dump_yaml

dump_yaml(obj, path, *, sort_keys=False)

Dump obj to path as YAML, normalizing numpy types via to_native.

Source code in simace/core/yaml_io.py
def dump_yaml(obj: Any, path: str | Path, *, sort_keys: bool = False) -> None:
    """Dump ``obj`` to ``path`` as YAML, normalizing numpy types via ``to_native``."""
    with open(path, "w", encoding="utf-8") as fh:
        yaml.dump(to_native(obj), fh, default_flow_style=False, sort_keys=sort_keys)

numerics

simace.core.numerics

Numerical helpers: safe and numba-accelerated correlation/regression.

safe_corrcoef

safe_corrcoef(x, y)

Compute Pearson correlation, returning nan if either array has zero variance.

PARAMETER DESCRIPTION
x

First array of observations.

TYPE: ndarray

y

Second array of observations, same length as x.

TYPE: ndarray

RETURNS DESCRIPTION
float

Pearson correlation coefficient, or nan if variance is near-zero.

Source code in simace/core/numerics.py
def safe_corrcoef(x: np.ndarray, y: np.ndarray) -> float:
    """Compute Pearson correlation, returning nan if either array has zero variance.

    Args:
        x: First array of observations.
        y: Second array of observations, same length as *x*.

    Returns:
        Pearson correlation coefficient, or nan if variance is near-zero.
    """
    if np.std(x) < _ZERO_VAR_THRESHOLD or np.std(y) < _ZERO_VAR_THRESHOLD:
        return float("nan")
    return float(_pearsonr_core(x, y))

safe_linregress

safe_linregress(x, y)

Run linear regression, returning None if x has zero variance.

PARAMETER DESCRIPTION
x

Independent variable array.

TYPE: ndarray

y

Dependent variable array, same length as x.

TYPE: ndarray

RETURNS DESCRIPTION
Any

scipy.stats.LinregressResult or None if x has near-zero variance.

Source code in simace/core/numerics.py
def safe_linregress(x: np.ndarray, y: np.ndarray) -> Any:
    """Run linear regression, returning None if x has zero variance.

    Args:
        x: Independent variable array.
        y: Dependent variable array, same length as *x*.

    Returns:
        ``scipy.stats.LinregressResult`` or None if *x* has near-zero variance.
    """
    if np.std(x) < _ZERO_VAR_THRESHOLD:
        return None
    return stats.linregress(x, y)

fast_linregress

fast_linregress(x, y)

Fast linear regression via numba-accelerated core.

PARAMETER DESCRIPTION
x

Independent variable array.

TYPE: ndarray

y

Dependent variable array, same length as x.

TYPE: ndarray

RETURNS DESCRIPTION
tuple[float, float, float, float, float]

Tuple of (slope, intercept, r, stderr, pvalue).

Source code in simace/core/numerics.py
def fast_linregress(x: np.ndarray, y: np.ndarray) -> tuple[float, float, float, float, float]:
    """Fast linear regression via numba-accelerated core.

    Args:
        x: Independent variable array.
        y: Dependent variable array, same length as *x*.

    Returns:
        Tuple of (slope, intercept, r, stderr, pvalue).
    """
    slope, intercept, r, stderr, t_stat = _linregress_core(x, y)
    pvalue = float(2.0 * _t_sf(abs(t_stat), len(x) - 2))
    return float(slope), float(intercept), float(r), float(stderr), pvalue

fast_pearsonr

fast_pearsonr(x, y)

Compute Pearson r with two-sided p-value via numba-accelerated core.

PARAMETER DESCRIPTION
x

First array of observations.

TYPE: ndarray

y

Second array of observations, same length as x.

TYPE: ndarray

RETURNS DESCRIPTION
tuple[float, float]

Tuple of (correlation, p-value).

Source code in simace/core/numerics.py
def fast_pearsonr(x: np.ndarray, y: np.ndarray) -> tuple[float, float]:
    """Compute Pearson r with two-sided p-value via numba-accelerated core.

    Args:
        x: First array of observations.
        y: Second array of observations, same length as *x*.

    Returns:
        Tuple of (correlation, p-value).
    """
    r = float(_pearsonr_core(x, y))
    n = len(x)
    denom = 1.0 - r * r
    if denom < 1e-30 or n <= 2:
        return r, 0.0
    t_stat = r * np.sqrt((n - 2) / denom)
    pvalue = float(2.0 * _t_sf(abs(t_stat), n - 2))
    return r, pvalue

relationships

simace.core.relationships

Relationship-pair and sex vocabulary used across stats and plotting.

compute_hazard_terms

simace.core.compute_hazard_terms

Baseline hazard computation for parametric survival models.

compute_hazard_terms

compute_hazard_terms(model, t, params)

Compute log-baseline-hazard and cumulative baseline hazard.

Returns (const, H_base) where: const = log h0(t) — event term: delta * (const + betaL) H_base = H0(t) — survival term: H_base * exp(betaL)

Supported models and their required params

"weibull" : {"scale": s, "rho": rho} "exponential" : {"rate": lam} or {"scale": s} "gompertz" : {"rate": b, "gamma": g} "lognormal" : {"mu": mu, "sigma": sigma} "loglogistic" : {"scale": alpha, "shape": k} "gamma" : {"shape": k, "scale": theta} "first_passage": {"drift": mu, "shape": lam}

RAISES DESCRIPTION
ValueError

unknown model name or missing required parameter.

Source code in simace/core/compute_hazard_terms.py
def compute_hazard_terms(
    model: str,
    t: np.ndarray,
    params: dict[str, float],
) -> tuple[np.ndarray, np.ndarray]:
    """Compute log-baseline-hazard and cumulative baseline hazard.

    Returns (const, H_base) where:
        const  = log h0(t)   — event term:    delta * (const + beta*L)
        H_base = H0(t)       — survival term: H_base * exp(beta*L)

    Supported models and their required params:
        "weibull"     : {"scale": s, "rho": rho}
        "exponential" : {"rate": lam}  or  {"scale": s}
        "gompertz"    : {"rate": b, "gamma": g}
        "lognormal"   : {"mu": mu, "sigma": sigma}
        "loglogistic" : {"scale": alpha, "shape": k}
        "gamma"       : {"shape": k, "scale": theta}
        "first_passage": {"drift": mu, "shape": lam}

    Raises:
        ValueError: unknown model name or missing required parameter.
    """
    t = np.asarray(t, dtype=np.float64)

    if model == "weibull":
        s, rho = params["scale"], params["rho"]
        log_t = np.log(t)
        const = np.log(rho) - rho * np.log(s) + (rho - 1) * log_t
        H_base = np.exp(rho * (log_t - np.log(s)))

    elif model == "exponential":
        if "rate" in params:
            lam = params["rate"]
        elif "scale" in params:
            lam = 1.0 / params["scale"]
        else:
            raise ValueError("exponential: need 'rate' or 'scale'")
        const = np.full_like(t, np.log(lam))
        H_base = lam * t

    elif model == "gompertz":
        # h0(t)=b·exp(g·t),  H0(t)=(b/g)·(exp(g·t)−1)  [expm1 stable near 0]
        b, g = params["rate"], params["gamma"]
        const = np.log(b) + g * t
        H_base = b * t if abs(g) < 1e-12 else (b / g) * np.expm1(g * t)

    elif model == "lognormal":
        # H0(t)=−log S0(t)=−norm.logsf(z),  z=(log t−mu)/sigma
        mu, sigma = params["mu"], params["sigma"]
        log_t = np.log(t)
        z = (log_t - mu) / sigma
        log_S0 = norm.logsf(z)  # stable complementary log-CDF
        log_f0 = -0.5 * z**2 - 0.5 * np.log(2 * np.pi) - np.log(sigma) - log_t
        const = log_f0 - log_S0
        H_base = -log_S0

    elif model == "loglogistic":
        # H0(t)=log(1+(t/α)^k)=log1p(exp(u)), u=k·log(t/α)
        # LSE trick: for large u, log1p(exp(u))≈u
        alpha, k = params["scale"], params["shape"]
        u = k * (np.log(t) - np.log(alpha))
        H_base = np.where(u > 30.0, u, np.log1p(np.exp(u)))
        const = np.log(k) - np.log(alpha) + (k - 1) * (np.log(t) - np.log(alpha)) - H_base

    elif model == "gamma":
        # H0(t)=−gamma_dist.logsf(t; shape=k, scale=θ)  [stable log-survival]
        k, theta = params["shape"], params["scale"]
        log_f0 = (k - 1) * np.log(t) - t / theta - k * np.log(theta) - gammaln(k)
        log_S0 = gamma_dist.logsf(t, a=k, scale=theta)
        const = log_f0 - log_S0
        H_base = -log_S0

    elif model == "first_passage":
        # First-passage time of Wiener process Y(t)=y0+mu*t+W(t) hitting 0.
        # FPT ~ Inverse Gaussian.  drift<0: everyone hits; drift>0: cure fraction.
        drift_val, lam = params["drift"], params["shape"]
        if drift_val == 0.0:
            raise ValueError("first_passage drift must be non-zero")
        y0 = np.sqrt(lam)
        ig_mean = y0 / abs(drift_val)
        # scipy parameterization: invgauss(mu=ig_mean/lam, scale=lam)
        rv = invgauss(mu=ig_mean / lam, scale=lam)
        if drift_val < 0:
            # Everyone hits — standard IG distribution
            log_sf = rv.logsf(t)
            const = rv.logpdf(t) - log_sf
            H_base = -log_sf
        else:
            # Defective distribution — cure fraction exists
            p_hit = np.exp(-2.0 * y0 * drift_val)
            log_S_def = np.log1p(-p_hit * rv.cdf(t))
            const = np.log(p_hit) + rv.logpdf(t) - log_S_def
            H_base = -log_S_def

    else:
        raise ValueError(
            f"Unknown hazard model '{model}'. "
            "Supported: 'weibull','exponential','gompertz','lognormal',"
            "'loglogistic','gamma','first_passage'."
        )
    return const, H_base

cli_base

simace.core.cli_base

Shared CLI boilerplate for simace entry points.

add_logging_args

add_logging_args(parser)

Add standard -v/--verbose and -q/--quiet arguments.

PARAMETER DESCRIPTION
parser

Argument parser to add logging flags to.

TYPE: ArgumentParser

Source code in simace/core/cli_base.py
def add_logging_args(parser: argparse.ArgumentParser) -> None:
    """Add standard -v/--verbose and -q/--quiet arguments.

    Args:
        parser: Argument parser to add logging flags to.
    """
    parser.add_argument("-v", "--verbose", action="store_true", help="DEBUG output")
    parser.add_argument("-q", "--quiet", action="store_true", help="WARNING+ only")

init_logging

init_logging(args)

Derive log level from parsed args and call setup_logging().

PARAMETER DESCRIPTION
args

Parsed namespace containing verbose and quiet flags.

TYPE: Namespace

Source code in simace/core/cli_base.py
def init_logging(args: argparse.Namespace) -> None:
    """Derive log level from parsed args and call ``setup_logging()``.

    Args:
        args: Parsed namespace containing ``verbose`` and ``quiet`` flags.
    """
    from simace import setup_logging

    level = logging.DEBUG if args.verbose else logging.WARNING if args.quiet else logging.INFO
    setup_logging(level=level)

parquet_to_tsv

simace.core.parquet_to_tsv

Convert parquet files to TSV (optionally gzipped) for use in R.

convert

convert(parquet_path, output_path=None, float_precision=4, gzip=True)

Read a parquet file and write it as a TSV.

PARAMETER DESCRIPTION
parquet_path

Path to the input .parquet file.

TYPE: str

output_path

Path for the output file. If None, replaces the .parquet suffix with .tsv.gz (or .tsv if gzip is False).

TYPE: str | None DEFAULT: None

float_precision

Number of decimal places for float columns.

TYPE: int DEFAULT: 4

gzip

Whether to gzip-compress the output.

TYPE: bool DEFAULT: True

Source code in simace/core/parquet_to_tsv.py
def convert(parquet_path: str, output_path: str | None = None, float_precision: int = 4, gzip: bool = True) -> None:
    """Read a parquet file and write it as a TSV.

    Args:
        parquet_path: Path to the input ``.parquet`` file.
        output_path: Path for the output file. If *None*, replaces the
            ``.parquet`` suffix with ``.tsv.gz`` (or ``.tsv`` if *gzip* is False).
        float_precision: Number of decimal places for float columns.
        gzip: Whether to gzip-compress the output.
    """
    parquet_path = str(parquet_path)
    if output_path is None:
        suffix = ".tsv.gz" if gzip else ".tsv"
        output_path = parquet_path.removesuffix(".parquet") + suffix

    t0 = time.perf_counter()
    df = pd.read_parquet(parquet_path)
    logger.info("Read %s (%d rows, %d cols)", parquet_path, len(df), len(df.columns))

    compression = "gzip" if gzip else None
    df.to_csv(output_path, sep="\t", index=False, compression=compression, float_format=f"%.{float_precision}f")
    elapsed = time.perf_counter() - t0
    logger.info("Wrote %s (%.1fs)", output_path, elapsed)

cli

cli()

Command-line interface: parquet-to-tsv.

Source code in simace/core/parquet_to_tsv.py
def cli() -> None:
    """Command-line interface: parquet-to-tsv."""
    from simace.core.cli_base import add_logging_args, init_logging

    parser = argparse.ArgumentParser(description="Convert parquet to TSV (gzipped by default)")
    add_logging_args(parser)
    parser.add_argument("parquet", nargs="+", help="Input parquet file(s)")
    parser.add_argument("-o", "--output", default=None, help="Output path (only valid with a single input)")
    parser.add_argument("-p", "--precision", type=int, default=4, help="Decimal places for float columns (default: 4)")
    parser.add_argument("--no-gzip", action="store_true", help="Write uncompressed .tsv instead of .tsv.gz")
    args = parser.parse_args()
    init_logging(args)

    if args.output and len(args.parquet) > 1:
        parser.error("--output can only be used with a single input file")

    for path in args.parquet:
        convert(path, output_path=args.output, float_precision=args.precision, gzip=not args.no_gzip)