Continuous Articulatory Feature Learning — Implementation Plan¶
For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Learn continuous articulatory feature vectors for 40 GenAm phonemes + 5 diphthongs via Bayesian inference, replacing PHOIBLE vectors.
Architecture: Hayes (2009) 26-feature discrete matrix initializes Beta priors. ECCC perceptual confusion data provides likelihood signal. PyMC NUTS samples 1,042 parameters (1,040 features + 2 onset/offset weights). Composite vectors (α·v_onset + β·v_offset) unify monophthongs and diphthongs.
Tech Stack: Python 3.10+, PyMC, ArviZ, numpy, pandas, python-Levenshtein, pytest, hatchling.
Spec: docs/superpowers/specs/2026-03-13-continuous-feature-learning-design.md
File Structure¶
packages/features/
├── src/phonolex_features/
│ ├── __init__.py
│ ├── prior.py # Load Hayes CSV → Beta(α, β) parameter arrays
│ ├── evidence/
│ │ ├── __init__.py
│ │ ├── eccc.py # Parse ECCC CSV, BrE→GenAm mapping, edit-distance
│ │ │ # alignment, phoneme-pair confusion aggregation
│ │ └── alternations.py # (Phase 2 — stub only in this plan)
│ ├── model.py # PyMC model: priors, composite vectors, likelihood
│ ├── composite.py # Composite vector computation: α·v_onset + β·v_offset
│ │ # Diphthong inventory mapping. Output generation.
│ ├── validate.py # Four validation stages: coherence, PHOIBLE regression,
│ │ # held-out prediction, clinical face validity
│ └── config.py # Load TOML config, expose typed hyperparameters
├── data/
│ ├── build_features_ipa.py # Hayes matrix generator (relocated from repo root)
│ └── phonolex_features_ipa.csv # Generated 40×26 matrix (relocated from repo root)
├── configs/
│ └── default.toml # All hyperparameters, seeds, NUTS settings
├── outputs/ # Gitignored — generated artifacts
├── tests/
│ ├── __init__.py
│ ├── test_prior.py
│ ├── test_eccc.py
│ ├── test_composite.py
│ ├── test_config.py
│ ├── test_model.py
│ └── test_validate.py
├── pyproject.toml
└── .gitignore # outputs/
Chunk 1: Package Scaffolding and Prior¶
Task 1: Package setup¶
Files:
- Create: packages/features/pyproject.toml
- Create: packages/features/src/phonolex_features/__init__.py
- Create: packages/features/.gitignore
- Modify: pyproject.toml (workspace root — add packages/features to members)
- Move: phonolex_features_ipa.csv → packages/features/data/phonolex_features_ipa.csv
- Move: build_features_ipa.py → packages/features/data/build_features_ipa.py
- [ ] Step 1: Create pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "phonolex-features"
version = "0.1.0"
description = "Bayesian articulatory feature learning for PhonoLex"
license = "CC-BY-SA-3.0"
requires-python = ">=3.10"
dependencies = [
"numpy>=1.24",
"pandas>=2.0",
"pymc>=5.10",
"arviz>=0.17",
"python-Levenshtein>=0.23",
"matplotlib>=3.7",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"ruff>=0.4",
]
[tool.hatch.build.targets.wheel]
packages = ["src/phonolex_features"]
[tool.ruff]
target-version = "py310"
line-length = 100
[tool.pytest.ini_options]
testpaths = ["tests"]
- [ ] Step 2: Create init.py
"""PhonoLex continuous articulatory feature learning."""
- [ ] Step 3: Create .gitignore
outputs/
- [ ] Step 4: Add to workspace root pyproject.toml
Add "packages/features" to [tool.uv.workspace] members.
- [ ] Step 5: Move Hayes matrix files
mkdir -p packages/features/data
mv phonolex_features_ipa.csv packages/features/data/
mv build_features_ipa.py packages/features/data/
- [ ] Step 6: Update build_features_ipa.py output path
Change the output_path variable on line 315 from /home/claude/phonolex_features_ipa.csv to a relative path:
output_path = Path(__file__).parent / 'phonolex_features_ipa.csv'
Add from pathlib import Path at the top (after import csv).
- [ ] Step 7: Create empty directories
mkdir -p packages/features/src/phonolex_features/evidence
mkdir -p packages/features/configs
mkdir -p packages/features/outputs
mkdir -p packages/features/tests
touch packages/features/src/phonolex_features/evidence/__init__.py
touch packages/features/tests/__init__.py
- [ ] Step 8: Install package in editable mode
uv pip install -e packages/features
Expected: installs successfully.
- [ ] Step 9: Commit
git add packages/features/ pyproject.toml
git commit -m "feat(features): scaffold phonolex-features package with Hayes matrix"
Task 2: Config module¶
Files:
- Create: packages/features/configs/default.toml
- Create: packages/features/src/phonolex_features/config.py
- Create: packages/features/tests/test_config.py
- [ ] Step 1: Write the config TOML
[prior]
concentration = 20 # alpha + beta for +/- features
[prior.na]
alpha = 1.0 # Beta(1,1) = uniform for structurally inapplicable
beta = 1.0
[salience]
alpha_scale = 1.0 # HalfNormal scale for onset weight
beta_scale = 1.0 # HalfNormal scale for offset weight
[likelihood]
intercept = 2.0 # logistic link 'a': baseline logit when distance=0
slope = 5.0 # logistic link 'b': how fast logit drops with distance
sigma = 1.0 # observation noise on logit-confusion
distance = "euclidean"
[nuts]
draws = 2000
tune = 1000
chains = 4
target_accept = 0.95
seed = 42
[data]
hayes_csv = "data/phonolex_features_ipa.csv"
eccc_csv = "../../data/norms/eccc/confusionCorpus_v1.2.csv"
- [ ] Step 2: Write the failing test
# packages/features/tests/test_config.py
from pathlib import Path
from phonolex_features.config import load_config
def test_load_default_config():
cfg = load_config()
assert cfg.prior.concentration == 20
assert cfg.nuts.seed == 42
assert cfg.likelihood.slope == 5.0
assert cfg.salience.alpha_scale == 1.0
def test_load_config_from_path():
path = Path(__file__).parent.parent / "configs" / "default.toml"
cfg = load_config(path)
assert cfg.prior.concentration == 20
def test_config_data_paths_resolve():
cfg = load_config()
# Paths should be strings, not validated for existence at load time
assert isinstance(cfg.data.hayes_csv, str)
assert isinstance(cfg.data.eccc_csv, str)
- [ ] Step 3: Run test to verify it fails
Run: cd packages/features && python -m pytest tests/test_config.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'phonolex_features.config'
- [ ] Step 4: Write the config module
# packages/features/src/phonolex_features/config.py
"""Hyperparameter configuration loader."""
from __future__ import annotations
import tomllib
from dataclasses import dataclass
from pathlib import Path
_DEFAULT_CONFIG = Path(__file__).parent.parent.parent / "configs" / "default.toml"
@dataclass(frozen=True)
class PriorNAConfig:
alpha: float
beta: float
@dataclass(frozen=True)
class PriorConfig:
concentration: int
na: PriorNAConfig
@dataclass(frozen=True)
class SalienceConfig:
alpha_scale: float
beta_scale: float
@dataclass(frozen=True)
class LikelihoodConfig:
intercept: float
slope: float
sigma: float
distance: str
@dataclass(frozen=True)
class NUTSConfig:
draws: int
tune: int
chains: int
target_accept: float
seed: int
@dataclass(frozen=True)
class DataConfig:
hayes_csv: str
eccc_csv: str
@dataclass(frozen=True)
class Config:
prior: PriorConfig
salience: SalienceConfig
likelihood: LikelihoodConfig
nuts: NUTSConfig
data: DataConfig
def load_config(path: Path | None = None) -> Config:
"""Load configuration from a TOML file."""
path = path or _DEFAULT_CONFIG
with open(path, "rb") as f:
raw = tomllib.load(f)
return Config(
prior=PriorConfig(
concentration=raw["prior"]["concentration"],
na=PriorNAConfig(**raw["prior"]["na"]),
),
salience=SalienceConfig(**raw["salience"]),
likelihood=LikelihoodConfig(**raw["likelihood"]),
nuts=NUTSConfig(**raw["nuts"]),
data=DataConfig(**raw["data"]),
)
- [ ] Step 5: Run test to verify it passes
Run: cd packages/features && python -m pytest tests/test_config.py -v
Expected: 3 passed.
- [ ] Step 6: Commit
git add packages/features/configs/ packages/features/src/phonolex_features/config.py packages/features/tests/test_config.py
git commit -m "feat(features): add config module with TOML loader"
Task 3: Prior module¶
Files:
- Create: packages/features/src/phonolex_features/prior.py
- Create: packages/features/tests/test_prior.py
- [ ] Step 1: Write the failing test
# packages/features/tests/test_prior.py
import numpy as np
from phonolex_features.prior import load_hayes_prior, SEGMENTS, FEATURES
def test_segments_count():
assert len(SEGMENTS) == 40
def test_features_count():
assert len(FEATURES) == 26
def test_load_hayes_prior_shape():
alphas, betas = load_hayes_prior()
assert alphas.shape == (40, 26)
assert betas.shape == (40, 26)
def test_plus_feature_maps_to_high_alpha():
"""'+' → Beta(19, 1): alpha=19, beta=1."""
alphas, betas = load_hayes_prior()
# /p/ is segment 0, 'consonantal' is feature 1 → should be '+'
p_idx = SEGMENTS.index("p")
cons_idx = FEATURES.index("consonantal")
assert alphas[p_idx, cons_idx] == 19.0
assert betas[p_idx, cons_idx] == 1.0
def test_minus_feature_maps_to_high_beta():
"""'-' → Beta(1, 19): alpha=1, beta=19."""
alphas, betas = load_hayes_prior()
# /p/ is segment 0, 'syllabic' is feature 0 → should be '-'
p_idx = SEGMENTS.index("p")
syl_idx = FEATURES.index("syllabic")
assert alphas[p_idx, syl_idx] == 1.0
assert betas[p_idx, syl_idx] == 19.0
def test_zero_feature_maps_to_uniform():
"""'0' → Beta(1, 1): uniform."""
alphas, betas = load_hayes_prior()
# /p/ is labial, so 'anterior' should be '0' (non-coronal)
p_idx = SEGMENTS.index("p")
ant_idx = FEATURES.index("anterior")
assert alphas[p_idx, ant_idx] == 1.0
assert betas[p_idx, ant_idx] == 1.0
def test_custom_concentration():
alphas, betas = load_hayes_prior(concentration=10)
p_idx = SEGMENTS.index("p")
cons_idx = FEATURES.index("consonantal")
# '+' with concentration 10 → Beta(9, 1)
assert alphas[p_idx, cons_idx] == 9.0
assert betas[p_idx, cons_idx] == 1.0
def test_all_segments_present():
"""Verify key segments from consonants and vowels are in the list."""
for seg in ["p", "b", "t", "d", "k", "ɡ", "tʃ", "dʒ",
"f", "v", "θ", "ð", "s", "z", "ʃ", "ʒ", "h",
"m", "n", "ŋ", "l", "ɹ", "w", "j",
"i", "ɪ", "e", "ɛ", "æ", "a", "ɑ", "ɒ",
"ɔ", "o", "ʊ", "u", "ʌ", "ə", "ɝ", "ɚ"]:
assert seg in SEGMENTS, f"Missing segment: {seg}"
- [ ] Step 2: Run test to verify it fails
Run: cd packages/features && python -m pytest tests/test_prior.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'phonolex_features.prior'
- [ ] Step 3: Write the prior module
# packages/features/src/phonolex_features/prior.py
"""Hayes (2009) feature matrix → Beta distribution parameters."""
from __future__ import annotations
import csv
from pathlib import Path
import numpy as np
_CSV_PATH = Path(__file__).parent.parent.parent / "data" / "phonolex_features_ipa.csv"
FEATURES: list[str] = [
"syllabic", "consonantal", "sonorant", "continuant", "delayed_release",
"approximant", "tap", "trill", "nasal",
"voice", "spread_gl", "constr_gl",
"labial", "round", "labiodental",
"coronal", "anterior", "distributed", "strident", "lateral",
"dorsal", "high", "low", "front", "back", "tense",
]
def _load_segments() -> list[str]:
"""Read segment names from the CSV at import time."""
segs: list[str] = []
with open(_CSV_PATH, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
segs.append(row["ipa"])
return segs
SEGMENTS: list[str] = _load_segments()
def _load_rows() -> dict[str, dict[str, str]]:
"""Load feature values from the CSV."""
rows: dict[str, dict[str, str]] = {}
with open(_CSV_PATH, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
rows[row["ipa"]] = {feat: row[feat] for feat in FEATURES}
return rows
def load_hayes_prior(
concentration: int = 20,
na_alpha: float = 1.0,
na_beta: float = 1.0,
csv_path: Path | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Load Hayes matrix and convert to Beta parameters.
Args:
concentration: alpha + beta for +/- features (default 20).
na_alpha: Alpha for structurally inapplicable features.
na_beta: Beta for structurally inapplicable features.
csv_path: Override path to Hayes CSV.
Returns:
(alphas, betas): arrays of shape (40, 26) with Beta distribution parameters.
"""
if csv_path is not None:
# Re-read from alternate path
segs: list[str] = []
rows_dict: dict[str, dict[str, str]] = {}
with open(csv_path, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
segs.append(row["ipa"])
rows_dict[row["ipa"]] = {feat: row[feat] for feat in FEATURES}
segments, rows = segs, rows_dict
else:
segments = SEGMENTS
rows = _load_rows()
n_seg = len(segments)
n_feat = len(FEATURES)
alphas = np.zeros((n_seg, n_feat), dtype=np.float64)
betas = np.zeros((n_seg, n_feat), dtype=np.float64)
for i, seg in enumerate(segments):
for j, feat in enumerate(FEATURES):
val = rows[seg][feat]
if val == "+":
alphas[i, j] = concentration - 1
betas[i, j] = 1.0
elif val == "-":
alphas[i, j] = 1.0
betas[i, j] = concentration - 1
else: # "0"
alphas[i, j] = na_alpha
betas[i, j] = na_beta
return alphas, betas
- [ ] Step 4: Run test to verify it passes
Run: cd packages/features && python -m pytest tests/test_prior.py -v
Expected: 8 passed.
- [ ] Step 5: Commit
git add packages/features/src/phonolex_features/prior.py packages/features/tests/test_prior.py
git commit -m "feat(features): add prior module — Hayes matrix to Beta params"
Task 4: Composite vector module¶
Files:
- Create: packages/features/src/phonolex_features/composite.py
- Create: packages/features/tests/test_composite.py
- [ ] Step 1: Write the failing test
# packages/features/tests/test_composite.py
import numpy as np
from phonolex_features.composite import (
DIPHTHONGS,
compute_composite,
compute_all_composites,
)
from phonolex_features.prior import SEGMENTS
def test_diphthong_count():
assert len(DIPHTHONGS) == 5
def test_diphthong_components_are_valid_segments():
for diph, (onset, offset) in DIPHTHONGS.items():
assert onset in SEGMENTS, f"Diphthong {diph} onset {onset} not in segments"
assert offset in SEGMENTS, f"Diphthong {diph} offset {offset} not in segments"
def test_monophthong_composite():
"""Monophthong: c = (alpha + beta) * v."""
v = np.array([0.9, 0.1, 0.5])
alpha, beta = 1.2, 0.8
c = compute_composite(v, v, alpha, beta)
expected = (alpha + beta) * v
np.testing.assert_allclose(c, expected)
def test_diphthong_composite():
"""Diphthong: c = alpha * v_onset + beta * v_offset."""
v_onset = np.array([0.9, 0.1, 0.5])
v_offset = np.array([0.1, 0.9, 0.5])
alpha, beta = 1.0, 1.0
c = compute_composite(v_onset, v_offset, alpha, beta)
expected = alpha * v_onset + beta * v_offset
np.testing.assert_allclose(c, expected)
def test_diphthong_magnitude_less_than_monophthong():
"""Wide diphthong has smaller magnitude than monophthong."""
v_a = np.array([0.9, 0.1, 0.0, 0.95])
v_i = np.array([0.1, 0.9, 0.95, 0.05])
alpha, beta = 1.0, 1.0
mono = compute_composite(v_a, v_a, alpha, beta)
diph = compute_composite(v_a, v_i, alpha, beta)
assert np.linalg.norm(mono) > np.linalg.norm(diph)
def test_compute_all_composites_shape():
"""Should return 45 composites (40 mono + 5 diph), each 26d."""
# Use fake phi for speed — 40 segments × 26 features
phi = np.random.default_rng(0).uniform(0, 1, (40, 26))
alpha, beta = 1.0, 1.0
labels, composites = compute_all_composites(phi, alpha, beta)
assert len(labels) == 45
assert composites.shape == (45, 26)
def test_compute_all_composites_monophthong_entry():
"""First 40 entries should be monophthong composites."""
phi = np.ones((40, 26)) * 0.5
alpha, beta = 1.0, 1.0
labels, composites = compute_all_composites(phi, alpha, beta)
# Monophthong: (alpha + beta) * v = 2.0 * 0.5 = 1.0 everywhere
np.testing.assert_allclose(composites[0], np.ones(26))
- [ ] Step 2: Run test to verify it fails
Run: cd packages/features && python -m pytest tests/test_composite.py -v
Expected: FAIL — ModuleNotFoundError
- [ ] Step 3: Write the composite module
# packages/features/src/phonolex_features/composite.py
"""Composite vector computation for monophthongs and diphthongs."""
from __future__ import annotations
import numpy as np
from phonolex_features.prior import SEGMENTS
# Diphthong → (onset monophthong, offset monophthong)
DIPHTHONGS: dict[str, tuple[str, str]] = {
"eɪ": ("e", "ɪ"),
"oʊ": ("o", "ʊ"),
"aɪ": ("a", "ɪ"),
"aʊ": ("a", "ʊ"),
"ɔɪ": ("ɔ", "ɪ"),
}
def compute_composite(
v_onset: np.ndarray,
v_offset: np.ndarray,
alpha: float,
beta: float,
) -> np.ndarray:
"""Compute composite vector: α·v_onset + β·v_offset."""
return alpha * v_onset + beta * v_offset
def compute_all_composites(
phi: np.ndarray,
alpha: float,
beta: float,
) -> tuple[list[str], np.ndarray]:
"""Compute composite vectors for all 40 monophthongs + 5 diphthongs.
Args:
phi: Learned feature matrix, shape (40, 26). Row order matches SEGMENTS.
alpha: Onset salience weight.
beta: Offset salience weight.
Returns:
(labels, composites): labels is a list of 45 segment names,
composites is shape (45, 26).
"""
from phonolex_features.prior import SEGMENTS # ensure loaded
n_seg = len(SEGMENTS)
n_feat = phi.shape[1]
seg_to_idx = {s: i for i, s in enumerate(SEGMENTS)}
labels: list[str] = []
rows: list[np.ndarray] = []
# Monophthongs: onset = offset
for i, seg in enumerate(SEGMENTS):
labels.append(seg)
rows.append(compute_composite(phi[i], phi[i], alpha, beta))
# Diphthongs
for diph, (onset, offset) in DIPHTHONGS.items():
onset_idx = seg_to_idx[onset]
offset_idx = seg_to_idx[offset]
labels.append(diph)
rows.append(compute_composite(phi[onset_idx], phi[offset_idx], alpha, beta))
return labels, np.array(rows)
- [ ] Step 4: Run test to verify it passes
Run: cd packages/features && python -m pytest tests/test_composite.py -v
Expected: 7 passed.
- [ ] Step 5: Commit
git add packages/features/src/phonolex_features/composite.py packages/features/tests/test_composite.py
git commit -m "feat(features): add composite vector module for mono/diphthongs"
Chunk 2: ECCC Evidence Extraction¶
Task 5: BrE→GenAm phoneme mapping and ECCC parsing¶
Files:
- Create: packages/features/src/phonolex_features/evidence/eccc.py
- Create: packages/features/tests/test_eccc.py
- [ ] Step 1: Write the failing tests — phoneme mapping
# packages/features/tests/test_eccc.py
import numpy as np
from phonolex_features.evidence.eccc import (
map_bre_to_genam,
parse_ipa_sequence,
align_phoneme_sequences,
extract_confusion_pairs,
load_eccc,
)
from phonolex_features.prior import SEGMENTS
def test_map_bre_to_genam_lot_vowel():
"""BrE /ɒ/ → GenAm /ɑ/."""
assert map_bre_to_genam("ɒ") == "ɑ"
def test_map_bre_to_genam_nurse_vowel():
"""BrE /ɜ/ → GenAm /ɝ/."""
assert map_bre_to_genam("ɜ") == "ɝ"
def test_map_bre_to_genam_passthrough():
"""Most phonemes pass through unchanged."""
assert map_bre_to_genam("p") == "p"
assert map_bre_to_genam("s") == "s"
assert map_bre_to_genam("i") == "i"
def test_map_bre_to_genam_excluded():
"""Centering diphthong components return None."""
assert map_bre_to_genam("ɪə") is None
def test_parse_ipa_sequence():
"""Parse space-delimited IPA with stress markers stripped."""
result = parse_ipa_sequence("! k æ t")
assert result == ["k", "æ", "t"]
def test_parse_ipa_sequence_syllable_boundary():
"""Syllable boundaries (.) are stripped."""
result = parse_ipa_sequence("! k æ . t ɪ ŋ")
assert result == ["k", "æ", "t", "ɪ", "ŋ"]
def test_parse_ipa_sequence_diphthong_to_onset():
"""Diphthong tokens map to onset monophthong."""
result = parse_ipa_sequence("! b aɪ . ə")
assert result == ["b", "a", "ə"] # aɪ → a
- [ ] Step 2: Run test to verify it fails
Run: cd packages/features && python -m pytest tests/test_eccc.py::test_map_bre_to_genam_lot_vowel -v
Expected: FAIL — ModuleNotFoundError
- [ ] Step 3: Write the ECCC module — phoneme mapping and parsing
# packages/features/src/phonolex_features/evidence/eccc.py
"""ECCC confusion corpus → phoneme-level confusion probabilities."""
from __future__ import annotations
import csv
from collections import defaultdict
from pathlib import Path
import numpy as np
from phonolex_features.prior import SEGMENTS
# BrE → GenAm phoneme mapping. None = unmappable (skip entire word pair).
_BRE_TO_GENAM: dict[str, str | None] = {
"ɒ": "ɑ", # LOT vowel
"ɜ": "ɝ", # NURSE vowel (non-rhotic → rhotic)
"ɪə": None, # centering diphthong — exclude
"ɛə": None, # centering diphthong — exclude
"ʊə": None, # centering diphthong — exclude
}
# ECCC represents diphthongs as single tokens. Map to onset monophthong
# for alignment purposes — the onset is the perceptually dominant component.
_DIPHTHONG_TO_ONSET: dict[str, str] = {
"eɪ": "e",
"oʊ": "o",
"aɪ": "a",
"aʊ": "a",
"ɔɪ": "ɔ",
}
def map_bre_to_genam(phoneme: str) -> str | None:
"""Map a BrE phoneme to its GenAm equivalent.
Returns None if the phoneme has no GenAm equivalent and should be excluded.
Returns the phoneme unchanged if no mapping is needed.
"""
return _BRE_TO_GENAM.get(phoneme, phoneme)
def parse_ipa_sequence(ipa_str: str) -> list[str]:
"""Parse a space-delimited IPA string into a list of phonemes.
Strips stress markers (!) and syllable boundaries (.).
Maps diphthong tokens to their onset monophthong.
"""
tokens = ipa_str.strip().split()
result: list[str] = []
for t in tokens:
if t in ("!", "."):
continue
# Map diphthong tokens to onset monophthong
mapped = _DIPHTHONG_TO_ONSET.get(t, t)
result.append(mapped)
return result
def align_phoneme_sequences(
target: list[str], confusion: list[str]
) -> list[tuple[str | None, str | None]]:
"""Align two phoneme sequences using minimum edit distance.
Returns a list of (target_phoneme, confusion_phoneme) pairs.
None indicates an insertion or deletion.
"""
n, m = len(target), len(confusion)
# DP table
dp = [[0] * (m + 1) for _ in range(n + 1)]
for i in range(n + 1):
dp[i][0] = i
for j in range(m + 1):
dp[0][j] = j
for i in range(1, n + 1):
for j in range(1, m + 1):
if target[i - 1] == confusion[j - 1]:
dp[i][j] = dp[i - 1][j - 1]
else:
dp[i][j] = 1 + min(
dp[i - 1][j], # deletion
dp[i][j - 1], # insertion
dp[i - 1][j - 1], # substitution
)
# Backtrace
alignment: list[tuple[str | None, str | None]] = []
i, j = n, m
while i > 0 or j > 0:
if i > 0 and j > 0 and target[i - 1] == confusion[j - 1]:
alignment.append((target[i - 1], confusion[j - 1]))
i -= 1
j -= 1
elif i > 0 and j > 0 and dp[i][j] == dp[i - 1][j - 1] + 1:
alignment.append((target[i - 1], confusion[j - 1]))
i -= 1
j -= 1
elif i > 0 and dp[i][j] == dp[i - 1][j] + 1:
alignment.append((target[i - 1], None))
i -= 1
else:
alignment.append((None, confusion[j - 1]))
j -= 1
alignment.reverse()
return alignment
def extract_confusion_pairs(
alignment: list[tuple[str | None, str | None]],
weight: float = 1.0,
) -> list[tuple[str, str, float]]:
"""Extract phoneme-pair confusion observations from an alignment.
Substitutions contribute positive confusion evidence (down-weighted by
1/n_substitutions for multi-site confusions). Insertions/deletions are
excluded. Identical pairs contribute negative (non-confusion) evidence.
Returns list of (phoneme1, phoneme2, observation_weight).
"""
subs = [(t, c) for t, c in alignment if t is not None and c is not None and t != c]
matches = [(t, c) for t, c in alignment if t is not None and c is not None and t == c]
n_subs = len(subs)
sub_weight = weight / max(n_subs, 1) if n_subs > 0 else 0.0
pairs: list[tuple[str, str, float]] = []
for t, c in subs:
pairs.append((t, c, sub_weight))
for t, c in matches:
pairs.append((t, c, weight))
return pairs
def load_eccc(
csv_path: str | Path,
) -> dict[tuple[str, str], float]:
"""Load ECCC and compute phoneme-pair confusion probabilities.
Returns:
Dictionary mapping (phoneme1, phoneme2) → confusion probability.
Keys are ordered canonically (sorted IPA).
"""
confusion_counts: dict[tuple[str, str], float] = defaultdict(float)
total_counts: dict[tuple[str, str], float] = defaultdict(float)
genam_set = set(SEGMENTS)
with open(csv_path, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
target_ipa = row.get("Target-IPA", "").strip()
confusion_ipa = row.get("Confusion-IPA", "").strip()
if not target_ipa or not confusion_ipa:
continue
target_seq = parse_ipa_sequence(target_ipa)
confusion_seq = parse_ipa_sequence(confusion_ipa)
# Map BrE → GenAm. Skip entire word pair if any phoneme is unmappable.
target_mapped: list[str] = []
skip = False
for p in target_seq:
m = map_bre_to_genam(p)
if m is None:
skip = True
break
target_mapped.append(m)
if skip:
continue
confusion_mapped: list[str] = []
for p in confusion_seq:
m = map_bre_to_genam(p)
if m is None:
skip = True
break
confusion_mapped.append(m)
if skip:
continue
# Skip if any phoneme not in GenAm inventory
if not all(p in genam_set for p in target_mapped):
continue
if not all(p in genam_set for p in confusion_mapped):
continue
alignment = align_phoneme_sequences(target_mapped, confusion_mapped)
# Get consistency count as weight
try:
consistency = int(row.get("Consistency", "1"))
except ValueError:
consistency = 1
pairs = extract_confusion_pairs(alignment, weight=float(consistency))
for p1, p2, w in pairs:
key = tuple(sorted([p1, p2]))
if p1 != p2:
confusion_counts[key] += w
total_counts[key] += w
# Compute confusion probabilities
result: dict[tuple[str, str], float] = {}
for pair, total in total_counts.items():
if total > 0:
result[pair] = confusion_counts.get(pair, 0.0) / total
return result
- [ ] Step 4: Run phoneme mapping tests
Run: cd packages/features && python -m pytest tests/test_eccc.py -v -k "map_bre or parse_ipa"
Expected: 7 passed.
- [ ] Step 5: Write alignment and extraction tests
Add to tests/test_eccc.py:
def test_align_identical_sequences():
aligned = align_phoneme_sequences(["k", "æ", "t"], ["k", "æ", "t"])
assert aligned == [("k", "k"), ("æ", "æ"), ("t", "t")]
def test_align_single_substitution():
aligned = align_phoneme_sequences(["k", "æ", "t"], ["b", "æ", "t"])
subs = [(t, c) for t, c in aligned if t != c and t is not None and c is not None]
assert len(subs) == 1
assert subs[0] == ("k", "b")
def test_align_insertion():
aligned = align_phoneme_sequences(["k", "æ", "t"], ["k", "æ", "t", "s"])
assert (None, "s") in aligned
def test_extract_single_sub_weight():
alignment = [("k", "b"), ("æ", "æ"), ("t", "t")]
pairs = extract_confusion_pairs(alignment, weight=10.0)
subs = [(p1, p2, w) for p1, p2, w in pairs if p1 != p2]
assert len(subs) == 1
assert subs[0] == ("k", "b", 10.0) # 1 sub → weight/1
def test_extract_multi_sub_downweight():
alignment = [("k", "b"), ("æ", "ɛ"), ("t", "t")]
pairs = extract_confusion_pairs(alignment, weight=10.0)
subs = [(p1, p2, w) for p1, p2, w in pairs if p1 != p2]
assert len(subs) == 2
assert all(w == 5.0 for _, _, w in subs) # 2 subs → weight/2
def test_load_eccc_returns_dict():
"""Integration test — loads actual ECCC data."""
from pathlib import Path
from phonolex_features.config import load_config
cfg = load_config()
pkg_root = Path(__file__).parent.parent
eccc_path = (pkg_root / cfg.data.eccc_csv).resolve()
if not eccc_path.exists():
import pytest
pytest.skip("ECCC data not available")
result = load_eccc(eccc_path)
assert isinstance(result, dict)
assert len(result) > 0
# All values should be probabilities in [0, 1]
for pair, prob in result.items():
assert 0.0 <= prob <= 1.0, f"{pair}: {prob}"
assert len(pair) == 2
- [ ] Step 6: Run all ECCC tests
Run: cd packages/features && python -m pytest tests/test_eccc.py -v
Expected: 13 passed.
- [ ] Step 7: Commit
git add packages/features/src/phonolex_features/evidence/eccc.py packages/features/tests/test_eccc.py
git commit -m "feat(features): add ECCC evidence extraction with BrE→GenAm mapping"
Chunk 3: PyMC Model and Inference¶
Task 6: PyMC model specification¶
Files:
- Create: packages/features/src/phonolex_features/model.py
- Create: packages/features/tests/test_model.py
- [ ] Step 1: Write the failing test
# packages/features/tests/test_model.py
import numpy as np
import pymc as pm
from phonolex_features.model import build_model
from phonolex_features.prior import load_hayes_prior, SEGMENTS, FEATURES
from phonolex_features.config import load_config
def test_build_model_returns_pymc_model():
cfg = load_config()
alphas, betas = load_hayes_prior(concentration=cfg.prior.concentration)
# Minimal confusion data for testing
confusion_data = {("p", "b"): 0.3, ("t", "d"): 0.25}
model = build_model(alphas, betas, confusion_data, cfg)
assert isinstance(model, pm.Model)
def test_model_has_expected_variables():
cfg = load_config()
alphas, betas = load_hayes_prior(concentration=cfg.prior.concentration)
confusion_data = {("p", "b"): 0.3}
model = build_model(alphas, betas, confusion_data, cfg)
var_names = {v.name for v in model.free_RVs}
assert "phi_logit" in var_names # logit-space reparameterization
assert "onset_weight" in var_names
assert "offset_weight" in var_names
def test_model_phi_shape():
cfg = load_config()
alphas, betas = load_hayes_prior(concentration=cfg.prior.concentration)
confusion_data = {("p", "b"): 0.3}
model = build_model(alphas, betas, confusion_data, cfg)
# phi should be (40, 26)
with model:
phi_var = model["phi"]
assert phi_var.eval().shape == (40, 26)
- [ ] Step 2: Run test to verify it fails
Run: cd packages/features && python -m pytest tests/test_model.py::test_build_model_returns_pymc_model -v
Expected: FAIL — ModuleNotFoundError
- [ ] Step 3: Write the model module
# packages/features/src/phonolex_features/model.py
"""PyMC model: Beta priors, composite vectors, confusion likelihood."""
from __future__ import annotations
import numpy as np
import pymc as pm
import pytensor.tensor as pt
from phonolex_features.config import Config
from phonolex_features.prior import SEGMENTS, FEATURES
def build_model(
alphas: np.ndarray,
betas: np.ndarray,
confusion_data: dict[tuple[str, str], float],
cfg: Config,
) -> pm.Model:
"""Build the PyMC model for feature learning.
Args:
alphas: Beta alpha params, shape (40, 26).
betas: Beta beta params, shape (40, 26).
confusion_data: {(phoneme1, phoneme2): confusion_probability}.
cfg: Hyperparameter configuration.
Returns:
A PyMC model ready for sampling.
"""
seg_to_idx = {s: i for i, s in enumerate(SEGMENTS)}
n_seg, n_feat = alphas.shape
# Prepare confusion data as arrays
pair_indices_1: list[int] = []
pair_indices_2: list[int] = []
logit_confusions: list[float] = []
for (s1, s2), p_conf in confusion_data.items():
if s1 not in seg_to_idx or s2 not in seg_to_idx:
continue
# Clamp to avoid logit(0) or logit(1)
p_clamped = np.clip(p_conf, 1e-6, 1 - 1e-6)
pair_indices_1.append(seg_to_idx[s1])
pair_indices_2.append(seg_to_idx[s2])
logit_confusions.append(np.log(p_clamped / (1 - p_clamped)))
idx1 = np.array(pair_indices_1)
idx2 = np.array(pair_indices_2)
observed_logits = np.array(logit_confusions)
with pm.Model() as model:
# --- Priors for feature matrix (logit-space reparameterization) ---
# Convert Beta(a, b) prior to logit-Normal:
# If X ~ Beta(a, b), then logit(X) has approximately
# Normal(digamma(a) - digamma(b), sqrt(trigamma(a) + trigamma(b)))
from scipy.special import digamma, polygamma
logit_mu = digamma(alphas) - digamma(betas)
logit_sd = np.sqrt(polygamma(1, alphas) + polygamma(1, betas))
phi_logit = pm.Normal("phi_logit", mu=logit_mu, sigma=logit_sd, shape=(n_seg, n_feat))
phi = pm.Deterministic("phi", pm.math.sigmoid(phi_logit))
# --- Onset/offset salience weights ---
onset_weight = pm.HalfNormal("onset_weight", sigma=cfg.salience.alpha_scale)
offset_weight = pm.HalfNormal("offset_weight", sigma=cfg.salience.beta_scale)
# --- Composite vectors for observed pairs ---
# For Phase 1, all confusion pairs are between monophthongs,
# so composite = (onset_weight + offset_weight) * phi[i].
# The scaling cancels in relative distance, but we keep it
# for consistency with the diphthong framework.
v1 = (onset_weight + offset_weight) * phi[idx1] # (n_pairs, 26)
v2 = (onset_weight + offset_weight) * phi[idx2] # (n_pairs, 26)
# --- Euclidean distance ---
diff = v1 - v2
dist = pt.sqrt(pt.sum(diff**2, axis=1) + 1e-8) # numerical stability
# --- Likelihood: logit(p_conf) ~ Normal(a - b*d, sigma) ---
mu_logit = cfg.likelihood.intercept - cfg.likelihood.slope * dist
pm.Normal(
"confusion_obs",
mu=mu_logit,
sigma=cfg.likelihood.sigma,
observed=observed_logits,
)
return model
def sample_model(
model: pm.Model,
cfg: Config,
) -> "arviz.InferenceData":
"""Run NUTS sampling on the model.
Returns ArviZ InferenceData with posterior samples.
"""
import arviz as az
with model:
trace = pm.sample(
draws=cfg.nuts.draws,
tune=cfg.nuts.tune,
chains=cfg.nuts.chains,
target_accept=cfg.nuts.target_accept,
random_seed=cfg.nuts.seed,
return_inferencedata=True,
)
return trace
def extract_posterior(
trace: "arviz.InferenceData",
) -> dict:
"""Extract posterior means and SDs from trace.
Returns dict with keys:
phi_mean: (40, 26) posterior mean feature values
phi_sd: (40, 26) posterior standard deviations
onset_weight_mean: float
onset_weight_sd: float
offset_weight_mean: float
offset_weight_sd: float
onset_weight_hdi: (2,) 94% HDI
offset_weight_hdi: (2,) 94% HDI
"""
import arviz as az
phi_samples = trace.posterior["phi"].values # (chains, draws, 40, 26)
phi_flat = phi_samples.reshape(-1, *phi_samples.shape[2:])
onset_samples = trace.posterior["onset_weight"].values.flatten()
offset_samples = trace.posterior["offset_weight"].values.flatten()
return {
"phi_mean": phi_flat.mean(axis=0),
"phi_sd": phi_flat.std(axis=0),
"onset_weight_mean": float(onset_samples.mean()),
"onset_weight_sd": float(onset_samples.std()),
"offset_weight_mean": float(offset_samples.mean()),
"offset_weight_sd": float(offset_samples.std()),
"onset_weight_hdi": az.hdi(onset_samples, hdi_prob=0.94),
"offset_weight_hdi": az.hdi(offset_samples, hdi_prob=0.94),
}
- [ ] Step 4: Run tests
Run: cd packages/features && python -m pytest tests/test_model.py -v
Expected: 3 passed.
- [ ] Step 5: Commit
git add packages/features/src/phonolex_features/model.py packages/features/tests/test_model.py
git commit -m "feat(features): add PyMC model with logit-space reparameterization"
Chunk 4: Validation and Output Generation¶
Task 7: Validation module¶
Files:
- Create: packages/features/src/phonolex_features/validate.py
- Create: packages/features/tests/test_validate.py
- [ ] Step 1: Write the failing test
# packages/features/tests/test_validate.py
import numpy as np
from phonolex_features.validate import (
check_voicing_pairs,
check_natural_classes,
check_vowel_geometry,
check_na_feature_drift,
check_clinical_face_validity,
pairwise_distances,
)
from phonolex_features.prior import SEGMENTS, FEATURES, load_hayes_prior
def test_pairwise_distances_shape():
n = 10
vecs = np.random.default_rng(0).uniform(0, 1, (n, 5))
dists = pairwise_distances(vecs)
assert dists.shape == (n, n)
assert dists[0, 0] == 0.0 # self-distance is 0
def test_pairwise_distances_symmetric():
vecs = np.random.default_rng(0).uniform(0, 1, (5, 3))
dists = pairwise_distances(vecs)
np.testing.assert_allclose(dists, dists.T)
def test_check_voicing_pairs_perfect():
"""If voicing pairs are nearest neighbors, all should pass."""
# Construct vectors where voicing pairs are closest
# Use 2D: voiced at (1,0), voiceless at (1,0.01) — very close
# Other segments far away
n_seg = len(SEGMENTS)
composites = np.random.default_rng(42).uniform(5, 10, (n_seg, 2))
# Force p and b to be very close
p_idx = SEGMENTS.index("p")
b_idx = SEGMENTS.index("b")
composites[p_idx] = [1.0, 0.0]
composites[b_idx] = [1.0, 0.01]
results = check_voicing_pairs(composites)
# At minimum, p-b should pass
p_b_result = [r for r in results if r["pair"] == ("p", "b")][0]
assert p_b_result["nearest_neighbor_match"]
def test_check_vowel_geometry_returns_dict():
n_seg = len(SEGMENTS)
composites = np.random.default_rng(0).uniform(0, 1, (n_seg, 5))
results = check_vowel_geometry(composites)
assert isinstance(results, dict)
assert "high_intra_lt_high_low" in results
def test_check_na_feature_drift_returns_list():
alphas, betas = load_hayes_prior()
phi_mean = np.full((40, 26), 0.5) # all at prior mean — no drift
results = check_na_feature_drift(phi_mean, alphas, betas)
assert isinstance(results, list)
assert len(results) == 0 # no drift from center
def test_check_na_feature_drift_detects_movement():
alphas, betas = load_hayes_prior()
phi_mean = np.full((40, 26), 0.5)
# Force one N/A feature to drift
# Find first N/A entry (alpha=1, beta=1)
for i in range(40):
for j in range(26):
if alphas[i, j] == 1.0 and betas[i, j] == 1.0:
phi_mean[i, j] = 0.9 # big drift
results = check_na_feature_drift(phi_mean, alphas, betas)
assert len(results) >= 1
return
# If no N/A features found (shouldn't happen), skip
assert False, "No N/A features found in Hayes matrix"
def test_check_clinical_face_validity_returns_list():
n_seg = len(SEGMENTS)
composites = np.random.default_rng(0).uniform(0, 1, (n_seg, 5))
results = check_clinical_face_validity(composites, top_k=5)
assert isinstance(results, list)
assert len(results) > 0
assert "process" in results[0]
assert "in_top_k" in results[0]
- [ ] Step 2: Run test to verify it fails
Run: cd packages/features && python -m pytest tests/test_validate.py -v
Expected: FAIL — ModuleNotFoundError
- [ ] Step 3: Write the validation module
# packages/features/src/phonolex_features/validate.py
"""Validation: coherence, PHOIBLE regression, held-out, clinical face validity."""
from __future__ import annotations
import numpy as np
from phonolex_features.prior import FEATURES, SEGMENTS
VOICING_PAIRS = [
("p", "b"), ("t", "d"), ("k", "ɡ"),
("f", "v"), ("s", "z"), ("ʃ", "ʒ"),
("tʃ", "dʒ"), ("θ", "ð"),
]
NATURAL_CLASSES = {
"stops": ["p", "b", "t", "d", "k", "ɡ"],
"fricatives": ["f", "v", "θ", "ð", "s", "z", "ʃ", "ʒ", "h"],
"nasals": ["m", "n", "ŋ"],
"vowels": ["i", "ɪ", "e", "ɛ", "æ", "a", "ɑ", "ɒ", "ɔ", "o", "ʊ", "u", "ʌ", "ə", "ɝ", "ɚ"],
}
CLINICAL_PROCESSES = [
{"process": "stopping", "target": "s", "error": "t"},
{"process": "stopping", "target": "f", "error": "p"},
{"process": "fronting", "target": "k", "error": "t"},
{"process": "fronting", "target": "ɡ", "error": "d"},
{"process": "gliding", "target": "l", "error": "w"},
{"process": "gliding", "target": "ɹ", "error": "w"},
]
def pairwise_distances(composites: np.ndarray) -> np.ndarray:
"""Compute pairwise Euclidean distance matrix.
Args:
composites: shape (N, D).
Returns:
shape (N, N) distance matrix.
"""
diff = composites[:, np.newaxis, :] - composites[np.newaxis, :, :]
return np.sqrt(np.sum(diff**2, axis=2))
def check_voicing_pairs(
composites: np.ndarray,
) -> list[dict]:
"""Check whether voicing pairs are nearest neighbors.
Args:
composites: shape (N, D) where N >= 40 (first 40 = monophthongs).
Returns:
List of dicts with pair, distance, nearest_neighbor, nearest_neighbor_match.
"""
seg_to_idx = {s: i for i, s in enumerate(SEGMENTS)}
dists = pairwise_distances(composites[:40]) # monophthongs only
results = []
for s1, s2 in VOICING_PAIRS:
i, j = seg_to_idx[s1], seg_to_idx[s2]
pair_dist = dists[i, j]
# Find nearest neighbor of s1 (excluding self)
row = dists[i].copy()
row[i] = np.inf
nn_idx = np.argmin(row)
nn = SEGMENTS[nn_idx]
results.append({
"pair": (s1, s2),
"distance": float(pair_dist),
"nearest_neighbor_of_first": nn,
"nearest_neighbor_match": nn == s2,
})
return results
def check_natural_classes(
composites: np.ndarray,
) -> dict[str, float]:
"""Check whether natural classes have lower intra-class than inter-class distance.
Returns:
Dict mapping class name → ratio (intra_mean / inter_mean).
Values < 1.0 indicate the class clusters tightly.
"""
seg_to_idx = {s: i for i, s in enumerate(SEGMENTS)}
dists = pairwise_distances(composites[:40])
all_indices = set(range(40))
results = {}
for class_name, members in NATURAL_CLASSES.items():
member_indices = [seg_to_idx[s] for s in members if s in seg_to_idx]
non_member_indices = list(all_indices - set(member_indices))
if len(member_indices) < 2:
continue
# Intra-class distances
intra = []
for i_idx, mi in enumerate(member_indices):
for mj in member_indices[i_idx + 1:]:
intra.append(dists[mi, mj])
# Inter-class distances (members to non-members)
inter = []
for mi in member_indices:
for ni in non_member_indices:
inter.append(dists[mi, ni])
intra_mean = np.mean(intra) if intra else 0.0
inter_mean = np.mean(inter) if inter else 1.0
results[class_name] = float(intra_mean / inter_mean) if inter_mean > 0 else 0.0
return results
def check_vowel_geometry(
composites: np.ndarray,
) -> dict[str, bool]:
"""Check that vowels distribute by height x backness x rounding.
Verifies:
- High vowels closer to each other than to low vowels
- Front vowels closer to each other than to back vowels
- Rounded vowels closer to each other than to unrounded vowels
"""
seg_to_idx = {s: i for i, s in enumerate(SEGMENTS)}
dists = pairwise_distances(composites[:40])
def mean_dist(group1: list[str], group2: list[str]) -> float:
d = []
for s1 in group1:
for s2 in group2:
if s1 != s2 and s1 in seg_to_idx and s2 in seg_to_idx:
d.append(dists[seg_to_idx[s1], seg_to_idx[s2]])
return float(np.mean(d)) if d else float("inf")
high = ["i", "ɪ", "u", "ʊ"]
low = ["æ", "a", "ɑ", "ɒ"]
front = ["i", "ɪ", "e", "ɛ", "æ"]
back = ["u", "ʊ", "o", "ɔ", "ɑ", "ɒ"]
rounded = ["u", "ʊ", "o", "ɔ", "ɒ"]
unrounded = ["i", "ɪ", "e", "ɛ", "æ", "a", "ɑ", "ʌ", "ə"]
return {
"high_intra_lt_high_low": mean_dist(high, high) < mean_dist(high, low),
"front_intra_lt_front_back": mean_dist(front, front) < mean_dist(front, back),
"rounded_intra_lt_rounded_unrounded": mean_dist(rounded, rounded) < mean_dist(rounded, unrounded),
}
def check_na_feature_drift(
phi_mean: np.ndarray,
alphas: np.ndarray,
betas: np.ndarray,
drift_threshold: float = 0.15,
) -> list[dict]:
"""Check structural N/A features (initialized flat) for unexpected drift.
Reports features where the posterior mean moved more than drift_threshold
away from 0.5 (the flat prior mean).
"""
results = []
for i, seg in enumerate(SEGMENTS):
for j, feat in enumerate(FEATURES):
# N/A features have alpha=1, beta=1 (flat prior)
if alphas[i, j] == 1.0 and betas[i, j] == 1.0:
val = phi_mean[i, j]
drift = abs(val - 0.5)
if drift > drift_threshold:
results.append({
"segment": seg,
"feature": feat,
"posterior_mean": float(val),
"drift_from_center": float(drift),
})
return results
def check_clinical_face_validity(
composites: np.ndarray,
top_k: int = 5,
) -> list[dict]:
"""Check whether clinical phonological process error phonemes are near targets.
Args:
composites: shape (N, D), first 40 rows are monophthongs.
top_k: How many nearest neighbors to check.
Returns:
List of result dicts.
"""
seg_to_idx = {s: i for i, s in enumerate(SEGMENTS)}
dists = pairwise_distances(composites[:40])
results = []
for proc in CLINICAL_PROCESSES:
target_idx = seg_to_idx[proc["target"]]
error_idx = seg_to_idx[proc["error"]]
row = dists[target_idx].copy()
row[target_idx] = np.inf
nearest_k = np.argsort(row)[:top_k]
nearest_labels = [SEGMENTS[i] for i in nearest_k]
results.append({
"process": proc["process"],
"target": proc["target"],
"error": proc["error"],
"in_top_k": proc["error"] in nearest_labels,
"rank": int(np.where(np.argsort(row) == error_idx)[0][0]) + 1,
"top_k_neighbors": nearest_labels,
})
return results
- [ ] Step 4: Run tests
Run: cd packages/features && python -m pytest tests/test_validate.py -v
Expected: 7 passed.
- [ ] Step 5: Commit
git add packages/features/src/phonolex_features/validate.py packages/features/tests/test_validate.py
git commit -m "feat(features): add validation module — coherence, clinical, PHOIBLE regression"
Task 8: Output generation script¶
Files:
- Create: packages/features/src/phonolex_features/run.py
- [ ] Step 1: Write the run script
# packages/features/src/phonolex_features/run.py
"""End-to-end: load data → build model → sample → validate → save outputs."""
from __future__ import annotations
import json
from pathlib import Path
import arviz as az
import numpy as np
import pandas as pd
from phonolex_features.composite import compute_all_composites
from phonolex_features.config import Config, load_config
from phonolex_features.evidence.eccc import load_eccc
from phonolex_features.model import build_model, extract_posterior, sample_model
from phonolex_features.prior import FEATURES, SEGMENTS, load_hayes_prior
from phonolex_features.validate import (
check_clinical_face_validity,
check_natural_classes,
check_na_feature_drift,
check_vowel_geometry,
check_voicing_pairs,
)
OUTPUT_DIR = Path(__file__).parent.parent.parent / "outputs"
def resolve_data_path(relative: str, base_dir: Path) -> Path:
"""Resolve a data path relative to a base directory."""
return (base_dir / relative).resolve()
def run(config_path: Path | None = None) -> None:
"""Execute the full pipeline."""
cfg = load_config(config_path)
# Package root: packages/features/
pkg_root = Path(__file__).parent.parent.parent
print("=== Loading Hayes prior ===")
hayes_csv = resolve_data_path(cfg.data.hayes_csv, pkg_root)
alphas, betas = load_hayes_prior(
concentration=cfg.prior.concentration,
na_alpha=cfg.prior.na.alpha,
na_beta=cfg.prior.na.beta,
csv_path=hayes_csv,
)
print(f" Prior shape: {alphas.shape}")
print("=== Loading ECCC evidence ===")
eccc_path = resolve_data_path(cfg.data.eccc_csv, pkg_root)
confusion_data = load_eccc(eccc_path)
print(f" Confusion pairs: {len(confusion_data)}")
print("=== Building model ===")
model = build_model(alphas, betas, confusion_data, cfg)
print("=== Sampling ===")
trace = sample_model(model, cfg)
print("=== Extracting posterior ===")
posterior = extract_posterior(trace)
phi_mean = posterior["phi_mean"]
phi_sd = posterior["phi_sd"]
print("=== Computing composites ===")
alpha_w = posterior["onset_weight_mean"]
beta_w = posterior["offset_weight_mean"]
labels, composites = compute_all_composites(phi_mean, alpha_w, beta_w)
print("=== Validation ===")
voicing = check_voicing_pairs(composites)
classes = check_natural_classes(composites)
vowel_geom = check_vowel_geometry(composites)
na_drift = check_na_feature_drift(phi_mean, alphas, betas)
clinical = check_clinical_face_validity(composites)
voicing_pass = sum(1 for v in voicing if v["nearest_neighbor_match"])
print(f" Voicing pairs: {voicing_pass}/{len(voicing)} nearest-neighbor matches")
for name, ratio in classes.items():
print(f" {name}: intra/inter ratio = {ratio:.3f} ({'clustered' if ratio < 1.0 else 'NOT clustered'})")
for check, passed in vowel_geom.items():
print(f" Vowel geometry {check}: {'PASS' if passed else 'FAIL'}")
print(f" N/A features with drift > 0.15: {len(na_drift)}")
clinical_pass = sum(1 for c in clinical if c["in_top_k"])
print(f" Clinical processes: {clinical_pass}/{len(clinical)} in top-5")
print("=== Saving outputs ===")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# vectors.csv
pd.DataFrame(phi_mean, index=SEGMENTS, columns=FEATURES).to_csv(
OUTPUT_DIR / "vectors.csv", index_label="ipa"
)
# uncertainty.csv
pd.DataFrame(phi_sd, index=SEGMENTS, columns=FEATURES).to_csv(
OUTPUT_DIR / "uncertainty.csv", index_label="ipa"
)
# composites.csv
pd.DataFrame(composites, index=labels, columns=FEATURES).to_csv(
OUTPUT_DIR / "composites.csv", index_label="segment"
)
# alpha_beta.json
with open(OUTPUT_DIR / "alpha_beta.json", "w") as f:
json.dump({
"onset_weight": {
"mean": posterior["onset_weight_mean"],
"sd": posterior["onset_weight_sd"],
"hdi_94": posterior["onset_weight_hdi"].tolist(),
},
"offset_weight": {
"mean": posterior["offset_weight_mean"],
"sd": posterior["offset_weight_sd"],
"hdi_94": posterior["offset_weight_hdi"].tolist(),
},
}, f, indent=2)
# validation.json
with open(OUTPUT_DIR / "validation.json", "w") as f:
json.dump({
"voicing_pairs": voicing,
"natural_classes": classes,
"vowel_geometry": vowel_geom,
"na_feature_drift": na_drift,
"clinical_processes": clinical,
}, f, indent=2, default=str)
# inference_data.nc
trace.to_netcdf(OUTPUT_DIR / "inference_data.nc")
# diagnostics
diag_dir = OUTPUT_DIR / "diagnostics"
diag_dir.mkdir(exist_ok=True)
# Save summary stats
summary = az.summary(trace, var_names=["onset_weight", "offset_weight"])
summary.to_csv(diag_dir / "weight_summary.csv")
# Trace plots
import matplotlib
matplotlib.use("Agg") # non-interactive backend
axes = az.plot_trace(trace, var_names=["onset_weight", "offset_weight"])
axes[0, 0].get_figure().savefig(diag_dir / "trace_weights.png", dpi=150, bbox_inches="tight")
axes = az.plot_forest(trace, var_names=["onset_weight", "offset_weight"])
axes[0].get_figure().savefig(diag_dir / "forest_weights.png", dpi=150, bbox_inches="tight")
import matplotlib.pyplot as plt
plt.close("all")
print(f"\n=== Done. Outputs saved to {OUTPUT_DIR} ===")
if __name__ == "__main__":
run()
- [ ] Step 2: Verify module imports work
Run: cd packages/features && python -c "from phonolex_features.run import run; print('OK')"
Expected: OK
- [ ] Step 3: Commit
git add packages/features/src/phonolex_features/run.py
git commit -m "feat(features): add end-to-end run script with output generation"
Task 9: Phase 2 stub¶
Files:
- Create: packages/features/src/phonolex_features/evidence/alternations.py
- [ ] Step 1: Write the stub
# packages/features/src/phonolex_features/evidence/alternations.py
"""MorphoLex + CMU alternation pair extraction.
Phase 2 — not yet implemented. This module will:
1. Load MorphoLex morphological segmentation data
2. Load CMU pronunciations
3. Identify words sharing root morphemes
4. Align pronunciations at morpheme boundaries
5. Extract phoneme alternation pairs with frequency weights
"""
def load_alternation_pairs(
morpholex_path: str,
cmu_path: str,
) -> dict[tuple[str, str], float]:
"""Extract alternation pairs from MorphoLex + CMU.
Not yet implemented. Returns empty dict.
"""
raise NotImplementedError("Phase 2: alternation extraction not yet implemented")
- [ ] Step 2: Commit
git add packages/features/src/phonolex_features/evidence/alternations.py
git commit -m "feat(features): add Phase 2 alternation extraction stub"
Task 10: Final integration test¶
Files:
- Modify: packages/features/tests/test_model.py
- [ ] Step 1: Add a smoke test that builds and samples a tiny model
Add to tests/test_model.py:
def test_smoke_sample_tiny():
"""Smoke test: build model with minimal data and run 10 draws."""
from phonolex_features.model import sample_model, extract_posterior
from phonolex_features.config import load_config
from phonolex_features.prior import load_hayes_prior
cfg = load_config()
alphas, betas = load_hayes_prior(concentration=cfg.prior.concentration)
confusion_data = {("p", "b"): 0.3, ("t", "d"): 0.25, ("s", "z"): 0.2}
model = build_model(alphas, betas, confusion_data, cfg)
# Override config for speed: 10 draws, 10 tune, 1 chain
from dataclasses import replace
fast_nuts = replace(cfg.nuts, draws=10, tune=10, chains=1)
fast_cfg = replace(cfg, nuts=fast_nuts)
trace = sample_model(model, fast_cfg)
posterior = extract_posterior(trace)
assert posterior["phi_mean"].shape == (40, 26)
assert posterior["phi_sd"].shape == (40, 26)
assert 0.0 < posterior["onset_weight_mean"]
assert 0.0 < posterior["offset_weight_mean"]
- [ ] Step 2: Run the smoke test
Run: cd packages/features && python -m pytest tests/test_model.py::test_smoke_sample_tiny -v --timeout=120
Expected: PASS (may take 30-60 seconds).
- [ ] Step 3: Run the full test suite
Run: cd packages/features && python -m pytest tests/ -v
Expected: All tests pass.
- [ ] Step 4: Commit
git add packages/features/tests/test_model.py
git commit -m "test(features): add smoke test for model sampling"