Skip to content

simace.analysis

Statistical analysis, validation, and metric extraction for simulation outputs.

validate

simace.analysis.validate

ACE simulation validation.

Validates simulation outputs for structural integrity, statistical properties, and heritability estimates.

validate_structural

validate_structural(df, params)

Validate structural integrity of the pedigree.

Checks contiguous IDs, valid parent references, sex-parent consistency, and balanced sex ratio.

PARAMETER DESCRIPTION
df

Pedigree DataFrame with columns id, sex, mother, father.

TYPE: DataFrame

params

Scenario parameters; requires keys N and G_ped.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts (keys: passed, details, …).

Source code in simace/analysis/validate.py
def validate_structural(df: pd.DataFrame, params: dict[str, Any]) -> dict[str, Any]:
    """Validate structural integrity of the pedigree.

    Checks contiguous IDs, valid parent references, sex-parent consistency,
    and balanced sex ratio.

    Args:
        df: Pedigree DataFrame with columns id, sex, mother, father.
        params: Scenario parameters; requires keys ``N`` and ``G_ped``.

    Returns:
        Dict of check-name to result dicts (keys: passed, details, …).
    """
    results = {}
    N = params["N"]
    ngen = params["G_ped"]
    expected_total = N * ngen

    # ID integrity
    ids = df["id"].values
    expected_ids = np.arange(expected_total)
    ids_contiguous = np.array_equal(np.sort(ids), expected_ids)
    results["id_integrity"] = _result(
        ids_contiguous and len(df) == expected_total,
        f"Expected {expected_total} contiguous IDs, found {len(df)} individuals",
        expected_count=expected_total,
        observed_count=len(df),
    )

    # Parent references: valid IDs (0..expected_total-1) or -1 for founders
    mother_vals = df["mother"].values
    father_vals = df["father"].values
    mothers_valid = (((mother_vals >= 0) & (mother_vals < expected_total)) | (mother_vals == -1)).all()
    fathers_valid = (((father_vals >= 0) & (father_vals < expected_total)) | (father_vals == -1)).all()
    no_self_parent = ((df["mother"] != df["id"]) & (df["father"] != df["id"])).all()
    results["parent_references"] = _result(
        bool(mothers_valid and fathers_valid and no_self_parent),
        f"Mothers valid: {mothers_valid}, Fathers valid: {fathers_valid}, No self-parenting: {no_self_parent}",
    )

    # Sex-parent consistency (only for non-founders)
    non_founders = df[df["mother"] != -1]
    if len(non_founders) > 0:
        id_to_sex = df.set_index("id")["sex"]
        mother_sex = id_to_sex.reindex(non_founders["mother"]).values
        father_sex = id_to_sex.reindex(non_founders["father"]).values
        mothers_female = (mother_sex == 0).all()
        fathers_male = (father_sex == 1).all()
        results["sex_parent_consistency"] = _result(
            bool(mothers_female and fathers_male),
            f"Mothers female: {mothers_female}, Fathers male: {fathers_male}",
        )
    else:
        results["sex_parent_consistency"] = _result(True, "No non-founders to check")

    # Sex distribution
    sex_ratio = df["sex"].mean()
    sex_balanced = 0.45 <= sex_ratio <= 0.55
    results["sex_distribution"] = _result(
        sex_balanced,
        f"Male ratio: {sex_ratio:.3f} (expected ~0.5)",
        observed_ratio=float(sex_ratio),
    )

    return results

validate_twins

validate_twins(df, params, df_indexed)

Validate MZ twin properties for two-trait simulation.

Checks bidirectional twin pointers, shared parents, identical A values and sex for MZ pairs, and that the observed twin rate matches the expected rate 2 * p_mztwin * eligible_fraction.

PARAMETER DESCRIPTION
df

Pedigree DataFrame.

TYPE: DataFrame

params

Scenario parameters; requires key p_mztwin.

TYPE: dict[str, Any]

df_indexed

Pedigree DataFrame indexed by id for fast lookups.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts.

Source code in simace/analysis/validate.py
def validate_twins(df: pd.DataFrame, params: dict[str, Any], df_indexed: pd.DataFrame) -> dict[str, Any]:
    """Validate MZ twin properties for two-trait simulation.

    Checks bidirectional twin pointers, shared parents, identical A values
    and sex for MZ pairs, and that the observed twin rate matches the
    expected rate ``2 * p_mztwin * eligible_fraction``.

    Args:
        df: Pedigree DataFrame.
        params: Scenario parameters; requires key ``p_mztwin``.
        df_indexed: Pedigree DataFrame indexed by ``id`` for fast lookups.

    Returns:
        Dict of check-name to result dicts.
    """
    results = {}
    p_mztwin = params["p_mztwin"]

    twins_df = df[df["twin"] != -1]
    n_twins = len(twins_df)

    if n_twins == 0:
        results["twin_bidirectional"] = _result(True, "No twins found")
        results["twin_same_parents"] = _result(True, "No twins found")
        for t in [1, 2]:
            results[f"twin_same_A{t}"] = _result(True, "No twins found")
        results["twin_same_sex"] = _result(True, "No twins found")
        results["twin_rate"] = _result(
            p_mztwin < 0.01,
            f"No twins found, expected rate: {p_mztwin}",
            expected_rate=p_mztwin,
            observed_rate=0.0,
        )
        return results

    # Get unique twin pairs
    twin_ids = twins_df["id"].values
    twin_partners = twins_df["twin"].values
    mask = twin_ids < twin_partners
    t1_arr = twin_ids[mask]
    t2_arr = twin_partners[mask]
    n_pairs = len(t1_arr)

    # Bidirectional check
    twin_col = df_indexed["twin"]
    reverse_check = twin_col.reindex(t2_arr).values
    bidirectional = np.all(reverse_check == t1_arr)
    results["twin_bidirectional"] = _result(
        bool(bidirectional),
        f"All {n_twins} twin references are bidirectional: {bidirectional}",
    )

    # Same parents
    t1_mother = df_indexed.loc[t1_arr, "mother"].values
    t2_mother = df_indexed.loc[t2_arr, "mother"].values
    t1_father = df_indexed.loc[t1_arr, "father"].values
    t2_father = df_indexed.loc[t2_arr, "father"].values
    same_parents = np.all((t1_mother == t2_mother) & (t1_father == t2_father))
    results["twin_same_parents"] = _result(
        bool(same_parents),
        f"All {n_pairs} twin pairs share parents: {same_parents}",
    )

    # Same A values and same sex - loop over traits for A
    for t in [1, 2]:
        col = f"A{t}"
        v1 = df_indexed.loc[t1_arr, col].values
        v2 = df_indexed.loc[t2_arr, col].values
        same = np.allclose(v1, v2)
        results[f"twin_same_{col}"] = _result(
            bool(same),
            f"All MZ twin pairs have identical {col} values: {same}",
        )

    # Same sex
    t1_sex = df_indexed.loc[t1_arr, "sex"].values
    t2_sex = df_indexed.loc[t2_arr, "sex"].values
    same_sex = np.all(t1_sex == t2_sex)
    results["twin_same_sex"] = _result(
        bool(same_sex),
        f"All MZ twin pairs have same sex: {same_sex}",
    )

    # Twin rate (count only non-founder twin pairs; founders have twins but no parents in pedigree)
    non_founders = df[df["mother"] != -1]
    if len(non_founders) > 0:
        n_nf = len(non_founders)
        nf_twins = non_founders[non_founders["twin"] != -1]
        nf_twin_ids = nf_twins["id"].values
        nf_twin_partners = nf_twins["twin"].values
        nf_pairs = int(np.sum(nf_twin_ids < nf_twin_partners))
        observed_rate = nf_pairs * 2 / n_nf
        # Under the mating-pair model, twins are assigned per mating with >=2
        # offspring. Use a generous range check since the expected rate depends
        # on the offspring allocation distribution.
        rate_tol = max(0.01, 3 * p_mztwin)
        rate_ok = observed_rate < rate_tol
        results["twin_rate"] = _result(
            rate_ok,
            f"Twin rate in non-founders: {observed_rate:.4f} (p_mztwin={p_mztwin:.4f}, tol: {rate_tol:.4f})",
            expected_rate=float(p_mztwin),
            observed_rate=float(observed_rate),
            twin_pairs=nf_pairs,
        )
    else:
        results["twin_rate"] = _result(True, "No non-founders to check twin rate")

    return results

validate_half_sibs

validate_half_sibs(df, params, sibling_pairs)

Validate half-sibling structure under the mating-pair model.

Reports observed counts and proportions of full-sib, maternal half-sib, and paternal half-sib pairs as informational checks. With a zero-truncated Poisson mating model, both maternal and paternal half-sibs arise naturally when individuals have multiple partners.

PARAMETER DESCRIPTION
df

Pedigree DataFrame with columns id, mother, father, twin.

TYPE: DataFrame

params

Scenario parameters; requires key mating_lambda.

TYPE: dict[str, Any]

sibling_pairs

Dict with keys FS, MHS, PHS mapping to (idx1, idx2) row-index arrays.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts.

Source code in simace/analysis/validate.py
def validate_half_sibs(
    df: pd.DataFrame, params: dict[str, Any], sibling_pairs: dict[str, tuple[np.ndarray, np.ndarray]]
) -> dict[str, Any]:
    """Validate half-sibling structure under the mating-pair model.

    Reports observed counts and proportions of full-sib, maternal half-sib,
    and paternal half-sib pairs as informational checks. With a
    zero-truncated Poisson mating model, both maternal and paternal
    half-sibs arise naturally when individuals have multiple partners.

    Args:
        df: Pedigree DataFrame with columns id, mother, father, twin.
        params: Scenario parameters; requires key ``mating_lambda``.
        sibling_pairs: Dict with keys ``FS``, ``MHS``, ``PHS`` mapping to
            ``(idx1, idx2)`` row-index arrays.

    Returns:
        Dict of check-name to result dicts.
    """
    results = {}

    sib_info = _sib_counts_from_pairs(sibling_pairs)

    # Report sibling structure (informational — no closed-form expected value)
    total_maternal_pairs = sib_info["n_full_sib_pairs"] + sib_info["n_maternal_half_sib_pairs"]
    if total_maternal_pairs > 0:
        observed_half_sib_prop = sib_info["n_maternal_half_sib_pairs"] / total_maternal_pairs
        # Range check: at lambda=0.5, most people have 1 partner, so half-sibs
        # should be present but not dominant. Wide tolerance for any lambda.
        results["half_sib_pair_proportion"] = _result(
            True,
            f"Maternal half-sib pair proportion: {observed_half_sib_prop:.4f} "
            f"(full={sib_info['n_full_sib_pairs']}, mat_hs={sib_info['n_maternal_half_sib_pairs']}, "
            f"pat_hs={sib_info['n_paternal_half_sib_pairs']})",
            observed=float(observed_half_sib_prop),
            n_full_sib_pairs=int(sib_info["n_full_sib_pairs"]),
            n_maternal_half_sib_pairs=int(sib_info["n_maternal_half_sib_pairs"]),
            n_paternal_half_sib_pairs=int(sib_info["n_paternal_half_sib_pairs"]),
        )
    else:
        results["half_sib_pair_proportion"] = _result(True, "No maternal sibling pairs to check")

    # Offspring with maternal half-sib (informational)
    n_offspring_with_sibs = sib_info["n_offspring_with_sibs"]
    n_offspring_with_hs = sib_info["n_offspring_with_maternal_half_sib"]
    if n_offspring_with_sibs > 0:
        observed_frac = n_offspring_with_hs / n_offspring_with_sibs
        results["offspring_with_half_sib"] = _result(
            True,
            f"Offspring with maternal half-sib: {observed_frac:.4f} ({n_offspring_with_hs}/{n_offspring_with_sibs})",
            observed=float(observed_frac),
            n_offspring_with_half_sib=int(n_offspring_with_hs),
            n_offspring_with_sibs=int(n_offspring_with_sibs),
        )
    else:
        results["offspring_with_half_sib"] = _result(True, "No non-twin offspring with siblings to check")

    return results

validate_consanguineous_matings

validate_consanguineous_matings(df, params)

Detect consanguineous matings and reconcile grandparent-link discrepancy.

When pair_partners() randomly pairs individuals, half-siblings (or full siblings) may be matched. Their offspring have fewer than 4 distinct grandparents, which reduces the grandparent-grandchild pair count relative to the naive expectation of 4 × n_eligible.

This check: 1. Identifies all mating pairs where partners share one or both parents. 2. Computes the expected and observed grandparent-grandchild pair counts. 3. Verifies that the discrepancy is fully explained by consanguineous matings.

PARAMETER DESCRIPTION
df

Pedigree DataFrame with columns id, mother, father.

TYPE: DataFrame

params

Scenario parameters (accepted for API consistency).

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts.

Source code in simace/analysis/validate.py
def validate_consanguineous_matings(df: pd.DataFrame, params: dict[str, Any]) -> dict[str, Any]:
    """Detect consanguineous matings and reconcile grandparent-link discrepancy.

    When ``pair_partners()`` randomly pairs individuals, half-siblings (or
    full siblings) may be matched.  Their offspring have fewer than 4
    distinct grandparents, which reduces the grandparent-grandchild pair
    count relative to the naive expectation of 4 × n_eligible.

    This check:
    1. Identifies all mating pairs where partners share one or both parents.
    2. Computes the expected and observed grandparent-grandchild pair counts.
    3. Verifies that the discrepancy is fully explained by consanguineous
       matings.

    Args:
        df: Pedigree DataFrame with columns id, mother, father.
        params: Scenario parameters (accepted for API consistency).

    Returns:
        Dict of check-name to result dicts.
    """
    results: dict[str, Any] = {}

    ids = df["id"].values
    mothers = df["mother"].values
    fathers = df["father"].values

    # Build parent lookup arrays indexed by id (assumes contiguous 0-based ids)
    n = len(ids)
    mother_of = np.full(n, -1, dtype=np.int64)
    father_of = np.full(n, -1, dtype=np.int64)
    mother_of[ids] = mothers
    father_of[ids] = fathers

    # Identify individuals in gen >= 2 (parents are non-founders, so grandparents exist)
    has_parents = mothers != -1
    mothers_have_parents = np.where(has_parents, mother_of[mothers] != -1, False)
    eligible = has_parents & mothers_have_parents  # gen >= 2

    eligible_ids = ids[eligible]
    eligible_mothers = mothers[eligible]
    eligible_fathers = fathers[eligible]

    if len(eligible_ids) == 0:
        results["consanguineous_count"] = _result(True, "No individuals with grandparents in pedigree")
        return results

    # Look up all 4 grandparents for eligible individuals
    mgm = mother_of[eligible_mothers]  # maternal grandmother
    mgf = father_of[eligible_mothers]  # maternal grandfather
    fgm = mother_of[eligible_fathers]  # paternal grandmother
    fgf = father_of[eligible_fathers]  # paternal grandfather

    # Count distinct grandparents per individual (vectorized via sorted rows)
    gp_stack = np.column_stack([mgm, mgf, fgm, fgf])  # (n_eligible, 4)
    gp_sorted = np.sort(gp_stack, axis=1)
    n_distinct = 1 + (gp_sorted[:, 1:] != gp_sorted[:, :-1]).sum(axis=1)
    observed_gp_links = int(n_distinct.sum())
    expected_gp_links = len(eligible_ids) * 4
    total_missing = expected_gp_links - observed_gp_links

    # Identify consanguineous matings (vectorized)
    # Encode (mother, father) as single int64 key for fast np.unique on 1D array
    # int64 cast required: max_id² overflows int32
    max_id = int(ids.max()) + 1
    pair_keys = eligible_mothers.astype(np.int64) * max_id + eligible_fathers.astype(np.int64)
    unique_keys, _inverse, pair_counts = np.unique(pair_keys, return_inverse=True, return_counts=True)
    mp_m = unique_keys // max_id  # mothers in each mating pair
    mp_f = unique_keys % max_id  # fathers in each mating pair
    # Check which parent IDs are shared between mates
    share_mother = mother_of[mp_m] == mother_of[mp_f]
    share_father = father_of[mp_m] == father_of[mp_f]
    shared_count = share_mother.astype(np.int64) + share_father.astype(np.int64)
    is_consanguineous = shared_count > 0

    n_half_sib_matings = int((shared_count == 1).sum())
    n_full_sib_matings = int((shared_count == 2).sum())
    explained_missing = int((shared_count[is_consanguineous] * pair_counts[is_consanguineous]).sum())

    # Informational: report counts
    results["consanguineous_count"] = _result(
        True,
        f"Consanguineous matings: {n_half_sib_matings} half-sib, "
        f"{n_full_sib_matings} full-sib "
        f"(total missing GP links: {total_missing})",
        n_half_sib_matings=n_half_sib_matings,
        n_full_sib_matings=n_full_sib_matings,
        total_missing_gp_links=total_missing,
    )

    # Hard check: reconciliation
    reconciled = explained_missing == total_missing
    results["grandparent_reconciliation"] = _result(
        reconciled,
        f"Grandparent links: expected={expected_gp_links}, observed={observed_gp_links}, "
        f"explained_missing={explained_missing}, actual_missing={total_missing}",
        expected_gp_links=expected_gp_links,
        observed_gp_links=observed_gp_links,
        explained_missing=explained_missing,
    )

    return results

validate_statistical

validate_statistical(df, params, df_indexed)

Validate statistical properties of variance components for two traits.

Checks founder variances for A, C, E against configured values, total variance close to 1.0, cross-trait correlations (rA, rC, rE), C sharing within households, and E independence between siblings.

PARAMETER DESCRIPTION
df

Pedigree DataFrame with variance-component columns A1, C1, E1, A2, C2, E2.

TYPE: DataFrame

params

Scenario parameters; requires keys A1, C1, E1, A2, C2, E2, rA, rC.

TYPE: dict[str, Any]

df_indexed

Pedigree DataFrame indexed by id.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts.

Source code in simace/analysis/validate.py
def validate_statistical(df: pd.DataFrame, params: dict[str, Any], df_indexed: pd.DataFrame) -> dict[str, Any]:
    """Validate statistical properties of variance components for two traits.

    Checks founder variances for A, C, E against configured values, total
    variance close to 1.0, cross-trait correlations (rA, rC, rE), C sharing
    within households, and E independence between siblings.

    Args:
        df: Pedigree DataFrame with variance-component columns A1, C1, E1,
            A2, C2, E2.
        params: Scenario parameters; requires keys ``A1``, ``C1``, ``E1``,
            ``A2``, ``C2``, ``E2``, ``rA``, ``rC``.
        df_indexed: Pedigree DataFrame indexed by ``id``.

    Returns:
        Dict of check-name to result dicts.
    """
    results = {}

    rA_param = params.get("rA", 0)
    rC_param = params.get("rC", 0)

    founders = df[df["mother"] == -1]

    # For per-generation dict params, resolve to founder sim generation value
    G_sim = params.get("G_sim", 8)
    G_ped = params.get("G_ped", 6)
    founder_sim_gen = G_sim - G_ped

    def _resolve_founder_val(val):
        """Resolve a scalar or per-gen dict to the founder generation value."""
        if isinstance(val, dict):
            from simace.simulation.simulate import resolve_per_gen_param

            return resolve_per_gen_param(val, G_sim)[founder_sim_gen]
        return val

    # Variance checks for both traits
    for t in [1, 2]:
        for comp in ["A", "C", "E"]:
            col = f"{comp}{t}"
            results[f"variance_{col}"] = _check_variance(founders, col, _resolve_founder_val(params[col]))

    # Total variances
    for t in [1, 2]:
        total = sum(results[f"variance_{c}{t}"]["observed"] for c in ["A", "C", "E"])
        results[f"total_variance_trait{t}"] = _result(
            abs(total - 1.0) < 0.15,
            f"Total variance trait {t}: {total:.4f} (expected: 1.0)",
            expected=1.0,
            observed=float(total),
        )

    # Cross-trait correlations
    for comp, expected, label in [("A", rA_param, "A"), ("C", rC_param, "C")]:
        obs = safe_corrcoef(founders[f"{comp}1"].values, founders[f"{comp}2"].values)
        ok = abs(obs - expected) < 0.15 if not np.isnan(obs) else expected == 0
        results[f"cross_trait_r{label}"] = _result(
            ok,
            f"Cross-trait {label} correlation: {obs:.4f} (expected: {expected})",
            expected=expected,
            observed=float(obs),
        )

    rE_param = params.get("rE", 0.0)
    rE_obs = safe_corrcoef(founders["E1"].values, founders["E2"].values)
    rE_ok = abs(rE_obs - rE_param) < 0.15 if not np.isnan(rE_obs) else rE_param == 0
    results["cross_trait_rE"] = _result(
        rE_ok,
        f"Cross-trait E correlation: {rE_obs:.4f} (expected: {rE_param})",
        expected=rE_param,
        observed=float(rE_obs),
    )

    # C inheritance: siblings should share C
    non_founders = df[df["mother"] != -1]
    if len(non_founders) > 0:
        for t in [1, 2]:
            col = f"C{t}"
            c_by_mother = non_founders.groupby("mother")[col].nunique()
            c_shared = (c_by_mother == 1).mean()
            results[f"c{t}_inheritance"] = _result(
                c_shared > 0.99,
                f"Proportion of families with shared {col}: {c_shared:.4f}",
                proportion_shared=float(c_shared),
            )
    else:
        for t in [1, 2]:
            results[f"c{t}_inheritance"] = _result(True, "No non-founders to check C inheritance")

    # E independence between siblings
    if len(non_founders) > 0:
        fam_sizes = non_founders.groupby("mother").size()
        multi_child_mothers = fam_sizes[fam_sizes >= 2].index

        if len(multi_child_mothers) > 10:
            # Vectorized: get first two E1 values per mother via groupby
            multi_child = non_founders[non_founders["mother"].isin(multi_child_mothers[:500])]
            grouped = multi_child.groupby("mother")["E1"]
            first = grouped.nth(0).values
            second = grouped.nth(1).values
            # nth returns NaN for groups with < 2 members; both arrays aligned by group
            valid = ~(np.isnan(first) | np.isnan(second))
            e1_pairs_arr = np.column_stack([first[valid], second[valid]])
            e1_pairs_arr = e1_pairs_arr[:1000]

            if len(e1_pairs_arr) > 10:
                e1, e2 = e1_pairs_arr[:, 0], e1_pairs_arr[:, 1]
                e_corr = safe_corrcoef(e1, e2)
                results["e1_independence"] = _result(
                    abs(e_corr) < 0.1,
                    f"E1 correlation between siblings: {e_corr:.4f} (expected: ~0)",
                    observed_correlation=float(e_corr),
                )
            else:
                results["e1_independence"] = _result(True, "Not enough sibling pairs to check E independence")
        else:
            results["e1_independence"] = _result(True, "Not enough sibling groups to check E independence")
    else:
        results["e1_independence"] = _result(True, "No non-founders to check E independence")

    return results

