Skip to content

simace.simulation

Multi-generational pedigree simulation with A/C/E variance components.

simulate

simace.simulation.simulate

ACE pedigree simulation.

Simulates multi-generational pedigrees with: - A: Additive genetic component - C: Common/shared environment component - E: Unique environment component

Supports single-trait and two-trait (bivariate) modes with configurable cross-trait correlations for genetic (rA) and common environment (rC) components.

resolve_per_gen_param

resolve_per_gen_param(value, G, name='param')

Resolve a variance-component parameter to a per-generation list.

PARAMETER DESCRIPTION
value

scalar (constant across all generations) or dict mapping generation index → value. Missing generation keys are forward-filled from the most recent earlier key.

TYPE: float | dict[int, float]

G

total number of generations to resolve for (indices 0..G-1).

TYPE: int

name

parameter name, used in error messages.

TYPE: str DEFAULT: 'param'

RETURNS DESCRIPTION
list[float]

List of length G with the resolved value for each generation.

RAISES DESCRIPTION
ValueError

if any resolved value is negative or if a dict has no key <= 0 (so generation 0 would be undefined).

Source code in simace/simulation/simulate.py
def resolve_per_gen_param(value: float | dict[int, float], G: int, name: str = "param") -> list[float]:
    """Resolve a variance-component parameter to a per-generation list.

    Args:
        value: scalar (constant across all generations) or dict mapping
               generation index → value.  Missing generation keys are
               forward-filled from the most recent earlier key.
        G: total number of generations to resolve for (indices 0..G-1).
        name: parameter name, used in error messages.

    Returns:
        List of length *G* with the resolved value for each generation.

    Raises:
        ValueError: if any resolved value is negative or if a dict has no
            key <= 0 (so generation 0 would be undefined).
    """
    if isinstance(value, (int, float)):
        v = float(value)
        if v < 0:
            raise ValueError(f"{name} must be >= 0, got {v}")
        return [v] * G

    if not isinstance(value, dict):
        raise TypeError(f"{name} must be a scalar or dict, got {type(value).__name__}")

    if not value:
        raise ValueError(f"{name} dict must not be empty")

    # Sort keys and validate
    sorted_keys = sorted(value)
    if sorted_keys[0] > 0:
        raise ValueError(
            f"{name} dict must have a key <= 0 so generation 0 is defined; smallest key is {sorted_keys[0]}"
        )
    for k in sorted_keys:
        v = float(value[k])
        if v < 0:
            raise ValueError(f"{name}[{k}] must be >= 0, got {v}")

    # Forward-fill
    result = [0.0] * G
    key_idx = 0
    current_val = float(value[sorted_keys[0]])
    for gen in range(G):
        # Advance to the latest key <= gen
        while key_idx + 1 < len(sorted_keys) and sorted_keys[key_idx + 1] <= gen:
            key_idx += 1
            current_val = float(value[sorted_keys[key_idx]])
        result[gen] = current_val
    return result

generate_correlated_components

generate_correlated_components(rng, n, sd1, sd2, correlation)

Generate two correlated normal variables via multivariate normal.

PARAMETER DESCRIPTION
rng

numpy random generator

TYPE: Generator

n

number of samples

TYPE: int

sd1

standard deviation for component 1

TYPE: float

sd2

standard deviation for component 2

TYPE: float

correlation

correlation between components

TYPE: float

RETURNS DESCRIPTION
(comp1, comp2)

tuple of arrays, each shape (n,)

RAISES DESCRIPTION
ValueError

if sd1 or sd2 is negative, or correlation is outside [-1, 1]

Source code in simace/simulation/simulate.py
def generate_correlated_components(
    rng: np.random.Generator, n: int, sd1: float, sd2: float, correlation: float
) -> tuple[np.ndarray, np.ndarray]:
    """Generate two correlated normal variables via multivariate normal.

    Args:
        rng: numpy random generator
        n: number of samples
        sd1: standard deviation for component 1
        sd2: standard deviation for component 2
        correlation: correlation between components

    Returns:
        (comp1, comp2): tuple of arrays, each shape (n,)

    Raises:
        ValueError: if sd1 or sd2 is negative, or correlation is outside [-1, 1]
    """
    if sd1 < 0 or sd2 < 0:
        raise ValueError(f"Standard deviations must be non-negative, got sd1={sd1}, sd2={sd2}")
    if not (-1 <= correlation <= 1):
        raise ValueError(f"Correlation must be in [-1, 1], got {correlation}")

    cov = [
        [sd1**2, correlation * sd1 * sd2],
        [correlation * sd1 * sd2, sd2**2],
    ]
    samples = rng.multivariate_normal(mean=[0, 0], cov=cov, size=n)
    return samples[:, 0], samples[:, 1]

generate_mendelian_noise

generate_mendelian_noise(rng, n, sd_A1, sd_A2, rA)

Generate correlated Mendelian sampling noise for two traits.

Under the infinitesimal model, the Mendelian sampling variance is 0.5 * Var(A) for each trait, so sd_noise = sd_A / sqrt(2) (Bulmer, 1971, Am. Nat., 105, 201-211).

PARAMETER DESCRIPTION
rng

numpy random generator

TYPE: Generator

n

number of offspring

TYPE: int

sd_A1

standard deviation of A1 (sqrt of A1 variance)

TYPE: float

sd_A2

standard deviation of A2 (sqrt of A2 variance)

TYPE: float

rA

genetic correlation between traits

TYPE: float

RETURNS DESCRIPTION
(noise1, noise2)

tuple of arrays, each shape (n,)

Source code in simace/simulation/simulate.py
def generate_mendelian_noise(
    rng: np.random.Generator, n: int, sd_A1: float, sd_A2: float, rA: float
) -> tuple[np.ndarray, np.ndarray]:
    """Generate correlated Mendelian sampling noise for two traits.

    Under the infinitesimal model, the Mendelian sampling variance is
    0.5 * Var(A) for each trait, so sd_noise = sd_A / sqrt(2)
    (Bulmer, 1971, Am. Nat., 105, 201-211).

    Args:
        rng: numpy random generator
        n: number of offspring
        sd_A1: standard deviation of A1 (sqrt of A1 variance)
        sd_A2: standard deviation of A2 (sqrt of A2 variance)
        rA: genetic correlation between traits

    Returns:
        (noise1, noise2): tuple of arrays, each shape (n,)
    """
    sd_noise1 = sd_A1 / np.sqrt(2)
    sd_noise2 = sd_A2 / np.sqrt(2)
    return generate_correlated_components(rng, n, sd_noise1, sd_noise2, rA)