validate_heritability

validate_heritability(df, params, df_indexed, sibling_pairs)

Validate heritability estimates for two-trait simulation.

Computes MZ twin and DZ sibling liability correlations, Falconer heritability estimates h² = 2(r_MZ - r_DZ), and midparent-offspring regressions, comparing each to expected values derived from the configured A parameters.

PARAMETER DESCRIPTION
df

Pedigree DataFrame.

TYPE: DataFrame

params

Scenario parameters; requires keys A1, A2, seed.

TYPE: dict[str, Any]

df_indexed

Pedigree DataFrame indexed by id.

TYPE: DataFrame

sibling_pairs

Dict with keys FS, MHS, PHS mapping to (idx1, idx2) row-index arrays.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts, including MZ/DZ correlations,

dict[str, Any]

Falconer estimates, and parent-offspring regression slopes.

Source code in simace/analysis/validate.py
def validate_heritability(
    df: pd.DataFrame,
    params: dict[str, Any],
    df_indexed: pd.DataFrame,
    sibling_pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Validate heritability estimates for two-trait simulation.

    Computes MZ twin and DZ sibling liability correlations, Falconer
    heritability estimates ``h² = 2(r_MZ - r_DZ)``, and midparent-offspring
    regressions, comparing each to expected values derived from the
    configured A parameters.

    Args:
        df: Pedigree DataFrame.
        params: Scenario parameters; requires keys ``A1``, ``A2``, ``seed``.
        df_indexed: Pedigree DataFrame indexed by ``id``.
        sibling_pairs: Dict with keys ``FS``, ``MHS``, ``PHS`` mapping to
            ``(idx1, idx2)`` row-index arrays.

    Returns:
        Dict of check-name to result dicts, including MZ/DZ correlations,
        Falconer estimates, and parent-offspring regression slopes.
    """
    results: dict[str, Any] = {}
    A_params = {1: params["A1"], 2: params["A2"]}

    comp_vals = {}
    for comp in ["A", "C", "E"]:
        for t in [1, 2]:
            comp_vals[f"{comp}{t}"] = df_indexed[f"{comp}{t}"].values
    id_to_idx = pd.Series(np.arange(len(df_indexed)), index=df_indexed.index)

    mz_pheno_corr, n_mz_pairs = _validate_mz_correlations(
        df,
        A_params,
        comp_vals,
        id_to_idx,
        results,
    )
    dz_pheno_corr, n_dz_pairs = _validate_dz_correlations(
        params,
        A_params,
        comp_vals,
        sibling_pairs["FS"],
        results,
    )
    _validate_falconer(
        A_params,
        mz_pheno_corr,
        dz_pheno_corr,
        n_mz_pairs,
        n_dz_pairs,
        results,
    )
    _validate_parent_offspring(df, comp_vals, id_to_idx, df_indexed, results)

    return results

compute_per_generation_stats

compute_per_generation_stats(df, params)

Compute per-generation statistics for two traits.

For each generation, computes liability mean/variance/sd and per-component (A, C, E) mean/variance for both traits.

PARAMETER DESCRIPTION
df

Pedigree DataFrame with columns id, A1, C1, E1, A2, C2, E2.

TYPE: DataFrame

params

Scenario parameters; requires keys N and G_ped.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by "generation_{g}" where each value is a dict of

dict[str, Any]

summary statistics (n, liability mean/variance/sd, component mean/var).

Source code in simace/analysis/validate.py
def compute_per_generation_stats(df: pd.DataFrame, params: dict[str, Any]) -> dict[str, Any]:
    """Compute per-generation statistics for two traits.

    For each generation, computes liability mean/variance/sd and per-component
    (A, C, E) mean/variance for both traits.

    Args:
        df: Pedigree DataFrame with columns id, A1, C1, E1, A2, C2, E2.
        params: Scenario parameters; requires keys ``N`` and ``G_ped``.

    Returns:
        Dict keyed by ``"generation_{g}"`` where each value is a dict of
        summary statistics (n, liability mean/variance/sd, component mean/var).
    """
    N = params["N"]
    ngen = params["G_ped"]

    # Assign generation labels once via integer division
    gen_labels = df["id"].values // N

    results = {}
    for gen in range(1, ngen + 1):
        gen_mask = gen_labels == (gen - 1)
        gen_df = df[gen_mask]

        gen_stats: dict[str, int | float] = {"n": int(gen_mask.sum())}
        for t in [1, 2]:
            a_vals = gen_df[f"A{t}"].values
            c_vals = gen_df[f"C{t}"].values
            e_vals = gen_df[f"E{t}"].values
            liability = a_vals + c_vals + e_vals
            gen_stats[f"liability{t}_mean"] = float(liability.mean())
            gen_stats[f"liability{t}_variance"] = float(liability.var())
            gen_stats[f"liability{t}_sd"] = float(liability.std())
            for comp, vals in [("A", a_vals), ("C", c_vals), ("E", e_vals)]:
                col = f"{comp}{t}"
                gen_stats[f"{col}_mean"] = float(vals.mean())
                gen_stats[f"{col}_var"] = float(vals.var())

        results[f"generation_{gen}"] = gen_stats

    return results

validate_population

validate_population(df, params)

Validate population-level properties.

Checks that each generation has exactly N individuals, the number of generations equals G_ped, and the mean offspring per mother is approximately N / n_females (always ~2.0 for balanced sex ratios).

PARAMETER DESCRIPTION
df

Pedigree DataFrame with columns id and mother.

TYPE: DataFrame

params

Scenario parameters; requires keys N, G_ped.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts.

Source code in simace/analysis/validate.py
def validate_population(df: pd.DataFrame, params: dict[str, Any]) -> dict[str, Any]:
    """Validate population-level properties.

    Checks that each generation has exactly ``N`` individuals, the number of
    generations equals ``G_ped``, and the mean offspring per mother is
    approximately ``N / n_females`` (always ~2.0 for balanced sex ratios).

    Args:
        df: Pedigree DataFrame with columns id and mother.
        params: Scenario parameters; requires keys ``N``, ``G_ped``.

    Returns:
        Dict of check-name to result dicts.
    """
    results = {}
    N = params["N"]
    ngen = params["G_ped"]

    gen_assignments = df["id"].values // N
    gen_sizes = np.bincount(gen_assignments, minlength=ngen)[:ngen].tolist()

    all_correct = all(s == N for s in gen_sizes)
    results["generation_sizes"] = _result(
        all_correct,
        f"Generation sizes: {gen_sizes} (expected: {N} each)",
        expected=N,
        observed=gen_sizes,
    )

    results["generation_count"] = _result(
        len(gen_sizes) == ngen,
        f"Number of generations: {len(gen_sizes)} (expected: {ngen})",
        expected=ngen,
        observed=len(gen_sizes),
    )

    non_founders = df[df["mother"] != -1]
    if len(non_founders) > 0:
        family_sizes = non_founders.groupby("mother").size()
        mean_fam = family_sizes.mean()
        # Mean offspring per mother is ~N / n_mothers ~= 2.0 for balanced sex
        expected_mean = 2.0
        fam_ok = abs(mean_fam - expected_mean) < expected_mean * 0.5
        results["family_size"] = _result(
            fam_ok,
            f"Mean offspring per mother: {mean_fam:.2f} (expected: ~{expected_mean:.1f})",
            expected=expected_mean,
            observed=float(mean_fam),
        )
    else:
        results["family_size"] = _result(True, "No non-founders to check family size")

    return results

compute_family_size_distribution

compute_family_size_distribution(df, params)

Compute offspring count distributions per parent sex.

PARAMETER DESCRIPTION
df

Pedigree DataFrame with columns mother and father.

TYPE: DataFrame

params

Scenario parameters (unused but accepted for API consistency).

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict with keys "mother" and "father", each mapping to a dict

dict[str, Any]

of summary statistics (mean, median, std, n_parents). Empty dict if

dict[str, Any]

no non-founders exist.

Source code in simace/analysis/validate.py
def compute_family_size_distribution(df: pd.DataFrame, params: dict[str, Any]) -> dict[str, Any]:
    """Compute offspring count distributions per parent sex.

    Args:
        df: Pedigree DataFrame with columns mother and father.
        params: Scenario parameters (unused but accepted for API consistency).

    Returns:
        Dict with keys ``"mother"`` and ``"father"``, each mapping to a dict
        of summary statistics (mean, median, std, n_parents). Empty dict if
        no non-founders exist.
    """
    non_founders = df[df["mother"] != -1]
    if len(non_founders) == 0:
        return {}

    mother_counts = non_founders.groupby("mother").size()
    father_counts = non_founders.groupby("father").size()

    result = {}
    for label, counts in [("mother", mother_counts), ("father", father_counts)]:
        result[label] = {
            "mean": float(counts.mean()),
            "median": float(counts.median()),
            "std": float(counts.std()),
            "n_parents": len(counts),
        }

    return result

validate_assortative_mating

validate_assortative_mating(df, params, df_indexed)

Validate mate correlation on liability when assortative mating is configured.

Extracts unique mating pairs from non-founders, computes Pearson correlation of mother and father liability for each trait, and checks against the configured assort1 / assort2 parameters.

PARAMETER DESCRIPTION
df

Pedigree DataFrame.

TYPE: DataFrame

params

Scenario parameters; uses keys assort1, assort2.

TYPE: dict[str, Any]

df_indexed

Pedigree DataFrame indexed by id.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict of check-name to result dicts.

Source code in simace/analysis/validate.py
def validate_assortative_mating(df: pd.DataFrame, params: dict[str, Any], df_indexed: pd.DataFrame) -> dict[str, Any]:
    """Validate mate correlation on liability when assortative mating is configured.

    Extracts unique mating pairs from non-founders, computes Pearson
    correlation of mother and father liability for each trait, and checks
    against the configured ``assort1`` / ``assort2`` parameters.

    Args:
        df: Pedigree DataFrame.
        params: Scenario parameters; uses keys ``assort1``, ``assort2``.
        df_indexed: Pedigree DataFrame indexed by ``id``.

    Returns:
        Dict of check-name to result dicts.
    """
    results: dict[str, Any] = {}
    assort1 = params.get("assort1", 0.0)
    assort2 = params.get("assort2", 0.0)

    non_founders = df[df["mother"] != -1]
    if len(non_founders) == 0:
        results["mate_corr_liability1"] = _result(True, "No non-founders to check")
        results["mate_corr_liability2"] = _result(True, "No non-founders to check")
        return results

    # Extract unique mating pairs
    pairs = non_founders[["mother", "father"]].drop_duplicates()
    mother_ids = pairs["mother"].values
    father_ids = pairs["father"].values
    n_pairs = len(pairs)

    for t, expected in [(1, assort1), (2, assort2)]:
        m_liab = df_indexed.loc[mother_ids, f"liability{t}"].values
        f_liab = df_indexed.loc[father_ids, f"liability{t}"].values
        obs = safe_corrcoef(m_liab, f_liab)

        if np.isnan(obs):
            results[f"mate_corr_liability{t}"] = _result(
                True,
                f"Cannot compute mate correlation for trait {t} (zero variance)",
                expected=float(expected),
                observed=float(obs),
            )
            continue

        se = _corr_se(expected, n_pairs)
        tol = max(0.1, 3 * se)
        ok = abs(obs - expected) < tol
        results[f"mate_corr_liability{t}"] = _result(
            ok,
            f"Mate correlation liability{t}: {obs:.4f} (expected: {expected}, tol: {tol:.4f})",
            expected=float(expected),
            observed=float(obs),
            n_pairs=n_pairs,
        )

    # Cross-trait validation (only when both traits assort)
    if assort1 != 0 and assort2 != 0:
        am = params.get("assort_matrix")
        if am is not None:
            c_expected = float(np.asarray(am)[0, 1])
        else:
            rho_w = params.get("rA", 0) * np.sqrt(params.get("A1", 0) * params.get("A2", 0)) + params.get(
                "rC", 0
            ) * np.sqrt(params.get("C1", 0) * params.get("C2", 0))
            c_expected = rho_w * np.sqrt(abs(assort1 * assort2)) * np.sign(assort1 * assort2)

        for label, fi, mi in [("cross_12", 1, 2), ("cross_21", 2, 1)]:
            m_liab = df_indexed.loc[mother_ids, f"liability{fi}"].values
            f_liab = df_indexed.loc[father_ids, f"liability{mi}"].values
            obs = safe_corrcoef(m_liab, f_liab)

            if np.isnan(obs):
                results[f"mate_corr_{label}"] = _result(
                    True,
                    f"Cannot compute mate correlation {label} (zero variance)",
                    expected=float(c_expected),
                    observed=float(obs),
                )
                continue

            se = _corr_se(c_expected, n_pairs)
            tol = max(0.1, 3 * se)
            ok = abs(obs - c_expected) < tol
            results[f"mate_corr_{label}"] = _result(
                ok,
                f"Mate correlation {label}: {obs:.4f} (expected: {c_expected:.4f}, tol: {tol:.4f})",
                expected=float(c_expected),
                observed=float(obs),
                n_pairs=n_pairs,
            )

    return results

validate_effective_size

validate_effective_size(stats, params)

Validate Ne observed-vs-expected for the eight estimators.

Each estimator entry in stats["effective_size"] (as written by :func:simace.analysis.stats.compute_effective_size) supplies an expected field (None under non-standard configs) and a scalar ne. A check passes when either expected is None (vacuous), or abs(ne / expected − 1) < 0.20.

PARAMETER DESCRIPTION
stats

Loaded phenotype_stats.yaml dict. Returns an empty dict when effective_size is absent.

TYPE: dict[str, Any]

params

Per-rep params.yaml dict (unused, accepted for parity with other validators).

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed on estimator name with passed / expected /

dict[str, Any]

observed / details fields.

Source code in simace/analysis/validate.py
def validate_effective_size(stats: dict[str, Any], params: dict[str, Any]) -> dict[str, Any]:
    """Validate Ne observed-vs-expected for the eight estimators.

    Each estimator entry in ``stats["effective_size"]`` (as written by
    :func:`simace.analysis.stats.compute_effective_size`) supplies an
    ``expected`` field (``None`` under non-standard configs) and a
    scalar ``ne``.  A check passes when either ``expected`` is ``None``
    (vacuous), or ``abs(ne / expected − 1) < 0.20``.

    Args:
        stats: Loaded ``phenotype_stats.yaml`` dict.  Returns an empty
            dict when ``effective_size`` is absent.
        params: Per-rep params.yaml dict (unused, accepted for parity
            with other validators).

    Returns:
        Dict keyed on estimator name with ``passed`` / ``expected`` /
        ``observed`` / ``details`` fields.
    """
    del params  # accepted for API parity
    es = stats.get("effective_size")
    if es is None:
        return {}
    out: dict[str, Any] = {}
    for name, entry in es.items():
        if not isinstance(entry, dict):
            continue
        expected = entry.get("expected")
        observed = entry.get("ne")
        if expected is None:
            out[name] = _result(
                True,
                f"{name}: no theoretical expectation under this config",
                expected=None,
                observed=None if observed is None else float(observed),
            )
            continue
        if observed is None:
            out[name] = _result(
                False,
                f"{name}: expected {expected:.3g} but observed is None",
                expected=float(expected),
                observed=None,
            )
            continue
        rel_err = abs(observed / expected - 1.0)
        passed = rel_err < _NE_TOLERANCE
        out[name] = _result(
            passed,
            f"{name}: observed {observed:.3g} vs expected {expected:.3g} (rel err {rel_err:.3f}, tol {_NE_TOLERANCE})",
            expected=float(expected),
            observed=float(observed),
            relative_error=float(rel_err),
        )
    return out

run_validation

run_validation(pedigree_path, params_path)

Run all validation checks and return results.

Loads a pedigree and its parameters, then runs structural, twin, half-sibling, statistical, heritability, and population checks.

PARAMETER DESCRIPTION
pedigree_path

Path to the pedigree parquet file.

TYPE: str

params_path

Path to the scenario parameters YAML file.

TYPE: str

RETURNS DESCRIPTION
dict[str, Any]

Nested dict with keys "structural", "twins", "half_sibs",

dict[str, Any]

"statistical", "heritability", "population",

dict[str, Any]

"per_generation", "summary", "family_size_distribution",

dict[str, Any]

and "parameters". The "summary" sub-dict contains

dict[str, Any]

passed (bool), checks_passed, checks_failed, and

dict[str, Any]

checks_total counts.

Source code in simace/analysis/validate.py
def run_validation(pedigree_path: str, params_path: str) -> dict[str, Any]:
    """Run all validation checks and return results.

    Loads a pedigree and its parameters, then runs structural, twin,
    half-sibling, statistical, heritability, and population checks.

    Args:
        pedigree_path: Path to the pedigree parquet file.
        params_path: Path to the scenario parameters YAML file.

    Returns:
        Nested dict with keys ``"structural"``, ``"twins"``, ``"half_sibs"``,
        ``"statistical"``, ``"heritability"``, ``"population"``,
        ``"per_generation"``, ``"summary"``, ``"family_size_distribution"``,
        and ``"parameters"``. The ``"summary"`` sub-dict contains
        ``passed`` (bool), ``checks_passed``, ``checks_failed``, and
        ``checks_total`` counts.
    """
    logger.info("Validating pedigree: %s", pedigree_path)
    df = pd.read_parquet(pedigree_path)
    params = load_yaml(params_path)

    df_indexed = df.set_index("id")

    all_pairs = PedigreeGraph(df).extract_pairs(max_degree=1)
    sibling_pairs = {k: all_pairs[k] for k in ("FS", "MHS", "PHS")}

    results = {
        "structural": validate_structural(df, params),
        "twins": validate_twins(df, params, df_indexed),
        "half_sibs": validate_half_sibs(df, params, sibling_pairs),
        "statistical": validate_statistical(df, params, df_indexed),
        "heritability": validate_heritability(df, params, df_indexed, sibling_pairs),
        "population": validate_population(df, params),
        "per_generation": compute_per_generation_stats(df, params),
        "assortative_mating": validate_assortative_mating(df, params, df_indexed),
        "consanguineous_matings": validate_consanguineous_matings(df, params),
    }

    checks_passed = 0
    checks_failed = 0

    for category, checks in results.items():
        if category == "per_generation":
            continue
        for check_name, check_result in checks.items():
            if "passed" in check_result:
                if check_result["passed"]:
                    checks_passed += 1
                else:
                    checks_failed += 1
                    logger.warning(
                        "FAILED %s.%s: %s",
                        category,
                        check_name,
                        check_result.get("details", ""),
                    )

    results["summary"] = {
        "passed": checks_failed == 0,
        "checks_passed": checks_passed,
        "checks_failed": checks_failed,
        "checks_total": checks_passed + checks_failed,
    }

    results["family_size_distribution"] = compute_family_size_distribution(df, params)
    results["parameters"] = params

    logger.info(
        "Validation complete: %d/%d checks passed",
        checks_passed,
        checks_passed + checks_failed,
    )

    return results

cli

cli()

Command-line interface for running validation.

Source code in simace/analysis/validate.py
def cli() -> None:
    """Command-line interface for running validation."""
    from simace.core.cli_base import add_logging_args, init_logging

    parser = argparse.ArgumentParser(description="Validate ACE simulation output")
    add_logging_args(parser)
    parser.add_argument("--pedigree", required=True, help="Pedigree parquet path")
    parser.add_argument("--params", required=True, help="Params YAML path")
    parser.add_argument("--output", required=True, help="Output validation YAML path")
    args = parser.parse_args()

    init_logging(args)

    results = run_validation(args.pedigree, args.params)
    dump_yaml(results, args.output)

stats

simace.analysis.stats

Compute per-rep phenotype statistics for downstream plotting.

Reads a single phenotype.parquet and produces: - phenotype_stats.yaml: scalar and array statistics - phenotype_samples.parquet: downsampled rows for scatter/histogram plots

Public API is re-exported from focused sub-modules:

  • :mod:.tetrachoric — tetrachoric primitives
  • :mod:.correlations — pairwise relationship correlations, parent-offspring regressions, observed h² estimators, mate correlation
  • :mod:.incidence — prevalence, mortality, cumulative incidence, regression, joint affection
  • :mod:.censoring — censoring windows, confusion, cascade, person-years
  • :mod:.pedigree — family size, parent presence
  • :mod:.sampling — per-rep downsampling for plots
  • :mod:.runnermain and cli entry point

compute_censoring_cascade

compute_censoring_cascade(df, censor_age, gen_censoring)

Per-trait, per-generation decomposition of true cases by censoring fate.

Source code in simace/analysis/stats/censoring.py
def compute_censoring_cascade(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]],
) -> dict[str, Any]:
    """Per-trait, per-generation decomposition of true cases by censoring fate."""
    if "generation" not in df.columns:
        return {}

    windows: dict[int, tuple[float, float]] = {}
    for g, (lo, hi) in gen_censoring.items():
        if hi > lo:
            windows[int(g)] = (lo, hi)

    if not windows:
        return {}

    has_death = "death_age" in df.columns
    result: dict[str, Any] = {}
    for trait in [1, 2]:
        t_col = f"t{trait}"
        a_col = f"affected{trait}"
        if t_col not in df.columns or a_col not in df.columns:
            continue
        trait_result: dict[str, Any] = {}
        for g in sorted(windows.keys()):
            lo, hi = windows[g]
            gen_mask = df["generation"] == g
            df_g = df.loc[gen_mask]
            t = df_g[t_col].values
            true_affected = t < censor_age
            n_true = int(true_affected.sum())
            n_gen = len(df_g)

            if n_true == 0:
                trait_result[f"gen{g}"] = {
                    "observed": 0,
                    "death_censored": 0,
                    "right_censored": 0,
                    "left_truncated": 0,
                    "true_affected": 0,
                    "n_gen": n_gen,
                    "sensitivity": float("nan"),
                    "window": [lo, hi],
                }
                continue

            left_trunc = true_affected & (t < lo)
            right_cens = true_affected & (t > hi)
            in_window = true_affected & (t >= lo) & (t <= hi)

            if has_death:
                death_age = df_g["death_age"].values
                death_cens = in_window & (death_age < t)
                observed = in_window & (death_age >= t)
            else:
                death_cens = np.zeros_like(in_window)
                observed = in_window

            n_obs = int(observed.sum())
            trait_result[f"gen{g}"] = {
                "observed": n_obs,
                "death_censored": int(death_cens.sum()),
                "right_censored": int(right_cens.sum()),
                "left_truncated": int(left_trunc.sum()),
                "true_affected": n_true,
                "n_gen": n_gen,
                "sensitivity": n_obs / n_true if n_true > 0 else float("nan"),
                "window": [lo, hi],
            }
        result[f"trait{trait}"] = trait_result
    return result

compute_censoring_confusion

compute_censoring_confusion(df, censor_age, gen_censoring)

Compute per-trait 2x2 confusion matrix: true affected vs. observed affected.

Only includes individuals from phenotyped generations (observation window > 0).

Source code in simace/analysis/stats/censoring.py
def compute_censoring_confusion(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]],
) -> dict[str, Any]:
    """Compute per-trait 2x2 confusion matrix: true affected vs. observed affected.

    Only includes individuals from phenotyped generations (observation window > 0).
    """
    if "generation" in df.columns:
        active_gens = {int(g) for g, (lo, hi) in gen_censoring.items() if hi > lo}
        if active_gens:
            df = df[df["generation"].isin(active_gens)]

    result = {}
    for trait in [1, 2]:
        t_col = f"t{trait}"
        a_col = f"affected{trait}"
        if t_col not in df.columns or a_col not in df.columns:
            continue
        true_aff = df[t_col].values < censor_age
        obs_aff = df[a_col].values.astype(bool)
        n = len(df)
        result[f"trait{trait}"] = {
            "tp": int(np.sum(true_aff & obs_aff)),
            "fn": int(np.sum(true_aff & ~obs_aff)),
            "fp": int(np.sum(~true_aff & obs_aff)),
            "tn": int(np.sum(~true_aff & ~obs_aff)),
            "n": n,
        }
    return result

compute_censoring_windows

compute_censoring_windows(df, censor_age, gen_censoring, n_points=300)

Compute per-generation censoring breakdown and incidence curves.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with generation, event time, and censoring columns.

TYPE: DataFrame

censor_age

Maximum observation age.

TYPE: float

gen_censoring

Dict mapping generation to [left, right] observation window.

TYPE: dict[int, list[float]]

n_points

Number of age grid points for incidence curves.

TYPE: int DEFAULT: 300

RETURNS DESCRIPTION
dict[str, Any] | None

Dict with per-generation censoring stats, or None if no generation column.

Source code in simace/analysis/stats/censoring.py
def compute_censoring_windows(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]],
    n_points: int = 300,
) -> dict[str, Any] | None:
    """Compute per-generation censoring breakdown and incidence curves.

    Args:
        df: Phenotype DataFrame with generation, event time, and censoring columns.
        censor_age: Maximum observation age.
        gen_censoring: Dict mapping generation to ``[left, right]`` observation window.
        n_points: Number of age grid points for incidence curves.

    Returns:
        Dict with per-generation censoring stats, or None if no generation column.
    """
    if "generation" not in df.columns:
        return None
    ages = np.linspace(0, censor_age, n_points)
    result = {}
    for gen in sorted(gen_censoring.keys()):
        win_lo, win_hi = gen_censoring[gen]
        gen_df = df[df["generation"] == gen]
        n_gen = len(gen_df)
        if n_gen == 0:
            result[f"gen{gen}"] = {"n": 0}
            continue
        gen_result: dict[str, Any] = {"n": int(n_gen)}
        for trait_num in [1, 2]:
            t_raw = gen_df[f"t{trait_num}"].values
            t_obs = gen_df[f"t_observed{trait_num}"].values
            affected = gen_df[f"affected{trait_num}"].values.astype(bool)
            death_c = gen_df[f"death_censored{trait_num}"].values.astype(bool)
            obs_inc = np.searchsorted(np.sort(t_obs[affected]), ages, side="right") / n_gen
            true_inc = np.searchsorted(np.sort(t_raw), ages, side="right") / n_gen
            gen_result[f"trait{trait_num}"] = {
                "true_incidence": true_inc.tolist(),
                "observed_incidence": obs_inc.tolist(),
                "pct_affected": float(affected.mean()),
                "left_censored": float((t_raw < win_lo).sum() / n_gen),
                "right_censored": float((t_raw > win_hi).sum() / n_gen),
                "death_censored": float((death_c & ~(t_raw < win_lo) & ~(t_raw > win_hi)).mean()),
            }
        result[f"gen{gen}"] = gen_result
    return {"generations": result, "censoring_ages": ages.tolist(), "censor_age": int(censor_age)}

compute_person_years

compute_person_years(df, censor_age, gen_censoring=None)

Compute person-years of follow-up, total and per-trait at-risk.

For each individual in generation g with observation window [lo, hi]: - Total follow-up ends at min(death_age, hi). - Trait-specific at-risk time ends at min(t_observed, death_age, hi).

Returns dict with total_person_years and per-trait person_years_at_risk.

Source code in simace/analysis/stats/censoring.py
def compute_person_years(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]] | None = None,
) -> dict[str, Any]:
    """Compute person-years of follow-up, total and per-trait at-risk.

    For each individual in generation *g* with observation window [lo, hi]:
      - Total follow-up ends at min(death_age, hi).
      - Trait-specific at-risk time ends at min(t_observed, death_age, hi).

    Returns dict with total_person_years and per-trait person_years_at_risk.
    """
    if "generation" not in df.columns:
        return {}

    # Build window lookup: generation → (lo, hi)
    windows: dict[int, tuple[float, float]] = {}
    if gen_censoring is not None:
        for g, (lo, hi) in gen_censoring.items():
            windows[int(g)] = (float(lo), float(hi))

    has_death = "death_age" in df.columns
    total_py = 0.0
    total_deaths = 0
    trait_py: dict[str, float] = {}

    for trait in [1, 2]:
        trait_py[f"trait{trait}"] = 0.0

    for g, df_g in df.groupby("generation"):
        lo, hi = windows.get(int(g), (0.0, censor_age))
        if hi <= lo:
            continue
        n = len(df_g)

        # End of observation for each person (not trait-specific)
        if has_death:
            death_ages = df_g["death_age"].values
            end = np.minimum(death_ages, hi)
            # Deaths observed during follow-up window
            total_deaths += int(((death_ages >= lo) & (death_ages < hi)).sum())
        else:
            end = np.full(n, hi)
        gen_total = np.clip(end - lo, 0, None).sum()
        total_py += float(gen_total)

        # Trait-specific at-risk person-years
        for trait in [1, 2]:
            t_col = f"t_observed{trait}"
            if t_col not in df_g.columns:
                continue
            t_obs = df_g[t_col].values
            if has_death:
                trait_end = np.minimum(np.minimum(t_obs, df_g["death_age"].values), hi)
            else:
                trait_end = np.minimum(t_obs, hi)
            trait_py[f"trait{trait}"] += float(np.clip(trait_end - lo, 0, None).sum())

    return {
        "total": round(total_py, 1),
        "deaths": total_deaths,
        **{k: round(v, 1) for k, v in trait_py.items()},
    }

compute_affected_correlations

compute_affected_correlations(df, seed=42, *, pairs)

Compute Pearson correlations on binary affected status per pair type and trait.

This is the phi coefficient — Pearson r on {0, 1} data — and is the input to observed-scale Falconer-style h² estimators (e.g. 2·(r_MZ − r_FS)).

PARAMETER DESCRIPTION
df

Phenotype DataFrame with affected{1,2} columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each mapping pair type to phi r or

dict[str, Any]

None (if fewer than 10 pairs, or either side is constant).

Source code in simace/analysis/stats/correlations.py
def compute_affected_correlations(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute Pearson correlations on binary affected status per pair type and trait.

    This is the phi coefficient — Pearson r on {0, 1} data — and is the input
    to observed-scale Falconer-style h² estimators (e.g. ``2·(r_MZ − r_FS)``).

    Args:
        df: Phenotype DataFrame with ``affected{1,2}`` columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each mapping pair type to phi r or
        None (if fewer than 10 pairs, or either side is constant).
    """
    result = {}
    for trait_num in [1, 2]:
        affected = df[f"affected{trait_num}"].values.astype(np.float64)
        trait_result: dict[str, float | None] = {}
        for ptype in PAIR_TYPES:
            idx1, idx2 = pairs[ptype]
            if len(idx1) < 10:
                trait_result[ptype] = None
                continue
            r = safe_corrcoef(affected[idx1], affected[idx2])
            trait_result[ptype] = None if np.isnan(r) else r
        result[f"trait{trait_num}"] = trait_result
    return result

compute_cross_trait_tetrachoric

compute_cross_trait_tetrachoric(df, seed=42, *, pairs)

Compute cross-trait tetrachoric correlations (trait 1 vs trait 2).

Includes same-person, same-person-by-generation, and cross-person (across relationship pair types) correlations.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with binary affection columns for both traits.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict with keys same_person, same_person_by_generation,

dict[str, Any]

and cross_person.

Source code in simace/analysis/stats/correlations.py
def compute_cross_trait_tetrachoric(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute cross-trait tetrachoric correlations (trait 1 vs trait 2).

    Includes same-person, same-person-by-generation, and cross-person
    (across relationship pair types) correlations.

    Args:
        df: Phenotype DataFrame with binary affection columns for both traits.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict with keys ``same_person``, ``same_person_by_generation``,
        and ``cross_person``.
    """
    a1 = df["affected1"].values.astype(bool)
    a2 = df["affected2"].values.astype(bool)
    r_sp, se_sp = tetrachoric_corr_se(a1, a2)
    result: dict[str, Any] = {
        "same_person": {
            "r": float(r_sp) if not np.isnan(r_sp) else None,
            "se": float(se_sp) if not np.isnan(se_sp) else None,
            "n": len(df),
        }
    }
    by_gen: dict[str, Any] = {}
    if "generation" in df.columns:
        gen_arr = df["generation"].values
        max_gen = int(gen_arr.max())
        plot_gens = list(range(max(1, max_gen - 2), max_gen + 1))
        for gen in plot_gens:
            mask = gen_arr == gen
            n_g = int(mask.sum())
            if n_g < 50:
                by_gen[f"gen{gen}"] = {"r": None, "se": None, "n": n_g}
                continue
            r_g, se_g = tetrachoric_corr_se(a1[mask], a2[mask])
            by_gen[f"gen{gen}"] = {
                "r": float(r_g) if not np.isnan(r_g) else None,
                "se": float(se_g) if not np.isnan(se_g) else None,
                "n": n_g,
            }
    result["same_person_by_generation"] = by_gen
    cross: dict[str, Any] = {}
    for ptype in PAIR_TYPES:
        idx1, idx2 = pairs[ptype]
        n_p = len(idx1)
        if n_p < 10:
            cross[ptype] = {"r": None, "se": None, "n_pairs": int(n_p)}
            continue
        r_cp, se_cp = tetrachoric_corr_se(a1[idx1], a2[idx2])
        cross[ptype] = {
            "r": float(r_cp) if not np.isnan(r_cp) else None,
            "se": float(se_cp) if not np.isnan(se_cp) else None,
            "n_pairs": int(n_p),
        }
    result["cross_person"] = cross
    return result

compute_liability_correlations

compute_liability_correlations(df, seed=42, *, pairs)

Compute Pearson liability correlations per pair type and trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with liability columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each mapping pair type to correlation.

Source code in simace/analysis/stats/correlations.py
def compute_liability_correlations(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute Pearson liability correlations per pair type and trait.

    Args:
        df: Phenotype DataFrame with liability columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each mapping pair type to correlation.
    """
    result = {}
    for trait_num in [1, 2]:
        liability = df[f"liability{trait_num}"].values
        trait_result: dict[str, float | None] = {}
        for ptype in PAIR_TYPES:
            idx1, idx2 = pairs[ptype]
            trait_result[ptype] = float(_pearsonr_core(liability[idx1], liability[idx2])) if len(idx1) >= 10 else None
        result[f"trait{trait_num}"] = trait_result
    return result

compute_mate_correlation

compute_mate_correlation(df)

Compute 2x2 Pearson correlation matrix between mated pairs' liabilities.

Each unique (mother, father) pair is counted once (not weighted by offspring). Only non-founders are considered.

Source code in simace/analysis/stats/correlations.py
def compute_mate_correlation(df: pd.DataFrame) -> dict:
    """Compute 2x2 Pearson correlation matrix between mated pairs' liabilities.

    Each unique (mother, father) pair is counted once (not weighted by offspring).
    Only non-founders are considered.
    """
    nf = df[df["mother"] != -1][["mother", "father"]].drop_duplicates()
    if len(nf) < 2:
        return {"matrix": [[float("nan")] * 2] * 2, "n_pairs": 0}

    lookup = df.set_index("id")[["liability1", "liability2"]]
    f_liab = lookup.loc[nf["mother"].values].values  # (N, 2)
    m_liab = lookup.loc[nf["father"].values].values  # (N, 2)

    matrix = [[float(safe_corrcoef(f_liab[:, i], m_liab[:, j])) for j in range(2)] for i in range(2)]
    return {"matrix": matrix, "n_pairs": len(nf)}

compute_observed_h2_estimators

compute_observed_h2_estimators(stats)

Derive five naive observed-scale h² estimators from precomputed correlations.

Reads from stats["affected_correlations"] (phi r per pair type) and stats["parent_offspring_affected_corr"] (PO regression slope on binary). Each estimator is a closed-form combination that, under a liability-threshold model, is an unbiased estimator of h²_liab · z(K)²/(K(1−K)) — i.e. the observed-scale h² — where K is the affected-status prevalence.

PARAMETER DESCRIPTION
stats

The in-progress stats dict with affected_correlations and parent_offspring_affected_corr already populated.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed trait1/trait2, each mapping estimator name to a

dict[str, Any]

float or None: {falconer, sibs, po, hs, cousins}.

Source code in simace/analysis/stats/correlations.py
def compute_observed_h2_estimators(stats: dict[str, Any]) -> dict[str, Any]:
    """Derive five naive observed-scale h² estimators from precomputed correlations.

    Reads from ``stats["affected_correlations"]`` (phi r per pair type) and
    ``stats["parent_offspring_affected_corr"]`` (PO regression slope on binary).
    Each estimator is a closed-form combination that, under a liability-threshold
    model, is an unbiased estimator of ``h²_liab · z(K)²/(K(1−K))`` — i.e. the
    observed-scale h² — where K is the affected-status prevalence.

    Args:
        stats: The in-progress stats dict with ``affected_correlations`` and
            ``parent_offspring_affected_corr`` already populated.

    Returns:
        Dict keyed ``trait1``/``trait2``, each mapping estimator name to a
        float or None: ``{falconer, sibs, po, hs, cousins}``.
    """
    aff = stats.get("affected_correlations", {}) or {}
    po_all = stats.get("parent_offspring_affected_corr", {}) or {}

    def _two_diff(r_a: Any, r_b: Any) -> float | None:
        if r_a is None or r_b is None:
            return None
        return 2.0 * (float(r_a) - float(r_b))

    def _scale(r: Any, factor: float) -> float | None:
        if r is None:
            return None
        return factor * float(r)

    def _mean_hs(r_mhs: Any, r_phs: Any) -> float | None:
        vals = [float(v) for v in (r_mhs, r_phs) if v is not None]
        if not vals:
            return None
        return 4.0 * (sum(vals) / len(vals))

    result: dict[str, Any] = {}
    for trait_num in [1, 2]:
        key = f"trait{trait_num}"
        rs = aff.get(key, {}) or {}
        po_entry = po_all.get(key, {}) or {}
        po_slope = po_entry.get("slope")
        result[key] = {
            "falconer": _two_diff(rs.get("MZ"), rs.get("FS")),
            "sibs": _scale(rs.get("FS"), 2.0),
            "po": float(po_slope) if po_slope is not None else None,
            "hs": _mean_hs(rs.get("MHS"), rs.get("PHS")),
            "cousins": _scale(rs.get("1C"), 8.0),
        }
    return result

compute_parent_offspring_affected_corr

compute_parent_offspring_affected_corr(df)

Compute pooled midparent-offspring regression on binary affected status.

Regresses offspring.affected (0/1) on midparent affected status (mother.affected + father.affected) / 2 (values in {0, 0.5, 1}), pooled across every non-founder individual whose parents are both in the DataFrame. The regression slope is the observed-scale PO heritability estimator; under LTM it can be back-transformed to liability via Dempster-Lerner.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with id, mother, father, and affected{1,2} columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed trait1/trait2, each with

dict[str, Any]

{slope, r, r2, intercept, stderr, pvalue, n_pairs}. Values are

dict[str, Any]

None when fewer than 10 valid trios or midparent has zero variance.

Source code in simace/analysis/stats/correlations.py
def compute_parent_offspring_affected_corr(df: pd.DataFrame) -> dict[str, Any]:
    """Compute pooled midparent-offspring regression on binary affected status.

    Regresses ``offspring.affected`` (0/1) on midparent affected status
    ``(mother.affected + father.affected) / 2`` (values in {0, 0.5, 1}),
    pooled across every non-founder individual whose parents are both in the
    DataFrame.  The regression slope is the observed-scale PO heritability
    estimator; under LTM it can be back-transformed to liability via
    Dempster-Lerner.

    Args:
        df: Phenotype DataFrame with ``id``, ``mother``, ``father``, and
            ``affected{1,2}`` columns.

    Returns:
        Dict keyed ``trait1``/``trait2``, each with
        ``{slope, r, r2, intercept, stderr, pvalue, n_pairs}``.  Values are
        None when fewer than 10 valid trios or midparent has zero variance.
    """
    if "id" not in df.columns or "mother" not in df.columns or "father" not in df.columns:
        return {}

    ids_arr = df["id"].values
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(len(df), dtype=np.int32)

    non_founder_idx = np.where(df["mother"].values >= 0)[0]

    null = {
        "r": None,
        "r2": None,
        "slope": None,
        "intercept": None,
        "stderr": None,
        "pvalue": None,
        "n_pairs": 0,
    }

    # Precompute valid trios once (parents present, looked up via id_to_row).
    mother_ids_arr = df["mother"].values[non_founder_idx]
    father_ids_arr = df["father"].values[non_founder_idx]
    has_m = (mother_ids_arr >= 0) & (mother_ids_arr < len(id_to_row))
    has_f = (father_ids_arr >= 0) & (father_ids_arr < len(id_to_row))
    m_rows = np.full(len(non_founder_idx), -1, dtype=np.int32)
    f_rows = np.full(len(non_founder_idx), -1, dtype=np.int32)
    m_rows[has_m] = id_to_row[mother_ids_arr[has_m]]
    f_rows[has_f] = id_to_row[father_ids_arr[has_f]]
    valid = (m_rows >= 0) & (f_rows >= 0)
    n_pairs = int(valid.sum())

    result: dict[str, Any] = {}
    for trait_num in [1, 2]:
        aff_col = f"affected{trait_num}"
        if aff_col not in df.columns:
            result[f"trait{trait_num}"] = {**null}
            continue
        affected = df[aff_col].values.astype(np.float64)
        if n_pairs < 10:
            result[f"trait{trait_num}"] = {**null, "n_pairs": n_pairs}
            continue
        midparent = (affected[m_rows[valid]] + affected[f_rows[valid]]) / 2.0
        # Zero-variance midparent (all parents concordant) gives an undefined
        # regression; surface as None rather than 0/0.
        if float(np.var(midparent)) < 1e-12:
            result[f"trait{trait_num}"] = {**null, "n_pairs": n_pairs}
            continue
        entry = _po_regression(non_founder_idx, affected, id_to_row, df)
        slope = entry.get("slope")
        if slope is not None and not np.isfinite(slope):
            entry = {**null, "n_pairs": entry.get("n_pairs", 0)}
        result[f"trait{trait_num}"] = entry
    return result

compute_parent_offspring_corr

compute_parent_offspring_corr(df)

Compute midparent-offspring liability regression per generation and trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with liability, generation, and parent columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each containing per-generation

dict[str, Any]

regression stats (slope, r, r2, intercept, stderr, pvalue, n_pairs).

Source code in simace/analysis/stats/correlations.py
def compute_parent_offspring_corr(df: pd.DataFrame) -> dict[str, Any]:
    """Compute midparent-offspring liability regression per generation and trait.

    Args:
        df: Phenotype DataFrame with liability, generation, and parent columns.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each containing per-generation
        regression stats (slope, r, r2, intercept, stderr, pvalue, n_pairs).
    """
    if "generation" not in df.columns:
        return {}
    max_gen = int(df["generation"].max())
    ids_arr = df["id"].values
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(len(df), dtype=np.int32)
    gen_arr = df["generation"].values
    result = {}
    for trait_num in [1, 2]:
        liability = df[f"liability{trait_num}"].values
        trait_result = {}
        for gen in range(1, max_gen + 1):
            gen_idx = np.where(gen_arr == gen)[0]
            trait_result[f"gen{gen}"] = _po_regression(gen_idx, liability, id_to_row, df)
        result[f"trait{trait_num}"] = trait_result
    return result

compute_parent_offspring_corr_by_sex

compute_parent_offspring_corr_by_sex(df)

Compute midparent-offspring regression partitioned by offspring sex.

Returns dict keyed by "female"/"male", each containing per-trait per-generation {slope, r, r2, intercept, stderr, pvalue, n_pairs}.

Source code in simace/analysis/stats/correlations.py
def compute_parent_offspring_corr_by_sex(df: pd.DataFrame) -> dict[str, Any]:
    """Compute midparent-offspring regression partitioned by offspring sex.

    Returns dict keyed by "female"/"male", each containing per-trait
    per-generation {slope, r, r2, intercept, stderr, pvalue, n_pairs}.
    """
    if "generation" not in df.columns:
        return {}
    max_gen = int(df["generation"].max())
    ids_arr = df["id"].values
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(len(df), dtype=np.int32)
    gen_arr = df["generation"].values
    sex_arr = df["sex"].values
    result: dict[str, Any] = {}
    for sex_val, sex_label in SEX_LEVELS:
        sex_result: dict[str, Any] = {}
        for trait_num in [1, 2]:
            liability = df[f"liability{trait_num}"].values
            trait_result: dict[str, Any] = {}
            for gen in range(1, max_gen + 1):
                gen_idx = np.where((gen_arr == gen) & (sex_arr == sex_val))[0]
                trait_result[f"gen{gen}"] = _po_regression(gen_idx, liability, id_to_row, df)
            sex_result[f"trait{trait_num}"] = trait_result
        result[sex_label] = sex_result
    return result

compute_tetrachoric

compute_tetrachoric(df, seed=42, *, pairs)

Compute tetrachoric correlations per pair type and trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with binary affection columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each mapping pair type to

dict[str, Any]

{r, se, n_pairs}.

Source code in simace/analysis/stats/correlations.py
def compute_tetrachoric(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute tetrachoric correlations per pair type and trait.

    Args:
        df: Phenotype DataFrame with binary affection columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each mapping pair type to
        ``{r, se, n_pairs}``.
    """
    result = {}
    for trait_num in [1, 2]:
        affected = df[f"affected{trait_num}"].values.astype(bool)
        trait_result = {}
        for ptype in PAIR_TYPES:
            idx1, idx2 = pairs[ptype]
            trait_result[ptype] = _tetrachoric_for_pairs(idx1, idx2, affected)
        result[f"trait{trait_num}"] = trait_result
    return result

compute_tetrachoric_by_generation

compute_tetrachoric_by_generation(df, seed=42, *, pairs)

Compute tetrachoric correlations stratified by generation.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with generation and affection columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by gen{N}, each containing per-trait per-pair-type

dict[str, Any]

{r, se, n_pairs, liability_r}.

Source code in simace/analysis/stats/correlations.py
def compute_tetrachoric_by_generation(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute tetrachoric correlations stratified by generation.

    Args:
        df: Phenotype DataFrame with generation and affection columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``gen{N}``, each containing per-trait per-pair-type
        ``{r, se, n_pairs, liability_r}``.
    """
    if "generation" not in df.columns:
        return {}
    gen_arr = df["generation"].values
    max_gen = int(gen_arr.max())
    plot_gens = list(range(max(1, max_gen - 2), max_gen + 1))
    affected_by_trait = {n: df[f"affected{n}"].values.astype(bool) for n in (1, 2)}
    liability_by_trait = {n: df[f"liability{n}"].values for n in (1, 2)}
    result = {}
    for gen in plot_gens:
        gen_result = {}
        for trait_num in (1, 2):
            affected = affected_by_trait[trait_num]
            liability = liability_by_trait[trait_num]
            trait_result = {}
            for ptype in PAIR_TYPES:
                idx1, idx2 = pairs[ptype]
                mask = gen_arr[idx1] == gen
                trait_result[ptype] = _tetrachoric_for_pairs(idx1[mask], idx2[mask], affected, liability)
            gen_result[f"trait{trait_num}"] = trait_result
        result[f"gen{gen}"] = gen_result
    return result

compute_tetrachoric_by_sex

compute_tetrachoric_by_sex(df, seed=42, *, pairs)

Compute tetrachoric correlations for same-sex pairs only (FF and MM).

Returns dict keyed by "female"/"male", each containing per-trait per-pair-type {r, se, n_pairs, liability_r}.

Source code in simace/analysis/stats/correlations.py
def compute_tetrachoric_by_sex(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute tetrachoric correlations for same-sex pairs only (FF and MM).

    Returns dict keyed by "female"/"male", each containing per-trait
    per-pair-type {r, se, n_pairs, liability_r}.
    """
    sex_arr = df["sex"].values
    affected_by_trait = {n: df[f"affected{n}"].values.astype(bool) for n in (1, 2)}
    liability_by_trait = {n: df[f"liability{n}"].values for n in (1, 2)}
    result: dict[str, Any] = {}
    for sex_val, sex_label in SEX_LEVELS:
        sex_result: dict[str, Any] = {}
        for trait_num in (1, 2):
            affected = affected_by_trait[trait_num]
            liability = liability_by_trait[trait_num]
            trait_result: dict[str, Any] = {}
            for ptype in PAIR_TYPES:
                idx1, idx2 = pairs[ptype]
                sex_mask = (sex_arr[idx1] == sex_val) & (sex_arr[idx2] == sex_val)
                trait_result[ptype] = _tetrachoric_for_pairs(idx1[sex_mask], idx2[sex_mask], affected, liability)
            sex_result[f"trait{trait_num}"] = trait_result
        result[sex_label] = sex_result
    return result

compute_effective_size

compute_effective_size(pedigree, config=None)

Run all eight Ne estimators on pedigree and serialize to dicts.

PARAMETER DESCRIPTION
pedigree

Either a pandas DataFrame with the standard pedigree columns or an already-built :class:PedigreeGraph. Passing a graph avoids rebuilding it when the runner has already constructed one for relationship extraction.

TYPE: DataFrame | PedigreeGraph

config

Per-rep params (e.g. loaded from params.yaml). Used solely to derive theoretical expectations.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

Dict keyed on estimator name; each value is the matching

dict[str, dict[str, Any]]

dataclass's to_dict() payload merged with an expected

dict[str, dict[str, Any]]

field (float or None).

Source code in simace/analysis/stats/effective_size.py
def compute_effective_size(
    pedigree: pd.DataFrame | PedigreeGraph,
    config: dict[str, Any] | None = None,
) -> dict[str, dict[str, Any]]:
    """Run all eight Ne estimators on ``pedigree`` and serialize to dicts.

    Args:
        pedigree: Either a pandas DataFrame with the standard pedigree
            columns or an already-built :class:`PedigreeGraph`.  Passing
            a graph avoids rebuilding it when the runner has already
            constructed one for relationship extraction.
        config: Per-rep params (e.g. loaded from ``params.yaml``).
            Used solely to derive theoretical expectations.

    Returns:
        Dict keyed on estimator name; each value is the matching
        dataclass's ``to_dict()`` payload merged with an ``expected``
        field (``float`` or ``None``).
    """
    pg = pedigree if isinstance(pedigree, PedigreeGraph) else PedigreeGraph(pedigree)
    raw = compute_all_ne(pg)
    expected = theoretical_expectations(config)
    out: dict[str, dict[str, Any]] = {}
    for name, result in raw.items():
        d = result.to_dict()
        d["expected"] = expected.get(name)
        out[name] = d
    return out

ne_v_expected_ztp

ne_v_expected_ztp(n, mating_lambda)

Closed-form Ne_V expectation under simACE's mating model.

Under random mating with balanced 50/50 sex, ZTP(λ) mating counts per individual, and multinomial allocation of N offspring across the resulting matings, the per-individual total-offspring count has

``E[k] = 2``,
``V(k) = 2 + 4 · Var[m] / E[m]²``,

where m ~ ZTP(λ) with

``E[m]   = λ / (1 − e^(−λ))``,
``Var[m] = E[m] · (1 + λ) − E[m]²``.

Plugging into Ne_V = 2N / V(k) yields

``Ne_V = N / (1 + 2 · Var[m] / E[m]²)``.

The formula is exact in the multinomial → Poisson per-mating offspring limit (large M); finite-sample correction is O(1 / number_of_matings).

Limits
  • λ → 0⁺ (degenerate at m=1, monogamous): Ne_V = N.
  • λ → ∞ (Poisson, no truncation): Ne_V = N.
  • Default λ = 0.5: Ne_V ≈ 0.7349 · N.
Source code in simace/analysis/stats/effective_size.py
def ne_v_expected_ztp(n: float, mating_lambda: float) -> float:
    """Closed-form ``Ne_V`` expectation under simACE's mating model.

    Under random mating with balanced 50/50 sex, ZTP(λ) mating counts
    per individual, and multinomial allocation of N offspring across the
    resulting matings, the per-individual total-offspring count has

        ``E[k] = 2``,
        ``V(k) = 2 + 4 · Var[m] / E[m]²``,

    where ``m ~ ZTP(λ)`` with

        ``E[m]   = λ / (1 − e^(−λ))``,
        ``Var[m] = E[m] · (1 + λ) − E[m]²``.

    Plugging into ``Ne_V = 2N / V(k)`` yields

        ``Ne_V = N / (1 + 2 · Var[m] / E[m]²)``.

    The formula is exact in the multinomial → Poisson per-mating
    offspring limit (large M); finite-sample correction is
    ``O(1 / number_of_matings)``.

    Limits:
        * ``λ → 0⁺`` (degenerate at m=1, monogamous): ``Ne_V = N``.
        * ``λ → ∞`` (Poisson, no truncation): ``Ne_V = N``.
        * Default ``λ = 0.5``: ``Ne_V ≈ 0.7349 · N``.
    """
    if mating_lambda <= 0:
        return float(n)
    p = 1.0 - math.exp(-mating_lambda)
    e_m = mating_lambda / p
    var_m = e_m * (1.0 + mating_lambda) - e_m * e_m
    return float(n) / (1.0 + 2.0 * var_m / (e_m * e_m))

regression_estimator_regime_ok

regression_estimator_regime_ok(n, g_ped, ne_v)

Whether the regression-based Ne estimators are reliable at this scale.

The slope estimate in Ne_I, Ne_C, and Ne_CT has variance ∝ 1/(N·G³); inverting the slope to get Ne incurs a Jensen bias that scales as Ne_V² / (N · G²). We declare the regime acceptable when the implied bias on Ne is below ~20 % of Ne_V, which corresponds to N · G² ≥ 120 · Ne_V.

Returns False for g_ped < 2 (no slope possible) regardless of N.

Source code in simace/analysis/stats/effective_size.py
def regression_estimator_regime_ok(n: float, g_ped: int, ne_v: float) -> bool:
    """Whether the regression-based Ne estimators are reliable at this scale.

    The slope estimate in Ne_I, Ne_C, and Ne_CT has variance
    ``∝ 1/(N·G³)``; inverting the slope to get Ne incurs a Jensen bias
    that scales as ``Ne_V² / (N · G²)``.  We declare the regime
    acceptable when the implied bias on Ne is below ~20 % of Ne_V,
    which corresponds to ``N · G² ≥ 120 · Ne_V``.

    Returns ``False`` for ``g_ped < 2`` (no slope possible) regardless
    of ``N``.
    """
    if g_ped < 2:
        return False
    return n * g_ped * g_ped >= _REGRESSION_REGIME_THRESHOLD * ne_v

theoretical_expectations

theoretical_expectations(config)

Closed-form Ne expectations under standard random-mating assumptions.

Returns a per-estimator dict. Every entry is None when config is missing, N is unknown, or the configuration includes a non-standard knob (currently: nonzero assort1 / assort2).

Under random mating with 50/50 sex and ZTP(mating_lambda) family allocation, the family-size variance correction reduces Ne_V-family estimators below N per :func:ne_v_expected_ztp. Three estimators (Ne_V, Ne_iΔF, Ne_H) inherit that expectation directly — their finite-sample bias is O(1/N) and negligible at realistic simACE scales.

Three regression-based estimators (Ne_I, Ne_C, Ne_CT) carry a Jensen bias on the inverted slope of order Ne_V² / (N · G²) that typically dominates at simACE's default G_ped = 6. We return their expectation only when :func:regression_estimator_regime_ok is satisfied, otherwise None (validator passes vacuously).

Ne_sr stays at N (deterministic balanced sex ratio). Ne_LTC under the Ne = 1/(2·Σc²) form is approximated as Ne_V_expected / 2 — consistent with the WF limit where Ne_V → N and Ne_LTC → N/2. In practice the observed Ne_LTC is typically None (asymptote not reached within G_ped generations under realized WF noise), so the validator passes vacuously.

Source code in simace/analysis/stats/effective_size.py
def theoretical_expectations(config: dict[str, Any] | None) -> dict[str, float | None]:
    """Closed-form Ne expectations under standard random-mating assumptions.

    Returns a per-estimator dict.  Every entry is ``None`` when ``config``
    is missing, ``N`` is unknown, or the configuration includes a
    non-standard knob (currently: nonzero ``assort1`` / ``assort2``).

    Under random mating with 50/50 sex and ZTP(``mating_lambda``) family
    allocation, the family-size variance correction reduces
    ``Ne_V``-family estimators below ``N`` per :func:`ne_v_expected_ztp`.
    Three estimators (Ne_V, Ne_iΔF, Ne_H) inherit that expectation
    directly — their finite-sample bias is ``O(1/N)`` and negligible at
    realistic simACE scales.

    Three regression-based estimators (Ne_I, Ne_C, Ne_CT) carry a Jensen
    bias on the inverted slope of order ``Ne_V² / (N · G²)`` that
    typically dominates at simACE's default ``G_ped = 6``.  We return
    their expectation only when
    :func:`regression_estimator_regime_ok` is satisfied, otherwise
    ``None`` (validator passes vacuously).

    Ne_sr stays at ``N`` (deterministic balanced sex ratio).  Ne_LTC
    under the ``Ne = 1/(2·Σc²)`` form is approximated as
    ``Ne_V_expected / 2`` — consistent with the WF limit where
    ``Ne_V → N`` and ``Ne_LTC → N/2``.  In practice the observed
    Ne_LTC is typically ``None`` (asymptote not reached within
    ``G_ped`` generations under realized WF noise), so the validator
    passes vacuously.
    """
    if config is None:
        return dict.fromkeys(_NE_KEYS)

    assort1 = float(config.get("assort1") or 0.0)
    assort2 = float(config.get("assort2") or 0.0)
    if assort1 != 0.0 or assort2 != 0.0:
        return dict.fromkeys(_NE_KEYS)

    N = config.get("N")
    if N is None:
        return dict.fromkeys(_NE_KEYS)

    mating_lambda = config.get("mating_lambda")
    if mating_lambda is None:
        return dict.fromkeys(_NE_KEYS)

    n = float(N)
    ne_v = ne_v_expected_ztp(n, float(mating_lambda))

    g_ped = config.get("G_ped")
    regression_ok = g_ped is not None and regression_estimator_regime_ok(n, int(g_ped), ne_v)
    regression_expected = ne_v if regression_ok else None

    return {
        "ne_inbreeding": regression_expected,
        "ne_coancestry": regression_expected,
        "ne_variance_family_size": ne_v,
        "ne_sex_ratio": n,
        "ne_individual_delta_f": ne_v,
        "ne_long_term_contributions": ne_v / 2.0,
        "ne_hill_overlapping": ne_v,
        "ne_caballero_toro": regression_expected,
    }

compute_cumulative_incidence

compute_cumulative_incidence(df, censor_age, n_points=200)

Compute observed and true cumulative incidence curves per trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with event time and affection columns.

TYPE: DataFrame

censor_age

Maximum observation age for the x-axis grid.

TYPE: float

n_points

Number of age grid points.

TYPE: int DEFAULT: 200

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each with ages,

dict[str, Any]

observed_values, true_values, and half_target_age.

Source code in simace/analysis/stats/incidence.py
def compute_cumulative_incidence(
    df: pd.DataFrame,
    censor_age: float,
    n_points: int = 200,
) -> dict[str, Any]:
    """Compute observed and true cumulative incidence curves per trait.

    Args:
        df: Phenotype DataFrame with event time and affection columns.
        censor_age: Maximum observation age for the x-axis grid.
        n_points: Number of age grid points.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each with ``ages``,
        ``observed_values``, ``true_values``, and ``half_target_age``.
    """
    ages = np.linspace(0, censor_age, n_points)
    n = len(df)
    result = {}
    for trait_num in [1, 2]:
        aff = df[f"affected{trait_num}"].values.astype(bool)
        t_obs = df[f"t_observed{trait_num}"].values
        t_raw = df[f"t{trait_num}"].values
        obs_inc = _cumulative_curve(t_obs, ages, n, mask=aff)
        true_inc = _cumulative_curve(t_raw, ages, n)
        half_idx = np.searchsorted(obs_inc, obs_inc[-1] / 2)
        result[f"trait{trait_num}"] = {
            "ages": ages.tolist(),
            "observed_values": obs_inc.tolist(),
            "true_values": true_inc.tolist(),
            "half_target_age": float(ages[min(half_idx, len(ages) - 1)]),
        }
    return result

compute_cumulative_incidence_by_sex

compute_cumulative_incidence_by_sex(df, censor_age, n_points=200)

Compute cumulative incidence curves split by sex (0=female, 1=male).

Source code in simace/analysis/stats/incidence.py
def compute_cumulative_incidence_by_sex(
    df: pd.DataFrame,
    censor_age: float,
    n_points: int = 200,
) -> dict[str, Any]:
    """Compute cumulative incidence curves split by sex (0=female, 1=male)."""
    if "sex" not in df.columns:
        return {}

    ages = np.linspace(0, censor_age, n_points)
    sex = df["sex"].values
    result = {}
    for trait_num in [1, 2]:
        aff = df[f"affected{trait_num}"].values.astype(bool)
        t_obs_aff = df[f"t_observed{trait_num}"].values[aff]
        sex_aff = sex[aff]

        trait_result = {}
        for sex_val, sex_label in SEX_LEVELS:
            n_sex = int((sex == sex_val).sum())
            if n_sex == 0:
                continue
            in_stratum_aff = sex_aff == sex_val
            inc = _cumulative_curve(t_obs_aff, ages, n_sex, mask=in_stratum_aff)
            trait_result[sex_label] = {
                "ages": ages.tolist(),
                "values": inc.tolist(),
                "n": n_sex,
                "prevalence": float(in_stratum_aff.sum() / n_sex),
            }
        result[f"trait{trait_num}"] = trait_result
    return result

compute_cumulative_incidence_by_sex_generation

compute_cumulative_incidence_by_sex_generation(df, censor_age, n_points=200)

Compute cumulative incidence curves split by sex and generation.

Source code in simace/analysis/stats/incidence.py
def compute_cumulative_incidence_by_sex_generation(
    df: pd.DataFrame,
    censor_age: float,
    n_points: int = 200,
) -> dict[str, Any]:
    """Compute cumulative incidence curves split by sex and generation."""
    if "sex" not in df.columns or "generation" not in df.columns:
        return {}

    ages = np.linspace(0, censor_age, n_points)
    generations = sorted(df["generation"].unique())
    sex = df["sex"].values
    gen_arr = df["generation"].values
    result = {}
    for trait_num in [1, 2]:
        aff = df[f"affected{trait_num}"].values.astype(bool)
        t_obs_aff = df[f"t_observed{trait_num}"].values[aff]
        sex_aff = sex[aff]
        gen_aff = gen_arr[aff]

        trait_result: dict[str, Any] = {}
        for gen in generations:
            gen_result: dict[str, Any] = {}
            for sex_val, sex_label in SEX_LEVELS:
                n_sex = int(((gen_arr == gen) & (sex == sex_val)).sum())
                if n_sex == 0:
                    continue
                in_stratum_aff = (gen_aff == gen) & (sex_aff == sex_val)
                inc = _cumulative_curve(t_obs_aff, ages, n_sex, mask=in_stratum_aff)
                gen_result[sex_label] = {
                    "ages": ages.tolist(),
                    "values": inc.tolist(),
                    "n": n_sex,
                    "prevalence": float(in_stratum_aff.sum() / n_sex),
                }
            trait_result[f"gen{int(gen)}"] = gen_result
        result[f"trait{trait_num}"] = trait_result
    return result

compute_joint_affection

compute_joint_affection(df)

Compute 2x2 contingency table for trait1 x trait2 affection status.

Source code in simace/analysis/stats/incidence.py
def compute_joint_affection(df: pd.DataFrame) -> dict[str, Any]:
    """Compute 2x2 contingency table for trait1 x trait2 affection status."""
    a1 = df["affected1"].values.astype(bool)
    a2 = df["affected2"].values.astype(bool)
    n = len(df)

    counts = {
        "both": int(np.sum(a1 & a2)),
        "trait1_only": int(np.sum(a1 & ~a2)),
        "trait2_only": int(np.sum(~a1 & a2)),
        "neither": int(np.sum(~a1 & ~a2)),
    }
    proportions = {k: v / n for k, v in counts.items()}

    # Sex-specific co-affection proportions
    by_sex: dict[str, float] = {}
    if "sex" in df.columns:
        for sex_val, sex_label in SEX_LEVELS:
            mask = df["sex"].values == sex_val
            n_sex = int(mask.sum())
            if n_sex > 0:
                by_sex[sex_label] = round(float(np.sum(a1[mask] & a2[mask])) / n_sex, 4)

    return {"counts": counts, "proportions": proportions, "n": n, "by_sex": by_sex}

compute_mortality

compute_mortality(df, censor_age)

Compute decade-binned mortality rates from death ages.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with death_age column.

TYPE: DataFrame

censor_age

Maximum observation age.

TYPE: float

RETURNS DESCRIPTION
dict[str, Any]

Dict with decade_labels and rates lists.

Source code in simace/analysis/stats/incidence.py
def compute_mortality(df: pd.DataFrame, censor_age: float) -> dict[str, Any]:
    """Compute decade-binned mortality rates from death ages.

    Args:
        df: Phenotype DataFrame with ``death_age`` column.
        censor_age: Maximum observation age.

    Returns:
        Dict with ``decade_labels`` and ``rates`` lists.
    """
    decade_edges = np.arange(0, censor_age + 10, 10)
    mortality_rates, decade_labels = [], []
    death_ages = df["death_age"].values
    for i in range(len(decade_edges) - 1):
        lo, hi = decade_edges[i], decade_edges[i + 1]
        if lo >= censor_age:
            break
        alive = (death_ages >= lo).sum()
        died = ((death_ages >= lo) & (death_ages < hi) & (death_ages < censor_age)).sum()
        mortality_rates.append(float(died / alive) if alive > 0 else 0.0)
        decade_labels.append(f"{int(lo)}-{int(hi - 1)}")
    return {"decade_labels": decade_labels, "rates": mortality_rates}

compute_prevalence

compute_prevalence(df)

Compute observed prevalence for each trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with affected1 and affected2 columns. If a generation column is present, per-generation prevalence is also reported under by_generation.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict with trait1 and trait2 marginal prevalence fractions, and

dict[str, Any]

(when generation is present) a by_generation subkey mapping

dict[str, Any]

int(generation) -> {"trait1": float, "trait2": float}.

Source code in simace/analysis/stats/incidence.py
def compute_prevalence(df: pd.DataFrame) -> dict[str, Any]:
    """Compute observed prevalence for each trait.

    Args:
        df: Phenotype DataFrame with ``affected1`` and ``affected2`` columns.
            If a ``generation`` column is present, per-generation prevalence
            is also reported under ``by_generation``.

    Returns:
        Dict with ``trait1`` and ``trait2`` marginal prevalence fractions, and
        (when ``generation`` is present) a ``by_generation`` subkey mapping
        ``int(generation) -> {"trait1": float, "trait2": float}``.
    """
    result: dict[str, Any] = {
        "trait1": float(df["affected1"].mean()),
        "trait2": float(df["affected2"].mean()),
    }
    if "generation" in df.columns:
        means = df.groupby("generation")[["affected1", "affected2"]].mean()
        result["by_generation"] = {
            int(gen): {"trait1": float(row["affected1"]), "trait2": float(row["affected2"])}
            for gen, row in means.iterrows()
        }
    return result

compute_regression

compute_regression(df)

Regress observed event time on liability for affected individuals.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with liability and observed-time columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each with regression stats

dict[str, Any]

(slope, intercept, r, r2, stderr, pvalue, n) or None.

Source code in simace/analysis/stats/incidence.py
def compute_regression(df: pd.DataFrame) -> dict[str, Any]:
    """Regress observed event time on liability for affected individuals.

    Args:
        df: Phenotype DataFrame with liability and observed-time columns.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each with regression stats
        (slope, intercept, r, r2, stderr, pvalue, n) or None.
    """
    result: dict[str, Any] = {}
    for trait_num in [1, 2]:
        aff_col = f"affected{trait_num}"
        t_col = f"t_observed{trait_num}"
        liab_col = f"liability{trait_num}"
        if liab_col not in df.columns:
            result[f"trait{trait_num}"] = None
            continue
        sub = df[df[aff_col]].dropna(subset=[liab_col, t_col])
        if len(sub) < 2:
            result[f"trait{trait_num}"] = None
            continue
        slope, intercept, r, stderr, pvalue = fast_linregress(sub[liab_col].values, sub[t_col].values)
        result[f"trait{trait_num}"] = {
            "slope": slope,
            "intercept": intercept,
            "r": r,
            "r2": r**2,
            "stderr": stderr,
            "pvalue": pvalue,
            "n": len(sub),
        }
    return result

compute_mean_family_size

compute_mean_family_size(df)

Compute mean realised family size (offspring per mating pair).

Uses non-founder individuals (mother != -1) grouped by (mother, father).

Source code in simace/analysis/stats/pedigree.py
def compute_mean_family_size(df: pd.DataFrame) -> dict[str, Any]:
    """Compute mean realised family size (offspring per mating pair).

    Uses non-founder individuals (mother != -1) grouped by (mother, father).
    """
    if "mother" not in df.columns or "father" not in df.columns:
        return {}

    children = df.loc[(df["mother"] != -1) & (df["father"] != -1)]
    if len(children) == 0:
        return {}

    family_sizes = children.groupby(["mother", "father"]).size()

    # Fraction with at least one phenotyped full sibling
    families_with_sibs = family_sizes[family_sizes >= 2].index
    has_sib = children.set_index(["mother", "father"]).index.isin(families_with_sibs)
    frac_with_full_sib = round(float(has_sib.sum()) / len(children), 4)

    # Family size distribution per mating (1, 2, 3, 4+)
    n_fam = len(family_sizes)
    dist: dict[str, float] = {}
    for k in [1, 2, 3]:
        dist[str(k)] = round(int((family_sizes == k).sum()) / n_fam, 4)
    dist["4+"] = round(int((family_sizes >= 4).sum()) / n_fam, 4)

    # Offspring per person (including 0 for childless individuals).
    # Count via bincount on row positions: faster than groupby + Series.update/add.
    # When df is a subsample, a child's parent may not be in df["id"]; id_to_row
    # marks those as -1 and they must be masked out before bincount (which rejects
    # negatives). This matches the prior groupby+update semantics, which only
    # counted offspring against parents present in df["id"].
    ids_arr = df["id"].to_numpy()
    n_total = len(df)
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(n_total, dtype=np.int32)
    m_rows = id_to_row[children["mother"].to_numpy()]
    f_rows = id_to_row[children["father"].to_numpy()]
    m_rows = m_rows[m_rows >= 0]
    f_rows = f_rows[f_rows >= 0]
    counts_arr = np.bincount(m_rows, minlength=n_total) + np.bincount(f_rows, minlength=n_total)
    offspring_counts = pd.Series(counts_arr, index=df["id"])
    person_dist = _offspring_count_dist(offspring_counts, n_total)

    # Offspring per person by sex
    person_dist_by_sex: dict[str, dict[str, float]] = {}
    if "sex" in df.columns:
        sex_by_id = df.set_index("id")["sex"]
        for sex_val, sex_label in SEX_LEVELS:
            sex_ids = sex_by_id[sex_by_id == sex_val].index
            sex_counts = offspring_counts.reindex(sex_ids, fill_value=0)
            if len(sex_counts) > 0:
                person_dist_by_sex[sex_label] = _offspring_count_dist(sex_counts, len(sex_counts))

    # Number of mates by sex
    # Females: unique fathers per mother
    mates_female = children.groupby("mother")["father"].nunique()
    # Males: unique mothers per father
    mates_male = children.groupby("father")["mother"].nunique()
    n_mothers = len(mates_female)
    n_fathers = len(mates_male)
    mates_by_sex: dict[str, Any] = {
        "female_mean": round(float(mates_female.mean()), 2) if n_mothers else 0,
        "male_mean": round(float(mates_male.mean()), 2) if n_fathers else 0,
        "female_1": round(int((mates_female == 1).sum()) / n_mothers, 4) if n_mothers else 0,
        "female_2+": round(int((mates_female >= 2).sum()) / n_mothers, 4) if n_mothers else 0,
        "male_1": round(int((mates_male == 1).sum()) / n_fathers, 4) if n_fathers else 0,
        "male_2+": round(int((mates_male >= 2).sum()) / n_fathers, 4) if n_fathers else 0,
    }

    return {
        "mean": round(float(family_sizes.mean()), 2),
        "median": round(float(family_sizes.median()), 1),
        "q1": round(float(family_sizes.quantile(0.25)), 1),
        "q3": round(float(family_sizes.quantile(0.75)), 1),
        "n_families": len(family_sizes),
        "frac_with_full_sib": frac_with_full_sib,
        "size_dist": dist,
        "person_offspring_dist": person_dist,
        "person_offspring_dist_by_sex": person_dist_by_sex,
        "mates_by_sex": mates_by_sex,
    }

compute_parent_status

compute_parent_status(df, df_ped=None)

Count individuals by number of parents phenotyped and in pedigree.

Returns dict with 'phenotyped' and optionally 'in_pedigree', each mapping 0/1/2 → count of individuals with that many parents present.

Source code in simace/analysis/stats/pedigree.py
def compute_parent_status(
    df: pd.DataFrame,
    df_ped: pd.DataFrame | None = None,
) -> dict[str, Any]:
    """Count individuals by number of parents phenotyped and in pedigree.

    Returns dict with 'phenotyped' and optionally 'in_pedigree', each mapping
    0/1/2 → count of individuals with that many parents present.
    """
    if "mother" not in df.columns or "father" not in df.columns:
        return {}

    pheno_ids = df["id"].to_numpy()
    mothers = df["mother"].values
    fathers = df["father"].values

    m_pheno = np.isin(mothers, pheno_ids) & (mothers != -1)
    f_pheno = np.isin(fathers, pheno_ids) & (fathers != -1)
    n_parents_pheno = m_pheno.astype(int) + f_pheno.astype(int)
    result: dict[str, Any] = {
        "phenotyped": {str(k): int((n_parents_pheno == k).sum()) for k in [0, 1, 2]},
    }

    if df_ped is not None:
        ped_ids = df_ped["id"].to_numpy()
        m_ped = np.isin(mothers, ped_ids) & (mothers != -1)
        f_ped = np.isin(fathers, ped_ids) & (fathers != -1)
        n_parents_ped = m_ped.astype(int) + f_ped.astype(int)
        result["in_pedigree"] = {str(k): int((n_parents_ped == k).sum()) for k in [0, 1, 2]}

    return result

cli

cli()

Command-line interface for phenotype statistics computation.

Source code in simace/analysis/stats/runner.py
def cli() -> None:
    """Command-line interface for phenotype statistics computation."""
    from simace.core.cli_base import add_logging_args, init_logging

    parser = argparse.ArgumentParser(description="Compute phenotype statistics")
    add_logging_args(parser)
    parser.add_argument("phenotype", help="Input phenotype parquet")
    parser.add_argument("censor_age", type=float)
    parser.add_argument("stats_output", help="Output stats YAML")
    parser.add_argument("samples_output", help="Output samples parquet")
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--gen-censoring", type=str, default=None, help="Per-generation censoring windows as JSON dict")
    parser.add_argument("--pedigree", default=None, help="Full pedigree parquet for G_ped pair counts")
    parser.add_argument(
        "--max-degree",
        dest="max_degree",
        type=int,
        default=2,
        help="Maximum kinship degree for pair extraction (1-5, default 2)",
    )
    parser.add_argument(
        "--params",
        default=None,
        help="Per-rep params.yaml; enables theoretical Ne expectations",
    )

    args = parser.parse_args()
    init_logging(args)

    gen_censoring = None
    if args.gen_censoring:
        gen_censoring = {int(k): v for k, v in json.loads(args.gen_censoring).items()}

    params = None
    if args.params:
        from simace.core.yaml_io import load_yaml

        params = load_yaml(args.params)

    main(
        args.phenotype,
        args.censor_age,
        args.stats_output,
        args.samples_output,
        seed=args.seed,
        gen_censoring=gen_censoring,
        pedigree_path=args.pedigree,
        max_degree=args.max_degree,
        params=params,
    )

main

main(phenotype_path, censor_age, stats_output, samples_output, seed=42, gen_censoring=None, pedigree_path=None, max_degree=2, case_ascertainment_ratio=1.0, params=None)

Compute all stats for a single rep and write outputs.

Source code in simace/analysis/stats/runner.py
def main(
    phenotype_path: str,
    censor_age: float,
    stats_output: str,
    samples_output: str,
    seed: int = 42,
    gen_censoring: dict[int, list[float]] | None = None,
    pedigree_path: str | None = None,
    max_degree: int = 2,
    case_ascertainment_ratio: float = 1.0,
    params: dict[str, Any] | None = None,
) -> None:
    """Compute all stats for a single rep and write outputs."""
    df = pd.read_parquet(phenotype_path)
    logger.info("Computing stats for %s (%d rows)", phenotype_path, len(df))

    stats: dict[str, Any] = {
        "n_individuals": len(df),
        "n_generations": int(df["generation"].nunique()) if "generation" in df.columns else 1,
    }

    if case_ascertainment_ratio != 1.0:
        stats["case_ascertainment_ratio"] = case_ascertainment_ratio

    stats["prevalence"] = compute_prevalence(df)
    stats["mortality"] = compute_mortality(df, censor_age)
    stats["regression"] = compute_regression(df)
    stats["cumulative_incidence"] = compute_cumulative_incidence(df, censor_age)
    stats["joint_affection"] = compute_joint_affection(df)
    stats["cumulative_incidence_by_sex"] = compute_cumulative_incidence_by_sex(df, censor_age)
    stats["cumulative_incidence_by_sex_generation"] = compute_cumulative_incidence_by_sex_generation(df, censor_age)

    if gen_censoring is not None:
        stats["censoring"] = compute_censoring_windows(df, censor_age, gen_censoring)
        stats["censoring_confusion"] = compute_censoring_confusion(df, censor_age, gen_censoring)
        stats["censoring_cascade"] = compute_censoring_cascade(df, censor_age, gen_censoring)

    stats["person_years"] = compute_person_years(df, censor_age, gen_censoring)
    stats["family_size"] = compute_mean_family_size(df)

    # Read full pedigree once (used for both pair extraction and pair counts)
    df_ped = pd.read_parquet(pedigree_path) if pedigree_path is not None else None

    logger.info("Extracting relationship pairs...")
    t0 = time.perf_counter()
    if df_ped is not None:
        pg = PedigreeGraph.from_subsample(df_ped, df)
        pairs = pg.extract_pairs(max_degree=max_degree)
        full_counts = pg.count_pairs(max_degree=max_degree, scope="full")
    else:
        pg = PedigreeGraph(df)
        pairs = pg.extract_pairs(max_degree=max_degree)
        full_counts = None
    logger.info(
        "Relationship pairs extracted in %.1fs: %s",
        time.perf_counter() - t0,
        ", ".join(f"{k}: {len(v[0])}" for k, v in pairs.items()),
    )

    stats["pair_counts"] = {k: len(v[0]) for k, v in pairs.items()}

    stats["parent_status"] = compute_parent_status(df, df_ped)

    if df_ped is not None and full_counts is not None:
        stats["pair_counts_ped"] = full_counts
        stats["n_individuals_ped"] = len(df_ped)
        stats["n_generations_ped"] = int(df_ped["generation"].nunique()) if "generation" in df_ped.columns else 1
        logger.info(
            "Pedigree pair counts (from same graph): %s",
            ", ".join(f"{k}: {v}" for k, v in full_counts.items()),
        )

    if df_ped is not None:
        logger.info("Computing mate liability correlations...")
        stats["mate_correlation"] = compute_mate_correlation(df_ped)
    del df_ped

    logger.info("Computing effective population size estimators...")
    t0 = time.perf_counter()
    stats["effective_size"] = compute_effective_size(pg, config=params)
    logger.info("Ne estimators computed in %.1fs", time.perf_counter() - t0)

    # Fast sequential computations
    stats["liability_correlations"] = compute_liability_correlations(df, seed=seed, pairs=pairs)
    stats["affected_correlations"] = compute_affected_correlations(df, seed=seed, pairs=pairs)
    stats["parent_offspring_corr"] = compute_parent_offspring_corr(df)
    stats["parent_offspring_corr_by_sex"] = compute_parent_offspring_corr_by_sex(df)
    stats["parent_offspring_affected_corr"] = compute_parent_offspring_affected_corr(df)
    stats["observed_h2_estimators"] = compute_observed_h2_estimators(stats)

    # Expensive MLE computations — run in parallel (scipy.optimize releases the GIL)
    logger.info("Computing tetrachoric correlations in parallel...")
    t_mle = time.perf_counter()
    with ThreadPoolExecutor(max_workers=5) as pool:
        fut_tetra = pool.submit(compute_tetrachoric, df, seed=seed, pairs=pairs)
        fut_tetra_gen = pool.submit(compute_tetrachoric_by_generation, df, seed=seed, pairs=pairs)
        fut_cross = pool.submit(compute_cross_trait_tetrachoric, df, seed=seed, pairs=pairs)
        fut_tetra_sex = pool.submit(compute_tetrachoric_by_sex, df, seed=seed, pairs=pairs)

        stats["tetrachoric"] = fut_tetra.result()
        stats["tetrachoric_by_generation"] = fut_tetra_gen.result()
        stats["cross_trait_tetrachoric"] = fut_cross.result()
        stats["tetrachoric_by_sex"] = fut_tetra_sex.result()
    logger.info("All MLE correlations computed in %.1fs", time.perf_counter() - t_mle)

    stats_path = Path(stats_output)
    stats_path.parent.mkdir(parents=True, exist_ok=True)
    with open(stats_path, "w", encoding="utf-8") as fh:
        yaml.dump(to_native(stats), fh, default_flow_style=False, sort_keys=False)
    logger.info("Stats written to %s", stats_path)

    sample_df = create_sample(df, seed=seed)
    save_parquet(sample_df, Path(samples_output))
    logger.info("Sample (%d rows) written to %s", len(sample_df), samples_output)

create_sample

create_sample(df, seed=42, n_per_gen=50000)

Downsample for scatter/histogram plots, preserving parent rows.

Source code in simace/analysis/stats/sampling.py
def create_sample(
    df: pd.DataFrame,
    seed: int = 42,
    n_per_gen: int = 50_000,
) -> pd.DataFrame:
    """Downsample for scatter/histogram plots, preserving parent rows."""
    rng = np.random.default_rng(seed)
    generations = df["generation"].values
    unique_gens = sorted(np.unique(generations))
    if all(int((generations == g).sum()) <= n_per_gen for g in unique_gens):
        return df.copy()
    ids = df["id"].values
    max_id = int(ids.max()) + 1
    id_to_row = np.full(max_id, -1, dtype=np.int32)
    id_to_row[ids] = np.arange(len(df), dtype=np.int32)
    sampled_chunks = []
    for gen in unique_gens:
        gen_idx = np.where(generations == gen)[0]
        sampled_chunks.append(rng.choice(gen_idx, min(len(gen_idx), n_per_gen), replace=False))
    sampled_rows = np.concatenate(sampled_chunks)
    parent_rows = []
    for pid_arr in (df["mother"].values[sampled_rows], df["father"].values[sampled_rows]):
        valid = (pid_arr >= 0) & (pid_arr < max_id)
        rows = id_to_row[pid_arr[valid]]
        parent_rows.append(rows[rows >= 0])
    final_rows = np.unique(np.concatenate([sampled_rows, *parent_rows]))
    return df.iloc[final_rows].copy()

tetrachoric_corr

tetrachoric_corr(a, b)

Return the MLE tetrachoric correlation between two binary arrays.

PARAMETER DESCRIPTION
a

First binary array.

TYPE: ndarray

b

Second binary array, same length as a.

TYPE: ndarray

RETURNS DESCRIPTION
float

Tetrachoric correlation coefficient.

Source code in simace/analysis/stats/tetrachoric.py
def tetrachoric_corr(a: np.ndarray, b: np.ndarray) -> float:
    """Return the MLE tetrachoric correlation between two binary arrays.

    Args:
        a: First binary array.
        b: Second binary array, same length as *a*.

    Returns:
        Tetrachoric correlation coefficient.
    """
    r, _ = tetrachoric_corr_se(a, b)
    return r

tetrachoric_corr_se

tetrachoric_corr_se(a, b)

Estimate tetrachoric correlation and SE from two binary arrays via MLE.

Delegates the numerical work (Brent optimization + bivariate normal CDF) to the numba-jitted _tetrachoric_core for speed.

Source code in simace/analysis/stats/tetrachoric.py
def tetrachoric_corr_se(a: np.ndarray, b: np.ndarray) -> tuple[float, float]:
    """Estimate tetrachoric correlation and SE from two binary arrays via MLE.

    Delegates the numerical work (Brent optimization + bivariate normal CDF)
    to the numba-jitted ``_tetrachoric_core`` for speed.
    """
    a = np.asarray(a, dtype=bool)
    b = np.asarray(b, dtype=bool)
    n_pairs = len(a)
    if n_pairs < 50:
        logger.warning("tetrachoric_corr_se: n_pairs=%d < 50, SE may be unreliable", n_pairs)

    n11 = float(np.sum(a & b))
    n10 = float(np.sum(a & ~b))
    n01 = float(np.sum(~a & b))
    n00 = float(np.sum(~a & ~b))

    p_a, p_b = a.mean(), b.mean()
    if p_a in (0, 1) or p_b in (0, 1):
        return np.nan, np.nan

    t_a = float(_ndtri_approx(1.0 - p_a))
    t_b = float(_ndtri_approx(1.0 - p_b))
    phi_ta = float(_norm_cdf(t_a))
    phi_tb = float(_norm_cdf(t_b))

    return _tetrachoric_core(n11, n10, n01, n00, t_a, t_b, phi_ta, phi_tb)

stats.tetrachoric

simace.analysis.stats.tetrachoric

Tetrachoric correlation primitives.

Low-level helpers for tetrachoric MLE on binary arrays plus the _tetrachoric_for_pairs pair-subset helper used across pairwise-correlation computations.

tetrachoric_corr

tetrachoric_corr(a, b)

Return the MLE tetrachoric correlation between two binary arrays.

PARAMETER DESCRIPTION
a

First binary array.

TYPE: ndarray

b

Second binary array, same length as a.

TYPE: ndarray

RETURNS DESCRIPTION
float

Tetrachoric correlation coefficient.

Source code in simace/analysis/stats/tetrachoric.py
def tetrachoric_corr(a: np.ndarray, b: np.ndarray) -> float:
    """Return the MLE tetrachoric correlation between two binary arrays.

    Args:
        a: First binary array.
        b: Second binary array, same length as *a*.

    Returns:
        Tetrachoric correlation coefficient.
    """
    r, _ = tetrachoric_corr_se(a, b)
    return r

tetrachoric_corr_se

tetrachoric_corr_se(a, b)

Estimate tetrachoric correlation and SE from two binary arrays via MLE.

Delegates the numerical work (Brent optimization + bivariate normal CDF) to the numba-jitted _tetrachoric_core for speed.

Source code in simace/analysis/stats/tetrachoric.py
def tetrachoric_corr_se(a: np.ndarray, b: np.ndarray) -> tuple[float, float]:
    """Estimate tetrachoric correlation and SE from two binary arrays via MLE.

    Delegates the numerical work (Brent optimization + bivariate normal CDF)
    to the numba-jitted ``_tetrachoric_core`` for speed.
    """
    a = np.asarray(a, dtype=bool)
    b = np.asarray(b, dtype=bool)
    n_pairs = len(a)
    if n_pairs < 50:
        logger.warning("tetrachoric_corr_se: n_pairs=%d < 50, SE may be unreliable", n_pairs)

    n11 = float(np.sum(a & b))
    n10 = float(np.sum(a & ~b))
    n01 = float(np.sum(~a & b))
    n00 = float(np.sum(~a & ~b))

    p_a, p_b = a.mean(), b.mean()
    if p_a in (0, 1) or p_b in (0, 1):
        return np.nan, np.nan

    t_a = float(_ndtri_approx(1.0 - p_a))
    t_b = float(_ndtri_approx(1.0 - p_b))
    phi_ta = float(_norm_cdf(t_a))
    phi_tb = float(_norm_cdf(t_b))

    return _tetrachoric_core(n11, n10, n01, n00, t_a, t_b, phi_ta, phi_tb)

stats.correlations

simace.analysis.stats.correlations

Pairwise relationship correlations, parent-offspring regressions, and h² estimators.

Covers liability/affected pair correlations, tetrachoric correlations across pair types (overall, by generation, by sex, cross-trait), midparent-offspring regressions (overall, by sex, on affected status), the closed-form observed-scale h² estimators derived from those correlations, and the mate-pair correlation matrix.

compute_liability_correlations

compute_liability_correlations(df, seed=42, *, pairs)

Compute Pearson liability correlations per pair type and trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with liability columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each mapping pair type to correlation.

Source code in simace/analysis/stats/correlations.py
def compute_liability_correlations(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute Pearson liability correlations per pair type and trait.

    Args:
        df: Phenotype DataFrame with liability columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each mapping pair type to correlation.
    """
    result = {}
    for trait_num in [1, 2]:
        liability = df[f"liability{trait_num}"].values
        trait_result: dict[str, float | None] = {}
        for ptype in PAIR_TYPES:
            idx1, idx2 = pairs[ptype]
            trait_result[ptype] = float(_pearsonr_core(liability[idx1], liability[idx2])) if len(idx1) >= 10 else None
        result[f"trait{trait_num}"] = trait_result
    return result

compute_affected_correlations

compute_affected_correlations(df, seed=42, *, pairs)

Compute Pearson correlations on binary affected status per pair type and trait.

This is the phi coefficient — Pearson r on {0, 1} data — and is the input to observed-scale Falconer-style h² estimators (e.g. 2·(r_MZ − r_FS)).

PARAMETER DESCRIPTION
df

Phenotype DataFrame with affected{1,2} columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each mapping pair type to phi r or

dict[str, Any]

None (if fewer than 10 pairs, or either side is constant).

Source code in simace/analysis/stats/correlations.py
def compute_affected_correlations(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute Pearson correlations on binary affected status per pair type and trait.

    This is the phi coefficient — Pearson r on {0, 1} data — and is the input
    to observed-scale Falconer-style h² estimators (e.g. ``2·(r_MZ − r_FS)``).

    Args:
        df: Phenotype DataFrame with ``affected{1,2}`` columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each mapping pair type to phi r or
        None (if fewer than 10 pairs, or either side is constant).
    """
    result = {}
    for trait_num in [1, 2]:
        affected = df[f"affected{trait_num}"].values.astype(np.float64)
        trait_result: dict[str, float | None] = {}
        for ptype in PAIR_TYPES:
            idx1, idx2 = pairs[ptype]
            if len(idx1) < 10:
                trait_result[ptype] = None
                continue
            r = safe_corrcoef(affected[idx1], affected[idx2])
            trait_result[ptype] = None if np.isnan(r) else r
        result[f"trait{trait_num}"] = trait_result
    return result

compute_tetrachoric

compute_tetrachoric(df, seed=42, *, pairs)

Compute tetrachoric correlations per pair type and trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with binary affection columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each mapping pair type to

dict[str, Any]

{r, se, n_pairs}.

Source code in simace/analysis/stats/correlations.py
def compute_tetrachoric(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute tetrachoric correlations per pair type and trait.

    Args:
        df: Phenotype DataFrame with binary affection columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each mapping pair type to
        ``{r, se, n_pairs}``.
    """
    result = {}
    for trait_num in [1, 2]:
        affected = df[f"affected{trait_num}"].values.astype(bool)
        trait_result = {}
        for ptype in PAIR_TYPES:
            idx1, idx2 = pairs[ptype]
            trait_result[ptype] = _tetrachoric_for_pairs(idx1, idx2, affected)
        result[f"trait{trait_num}"] = trait_result
    return result

compute_tetrachoric_by_generation

compute_tetrachoric_by_generation(df, seed=42, *, pairs)

Compute tetrachoric correlations stratified by generation.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with generation and affection columns.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by gen{N}, each containing per-trait per-pair-type

dict[str, Any]

{r, se, n_pairs, liability_r}.

Source code in simace/analysis/stats/correlations.py
def compute_tetrachoric_by_generation(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute tetrachoric correlations stratified by generation.

    Args:
        df: Phenotype DataFrame with generation and affection columns.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict keyed by ``gen{N}``, each containing per-trait per-pair-type
        ``{r, se, n_pairs, liability_r}``.
    """
    if "generation" not in df.columns:
        return {}
    gen_arr = df["generation"].values
    max_gen = int(gen_arr.max())
    plot_gens = list(range(max(1, max_gen - 2), max_gen + 1))
    affected_by_trait = {n: df[f"affected{n}"].values.astype(bool) for n in (1, 2)}
    liability_by_trait = {n: df[f"liability{n}"].values for n in (1, 2)}
    result = {}
    for gen in plot_gens:
        gen_result = {}
        for trait_num in (1, 2):
            affected = affected_by_trait[trait_num]
            liability = liability_by_trait[trait_num]
            trait_result = {}
            for ptype in PAIR_TYPES:
                idx1, idx2 = pairs[ptype]
                mask = gen_arr[idx1] == gen
                trait_result[ptype] = _tetrachoric_for_pairs(idx1[mask], idx2[mask], affected, liability)
            gen_result[f"trait{trait_num}"] = trait_result
        result[f"gen{gen}"] = gen_result
    return result

compute_cross_trait_tetrachoric

compute_cross_trait_tetrachoric(df, seed=42, *, pairs)

Compute cross-trait tetrachoric correlations (trait 1 vs trait 2).

Includes same-person, same-person-by-generation, and cross-person (across relationship pair types) correlations.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with binary affection columns for both traits.

TYPE: DataFrame

seed

Random seed (unused, kept for API consistency).

TYPE: int DEFAULT: 42

pairs

Pre-extracted relationship pairs.

TYPE: dict[str, tuple[ndarray, ndarray]]

RETURNS DESCRIPTION
dict[str, Any]

Dict with keys same_person, same_person_by_generation,

dict[str, Any]

and cross_person.

Source code in simace/analysis/stats/correlations.py
def compute_cross_trait_tetrachoric(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute cross-trait tetrachoric correlations (trait 1 vs trait 2).

    Includes same-person, same-person-by-generation, and cross-person
    (across relationship pair types) correlations.

    Args:
        df: Phenotype DataFrame with binary affection columns for both traits.
        seed: Random seed (unused, kept for API consistency).
        pairs: Pre-extracted relationship pairs.

    Returns:
        Dict with keys ``same_person``, ``same_person_by_generation``,
        and ``cross_person``.
    """
    a1 = df["affected1"].values.astype(bool)
    a2 = df["affected2"].values.astype(bool)
    r_sp, se_sp = tetrachoric_corr_se(a1, a2)
    result: dict[str, Any] = {
        "same_person": {
            "r": float(r_sp) if not np.isnan(r_sp) else None,
            "se": float(se_sp) if not np.isnan(se_sp) else None,
            "n": len(df),
        }
    }
    by_gen: dict[str, Any] = {}
    if "generation" in df.columns:
        gen_arr = df["generation"].values
        max_gen = int(gen_arr.max())
        plot_gens = list(range(max(1, max_gen - 2), max_gen + 1))
        for gen in plot_gens:
            mask = gen_arr == gen
            n_g = int(mask.sum())
            if n_g < 50:
                by_gen[f"gen{gen}"] = {"r": None, "se": None, "n": n_g}
                continue
            r_g, se_g = tetrachoric_corr_se(a1[mask], a2[mask])
            by_gen[f"gen{gen}"] = {
                "r": float(r_g) if not np.isnan(r_g) else None,
                "se": float(se_g) if not np.isnan(se_g) else None,
                "n": n_g,
            }
    result["same_person_by_generation"] = by_gen
    cross: dict[str, Any] = {}
    for ptype in PAIR_TYPES:
        idx1, idx2 = pairs[ptype]
        n_p = len(idx1)
        if n_p < 10:
            cross[ptype] = {"r": None, "se": None, "n_pairs": int(n_p)}
            continue
        r_cp, se_cp = tetrachoric_corr_se(a1[idx1], a2[idx2])
        cross[ptype] = {
            "r": float(r_cp) if not np.isnan(r_cp) else None,
            "se": float(se_cp) if not np.isnan(se_cp) else None,
            "n_pairs": int(n_p),
        }
    result["cross_person"] = cross
    return result

compute_tetrachoric_by_sex

compute_tetrachoric_by_sex(df, seed=42, *, pairs)

Compute tetrachoric correlations for same-sex pairs only (FF and MM).

Returns dict keyed by "female"/"male", each containing per-trait per-pair-type {r, se, n_pairs, liability_r}.

Source code in simace/analysis/stats/correlations.py
def compute_tetrachoric_by_sex(
    df: pd.DataFrame,
    seed: int = 42,
    *,
    pairs: dict[str, tuple[np.ndarray, np.ndarray]],
) -> dict[str, Any]:
    """Compute tetrachoric correlations for same-sex pairs only (FF and MM).

    Returns dict keyed by "female"/"male", each containing per-trait
    per-pair-type {r, se, n_pairs, liability_r}.
    """
    sex_arr = df["sex"].values
    affected_by_trait = {n: df[f"affected{n}"].values.astype(bool) for n in (1, 2)}
    liability_by_trait = {n: df[f"liability{n}"].values for n in (1, 2)}
    result: dict[str, Any] = {}
    for sex_val, sex_label in SEX_LEVELS:
        sex_result: dict[str, Any] = {}
        for trait_num in (1, 2):
            affected = affected_by_trait[trait_num]
            liability = liability_by_trait[trait_num]
            trait_result: dict[str, Any] = {}
            for ptype in PAIR_TYPES:
                idx1, idx2 = pairs[ptype]
                sex_mask = (sex_arr[idx1] == sex_val) & (sex_arr[idx2] == sex_val)
                trait_result[ptype] = _tetrachoric_for_pairs(idx1[sex_mask], idx2[sex_mask], affected, liability)
            sex_result[f"trait{trait_num}"] = trait_result
        result[sex_label] = sex_result
    return result

compute_parent_offspring_corr

compute_parent_offspring_corr(df)

Compute midparent-offspring liability regression per generation and trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with liability, generation, and parent columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each containing per-generation

dict[str, Any]

regression stats (slope, r, r2, intercept, stderr, pvalue, n_pairs).

Source code in simace/analysis/stats/correlations.py
def compute_parent_offspring_corr(df: pd.DataFrame) -> dict[str, Any]:
    """Compute midparent-offspring liability regression per generation and trait.

    Args:
        df: Phenotype DataFrame with liability, generation, and parent columns.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each containing per-generation
        regression stats (slope, r, r2, intercept, stderr, pvalue, n_pairs).
    """
    if "generation" not in df.columns:
        return {}
    max_gen = int(df["generation"].max())
    ids_arr = df["id"].values
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(len(df), dtype=np.int32)
    gen_arr = df["generation"].values
    result = {}
    for trait_num in [1, 2]:
        liability = df[f"liability{trait_num}"].values
        trait_result = {}
        for gen in range(1, max_gen + 1):
            gen_idx = np.where(gen_arr == gen)[0]
            trait_result[f"gen{gen}"] = _po_regression(gen_idx, liability, id_to_row, df)
        result[f"trait{trait_num}"] = trait_result
    return result

compute_parent_offspring_affected_corr

compute_parent_offspring_affected_corr(df)

Compute pooled midparent-offspring regression on binary affected status.

Regresses offspring.affected (0/1) on midparent affected status (mother.affected + father.affected) / 2 (values in {0, 0.5, 1}), pooled across every non-founder individual whose parents are both in the DataFrame. The regression slope is the observed-scale PO heritability estimator; under LTM it can be back-transformed to liability via Dempster-Lerner.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with id, mother, father, and affected{1,2} columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed trait1/trait2, each with

dict[str, Any]

{slope, r, r2, intercept, stderr, pvalue, n_pairs}. Values are

dict[str, Any]

None when fewer than 10 valid trios or midparent has zero variance.

Source code in simace/analysis/stats/correlations.py
def compute_parent_offspring_affected_corr(df: pd.DataFrame) -> dict[str, Any]:
    """Compute pooled midparent-offspring regression on binary affected status.

    Regresses ``offspring.affected`` (0/1) on midparent affected status
    ``(mother.affected + father.affected) / 2`` (values in {0, 0.5, 1}),
    pooled across every non-founder individual whose parents are both in the
    DataFrame.  The regression slope is the observed-scale PO heritability
    estimator; under LTM it can be back-transformed to liability via
    Dempster-Lerner.

    Args:
        df: Phenotype DataFrame with ``id``, ``mother``, ``father``, and
            ``affected{1,2}`` columns.

    Returns:
        Dict keyed ``trait1``/``trait2``, each with
        ``{slope, r, r2, intercept, stderr, pvalue, n_pairs}``.  Values are
        None when fewer than 10 valid trios or midparent has zero variance.
    """
    if "id" not in df.columns or "mother" not in df.columns or "father" not in df.columns:
        return {}

    ids_arr = df["id"].values
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(len(df), dtype=np.int32)

    non_founder_idx = np.where(df["mother"].values >= 0)[0]

    null = {
        "r": None,
        "r2": None,
        "slope": None,
        "intercept": None,
        "stderr": None,
        "pvalue": None,
        "n_pairs": 0,
    }

    # Precompute valid trios once (parents present, looked up via id_to_row).
    mother_ids_arr = df["mother"].values[non_founder_idx]
    father_ids_arr = df["father"].values[non_founder_idx]
    has_m = (mother_ids_arr >= 0) & (mother_ids_arr < len(id_to_row))
    has_f = (father_ids_arr >= 0) & (father_ids_arr < len(id_to_row))
    m_rows = np.full(len(non_founder_idx), -1, dtype=np.int32)
    f_rows = np.full(len(non_founder_idx), -1, dtype=np.int32)
    m_rows[has_m] = id_to_row[mother_ids_arr[has_m]]
    f_rows[has_f] = id_to_row[father_ids_arr[has_f]]
    valid = (m_rows >= 0) & (f_rows >= 0)
    n_pairs = int(valid.sum())

    result: dict[str, Any] = {}
    for trait_num in [1, 2]:
        aff_col = f"affected{trait_num}"
        if aff_col not in df.columns:
            result[f"trait{trait_num}"] = {**null}
            continue
        affected = df[aff_col].values.astype(np.float64)
        if n_pairs < 10:
            result[f"trait{trait_num}"] = {**null, "n_pairs": n_pairs}
            continue
        midparent = (affected[m_rows[valid]] + affected[f_rows[valid]]) / 2.0
        # Zero-variance midparent (all parents concordant) gives an undefined
        # regression; surface as None rather than 0/0.
        if float(np.var(midparent)) < 1e-12:
            result[f"trait{trait_num}"] = {**null, "n_pairs": n_pairs}
            continue
        entry = _po_regression(non_founder_idx, affected, id_to_row, df)
        slope = entry.get("slope")
        if slope is not None and not np.isfinite(slope):
            entry = {**null, "n_pairs": entry.get("n_pairs", 0)}
        result[f"trait{trait_num}"] = entry
    return result

compute_parent_offspring_corr_by_sex

compute_parent_offspring_corr_by_sex(df)

Compute midparent-offspring regression partitioned by offspring sex.

Returns dict keyed by "female"/"male", each containing per-trait per-generation {slope, r, r2, intercept, stderr, pvalue, n_pairs}.

Source code in simace/analysis/stats/correlations.py
def compute_parent_offspring_corr_by_sex(df: pd.DataFrame) -> dict[str, Any]:
    """Compute midparent-offspring regression partitioned by offspring sex.

    Returns dict keyed by "female"/"male", each containing per-trait
    per-generation {slope, r, r2, intercept, stderr, pvalue, n_pairs}.
    """
    if "generation" not in df.columns:
        return {}
    max_gen = int(df["generation"].max())
    ids_arr = df["id"].values
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(len(df), dtype=np.int32)
    gen_arr = df["generation"].values
    sex_arr = df["sex"].values
    result: dict[str, Any] = {}
    for sex_val, sex_label in SEX_LEVELS:
        sex_result: dict[str, Any] = {}
        for trait_num in [1, 2]:
            liability = df[f"liability{trait_num}"].values
            trait_result: dict[str, Any] = {}
            for gen in range(1, max_gen + 1):
                gen_idx = np.where((gen_arr == gen) & (sex_arr == sex_val))[0]
                trait_result[f"gen{gen}"] = _po_regression(gen_idx, liability, id_to_row, df)
            sex_result[f"trait{trait_num}"] = trait_result
        result[sex_label] = sex_result
    return result

compute_observed_h2_estimators

compute_observed_h2_estimators(stats)

Derive five naive observed-scale h² estimators from precomputed correlations.

Reads from stats["affected_correlations"] (phi r per pair type) and stats["parent_offspring_affected_corr"] (PO regression slope on binary). Each estimator is a closed-form combination that, under a liability-threshold model, is an unbiased estimator of h²_liab · z(K)²/(K(1−K)) — i.e. the observed-scale h² — where K is the affected-status prevalence.

PARAMETER DESCRIPTION
stats

The in-progress stats dict with affected_correlations and parent_offspring_affected_corr already populated.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed trait1/trait2, each mapping estimator name to a

dict[str, Any]

float or None: {falconer, sibs, po, hs, cousins}.

Source code in simace/analysis/stats/correlations.py
def compute_observed_h2_estimators(stats: dict[str, Any]) -> dict[str, Any]:
    """Derive five naive observed-scale h² estimators from precomputed correlations.

    Reads from ``stats["affected_correlations"]`` (phi r per pair type) and
    ``stats["parent_offspring_affected_corr"]`` (PO regression slope on binary).
    Each estimator is a closed-form combination that, under a liability-threshold
    model, is an unbiased estimator of ``h²_liab · z(K)²/(K(1−K))`` — i.e. the
    observed-scale h² — where K is the affected-status prevalence.

    Args:
        stats: The in-progress stats dict with ``affected_correlations`` and
            ``parent_offspring_affected_corr`` already populated.

    Returns:
        Dict keyed ``trait1``/``trait2``, each mapping estimator name to a
        float or None: ``{falconer, sibs, po, hs, cousins}``.
    """
    aff = stats.get("affected_correlations", {}) or {}
    po_all = stats.get("parent_offspring_affected_corr", {}) or {}

    def _two_diff(r_a: Any, r_b: Any) -> float | None:
        if r_a is None or r_b is None:
            return None
        return 2.0 * (float(r_a) - float(r_b))

    def _scale(r: Any, factor: float) -> float | None:
        if r is None:
            return None
        return factor * float(r)

    def _mean_hs(r_mhs: Any, r_phs: Any) -> float | None:
        vals = [float(v) for v in (r_mhs, r_phs) if v is not None]
        if not vals:
            return None
        return 4.0 * (sum(vals) / len(vals))

    result: dict[str, Any] = {}
    for trait_num in [1, 2]:
        key = f"trait{trait_num}"
        rs = aff.get(key, {}) or {}
        po_entry = po_all.get(key, {}) or {}
        po_slope = po_entry.get("slope")
        result[key] = {
            "falconer": _two_diff(rs.get("MZ"), rs.get("FS")),
            "sibs": _scale(rs.get("FS"), 2.0),
            "po": float(po_slope) if po_slope is not None else None,
            "hs": _mean_hs(rs.get("MHS"), rs.get("PHS")),
            "cousins": _scale(rs.get("1C"), 8.0),
        }
    return result

compute_mate_correlation

compute_mate_correlation(df)

Compute 2x2 Pearson correlation matrix between mated pairs' liabilities.

Each unique (mother, father) pair is counted once (not weighted by offspring). Only non-founders are considered.

Source code in simace/analysis/stats/correlations.py
def compute_mate_correlation(df: pd.DataFrame) -> dict:
    """Compute 2x2 Pearson correlation matrix between mated pairs' liabilities.

    Each unique (mother, father) pair is counted once (not weighted by offspring).
    Only non-founders are considered.
    """
    nf = df[df["mother"] != -1][["mother", "father"]].drop_duplicates()
    if len(nf) < 2:
        return {"matrix": [[float("nan")] * 2] * 2, "n_pairs": 0}

    lookup = df.set_index("id")[["liability1", "liability2"]]
    f_liab = lookup.loc[nf["mother"].values].values  # (N, 2)
    m_liab = lookup.loc[nf["father"].values].values  # (N, 2)

    matrix = [[float(safe_corrcoef(f_liab[:, i], m_liab[:, j])) for j in range(2)] for i in range(2)]
    return {"matrix": matrix, "n_pairs": len(nf)}

stats.incidence

simace.analysis.stats.incidence

Prevalence, mortality, cumulative incidence, and joint-affection statistics.

compute_mortality

compute_mortality(df, censor_age)

Compute decade-binned mortality rates from death ages.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with death_age column.

TYPE: DataFrame

censor_age

Maximum observation age.

TYPE: float

RETURNS DESCRIPTION
dict[str, Any]

Dict with decade_labels and rates lists.

Source code in simace/analysis/stats/incidence.py
def compute_mortality(df: pd.DataFrame, censor_age: float) -> dict[str, Any]:
    """Compute decade-binned mortality rates from death ages.

    Args:
        df: Phenotype DataFrame with ``death_age`` column.
        censor_age: Maximum observation age.

    Returns:
        Dict with ``decade_labels`` and ``rates`` lists.
    """
    decade_edges = np.arange(0, censor_age + 10, 10)
    mortality_rates, decade_labels = [], []
    death_ages = df["death_age"].values
    for i in range(len(decade_edges) - 1):
        lo, hi = decade_edges[i], decade_edges[i + 1]
        if lo >= censor_age:
            break
        alive = (death_ages >= lo).sum()
        died = ((death_ages >= lo) & (death_ages < hi) & (death_ages < censor_age)).sum()
        mortality_rates.append(float(died / alive) if alive > 0 else 0.0)
        decade_labels.append(f"{int(lo)}-{int(hi - 1)}")
    return {"decade_labels": decade_labels, "rates": mortality_rates}

compute_cumulative_incidence

compute_cumulative_incidence(df, censor_age, n_points=200)

Compute observed and true cumulative incidence curves per trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with event time and affection columns.

TYPE: DataFrame

censor_age

Maximum observation age for the x-axis grid.

TYPE: float

n_points

Number of age grid points.

TYPE: int DEFAULT: 200

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each with ages,

dict[str, Any]

observed_values, true_values, and half_target_age.

Source code in simace/analysis/stats/incidence.py
def compute_cumulative_incidence(
    df: pd.DataFrame,
    censor_age: float,
    n_points: int = 200,
) -> dict[str, Any]:
    """Compute observed and true cumulative incidence curves per trait.

    Args:
        df: Phenotype DataFrame with event time and affection columns.
        censor_age: Maximum observation age for the x-axis grid.
        n_points: Number of age grid points.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each with ``ages``,
        ``observed_values``, ``true_values``, and ``half_target_age``.
    """
    ages = np.linspace(0, censor_age, n_points)
    n = len(df)
    result = {}
    for trait_num in [1, 2]:
        aff = df[f"affected{trait_num}"].values.astype(bool)
        t_obs = df[f"t_observed{trait_num}"].values
        t_raw = df[f"t{trait_num}"].values
        obs_inc = _cumulative_curve(t_obs, ages, n, mask=aff)
        true_inc = _cumulative_curve(t_raw, ages, n)
        half_idx = np.searchsorted(obs_inc, obs_inc[-1] / 2)
        result[f"trait{trait_num}"] = {
            "ages": ages.tolist(),
            "observed_values": obs_inc.tolist(),
            "true_values": true_inc.tolist(),
            "half_target_age": float(ages[min(half_idx, len(ages) - 1)]),
        }
    return result

compute_regression

compute_regression(df)

Regress observed event time on liability for affected individuals.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with liability and observed-time columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict keyed by trait1/trait2, each with regression stats

dict[str, Any]

(slope, intercept, r, r2, stderr, pvalue, n) or None.

Source code in simace/analysis/stats/incidence.py
def compute_regression(df: pd.DataFrame) -> dict[str, Any]:
    """Regress observed event time on liability for affected individuals.

    Args:
        df: Phenotype DataFrame with liability and observed-time columns.

    Returns:
        Dict keyed by ``trait1``/``trait2``, each with regression stats
        (slope, intercept, r, r2, stderr, pvalue, n) or None.
    """
    result: dict[str, Any] = {}
    for trait_num in [1, 2]:
        aff_col = f"affected{trait_num}"
        t_col = f"t_observed{trait_num}"
        liab_col = f"liability{trait_num}"
        if liab_col not in df.columns:
            result[f"trait{trait_num}"] = None
            continue
        sub = df[df[aff_col]].dropna(subset=[liab_col, t_col])
        if len(sub) < 2:
            result[f"trait{trait_num}"] = None
            continue
        slope, intercept, r, stderr, pvalue = fast_linregress(sub[liab_col].values, sub[t_col].values)
        result[f"trait{trait_num}"] = {
            "slope": slope,
            "intercept": intercept,
            "r": r,
            "r2": r**2,
            "stderr": stderr,
            "pvalue": pvalue,
            "n": len(sub),
        }
    return result

compute_prevalence

compute_prevalence(df)

Compute observed prevalence for each trait.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with affected1 and affected2 columns. If a generation column is present, per-generation prevalence is also reported under by_generation.

TYPE: DataFrame

RETURNS DESCRIPTION
dict[str, Any]

Dict with trait1 and trait2 marginal prevalence fractions, and

dict[str, Any]

(when generation is present) a by_generation subkey mapping

dict[str, Any]

int(generation) -> {"trait1": float, "trait2": float}.

Source code in simace/analysis/stats/incidence.py
def compute_prevalence(df: pd.DataFrame) -> dict[str, Any]:
    """Compute observed prevalence for each trait.

    Args:
        df: Phenotype DataFrame with ``affected1`` and ``affected2`` columns.
            If a ``generation`` column is present, per-generation prevalence
            is also reported under ``by_generation``.

    Returns:
        Dict with ``trait1`` and ``trait2`` marginal prevalence fractions, and
        (when ``generation`` is present) a ``by_generation`` subkey mapping
        ``int(generation) -> {"trait1": float, "trait2": float}``.
    """
    result: dict[str, Any] = {
        "trait1": float(df["affected1"].mean()),
        "trait2": float(df["affected2"].mean()),
    }
    if "generation" in df.columns:
        means = df.groupby("generation")[["affected1", "affected2"]].mean()
        result["by_generation"] = {
            int(gen): {"trait1": float(row["affected1"]), "trait2": float(row["affected2"])}
            for gen, row in means.iterrows()
        }
    return result

compute_joint_affection

compute_joint_affection(df)

Compute 2x2 contingency table for trait1 x trait2 affection status.

Source code in simace/analysis/stats/incidence.py
def compute_joint_affection(df: pd.DataFrame) -> dict[str, Any]:
    """Compute 2x2 contingency table for trait1 x trait2 affection status."""
    a1 = df["affected1"].values.astype(bool)
    a2 = df["affected2"].values.astype(bool)
    n = len(df)

    counts = {
        "both": int(np.sum(a1 & a2)),
        "trait1_only": int(np.sum(a1 & ~a2)),
        "trait2_only": int(np.sum(~a1 & a2)),
        "neither": int(np.sum(~a1 & ~a2)),
    }
    proportions = {k: v / n for k, v in counts.items()}

    # Sex-specific co-affection proportions
    by_sex: dict[str, float] = {}
    if "sex" in df.columns:
        for sex_val, sex_label in SEX_LEVELS:
            mask = df["sex"].values == sex_val
            n_sex = int(mask.sum())
            if n_sex > 0:
                by_sex[sex_label] = round(float(np.sum(a1[mask] & a2[mask])) / n_sex, 4)

    return {"counts": counts, "proportions": proportions, "n": n, "by_sex": by_sex}

compute_cumulative_incidence_by_sex

compute_cumulative_incidence_by_sex(df, censor_age, n_points=200)

Compute cumulative incidence curves split by sex (0=female, 1=male).

Source code in simace/analysis/stats/incidence.py
def compute_cumulative_incidence_by_sex(
    df: pd.DataFrame,
    censor_age: float,
    n_points: int = 200,
) -> dict[str, Any]:
    """Compute cumulative incidence curves split by sex (0=female, 1=male)."""
    if "sex" not in df.columns:
        return {}

    ages = np.linspace(0, censor_age, n_points)
    sex = df["sex"].values
    result = {}
    for trait_num in [1, 2]:
        aff = df[f"affected{trait_num}"].values.astype(bool)
        t_obs_aff = df[f"t_observed{trait_num}"].values[aff]
        sex_aff = sex[aff]

        trait_result = {}
        for sex_val, sex_label in SEX_LEVELS:
            n_sex = int((sex == sex_val).sum())
            if n_sex == 0:
                continue
            in_stratum_aff = sex_aff == sex_val
            inc = _cumulative_curve(t_obs_aff, ages, n_sex, mask=in_stratum_aff)
            trait_result[sex_label] = {
                "ages": ages.tolist(),
                "values": inc.tolist(),
                "n": n_sex,
                "prevalence": float(in_stratum_aff.sum() / n_sex),
            }
        result[f"trait{trait_num}"] = trait_result
    return result

compute_cumulative_incidence_by_sex_generation

compute_cumulative_incidence_by_sex_generation(df, censor_age, n_points=200)

Compute cumulative incidence curves split by sex and generation.

Source code in simace/analysis/stats/incidence.py
def compute_cumulative_incidence_by_sex_generation(
    df: pd.DataFrame,
    censor_age: float,
    n_points: int = 200,
) -> dict[str, Any]:
    """Compute cumulative incidence curves split by sex and generation."""
    if "sex" not in df.columns or "generation" not in df.columns:
        return {}

    ages = np.linspace(0, censor_age, n_points)
    generations = sorted(df["generation"].unique())
    sex = df["sex"].values
    gen_arr = df["generation"].values
    result = {}
    for trait_num in [1, 2]:
        aff = df[f"affected{trait_num}"].values.astype(bool)
        t_obs_aff = df[f"t_observed{trait_num}"].values[aff]
        sex_aff = sex[aff]
        gen_aff = gen_arr[aff]

        trait_result: dict[str, Any] = {}
        for gen in generations:
            gen_result: dict[str, Any] = {}
            for sex_val, sex_label in SEX_LEVELS:
                n_sex = int(((gen_arr == gen) & (sex == sex_val)).sum())
                if n_sex == 0:
                    continue
                in_stratum_aff = (gen_aff == gen) & (sex_aff == sex_val)
                inc = _cumulative_curve(t_obs_aff, ages, n_sex, mask=in_stratum_aff)
                gen_result[sex_label] = {
                    "ages": ages.tolist(),
                    "values": inc.tolist(),
                    "n": n_sex,
                    "prevalence": float(in_stratum_aff.sum() / n_sex),
                }
            trait_result[f"gen{int(gen)}"] = gen_result
        result[f"trait{trait_num}"] = trait_result
    return result

stats.censoring

simace.analysis.stats.censoring

Censoring window, confusion-matrix, cascade, and person-years statistics.

compute_censoring_windows

compute_censoring_windows(df, censor_age, gen_censoring, n_points=300)

Compute per-generation censoring breakdown and incidence curves.

PARAMETER DESCRIPTION
df

Phenotype DataFrame with generation, event time, and censoring columns.

TYPE: DataFrame

censor_age

Maximum observation age.

TYPE: float

gen_censoring

Dict mapping generation to [left, right] observation window.

TYPE: dict[int, list[float]]

n_points

Number of age grid points for incidence curves.

TYPE: int DEFAULT: 300

RETURNS DESCRIPTION
dict[str, Any] | None

Dict with per-generation censoring stats, or None if no generation column.

Source code in simace/analysis/stats/censoring.py
def compute_censoring_windows(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]],
    n_points: int = 300,
) -> dict[str, Any] | None:
    """Compute per-generation censoring breakdown and incidence curves.

    Args:
        df: Phenotype DataFrame with generation, event time, and censoring columns.
        censor_age: Maximum observation age.
        gen_censoring: Dict mapping generation to ``[left, right]`` observation window.
        n_points: Number of age grid points for incidence curves.

    Returns:
        Dict with per-generation censoring stats, or None if no generation column.
    """
    if "generation" not in df.columns:
        return None
    ages = np.linspace(0, censor_age, n_points)
    result = {}
    for gen in sorted(gen_censoring.keys()):
        win_lo, win_hi = gen_censoring[gen]
        gen_df = df[df["generation"] == gen]
        n_gen = len(gen_df)
        if n_gen == 0:
            result[f"gen{gen}"] = {"n": 0}
            continue
        gen_result: dict[str, Any] = {"n": int(n_gen)}
        for trait_num in [1, 2]:
            t_raw = gen_df[f"t{trait_num}"].values
            t_obs = gen_df[f"t_observed{trait_num}"].values
            affected = gen_df[f"affected{trait_num}"].values.astype(bool)
            death_c = gen_df[f"death_censored{trait_num}"].values.astype(bool)
            obs_inc = np.searchsorted(np.sort(t_obs[affected]), ages, side="right") / n_gen
            true_inc = np.searchsorted(np.sort(t_raw), ages, side="right") / n_gen
            gen_result[f"trait{trait_num}"] = {
                "true_incidence": true_inc.tolist(),
                "observed_incidence": obs_inc.tolist(),
                "pct_affected": float(affected.mean()),
                "left_censored": float((t_raw < win_lo).sum() / n_gen),
                "right_censored": float((t_raw > win_hi).sum() / n_gen),
                "death_censored": float((death_c & ~(t_raw < win_lo) & ~(t_raw > win_hi)).mean()),
            }
        result[f"gen{gen}"] = gen_result
    return {"generations": result, "censoring_ages": ages.tolist(), "censor_age": int(censor_age)}

compute_censoring_confusion

compute_censoring_confusion(df, censor_age, gen_censoring)

Compute per-trait 2x2 confusion matrix: true affected vs. observed affected.

Only includes individuals from phenotyped generations (observation window > 0).

Source code in simace/analysis/stats/censoring.py
def compute_censoring_confusion(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]],
) -> dict[str, Any]:
    """Compute per-trait 2x2 confusion matrix: true affected vs. observed affected.

    Only includes individuals from phenotyped generations (observation window > 0).
    """
    if "generation" in df.columns:
        active_gens = {int(g) for g, (lo, hi) in gen_censoring.items() if hi > lo}
        if active_gens:
            df = df[df["generation"].isin(active_gens)]

    result = {}
    for trait in [1, 2]:
        t_col = f"t{trait}"
        a_col = f"affected{trait}"
        if t_col not in df.columns or a_col not in df.columns:
            continue
        true_aff = df[t_col].values < censor_age
        obs_aff = df[a_col].values.astype(bool)
        n = len(df)
        result[f"trait{trait}"] = {
            "tp": int(np.sum(true_aff & obs_aff)),
            "fn": int(np.sum(true_aff & ~obs_aff)),
            "fp": int(np.sum(~true_aff & obs_aff)),
            "tn": int(np.sum(~true_aff & ~obs_aff)),
            "n": n,
        }
    return result

compute_censoring_cascade

compute_censoring_cascade(df, censor_age, gen_censoring)

Per-trait, per-generation decomposition of true cases by censoring fate.

Source code in simace/analysis/stats/censoring.py
def compute_censoring_cascade(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]],
) -> dict[str, Any]:
    """Per-trait, per-generation decomposition of true cases by censoring fate."""
    if "generation" not in df.columns:
        return {}

    windows: dict[int, tuple[float, float]] = {}
    for g, (lo, hi) in gen_censoring.items():
        if hi > lo:
            windows[int(g)] = (lo, hi)

    if not windows:
        return {}

    has_death = "death_age" in df.columns
    result: dict[str, Any] = {}
    for trait in [1, 2]:
        t_col = f"t{trait}"
        a_col = f"affected{trait}"
        if t_col not in df.columns or a_col not in df.columns:
            continue
        trait_result: dict[str, Any] = {}
        for g in sorted(windows.keys()):
            lo, hi = windows[g]
            gen_mask = df["generation"] == g
            df_g = df.loc[gen_mask]
            t = df_g[t_col].values
            true_affected = t < censor_age
            n_true = int(true_affected.sum())
            n_gen = len(df_g)

            if n_true == 0:
                trait_result[f"gen{g}"] = {
                    "observed": 0,
                    "death_censored": 0,
                    "right_censored": 0,
                    "left_truncated": 0,
                    "true_affected": 0,
                    "n_gen": n_gen,
                    "sensitivity": float("nan"),
                    "window": [lo, hi],
                }
                continue

            left_trunc = true_affected & (t < lo)
            right_cens = true_affected & (t > hi)
            in_window = true_affected & (t >= lo) & (t <= hi)

            if has_death:
                death_age = df_g["death_age"].values
                death_cens = in_window & (death_age < t)
                observed = in_window & (death_age >= t)
            else:
                death_cens = np.zeros_like(in_window)
                observed = in_window

            n_obs = int(observed.sum())
            trait_result[f"gen{g}"] = {
                "observed": n_obs,
                "death_censored": int(death_cens.sum()),
                "right_censored": int(right_cens.sum()),
                "left_truncated": int(left_trunc.sum()),
                "true_affected": n_true,
                "n_gen": n_gen,
                "sensitivity": n_obs / n_true if n_true > 0 else float("nan"),
                "window": [lo, hi],
            }
        result[f"trait{trait}"] = trait_result
    return result

compute_person_years

compute_person_years(df, censor_age, gen_censoring=None)

Compute person-years of follow-up, total and per-trait at-risk.

For each individual in generation g with observation window [lo, hi]: - Total follow-up ends at min(death_age, hi). - Trait-specific at-risk time ends at min(t_observed, death_age, hi).

Returns dict with total_person_years and per-trait person_years_at_risk.

Source code in simace/analysis/stats/censoring.py
def compute_person_years(
    df: pd.DataFrame,
    censor_age: float,
    gen_censoring: dict[int, list[float]] | None = None,
) -> dict[str, Any]:
    """Compute person-years of follow-up, total and per-trait at-risk.

    For each individual in generation *g* with observation window [lo, hi]:
      - Total follow-up ends at min(death_age, hi).
      - Trait-specific at-risk time ends at min(t_observed, death_age, hi).

    Returns dict with total_person_years and per-trait person_years_at_risk.
    """
    if "generation" not in df.columns:
        return {}

    # Build window lookup: generation → (lo, hi)
    windows: dict[int, tuple[float, float]] = {}
    if gen_censoring is not None:
        for g, (lo, hi) in gen_censoring.items():
            windows[int(g)] = (float(lo), float(hi))

    has_death = "death_age" in df.columns
    total_py = 0.0
    total_deaths = 0
    trait_py: dict[str, float] = {}

    for trait in [1, 2]:
        trait_py[f"trait{trait}"] = 0.0

    for g, df_g in df.groupby("generation"):
        lo, hi = windows.get(int(g), (0.0, censor_age))
        if hi <= lo:
            continue
        n = len(df_g)

        # End of observation for each person (not trait-specific)
        if has_death:
            death_ages = df_g["death_age"].values
            end = np.minimum(death_ages, hi)
            # Deaths observed during follow-up window
            total_deaths += int(((death_ages >= lo) & (death_ages < hi)).sum())
        else:
            end = np.full(n, hi)
        gen_total = np.clip(end - lo, 0, None).sum()
        total_py += float(gen_total)

        # Trait-specific at-risk person-years
        for trait in [1, 2]:
            t_col = f"t_observed{trait}"
            if t_col not in df_g.columns:
                continue
            t_obs = df_g[t_col].values
            if has_death:
                trait_end = np.minimum(np.minimum(t_obs, df_g["death_age"].values), hi)
            else:
                trait_end = np.minimum(t_obs, hi)
            trait_py[f"trait{trait}"] += float(np.clip(trait_end - lo, 0, None).sum())

    return {
        "total": round(total_py, 1),
        "deaths": total_deaths,
        **{k: round(v, 1) for k, v in trait_py.items()},
    }