draw_mating_counts

draw_mating_counts(rng, n, mating_lambda)

Draw zero-truncated Poisson mating counts for n individuals.

PARAMETER DESCRIPTION
rng

numpy random generator

TYPE: Generator

n

number of individuals

TYPE: int

mating_lambda

Poisson lambda for the ZTP distribution

TYPE: float

RETURNS DESCRIPTION
ndarray

Array of shape (n,) with all values >= 1.

Source code in simace/simulation/simulate.py
def draw_mating_counts(rng: np.random.Generator, n: int, mating_lambda: float) -> np.ndarray:
    """Draw zero-truncated Poisson mating counts for *n* individuals.

    Args:
        rng: numpy random generator
        n: number of individuals
        mating_lambda: Poisson lambda for the ZTP distribution

    Returns:
        Array of shape ``(n,)`` with all values >= 1.
    """
    counts = rng.poisson(lam=mating_lambda, size=n)
    zeros = counts == 0
    while zeros.any():
        counts[zeros] = rng.poisson(lam=mating_lambda, size=zeros.sum())
        zeros = counts == 0
    return counts

balance_mating_slots

balance_mating_slots(rng, male_counts, female_counts)

Balance total mating slots between males and females.

Takes T = min(sum(male), sum(female)) and randomly trims the larger side so both sum to T.

RETURNS DESCRIPTION
tuple[ndarray, ndarray]

(balanced_male_counts, balanced_female_counts)