stats.pedigree

simace.analysis.stats.pedigree

Pedigree-structure summaries: family size and parent presence.

compute_mean_family_size

compute_mean_family_size(df)

Compute mean realised family size (offspring per mating pair).

Uses non-founder individuals (mother != -1) grouped by (mother, father).

Source code in simace/analysis/stats/pedigree.py
def compute_mean_family_size(df: pd.DataFrame) -> dict[str, Any]:
    """Compute mean realised family size (offspring per mating pair).

    Uses non-founder individuals (mother != -1) grouped by (mother, father).
    """
    if "mother" not in df.columns or "father" not in df.columns:
        return {}

    children = df.loc[(df["mother"] != -1) & (df["father"] != -1)]
    if len(children) == 0:
        return {}

    family_sizes = children.groupby(["mother", "father"]).size()

    # Fraction with at least one phenotyped full sibling
    families_with_sibs = family_sizes[family_sizes >= 2].index
    has_sib = children.set_index(["mother", "father"]).index.isin(families_with_sibs)
    frac_with_full_sib = round(float(has_sib.sum()) / len(children), 4)

    # Family size distribution per mating (1, 2, 3, 4+)
    n_fam = len(family_sizes)
    dist: dict[str, float] = {}
    for k in [1, 2, 3]:
        dist[str(k)] = round(int((family_sizes == k).sum()) / n_fam, 4)
    dist["4+"] = round(int((family_sizes >= 4).sum()) / n_fam, 4)

    # Offspring per person (including 0 for childless individuals).
    # Count via bincount on row positions: faster than groupby + Series.update/add.
    # When df is a subsample, a child's parent may not be in df["id"]; id_to_row
    # marks those as -1 and they must be masked out before bincount (which rejects
    # negatives). This matches the prior groupby+update semantics, which only
    # counted offspring against parents present in df["id"].
    ids_arr = df["id"].to_numpy()
    n_total = len(df)
    id_to_row = np.full(int(ids_arr.max()) + 1, -1, dtype=np.int32)
    id_to_row[ids_arr] = np.arange(n_total, dtype=np.int32)
    m_rows = id_to_row[children["mother"].to_numpy()]
    f_rows = id_to_row[children["father"].to_numpy()]
    m_rows = m_rows[m_rows >= 0]
    f_rows = f_rows[f_rows >= 0]
    counts_arr = np.bincount(m_rows, minlength=n_total) + np.bincount(f_rows, minlength=n_total)
    offspring_counts = pd.Series(counts_arr, index=df["id"])
    person_dist = _offspring_count_dist(offspring_counts, n_total)

    # Offspring per person by sex
    person_dist_by_sex: dict[str, dict[str, float]] = {}
    if "sex" in df.columns:
        sex_by_id = df.set_index("id")["sex"]
        for sex_val, sex_label in SEX_LEVELS:
            sex_ids = sex_by_id[sex_by_id == sex_val].index
            sex_counts = offspring_counts.reindex(sex_ids, fill_value=0)
            if len(sex_counts) > 0:
                person_dist_by_sex[sex_label] = _offspring_count_dist(sex_counts, len(sex_counts))

    # Number of mates by sex
    # Females: unique fathers per mother
    mates_female = children.groupby("mother")["father"].nunique()
    # Males: unique mothers per father
    mates_male = children.groupby("father")["mother"].nunique()
    n_mothers = len(mates_female)
    n_fathers = len(mates_male)
    mates_by_sex: dict[str, Any] = {
        "female_mean": round(float(mates_female.mean()), 2) if n_mothers else 0,
        "male_mean": round(float(mates_male.mean()), 2) if n_fathers else 0,
        "female_1": round(int((mates_female == 1).sum()) / n_mothers, 4) if n_mothers else 0,
        "female_2+": round(int((mates_female >= 2).sum()) / n_mothers, 4) if n_mothers else 0,
        "male_1": round(int((mates_male == 1).sum()) / n_fathers, 4) if n_fathers else 0,
        "male_2+": round(int((mates_male >= 2).sum()) / n_fathers, 4) if n_fathers else 0,
    }

    return {
        "mean": round(float(family_sizes.mean()), 2),
        "median": round(float(family_sizes.median()), 1),
        "q1": round(float(family_sizes.quantile(0.25)), 1),
        "q3": round(float(family_sizes.quantile(0.75)), 1),
        "n_families": len(family_sizes),
        "frac_with_full_sib": frac_with_full_sib,
        "size_dist": dist,
        "person_offspring_dist": person_dist,
        "person_offspring_dist_by_sex": person_dist_by_sex,
        "mates_by_sex": mates_by_sex,
    }

compute_parent_status

compute_parent_status(df, df_ped=None)

Count individuals by number of parents phenotyped and in pedigree.

Returns dict with 'phenotyped' and optionally 'in_pedigree', each mapping 0/1/2 → count of individuals with that many parents present.

Source code in simace/analysis/stats/pedigree.py
def compute_parent_status(
    df: pd.DataFrame,
    df_ped: pd.DataFrame | None = None,
) -> dict[str, Any]:
    """Count individuals by number of parents phenotyped and in pedigree.

    Returns dict with 'phenotyped' and optionally 'in_pedigree', each mapping
    0/1/2 → count of individuals with that many parents present.
    """
    if "mother" not in df.columns or "father" not in df.columns:
        return {}

    pheno_ids = df["id"].to_numpy()
    mothers = df["mother"].values
    fathers = df["father"].values

    m_pheno = np.isin(mothers, pheno_ids) & (mothers != -1)
    f_pheno = np.isin(fathers, pheno_ids) & (fathers != -1)
    n_parents_pheno = m_pheno.astype(int) + f_pheno.astype(int)
    result: dict[str, Any] = {
        "phenotyped": {str(k): int((n_parents_pheno == k).sum()) for k in [0, 1, 2]},
    }

    if df_ped is not None:
        ped_ids = df_ped["id"].to_numpy()
        m_ped = np.isin(mothers, ped_ids) & (mothers != -1)
        f_ped = np.isin(fathers, ped_ids) & (fathers != -1)
        n_parents_ped = m_ped.astype(int) + f_ped.astype(int)
        result["in_pedigree"] = {str(k): int((n_parents_ped == k).sum()) for k in [0, 1, 2]}

    return result