Source code in simace/simulation/simulate.py
def balance_mating_slots(
    rng: np.random.Generator, male_counts: np.ndarray, female_counts: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
    """Balance total mating slots between males and females.

    Takes ``T = min(sum(male), sum(female))`` and randomly trims the larger
    side so both sum to *T*.

    Returns:
        ``(balanced_male_counts, balanced_female_counts)``
    """
    m_total = int(male_counts.sum())
    f_total = int(female_counts.sum())
    T = min(m_total, f_total)

    def _trim(counts: np.ndarray, total: int, target: int) -> np.ndarray:
        if total == target:
            return counts.copy()
        # Expand to per-slot array, keep a random subset of size target
        idx = np.repeat(np.arange(len(counts)), counts)
        keep = rng.choice(len(idx), size=target, replace=False)
        keep.sort()
        kept_idx = idx[keep]
        return np.bincount(kept_idx, minlength=len(counts)).astype(counts.dtype)

    return _trim(male_counts, m_total, T), _trim(female_counts, f_total, T)

pair_partners

pair_partners(rng, male_idxs, male_counts, female_idxs, female_counts)

Create mating pairs via random bipartite matching.

Expands each sex into slot arrays using np.repeat, shuffles one side, and pairs positionally. Duplicate (mother, father) pairs are resolved by swapping conflicting entries.

RETURNS DESCRIPTION
ndarray

(M, 2) array of [mother_idx, father_idx].

Source code in simace/simulation/simulate.py
def pair_partners(
    rng: np.random.Generator,
    male_idxs: np.ndarray,
    male_counts: np.ndarray,
    female_idxs: np.ndarray,
    female_counts: np.ndarray,
) -> np.ndarray:
    """Create mating pairs via random bipartite matching.

    Expands each sex into slot arrays using ``np.repeat``, shuffles one side,
    and pairs positionally. Duplicate ``(mother, father)`` pairs are resolved
    by swapping conflicting entries.

    Returns:
        ``(M, 2)`` array of ``[mother_idx, father_idx]``.
    """
    male_slots = np.repeat(male_idxs, male_counts)
    female_slots = np.repeat(female_idxs, female_counts)
    rng.shuffle(male_slots)

    matings = np.column_stack([female_slots, male_slots])

    # Deduplicate: swap conflicting entries (rare at low lambda)
    for _attempt in range(5):
        is_dup = _find_duplicate_pairs(matings)
        if not is_dup.any():
            break
        dup_idxs = np.where(is_dup)[0]
        non_dup_idxs = np.where(~is_dup)[0]
        if len(non_dup_idxs) == 0:
            break
        for d in dup_idxs:
            swap_with = rng.choice(non_dup_idxs)
            matings[d, 1], matings[swap_with, 1] = matings[swap_with, 1], matings[d, 1]

    return matings

allocate_offspring

allocate_offspring(rng, n_matings, N)

Distribute N offspring across n_matings matings via multinomial.

RETURNS DESCRIPTION
ndarray

Array of shape (n_matings,) summing to exactly N.

Source code in simace/simulation/simulate.py
def allocate_offspring(rng: np.random.Generator, n_matings: int, N: int) -> np.ndarray:
    """Distribute *N* offspring across *n_matings* matings via multinomial.

    Returns:
        Array of shape ``(n_matings,)`` summing to exactly *N*.
    """
    probs = np.ones(n_matings) / n_matings
    return rng.multinomial(N, probs)

assign_twins

assign_twins(rng, offspring_counts, p_mztwin)

Decide which matings produce an MZ twin pair.

Only matings with >= 2 offspring are eligible. At most one twin pair per mating.

RETURNS DESCRIPTION
ndarray

Boolean mask of shape (M,) — True where a twin pair occurs.

Source code in simace/simulation/simulate.py
def assign_twins(rng: np.random.Generator, offspring_counts: np.ndarray, p_mztwin: float) -> np.ndarray:
    """Decide which matings produce an MZ twin pair.

    Only matings with >= 2 offspring are eligible. At most one twin pair
    per mating.

    Returns:
        Boolean mask of shape ``(M,)`` — True where a twin pair occurs.
    """
    mask = np.zeros(len(offspring_counts), dtype=bool)
    eligible = offspring_counts >= 2
    if eligible.any():
        rolls = rng.uniform(size=eligible.sum()) < p_mztwin
        mask[eligible] = rolls
    return mask

mating

mating(rng, parental_sex, mating_lambda, p_mztwin, pheno=None, assort1=0.0, assort2=0.0, rho_w=0.0, assort_matrix=None)

Generate parent-offspring pairings via a modular mating pipeline.

  1. Separate males/females and draw ZTP(mating_lambda) mating counts.
  2. Balance slots so both sexes have equal total.
  3. Pair partners randomly (or assortatively if assort1/assort2 != 0).
  4. Allocate N offspring across matings via multinomial.
  5. Assign MZ twins to eligible matings.
  6. Build output arrays.
PARAMETER DESCRIPTION
rng

numpy random generator

TYPE: Generator

parental_sex

array of sex values (0=female, 1=male) for parents

TYPE: ndarray

mating_lambda

Poisson lambda for zero-truncated mating count distribution

TYPE: float

p_mztwin

probability of a mating producing MZ twins (if >= 2 offspring)

TYPE: float

pheno

(n, 6) array of [A1, C1, E1, A2, C2, E2]; required when assort1 or assort2 is nonzero

TYPE: ndarray | None DEFAULT: None

assort1

target mate correlation on trait 1 liability

TYPE: float DEFAULT: 0.0

assort2

target mate correlation on trait 2 liability

TYPE: float DEFAULT: 0.0

rho_w

within-person cross-trait liability correlation, used to derive off-diagonal entries of the mate correlation matrix

TYPE: float DEFAULT: 0.0

assort_matrix

optional full 2x2 mate correlation matrix R_mf

TYPE: ndarray | None DEFAULT: None

RETURNS DESCRIPTION
parent_idxs

(N, 2) array of [mother_idx, father_idx] for each offspring

TYPE: ndarray

twins

(m, 2) array of [twin1_idx, twin2_idx] pairs for MZ twins

TYPE: ndarray

household_ids

(N,) array mapping each offspring to a household index

TYPE: ndarray

RAISES DESCRIPTION
ValueError

if assort is nonzero but pheno is None

Source code in simace/simulation/simulate.py
def mating(
    rng: np.random.Generator,
    parental_sex: np.ndarray,
    mating_lambda: float,
    p_mztwin: float,
    pheno: np.ndarray | None = None,
    assort1: float = 0.0,
    assort2: float = 0.0,
    rho_w: float = 0.0,
    assort_matrix: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Generate parent-offspring pairings via a modular mating pipeline.

    1. Separate males/females and draw ZTP(mating_lambda) mating counts.
    2. Balance slots so both sexes have equal total.
    3. Pair partners randomly (or assortatively if assort1/assort2 != 0).
    4. Allocate N offspring across matings via multinomial.
    5. Assign MZ twins to eligible matings.
    6. Build output arrays.

    Args:
        rng: numpy random generator
        parental_sex: array of sex values (0=female, 1=male) for parents
        mating_lambda: Poisson lambda for zero-truncated mating count distribution
        p_mztwin: probability of a mating producing MZ twins (if >= 2 offspring)
        pheno: (n, 6) array of [A1, C1, E1, A2, C2, E2]; required when
            assort1 or assort2 is nonzero
        assort1: target mate correlation on trait 1 liability
        assort2: target mate correlation on trait 2 liability
        rho_w: within-person cross-trait liability correlation, used to
            derive off-diagonal entries of the mate correlation matrix
        assort_matrix: optional full 2x2 mate correlation matrix R_mf

    Returns:
        parent_idxs: (N, 2) array of [mother_idx, father_idx] for each offspring
        twins: (m, 2) array of [twin1_idx, twin2_idx] pairs for MZ twins
        household_ids: (N,) array mapping each offspring to a household index

    Raises:
        ValueError: if assort is nonzero but pheno is None
    """
    if (assort1 != 0 or assort2 != 0) and pheno is None:
        raise ValueError("pheno must be provided when assort1 or assort2 is nonzero")
    N = len(parental_sex)
    male_idxs = np.where(parental_sex == 1)[0]
    female_idxs = np.where(parental_sex == 0)[0]

    # 1. Draw mating counts per individual
    male_counts = draw_mating_counts(rng, len(male_idxs), mating_lambda)
    female_counts = draw_mating_counts(rng, len(female_idxs), mating_lambda)

    # 2. Balance slots
    male_counts, female_counts = balance_mating_slots(rng, male_counts, female_counts)

    # 3. Pair partners -> (M, 2) of [mother_idx, father_idx]
    if assort1 != 0 or assort2 != 0:
        matings = _assortative_pair_partners(
            rng,
            male_idxs,
            male_counts,
            female_idxs,
            female_counts,
            pheno,
            assort1,
            assort2,
            rho_w=rho_w,
            assort_matrix=assort_matrix,
        )
    else:
        matings = pair_partners(rng, male_idxs, male_counts, female_idxs, female_counts)
    M = len(matings)

    # 4. Allocate offspring
    offspring_counts = allocate_offspring(rng, M, N)

    # 5. Assign twins
    twin_mask = assign_twins(rng, offspring_counts, p_mztwin)

    # 6. Build output arrays
    parent_idxs = np.repeat(matings, offspring_counts, axis=0)

    # Household: all offspring of the same mother share a household
    _, household_ids = np.unique(parent_idxs[:, 0], return_inverse=True)

    # Twin pairs: first two offspring of each twin-flagged mating
    starts = np.zeros(M + 1, dtype=int)
    np.cumsum(offspring_counts, out=starts[1:])
    twin_indices = np.where(twin_mask)[0]
    if len(twin_indices) > 0:
        t1 = starts[twin_indices]
        t2 = t1 + 1
        twins = np.column_stack([t1, t2])
    else:
        twins = np.array([], dtype=int).reshape(0, 2)

    return parent_idxs, twins, household_ids

reproduce

reproduce(rng, pheno, parents, twins, household_ids, sd_A1, sd_E1, sd_C1, sd_A2, sd_E2, sd_C2, rA, rC, rE=0.0)

Simulate offspring phenotypes from parents for two correlated traits.

Additive genetic values are inherited as midparent + Mendelian noise. Common environment (C) is drawn freshly per household — it is NOT inherited from parents but represents the offspring's own shared rearing environment (siblings share C; parents and children do not). Unique environment (E) is drawn independently per individual.

PARAMETER DESCRIPTION
rng

numpy random generator

TYPE: Generator

pheno

(n, 6) array of [A1, C1, E1, A2, C2, E2] for parents

TYPE: ndarray

parents

(n, 2) array of [mother_idx, father_idx]

TYPE: ndarray

twins

array of MZ twin index pairs

TYPE: ndarray

household_ids

(n,) array mapping each offspring to a household

TYPE: ndarray

sd_A1

standard deviation of A for trait 1

TYPE: float

sd_E1

standard deviation of E for trait 1

TYPE: float

sd_C1

standard deviation of C for trait 1

TYPE: float

sd_A2

standard deviation of A for trait 2

TYPE: float

sd_E2

standard deviation of E for trait 2

TYPE: float

sd_C2

standard deviation of C for trait 2

TYPE: float

rA

genetic correlation between traits

TYPE: float

rC

common environment correlation between traits

TYPE: float

rE

unique environment correlation between traits

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
offspring

(n, 6) array of [A1, C1, E1, A2, C2, E2]

TYPE: ndarray

sex_offspring

(n,) array of sex values (0=female, 1=male)

TYPE: ndarray

Source code in simace/simulation/simulate.py
def reproduce(
    rng: np.random.Generator,
    pheno: np.ndarray,
    parents: np.ndarray,
    twins: np.ndarray,
    household_ids: np.ndarray,
    sd_A1: float,
    sd_E1: float,
    sd_C1: float,
    sd_A2: float,
    sd_E2: float,
    sd_C2: float,
    rA: float,
    rC: float,
    rE: float = 0.0,
) -> tuple[np.ndarray, np.ndarray]:
    """Simulate offspring phenotypes from parents for two correlated traits.

    Additive genetic values are inherited as midparent + Mendelian noise.
    Common environment (C) is drawn freshly per household — it is NOT
    inherited from parents but represents the offspring's own shared rearing
    environment (siblings share C; parents and children do not). Unique
    environment (E) is drawn independently per individual.

    Args:
        rng: numpy random generator
        pheno: (n, 6) array of [A1, C1, E1, A2, C2, E2] for parents
        parents: (n, 2) array of [mother_idx, father_idx]
        twins: array of MZ twin index pairs
        household_ids: (n,) array mapping each offspring to a household
        sd_A1: standard deviation of A for trait 1
        sd_E1: standard deviation of E for trait 1
        sd_C1: standard deviation of C for trait 1
        sd_A2: standard deviation of A for trait 2
        sd_E2: standard deviation of E for trait 2
        sd_C2: standard deviation of C for trait 2
        rA: genetic correlation between traits
        rC: common environment correlation between traits
        rE: unique environment correlation between traits

    Returns:
        offspring: (n, 6) array of [A1, C1, E1, A2, C2, E2]
        sex_offspring: (n,) array of sex values (0=female, 1=male)
    """
    n = len(parents)

    # Sex assignment
    sex_offspring = rng.binomial(size=n, n=1, p=0.5)

    # Additive genetic: midparent + correlated Mendelian noise
    mp1 = _midparent(pheno[:, 0], parents)  # A1 midparent
    mp2 = _midparent(pheno[:, 3], parents)  # A2 midparent

    noise1, noise2 = generate_mendelian_noise(rng, n, sd_A1, sd_A2, rA)
    a1_offspring = mp1 + noise1
    a2_offspring = mp2 + noise2

    # Common environment: freshly drawn per household each generation.
    # C is NOT inherited from parents -- it reflects the offspring's own
    # shared rearing environment. Siblings share C; parents and children do not.
    # This is the standard ACE model assumption (no autoregressive C transmission).
    # household_ids are already 0-based contiguous integers from mating()
    n_hh = int(household_ids.max()) + 1
    hh_c1, hh_c2 = generate_correlated_components(rng, n_hh, sd_C1, sd_C2, rC)
    c1_offspring = hh_c1[household_ids]
    c2_offspring = hh_c2[household_ids]

    # Unique environment: correlated via rE; independent when rE=0
    if rE != 0:
        e1_offspring, e2_offspring = generate_correlated_components(rng, n, sd_E1, sd_E2, rE)
    else:
        e1_offspring = rng.normal(size=n, loc=0, scale=sd_E1)
        e2_offspring = rng.normal(size=n, loc=0, scale=sd_E2)

    # MZ twins share A values and sex for both traits
    if len(twins) > 0:
        a1_offspring[twins[:, 1]] = a1_offspring[twins[:, 0]]
        a2_offspring[twins[:, 1]] = a2_offspring[twins[:, 0]]
        sex_offspring[twins[:, 1]] = sex_offspring[twins[:, 0]]

    offspring = np.stack(
        [
            a1_offspring,
            c1_offspring,
            e1_offspring,
            a2_offspring,
            c2_offspring,
            e2_offspring,
        ],
        axis=-1,
    )

    return offspring, sex_offspring

add_to_pedigree

add_to_pedigree(pheno, sex, parents, twins, household_ids, generation, pedigree=None)

Add a generation to the pedigree DataFrame (backward-compatible wrapper).

The internal simulation loop uses pre-allocated arrays for performance. This function is retained for external callers and tests.

Source code in simace/simulation/simulate.py
def add_to_pedigree(
    pheno: np.ndarray,
    sex: np.ndarray,
    parents: np.ndarray,
    twins: np.ndarray,
    household_ids: np.ndarray,
    generation: int,
    pedigree: pd.DataFrame | None = None,
) -> pd.DataFrame:
    """Add a generation to the pedigree DataFrame (backward-compatible wrapper).

    The internal simulation loop uses pre-allocated arrays for performance.
    This function is retained for external callers and tests.
    """
    n = len(pheno)
    is_founder = pedigree is None
    id_offset = 0 if is_founder else len(pedigree)
    parent_offset = id_offset - n if not is_founder else 0
    household_offset = 0 if is_founder else int(pedigree["household_id"].iloc[-1]) + 1

    arrays = _init_pedigree_arrays(n)
    _fill_pedigree_slice(
        arrays,
        0,
        pheno,
        sex,
        parents,
        twins,
        household_ids,
        generation=generation,
        is_founder=is_founder,
        id_offset=id_offset,
        parent_offset=parent_offset,
        household_offset=household_offset,
    )
    df = _arrays_to_dataframe(arrays, n)
    if pedigree is not None:
        return pd.concat([pedigree, df], ignore_index=True)
    return df

run_simulation

run_simulation(*, seed, N, G_ped, mating_lambda, p_mztwin, A1, C1, A2, C2, rA, rC, rE=0.0, E1, E2, G_sim=None, assort1=0.0, assort2=0.0, assort_matrix=None)

Run the full ACE simulation for two correlated traits.

Variance components A (additive genetic), C (shared environment), and E (unique environment) are specified as absolute variances. A is constant across generations; C and E may be specified per-generation via a dict mapping generation index to value (forward-filled for missing keys).

PARAMETER DESCRIPTION
seed

Random seed

TYPE: int

N

Population size per generation (positive integer)

TYPE: int

G_ped

Number of generations to record in pedigree (integer >= 1)

TYPE: int

mating_lambda

Poisson lambda for zero-truncated mating count distribution (> 0)

TYPE: float

p_mztwin

Probability of a mating producing MZ twins, in [0, 1)

TYPE: float

A1

Trait 1 additive genetic variance (>= 0).

TYPE: float

C1

Trait 1 shared-environment variance (>= 0).

TYPE: float

A2

Trait 2 additive genetic variance (>= 0).

TYPE: float

C2

Trait 2 shared-environment variance (>= 0).

TYPE: float

rA

Genetic correlation between traits, in [-1, 1]

TYPE: float

rC

Common environment correlation between traits, in [-1, 1]

TYPE: float

rE

Unique environment correlation between traits, in [-1, 1]. Default 0 (independent E across traits).

TYPE: float DEFAULT: 0.0

E1

Trait 1 unique-environment variance. Scalar (constant) or dict mapping generation index → value (forward-filled).

TYPE: float | dict[int, float]

E2

Trait 2 unique-environment variance. Same format as E1.

TYPE: float | dict[int, float]

G_sim

Total generations to simulate (default: G_ped). First G_sim - G_ped generations are burn-in and discarded from output.

TYPE: int | None DEFAULT: None

assort1

Target mate Pearson correlation on trait 1 liability, in [-1, 1].

TYPE: float DEFAULT: 0.0

assort2

Target mate Pearson correlation on trait 2 liability, in [-1, 1].

TYPE: float DEFAULT: 0.0

assort_matrix

Optional full 2x2 mate correlation matrix R_mf. Overrides assort1/assort2 diagonal with matrix diagonal.

TYPE: list[list[float]] | ndarray | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Pedigree DataFrame with columns id, sex, mother, father, twin,

DataFrame

generation, household_id, A1, C1, E1, liability1, A2, C2, E2,

DataFrame

liability2.

RAISES DESCRIPTION
ValueError

if any parameter is outside its valid range

Source code in simace/simulation/simulate.py
@stage(reads=None, writes=PEDIGREE)
def run_simulation(
    *,
    seed: int,
    N: int,
    G_ped: int,
    mating_lambda: float,
    p_mztwin: float,
    A1: float,
    C1: float,
    A2: float,
    C2: float,
    rA: float,
    rC: float,
    rE: float = 0.0,
    E1: float | dict[int, float],
    E2: float | dict[int, float],
    G_sim: int | None = None,
    assort1: float = 0.0,
    assort2: float = 0.0,
    assort_matrix: list[list[float]] | np.ndarray | None = None,
) -> pd.DataFrame:
    """Run the full ACE simulation for two correlated traits.

    Variance components A (additive genetic), C (shared environment), and
    E (unique environment) are specified as absolute variances.  A is constant
    across generations; C and E may be specified per-generation via a dict
    mapping generation index to value (forward-filled for missing keys).

    Args:
        seed: Random seed
        N: Population size per generation (positive integer)
        G_ped: Number of generations to record in pedigree (integer >= 1)
        mating_lambda: Poisson lambda for zero-truncated mating count distribution (> 0)
        p_mztwin: Probability of a mating producing MZ twins, in [0, 1)
        A1: Trait 1 additive genetic variance (>= 0).
        C1: Trait 1 shared-environment variance (>= 0).
        A2: Trait 2 additive genetic variance (>= 0).
        C2: Trait 2 shared-environment variance (>= 0).
        rA: Genetic correlation between traits, in [-1, 1]
        rC: Common environment correlation between traits, in [-1, 1]
        rE: Unique environment correlation between traits, in [-1, 1].
            Default 0 (independent E across traits).
        E1: Trait 1 unique-environment variance.  Scalar (constant) or dict
            mapping generation index → value (forward-filled).
        E2: Trait 2 unique-environment variance.  Same format as E1.
        G_sim: Total generations to simulate (default: G_ped). First G_sim - G_ped
               generations are burn-in and discarded from output.
        assort1: Target mate Pearson correlation on trait 1 liability, in [-1, 1].
        assort2: Target mate Pearson correlation on trait 2 liability, in [-1, 1].
        assort_matrix: Optional full 2x2 mate correlation matrix R_mf.
            Overrides assort1/assort2 diagonal with matrix diagonal.

    Returns:
        Pedigree DataFrame with columns id, sex, mother, father, twin,
        generation, household_id, A1, C1, E1, liability1, A2, C2, E2,
        liability2.

    Raises:
        ValueError: if any parameter is outside its valid range
    """
    if G_sim is None:
        G_sim = G_ped

    # --- Input validation ---
    for name, val in [("A1", A1), ("C1", C1), ("A2", A2), ("C2", C2)]:
        if not (isinstance(val, (int, float)) and val >= 0):
            raise ValueError(f"{name} must be a non-negative scalar, got {val}")

    if not (int(N) == N and N > 0):
        raise ValueError(f"N must be a positive integer, got {N}")
    if not (G_ped == int(G_ped) and G_ped >= 1):
        raise ValueError(f"G_ped must be an integer >= 1, got {G_ped}")
    if not (mating_lambda > 0):
        raise ValueError(f"mating_lambda must be > 0, got {mating_lambda}")
    if not (0 <= p_mztwin < 1):
        raise ValueError(f"p_mztwin must be in [0, 1), got {p_mztwin}")
    if not (-1 <= rA <= 1):
        raise ValueError(f"rA must be in [-1, 1], got {rA}")
    if not (-1 <= rC <= 1):
        raise ValueError(f"rC must be in [-1, 1], got {rC}")
    if not (-1 <= rE <= 1):
        raise ValueError(f"rE must be in [-1, 1], got {rE}")
    if not (-1 <= assort1 <= 1):
        raise ValueError(f"assort1 must be in [-1, 1], got {assort1}")
    if not (-1 <= assort2 <= 1):
        raise ValueError(f"assort2 must be in [-1, 1], got {assort2}")

    # Resolve assort_matrix
    R_mf = None
    if assort_matrix is not None:
        R_mf = np.asarray(assort_matrix, dtype=np.float64)
        if R_mf.shape != (2, 2):
            raise ValueError(f"assort_matrix must be 2x2, got shape {R_mf.shape}")
        if abs(R_mf[0, 1] - R_mf[1, 0]) > 1e-10:
            raise ValueError(f"assort_matrix must be symmetric: got [{R_mf[0, 1]}, {R_mf[1, 0]}]")
        assort1 = float(R_mf[0, 0])
        assort2 = float(R_mf[1, 1])
        if not (-1 <= assort1 <= 1):
            raise ValueError(f"assort_matrix[0,0] must be in [-1, 1], got {assort1}")
        if not (-1 <= assort2 <= 1):
            raise ValueError(f"assort_matrix[1,1] must be in [-1, 1], got {assort2}")

    if G_sim < G_ped:
        raise ValueError(f"G_sim ({G_sim}) must be >= G_ped ({G_ped})")

    logger.info("Starting simulation: N=%d, G_ped=%d, seed=%d", N, G_ped, seed)
    t0 = time.perf_counter()

    rng = np.random.default_rng(seed)

    # Resolve per-generation C and E variance components
    # C1/C2 may be scalar or per-gen dict; A is always scalar (constant)
    C1_per_gen = resolve_per_gen_param(C1, G_sim, name="C1")
    C2_per_gen = resolve_per_gen_param(C2, G_sim, name="C2")
    E1_per_gen = resolve_per_gen_param(E1, G_sim, name="E1")
    E2_per_gen = resolve_per_gen_param(E2, G_sim, name="E2")

    # Compute per-generation standard deviations
    sd_C1_per_gen = [np.sqrt(v) for v in C1_per_gen]
    sd_C2_per_gen = [np.sqrt(v) for v in C2_per_gen]
    sd_E1_per_gen = [np.sqrt(v) for v in E1_per_gen]
    sd_E2_per_gen = [np.sqrt(v) for v in E2_per_gen]

    # A is constant across generations
    sd_A1 = np.sqrt(A1)
    sd_A2 = np.sqrt(A2)

    # Within-person cross-trait liability correlation per C/E generation
    _rho_w_A = rA * np.sqrt(A1 * A2)
    rho_w_per_ce = [
        _rho_w_A + rC * np.sqrt(C1_per_gen[g] * C2_per_gen[g]) + rE * np.sqrt(E1_per_gen[g] * E2_per_gen[g])
        for g in range(G_sim)
    ]

    # Validate |rho_w| < 1 for all C/E generations
    if assort1 != 0 and assort2 != 0:
        for g, rw in enumerate(rho_w_per_ce):
            if abs(rw) >= 1.0 - 1e-10:
                raise ValueError(
                    f"Both-trait assortative mating requires |rho_w| < 1 "
                    f"(got rho_w={rw:.4f} at C/E generation {g}). "
                    f"Traits are perfectly correlated; "
                    f"use single-trait assortment instead."
                )

    # Track whether R_mf was provided explicitly (vs. auto-computed from rho_w)
    R_mf_user = R_mf

    # Validate PSD of full 4x4 Sigma for each generation's rho_w
    if R_mf_user is not None or (assort1 != 0 and assort2 != 0):
        for g, rw in enumerate(rho_w_per_ce):
            if R_mf_user is not None:
                R_mf_g = R_mf_user
            else:
                c = rw * np.sqrt(abs(assort1 * assort2)) * np.sign(assort1 * assort2)
                R_mf_g = np.array([[assort1, c], [c, assort2]])
            R_ff = np.array([[1.0, rw], [rw, 1.0]])
            Sigma_4 = np.block([[R_ff, R_mf_g.T], [R_mf_g, R_ff]])
            eigvals = np.linalg.eigvalsh(Sigma_4)
            if eigvals[0] < -1e-8:
                raise ValueError(
                    f"Full 4x4 mate correlation matrix Sigma_4 is not PSD "
                    f"(min eigenvalue = {eigvals[0]:.6f} at C/E generation {g}). "
                    f"Reduce the magnitude of assort_matrix off-diagonal entries."
                )

    # Initialize founders with correlated components (using gen-0 C/E variances)
    sex = rng.binomial(size=N, n=1, p=0.5)

    # A components: correlated via rA (constant across generations)
    a1, a2 = generate_correlated_components(rng, N, sd_A1, sd_A2, rA)

    # C components: correlated via rC (gen-0 variance)
    c1, c2 = generate_correlated_components(rng, N, sd_C1_per_gen[0], sd_C2_per_gen[0], rC)

    # E components: correlated via rE (gen-0 variance); independent when rE=0
    if rE != 0:
        e1, e2 = generate_correlated_components(rng, N, sd_E1_per_gen[0], sd_E2_per_gen[0], rE)
    else:
        e1 = rng.normal(size=N, loc=0, scale=sd_E1_per_gen[0])
        e2 = rng.normal(size=N, loc=0, scale=sd_E2_per_gen[0])

    pheno = np.stack([a1, c1, e1, a2, c2, e2], axis=-1)

    # Simulate generations
    burnin = G_sim - G_ped

    # Pre-allocate pedigree arrays (avoids pd.concat per generation)
    total_individuals = N * G_ped
    if total_individuals > np.iinfo(np.int32).max:
        raise ValueError(
            f"Total pedigree size {total_individuals:,} exceeds int32 max "
            f"({np.iinfo(np.int32).max:,}). Reduce N or G_ped."
        )
    ped_arrays = _init_pedigree_arrays(total_individuals)
    ped_offset = 0
    id_offset = 0
    household_offset = 0

    for i in range(G_sim):
        t_gen = time.perf_counter()

        # rho_w for the current parental population:
        # founders (i=0) have gen-0 C/E; offspring from iter j have per_gen[j] C/E
        parent_ce_gen = max(0, i - 1)
        rho_w_i = rho_w_per_ce[parent_ce_gen]

        # Auto-compute R_mf for this generation's rho_w if not user-provided
        if R_mf_user is None and assort1 != 0 and assort2 != 0:
            c = rho_w_i * np.sqrt(abs(assort1 * assort2)) * np.sign(assort1 * assort2)
            R_mf_i = np.array([[assort1, c], [c, assort2]])
        else:
            R_mf_i = R_mf_user

        parents, twins, household_ids = mating(
            rng,
            sex,
            mating_lambda,
            p_mztwin,
            pheno=pheno,
            assort1=assort1,
            assort2=assort2,
            rho_w=rho_w_i,
            assort_matrix=R_mf_i,
        )
        t_mate = time.perf_counter()
        pheno, sex = reproduce(
            rng,
            pheno,
            parents,
            twins,
            household_ids,
            sd_A1,
            sd_E1_per_gen[i],
            sd_C1_per_gen[i],
            sd_A2,
            sd_E2_per_gen[i],
            sd_C2_per_gen[i],
            rA,
            rC,
            rE,
        )
        t_repro = time.perf_counter()
        if i >= burnin:
            n = len(sex)
            is_founder = i == burnin
            parent_offset = id_offset - n if not is_founder else 0
            _fill_pedigree_slice(
                ped_arrays,
                ped_offset,
                pheno,
                sex,
                parents,
                twins,
                household_ids,
                generation=i - burnin,
                is_founder=is_founder,
                id_offset=id_offset,
                parent_offset=parent_offset,
                household_offset=household_offset,
            )
            household_offset += int(household_ids.max()) + 1
            id_offset += n
            ped_offset += n
        t_fill = time.perf_counter()

        # Per-generation data shape checkpoints
        fam_sizes = np.bincount(household_ids)
        logger.info(
            "Generation %d: %d twins, mean family size %.2f [mating=%.1fs, reproduce=%.1fs, fill=%.1fs, total=%.1fs]",
            i,
            len(twins) * 2,
            fam_sizes.mean(),
            t_mate - t_gen,
            t_repro - t_mate,
            t_fill - t_repro,
            t_fill - t_gen,
        )

    pedigree = _arrays_to_dataframe(ped_arrays, ped_offset)
    elapsed = time.perf_counter() - t0
    logger.info(
        "Simulation complete in %.1fs: pedigree has %d individuals",
        elapsed,
        len(pedigree),
    )

    return pedigree

cli

cli()

Command-line interface for running ACE simulations.

Source code in simace/simulation/simulate.py
def cli() -> None:
    """Command-line interface for running ACE simulations."""
    import json

    from simace.core.cli_base import add_logging_args, init_logging
    from simace.core.yaml_io import dump_yaml
    from simace.simulation.emit_params import emit_params

    parser = argparse.ArgumentParser(description="Run ACE pedigree simulation")
    add_logging_args(parser)
    parser.add_argument("--seed", type=int, default=42, help="Random seed")
    parser.add_argument("--N", type=int, default=1000, help="Founder population size")
    parser.add_argument("--G-ped", type=int, default=3, help="Number of pedigree generations")
    parser.add_argument("--G-sim", type=int, default=None, help="Number of burn-in generations (default: G_ped)")
    parser.add_argument("--mating-lambda", type=float, default=0.5, help="ZTP mating count lambda")
    parser.add_argument("--p-mztwin", type=float, default=0.02, help="Probability of MZ twinning")
    parser.add_argument("--A1", type=float, default=0.5, help="Additive genetic variance for trait 1")
    parser.add_argument("--C1", type=float, default=0.2, help="Shared environment variance for trait 1")
    parser.add_argument("--E1", type=float, required=True, help="Unique environment variance for trait 1")
    parser.add_argument("--A2", type=float, default=0.5, help="Additive genetic variance for trait 2")
    parser.add_argument("--C2", type=float, default=0.2, help="Shared environment variance for trait 2")
    parser.add_argument("--E2", type=float, required=True, help="Unique environment variance for trait 2")
    parser.add_argument("--rA", type=float, default=0.5, help="Cross-trait genetic correlation")
    parser.add_argument("--rC", type=float, default=0.3, help="Cross-trait shared environment correlation")
    parser.add_argument("--rE", type=float, default=0.0, help="Cross-trait unique environment correlation")
    parser.add_argument("--assort1", type=float, default=0.0, help="Mate correlation on trait 1 liability")
    parser.add_argument("--assort2", type=float, default=0.0, help="Mate correlation on trait 2 liability")
    parser.add_argument(
        "--assort-matrix",
        type=str,
        default=None,
        help='Optional 2x2 mate correlation matrix as JSON, e.g. \'[[0.3, 0.05], [0.05, 0.15]]\'',
    )
    parser.add_argument("--output-pedigree", required=True, help="Output pedigree parquet path")
    parser.add_argument("--output-params", required=True, help="Output params YAML path")
    parser.add_argument("--rep", type=int, default=1, help="Replicate number")
    args = parser.parse_args()

    init_logging(args)

    assort_matrix = json.loads(args.assort_matrix) if args.assort_matrix else None

    pedigree = run_simulation(
        seed=args.seed,
        N=args.N,
        G_ped=args.G_ped,
        mating_lambda=args.mating_lambda,
        p_mztwin=args.p_mztwin,
        A1=args.A1,
        C1=args.C1,
        A2=args.A2,
        C2=args.C2,
        rA=args.rA,
        rC=args.rC,
        rE=args.rE,
        E1=args.E1,
        E2=args.E2,
        G_sim=args.G_sim,
        assort1=args.assort1,
        assort2=args.assort2,
        assort_matrix=assort_matrix,
    )

    save_parquet(pedigree, args.output_pedigree)

    params_dict = emit_params(
        seed=args.seed,
        rep=args.rep,
        A1=args.A1,
        C1=args.C1,
        E1=args.E1,
        A2=args.A2,
        C2=args.C2,
        E2=args.E2,
        rA=args.rA,
        rC=args.rC,
        rE=args.rE,
        N=args.N,
        G_ped=args.G_ped,
        G_sim=args.G_sim,
        mating_lambda=args.mating_lambda,
        p_mztwin=args.p_mztwin,
        assort1=args.assort1,
        assort2=args.assort2,
        assort_matrix=assort_matrix,
    )
    dump_yaml(params_dict, args.output_params, sort_keys=True)

mate_correlation

simace.simulation.mate_correlation

Theoretical expected mate liability correlation matrix.

Companion to the generative _assortative_pair_partners in simace.simulation.simulate: given the same assortative-mating parameters and ACE variance components passed to the simulator, this module returns the expected R_mf matrix that the simulator should produce. Used by validation and plotting code to compare observed vs. expected mate correlations. Change one, change the other.

expected_mate_corr_matrix

expected_mate_corr_matrix(assort1, assort2, rA, rC, A1, C1, A2, C2, assort_matrix=None, rE=0.0, E1=0.0, E2=0.0)

Compute the 2x2 expected mate liability correlation matrix.

Returns E[corr(F_i, M_j)] for i,j in {1,2} given assortative mating parameters and ACE variance components.

With the 4-variate copula algorithm, assort1 and assort2 are target Pearson mate correlations. The cross-mate cross-trait correlation follows from the mechanistic path: c = rho_w * sqrt(|r1r2|) * sign(r1r2), where rho_w is the within-person cross-trait liability correlation.

When assort_matrix is provided, it is returned directly (the user has specified the full R_mf).

PARAMETER DESCRIPTION
assort1

Target mate Pearson correlation for trait 1.

TYPE: float

assort2

Target mate Pearson correlation for trait 2.

TYPE: float

rA

Genetic correlation between traits.

TYPE: float

rC

Shared-environment correlation between traits.

TYPE: float

A1

Additive genetic variance for trait 1.

TYPE: float

C1

Shared-environment variance for trait 1.

TYPE: float

A2

Additive genetic variance for trait 2.

TYPE: float

C2

Shared-environment variance for trait 2.

TYPE: float

assort_matrix

If provided, returned directly as the full R_mf matrix.

TYPE: ndarray | list | None DEFAULT: None

rE

Unique-environment correlation between traits.

TYPE: float DEFAULT: 0.0

E1

Unique-environment variance for trait 1.

TYPE: float DEFAULT: 0.0

E2

Unique-environment variance for trait 2.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
ndarray

2x2 array of expected mate liability correlations E[corr(F_i, M_j)].

Source code in simace/simulation/mate_correlation.py
def expected_mate_corr_matrix(
    assort1: float,
    assort2: float,
    rA: float,
    rC: float,
    A1: float,
    C1: float,
    A2: float,
    C2: float,
    assort_matrix: np.ndarray | list | None = None,
    rE: float = 0.0,
    E1: float = 0.0,
    E2: float = 0.0,
) -> np.ndarray:
    """Compute the 2x2 expected mate liability correlation matrix.

    Returns E[corr(F_i, M_j)] for i,j in {1,2} given assortative mating
    parameters and ACE variance components.

    With the 4-variate copula algorithm, assort1 and assort2 are target
    Pearson mate correlations. The cross-mate cross-trait correlation follows
    from the mechanistic path: c = rho_w * sqrt(|r1*r2|) * sign(r1*r2),
    where rho_w is the within-person cross-trait liability correlation.

    When ``assort_matrix`` is provided, it is returned directly (the user
    has specified the full R_mf).

    Args:
        assort1: Target mate Pearson correlation for trait 1.
        assort2: Target mate Pearson correlation for trait 2.
        rA: Genetic correlation between traits.
        rC: Shared-environment correlation between traits.
        A1: Additive genetic variance for trait 1.
        C1: Shared-environment variance for trait 1.
        A2: Additive genetic variance for trait 2.
        C2: Shared-environment variance for trait 2.
        assort_matrix: If provided, returned directly as the full R_mf matrix.
        rE: Unique-environment correlation between traits.
        E1: Unique-environment variance for trait 1.
        E2: Unique-environment variance for trait 2.

    Returns:
        2x2 array of expected mate liability correlations ``E[corr(F_i, M_j)]``.
    """
    if assort_matrix is not None:
        return np.asarray(assort_matrix, dtype=np.float64)

    if assort1 == 0 and assort2 == 0:
        return np.zeros((2, 2))

    # Within-person cross-trait correlation
    rho_w = rA * np.sqrt(A1 * A2) + rC * np.sqrt(C1 * C2) + rE * np.sqrt(E1 * E2)

    if assort1 != 0 and assort2 != 0:
        # Both traits: diagonal = targets, off-diagonal from rho_w mediation
        c = rho_w * np.sqrt(abs(assort1 * assort2)) * np.sign(assort1 * assort2)
        return np.array([[assort1, c], [c, assort2]])
    if assort1 != 0:
        # Single-trait on trait 1: propagate via rho_w
        a = assort1
        return np.array(
            [
                [a, a * rho_w],
                [a * rho_w, a * rho_w**2],
            ]
        )
    # Single-trait on trait 2: propagate via rho_w
    a = assort2
    return np.array(
        [
            [a * rho_w**2, a * rho_w],
            [a * rho_w, a],
        ]
    )