stats.sampling

simace.analysis.stats.sampling

Per-rep downsampling for scatter/histogram plot inputs.

create_sample

create_sample(df, seed=42, n_per_gen=50000)

Downsample for scatter/histogram plots, preserving parent rows.

Source code in simace/analysis/stats/sampling.py
def create_sample(
    df: pd.DataFrame,
    seed: int = 42,
    n_per_gen: int = 50_000,
) -> pd.DataFrame:
    """Downsample for scatter/histogram plots, preserving parent rows."""
    rng = np.random.default_rng(seed)
    generations = df["generation"].values
    unique_gens = sorted(np.unique(generations))
    if all(int((generations == g).sum()) <= n_per_gen for g in unique_gens):
        return df.copy()
    ids = df["id"].values
    max_id = int(ids.max()) + 1
    id_to_row = np.full(max_id, -1, dtype=np.int32)
    id_to_row[ids] = np.arange(len(df), dtype=np.int32)
    sampled_chunks = []
    for gen in unique_gens:
        gen_idx = np.where(generations == gen)[0]
        sampled_chunks.append(rng.choice(gen_idx, min(len(gen_idx), n_per_gen), replace=False))
    sampled_rows = np.concatenate(sampled_chunks)
    parent_rows = []
    for pid_arr in (df["mother"].values[sampled_rows], df["father"].values[sampled_rows]):
        valid = (pid_arr >= 0) & (pid_arr < max_id)
        rows = id_to_row[pid_arr[valid]]
        parent_rows.append(rows[rows >= 0])
    final_rows = np.unique(np.concatenate([sampled_rows, *parent_rows]))
    return df.iloc[final_rows].copy()

stats.runner

simace.analysis.stats.runner

Orchestration entry point for per-rep phenotype statistics.

Reads a single phenotype.parquet, runs every stats computation, and writes phenotype_stats.yaml plus phenotype_samples.parquet.

main

main(phenotype_path, censor_age, stats_output, samples_output, seed=42, gen_censoring=None, pedigree_path=None, max_degree=2, case_ascertainment_ratio=1.0, params=None)

Compute all stats for a single rep and write outputs.

Source code in simace/analysis/stats/runner.py
def main(
    phenotype_path: str,
    censor_age: float,
    stats_output: str,
    samples_output: str,
    seed: int = 42,
    gen_censoring: dict[int, list[float]] | None = None,
    pedigree_path: str | None = None,
    max_degree: int = 2,
    case_ascertainment_ratio: float = 1.0,
    params: dict[str, Any] | None = None,
) -> None:
    """Compute all stats for a single rep and write outputs."""
    df = pd.read_parquet(phenotype_path)
    logger.info("Computing stats for %s (%d rows)", phenotype_path, len(df))

    stats: dict[str, Any] = {
        "n_individuals": len(df),
        "n_generations": int(df["generation"].nunique()) if "generation" in df.columns else 1,
    }

    if case_ascertainment_ratio != 1.0:
        stats["case_ascertainment_ratio"] = case_ascertainment_ratio

    stats["prevalence"] = compute_prevalence(df)
    stats["mortality"] = compute_mortality(df, censor_age)
    stats["regression"] = compute_regression(df)
    stats["cumulative_incidence"] = compute_cumulative_incidence(df, censor_age)
    stats["joint_affection"] = compute_joint_affection(df)
    stats["cumulative_incidence_by_sex"] = compute_cumulative_incidence_by_sex(df, censor_age)
    stats["cumulative_incidence_by_sex_generation"] = compute_cumulative_incidence_by_sex_generation(df, censor_age)

    if gen_censoring is not None:
        stats["censoring"] = compute_censoring_windows(df, censor_age, gen_censoring)
        stats["censoring_confusion"] = compute_censoring_confusion(df, censor_age, gen_censoring)
        stats["censoring_cascade"] = compute_censoring_cascade(df, censor_age, gen_censoring)

    stats["person_years"] = compute_person_years(df, censor_age, gen_censoring)
    stats["family_size"] = compute_mean_family_size(df)

    # Read full pedigree once (used for both pair extraction and pair counts)
    df_ped = pd.read_parquet(pedigree_path) if pedigree_path is not None else None

    logger.info("Extracting relationship pairs...")
    t0 = time.perf_counter()
    if df_ped is not None:
        pg = PedigreeGraph.from_subsample(df_ped, df)
        pairs = pg.extract_pairs(max_degree=max_degree)
        full_counts = pg.count_pairs(max_degree=max_degree, scope="full")
    else:
        pg = PedigreeGraph(df)
        pairs = pg.extract_pairs(max_degree=max_degree)
        full_counts = None
    logger.info(
        "Relationship pairs extracted in %.1fs: %s",
        time.perf_counter() - t0,
        ", ".join(f"{k}: {len(v[0])}" for k, v in pairs.items()),
    )

    stats["pair_counts"] = {k: len(v[0]) for k, v in pairs.items()}

    stats["parent_status"] = compute_parent_status(df, df_ped)

    if df_ped is not None and full_counts is not None:
        stats["pair_counts_ped"] = full_counts
        stats["n_individuals_ped"] = len(df_ped)
        stats["n_generations_ped"] = int(df_ped["generation"].nunique()) if "generation" in df_ped.columns else 1
        logger.info(
            "Pedigree pair counts (from same graph): %s",
            ", ".join(f"{k}: {v}" for k, v in full_counts.items()),
        )

    if df_ped is not None:
        logger.info("Computing mate liability correlations...")
        stats["mate_correlation"] = compute_mate_correlation(df_ped)
    del df_ped

    logger.info("Computing effective population size estimators...")
    t0 = time.perf_counter()
    stats["effective_size"] = compute_effective_size(pg, config=params)
    logger.info("Ne estimators computed in %.1fs", time.perf_counter() - t0)

    # Fast sequential computations
    stats["liability_correlations"] = compute_liability_correlations(df, seed=seed, pairs=pairs)
    stats["affected_correlations"] = compute_affected_correlations(df, seed=seed, pairs=pairs)
    stats["parent_offspring_corr"] = compute_parent_offspring_corr(df)
    stats["parent_offspring_corr_by_sex"] = compute_parent_offspring_corr_by_sex(df)
    stats["parent_offspring_affected_corr"] = compute_parent_offspring_affected_corr(df)
    stats["observed_h2_estimators"] = compute_observed_h2_estimators(stats)

    # Expensive MLE computations — run in parallel (scipy.optimize releases the GIL)
    logger.info("Computing tetrachoric correlations in parallel...")
    t_mle = time.perf_counter()
    with ThreadPoolExecutor(max_workers=5) as pool:
        fut_tetra = pool.submit(compute_tetrachoric, df, seed=seed, pairs=pairs)
        fut_tetra_gen = pool.submit(compute_tetrachoric_by_generation, df, seed=seed, pairs=pairs)
        fut_cross = pool.submit(compute_cross_trait_tetrachoric, df, seed=seed, pairs=pairs)
        fut_tetra_sex = pool.submit(compute_tetrachoric_by_sex, df, seed=seed, pairs=pairs)

        stats["tetrachoric"] = fut_tetra.result()
        stats["tetrachoric_by_generation"] = fut_tetra_gen.result()
        stats["cross_trait_tetrachoric"] = fut_cross.result()
        stats["tetrachoric_by_sex"] = fut_tetra_sex.result()
    logger.info("All MLE correlations computed in %.1fs", time.perf_counter() - t_mle)

    stats_path = Path(stats_output)
    stats_path.parent.mkdir(parents=True, exist_ok=True)
    with open(stats_path, "w", encoding="utf-8") as fh:
        yaml.dump(to_native(stats), fh, default_flow_style=False, sort_keys=False)
    logger.info("Stats written to %s", stats_path)

    sample_df = create_sample(df, seed=seed)
    save_parquet(sample_df, Path(samples_output))
    logger.info("Sample (%d rows) written to %s", len(sample_df), samples_output)

cli

cli()

Command-line interface for phenotype statistics computation.

Source code in simace/analysis/stats/runner.py
def cli() -> None:
    """Command-line interface for phenotype statistics computation."""
    from simace.core.cli_base import add_logging_args, init_logging

    parser = argparse.ArgumentParser(description="Compute phenotype statistics")
    add_logging_args(parser)
    parser.add_argument("phenotype", help="Input phenotype parquet")
    parser.add_argument("censor_age", type=float)
    parser.add_argument("stats_output", help="Output stats YAML")
    parser.add_argument("samples_output", help="Output samples parquet")
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--gen-censoring", type=str, default=None, help="Per-generation censoring windows as JSON dict")
    parser.add_argument("--pedigree", default=None, help="Full pedigree parquet for G_ped pair counts")
    parser.add_argument(
        "--max-degree",
        dest="max_degree",
        type=int,
        default=2,
        help="Maximum kinship degree for pair extraction (1-5, default 2)",
    )
    parser.add_argument(
        "--params",
        default=None,
        help="Per-rep params.yaml; enables theoretical Ne expectations",
    )

    args = parser.parse_args()
    init_logging(args)

    gen_censoring = None
    if args.gen_censoring:
        gen_censoring = {int(k): v for k, v in json.loads(args.gen_censoring).items()}

    params = None
    if args.params:
        from simace.core.yaml_io import load_yaml

        params = load_yaml(args.params)

    main(
        args.phenotype,
        args.censor_age,
        args.stats_output,
        args.samples_output,
        seed=args.seed,
        gen_censoring=gen_censoring,
        pedigree_path=args.pedigree,
        max_degree=args.max_degree,
        params=params,
    )

gather

simace.analysis.gather

Gather validation results from all scenarios into a single TSV file.

extract_metrics

extract_metrics(validation_path)

Extract key metrics from a validation YAML file.

Source code in simace/analysis/gather.py
def extract_metrics(validation_path: str) -> dict[str, Any]:
    """Extract key metrics from a validation YAML file."""
    data = load_yaml(validation_path)

    validation_path = str(validation_path).replace("\\", "/")

    match = _VALIDATION_PATH_RE.search(validation_path)
    if match:
        folder, scenario, rep_str = match.group(1), match.group(2), match.group(3)
        rep = int(rep_str)
        bench_path = Path(f"benchmarks/{folder}/{scenario}/rep{rep_str}/simulate.tsv")
    else:
        scenario = "unknown"
        rep = 1
        bench_path = Path("")

    simulate_seconds = None
    simulate_max_rss_mb = None
    if bench_path.exists():
        with open(bench_path, encoding="utf-8", newline="") as bf:
            reader = csv.DictReader(bf, delimiter="\t")
            for row_b in reader:
                simulate_seconds = float(row_b["s"])

                if platform.system() == "Windows":
                    # Windows does not support max_rss
                    simulate_max_rss_mb = float(1)
                else:
                    # Linux/macOS → normal
                    simulate_max_rss_mb = float(row_b["max_rss"])

                break

    params = data["parameters"]
    summary = data["summary"]

    return {
        "scenario": scenario,
        "rep": rep,
        "N": params.get("N"),
        "G_ped": params.get("G_ped"),
        "G_sim": params.get("G_sim"),
        # Trait 1 parameters
        "A1": params.get("A1"),
        "C1": params.get("C1"),
        "E1": params.get("E1"),
        # Trait 2 parameters
        "A2": params.get("A2"),
        "C2": params.get("C2"),
        "E2": params.get("E2"),
        # Cross-trait correlations
        "rA": params.get("rA"),
        "rC": params.get("rC"),
        # Population parameters
        "p_mztwin": params.get("p_mztwin"),
        "mating_lambda": params.get("mating_lambda"),
        "assort1": params.get("assort1"),
        "assort2": params.get("assort2"),
        "seed": params.get("seed"),
        "checks_failed": summary.get("checks_failed"),
        # Twin rate
        "observed_twin_rate": _get_nested(data, "twins", "twin_rate", "observed_rate"),
        # Trait 1 variances
        "variance_A1": _get_nested(data, "statistical", "variance_A1", "observed"),
        "variance_C1": _get_nested(data, "statistical", "variance_C1", "observed"),
        "variance_E1": _get_nested(data, "statistical", "variance_E1", "observed"),
        # Trait 2 variances
        "variance_A2": _get_nested(data, "statistical", "variance_A2", "observed"),
        "variance_C2": _get_nested(data, "statistical", "variance_C2", "observed"),
        "variance_E2": _get_nested(data, "statistical", "variance_E2", "observed"),
        # Cross-trait correlations (observed)
        "observed_rA": _get_nested(data, "statistical", "cross_trait_rA", "observed"),
        "observed_rC": _get_nested(data, "statistical", "cross_trait_rC", "observed"),
        "observed_rE": _get_nested(data, "statistical", "cross_trait_rE", "observed"),
        # MZ twin correlations (trait 1)
        "mz_twin_A1_corr": _get_nested(data, "heritability", "mz_twin_A1_correlation", "observed"),
        "mz_twin_liability1_corr": _get_nested(data, "heritability", "mz_twin_liability1_correlation", "observed"),
        # MZ twin correlations (trait 2)
        "mz_twin_A2_corr": _get_nested(data, "heritability", "mz_twin_A2_correlation", "observed"),
        "mz_twin_liability2_corr": _get_nested(data, "heritability", "mz_twin_liability2_correlation", "observed"),
        # DZ sibling correlations (trait 1)
        "dz_sibling_A1_corr": _get_nested(data, "heritability", "dz_sibling_A1_correlation", "observed"),
        "dz_sibling_liability1_corr": _get_nested(
            data, "heritability", "dz_sibling_liability1_correlation", "observed"
        ),
        # DZ sibling correlations (trait 2)
        "dz_sibling_A2_corr": _get_nested(data, "heritability", "dz_sibling_A2_correlation", "observed"),
        "dz_sibling_liability2_corr": _get_nested(
            data, "heritability", "dz_sibling_liability2_correlation", "observed"
        ),
        # Half-sib statistics
        "half_sib_prop_expected": _get_nested(data, "half_sibs", "half_sib_pair_proportion", "expected"),
        "half_sib_prop_observed": _get_nested(data, "half_sibs", "half_sib_pair_proportion", "observed"),
        "offspring_with_half_sib_expected": _get_nested(data, "half_sibs", "offspring_with_half_sib", "expected"),
        "offspring_with_half_sib_observed": _get_nested(data, "half_sibs", "offspring_with_half_sib", "observed"),
        "half_sib_A1_corr": _get_nested(data, "half_sibs", "half_sib_A1_correlation", "observed"),
        "half_sib_liability1_corr": _get_nested(data, "half_sibs", "half_sib_liability1_correlation", "observed"),
        "half_sib_shared_C1": _get_nested(data, "half_sibs", "half_sib_shared_C1", "observed"),
        # Mate correlation (assortative mating)
        "mate_corr_liability1": _get_nested(data, "assortative_mating", "mate_corr_liability1", "observed"),
        "mate_corr_liability2": _get_nested(data, "assortative_mating", "mate_corr_liability2", "observed"),
        # Benchmark timing and memory
        "simulate_seconds": simulate_seconds,
        "simulate_max_rss_mb": simulate_max_rss_mb,
        # Family size distribution
        "mother_mean_offspring": _get_nested(data, "family_size_distribution", "mother", "mean"),
        "father_mean_offspring": _get_nested(data, "family_size_distribution", "father", "mean"),
        # Falconer heritability estimates
        "falconer_h2_trait1": _get_nested(data, "heritability", "falconer_estimate_trait1", "observed"),
        "falconer_h2_trait2": _get_nested(data, "heritability", "falconer_estimate_trait2", "observed"),
        # Parent-offspring regressions (trait 1)
        "parent_offspring_A1_slope": _get_nested(data, "heritability", "parent_offspring_A1_regression", "slope"),
        "parent_offspring_A1_r2": _get_nested(data, "heritability", "parent_offspring_A1_regression", "r_squared"),
        "parent_offspring_liability1_slope": _get_nested(
            data, "heritability", "parent_offspring_liability1_regression", "slope"
        ),
        "parent_offspring_liability1_r2": _get_nested(
            data, "heritability", "parent_offspring_liability1_regression", "r_squared"
        ),
        # Parent-offspring regressions (trait 2)
        "parent_offspring_A2_slope": _get_nested(data, "heritability", "parent_offspring_A2_regression", "slope"),
        "parent_offspring_A2_r2": _get_nested(data, "heritability", "parent_offspring_A2_regression", "r_squared"),
        "parent_offspring_liability2_slope": _get_nested(
            data, "heritability", "parent_offspring_liability2_regression", "slope"
        ),
        "parent_offspring_liability2_r2": _get_nested(
            data, "heritability", "parent_offspring_liability2_regression", "r_squared"
        ),
        # Consanguineous matings
        "n_half_sib_matings": _get_nested(data, "consanguineous_matings", "consanguineous_count", "n_half_sib_matings"),
        "n_full_sib_matings": _get_nested(data, "consanguineous_matings", "consanguineous_count", "n_full_sib_matings"),
        "missing_gp_links": _get_nested(
            data, "consanguineous_matings", "consanguineous_count", "total_missing_gp_links"
        ),
        "gp_reconciled": _get_nested(data, "consanguineous_matings", "grandparent_reconciliation", "passed"),
    }

main

main(validation_files, output_path)

Gather all validation results into a TSV file.

Source code in simace/analysis/gather.py
def main(validation_files: list[str], output_path: str) -> None:
    """Gather all validation results into a TSV file."""
    rows = []
    for validation_path in validation_files:
        row = extract_metrics(validation_path)
        rows.append(row)

    # Sort by scenario name, then by rep
    rows.sort(key=lambda x: (x["scenario"], x["rep"]))

    logger.info("Gathered %d validation results -> %s", len(rows), output_path)

    # Write TSV
    if rows:
        columns = list(rows[0].keys())
        with open(output_path, "w", encoding="utf-8", newline="") as f:
            f.write("\t".join(columns) + "\n")
            for row in rows:
                values = []
                for col in columns:
                    val = row[col]
                    if val is None:
                        values.append("")
                    elif isinstance(val, float):
                        values.append(f"{val:.4g}")
                    else:
                        values.append(str(val))
                f.write("\t".join(values) + "\n")

cli

cli()

Command-line interface for gathering validation results.

Source code in simace/analysis/gather.py
def cli() -> None:
    """Command-line interface for gathering validation results."""
    from simace.core.cli_base import add_logging_args, init_logging

    parser = argparse.ArgumentParser(description="Gather validation results into TSV")
    add_logging_args(parser)
    parser.add_argument("validations", nargs="+", help="Validation YAML paths")
    parser.add_argument("--output", required=True, help="Output TSV path")
    args = parser.parse_args()

    init_logging(args)

    main(args.validations, args.output)