PHON-94: Corpus DEP Reannotation + Selectional Preference Population — Implementation Plan¶
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Populate data/runtime/selectional.parquet with banded per-(verb, role, filler) PPMI by parsing FineWeb-Edu + CHILDES + PhonBank with a canonical spaCy methodology that also regenerates PHON-72/88 frequency+POS columns at zero extra compute cost.
Architecture: Single canonical spaCy config (en_core_web_trf + parser + lemmatizer) lives at phonolex_data.pipeline.canonical_spacy. Three corpus passes run that config and write per-shard parquets to ExternalData1 cold storage. A merge step Polars-stream-aggregates shards into the final selectional.parquet (banded, lemma-keyed) plus refreshed FineWeb-Edu freq+POS deltas for words.parquet. Subcat profiles and role_fillability are derived views over selectional.parquet, computed at consumer-load by WordStore.
Tech Stack: Python 3.12, spaCy en_core_web_trf, Polars, Parquet, HuggingFace datasets (streaming), RunPod H100 SXM, git-lfs.
Spec: docs/superpowers/specs/2026-05-06-phon-94-corpus-dep-reannotation-design.md
Revisions log¶
2026-05-06: per-sentence F-K banding replaces edu-score grade banding¶
After four research probes (research/2026-05-06-phon-94-{aoa-banding,readability,nb,chunked-fk}-probe/), the original per-doc banding scheme (fineweb_grade_K_8/9_12/13_16 from FineWeb-Edu's edu_score field) was retired. Per-doc averaging compressed the developmental signal beyond resolution; the calibrated regression and NB approaches both failed for structural reasons (function-word floor, smoothing-denominator artifact). Per-sentence F-K — the chunk-level methodology PHON-88 used — gives 14.47 grade-level points of p10–p90 spread, 23× wider than any per-doc approach.
Replacement banding (FineWeb-Edu side):
- 5 quantile-based bins on per-sentence F-K: fineweb_b1 (F-K < 7.6), b2 (7.6–10.7), b3 (10.7–13.4), b4 (13.4–16.8), b5 (≥ 16.8). Boundaries from chunked-fk-probe's empirical p20/p40/p60/p80.
- F-K computed as 0.39·(W/S) + 11.8·(syl/W) − 15.59, syllables from words.parquet[token].syllable_count with vowel-cluster heuristic for OOV. Clip at 30. Skip sentences with W < 5.
- fineweb_adult materialized aggregate stays — every sentence increments it.
Replacement banding (CHILDES + PhonBank, separate corpora):
- Banded by participant age tag (from source data), not by F-K. CHILDES bands match freq_childes_input_*; PhonBank bands match freq_pb_*.
- PhonBank smoke-gate retired: empirical inspection of /Volumes/ExternalData1/phonbank/dataset.jsonl (828K utterances, 22.9K vocab) confirms sufficient density for direct parsing.
Implementation deltas:
- build_selectional.py (Task 6, already implemented at commit 57cd4c9) iterates per-document. Must be updated to iterate per-sentence with F-K-based bin assignment. The existing band_resolver parameter (which takes doc_idx → list[bands]) is replaced by an inline F-K computation using a syllable_count lookup table from words.parquet.
- bands_fineweb.py (Task 11 prep) becomes an F-K-based bin function, not an edu-score-based resolver. Inputs: (W, S, syllables) → band_label.
- lemma_frequency_grade_K_8/9_12/13_16 PropertyDefs (Task 10, commit 977d222) become lemma_frequency_b1..b5 (5 columns instead of 3, matching the 5-bin F-K inventory). Schema regen updates words.parquet.
- Acceptance tests (Task 14) reference fineweb_b1..b5 instead of fineweb_grade_*.
- New PhonBank parser (Task 13) reads dataset.jsonl directly, filters by speaker_role for input/production split.
Where to find the canonical band inventory and methodology: the spec at docs/superpowers/specs/2026-05-06-phon-94-corpus-dep-reannotation-design.md has been updated. The remainder of this plan still references the old band names in places (file is too large to surgically edit each occurrence); when those references conflict with the spec, the spec wins.
File Structure¶
New files:
- packages/data/src/phonolex_data/pipeline/canonical_spacy.py — load_canonical_pipeline(); single source of spaCy config
- packages/data/src/phonolex_data/pipeline/extract_triples.py — DEP extraction logic (verb-role-filler triples + passive remap + PP-attachment filtering)
- packages/data/src/phonolex_data/loaders/selectional.py — load selectional.parquet (loader pattern parity with norms loaders)
- packages/data/tests/test_canonical_spacy.py — fixture-driven unit tests for the canonical pass
- packages/data/tests/test_extract_triples.py — fixture-driven extraction unit tests
- packages/data/tests/runtime/test_selectional_parquet.py — schema + WordStore-views tests
- research/2026-05-06-phon-94-canonical-spacy-probe/probe.py — Phase-0 sanity check
- research/2026-05-06-phon-94-canonical-spacy-probe/notebook.md — probe findings
- research/2026-05-06-phon-94-canonical-spacy-probe/README.md — how to run
- research/2026-05-06-phon-94-corpus-parse/build_selectional.py — sharded parse + extract
- research/2026-05-06-phon-94-corpus-parse/merge_shards.py — Polars stream-merge → final Parquet
- research/2026-05-06-phon-94-corpus-parse/launch_shards.sh — RunPod launcher
- research/2026-05-06-phon-94-corpus-parse/poll_progress.sh — shard progress poller
- research/2026-05-06-phon-94-corpus-parse/notebook.md — production run log
- research/2026-05-06-phon-94-corpus-parse/README.md
Modified files:
- packages/data/src/phonolex_data/runtime/schema.py:89-101 — selectional_schema() adds band column
- packages/data/src/phonolex_data/runtime/store.py — WordStore.subcat_profile() and .role_fillability() derived-view methods
- packages/web/workers/scripts/config.py — add lemma + lemma_freq PropertyDefs
- packages/data/src/phonolex_data/runtime/emit_parquet.py — populate lemma columns from canonical-pass output
Task 1: Branch setup + pre-flight check¶
Files: - (none — git operations only)
- [ ] Step 1: Verify clean state on
release/v5.2.0
git -C /Users/jneumann/Repos/PhonoLex status
git -C /Users/jneumann/Repos/PhonoLex log --oneline -3
Expected: clean tree, HEAD at 4264460 (the spec commit).
- [ ] Step 2: Verify Jira PHON-94 exists and is in expected state
Use the mcp__plugin_atlassian_atlassian__getJiraIssue MCP tool with cloudId="neumannsworkshop.atlassian.net" and issueIdOrKey="PHON-94". Expected: status In-Progress or To-Do, summary mentions corpus DEP reannotation. If the ticket doesn't exist, halt — per feedback_verify_jira_state.md, file the ticket first.
- [ ] Step 3: Create feature branch
git -C /Users/jneumann/Repos/PhonoLex checkout -b feature/phon-94-corpus-dep-reannotation
- [ ] Step 4: Verify ExternalData1 mountpoint accessibility
ls /Volumes/ExternalData1/ 2>&1 | head -3
mkdir -p /Volumes/ExternalData1/phonolex/raw_corpus_parses/{fineweb_edu,childes,phonbank}
ls /Volumes/ExternalData1/phonolex/raw_corpus_parses/
Expected: drive mounted, three subdirectories created. If drive not mounted, halt and ask user to mount.
- [ ] Step 5: Verify Python tooling + uv environment
cd /Users/jneumann/Repos/PhonoLex && uv pip list 2>&1 | grep -E "(spacy|polars|datasets)" | head -5
Expected: spacy, polars, datasets all present. If spacy missing, run uv pip install -e packages/data to ensure deps install.
Task 2: Extend selectional_schema() with band column¶
Files:
- Modify: packages/data/src/phonolex_data/runtime/schema.py:89-101
- Test: packages/data/tests/runtime/test_schema.py
- [ ] Step 1: Read existing schema test to understand the test pattern
cat /Users/jneumann/Repos/PhonoLex/packages/data/tests/runtime/test_schema.py
Note the test layout — schema tests typically assert column names + types match the function output.
- [ ] Step 2: Write the failing test for the band column
Add to packages/data/tests/runtime/test_schema.py:
def test_selectional_schema_has_band_column():
"""selectional_schema must include a band column for age/grade-banded statistics."""
from phonolex_data.runtime.schema import selectional_schema
schema = selectional_schema()
assert "band" in schema, f"missing band column in {list(schema.keys())}"
import polars as pl
assert schema["band"] == pl.Utf8, f"band must be Utf8, got {schema['band']}"
def test_selectional_schema_column_order_preserves_band_after_filler():
"""For human readability, band sits between filler and the count columns."""
from phonolex_data.runtime.schema import selectional_schema
cols = list(selectional_schema().keys())
assert cols.index("band") == cols.index("filler") + 1
- [ ] Step 3: Run the test to verify it fails
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/runtime/test_schema.py::test_selectional_schema_has_band_column -v
Expected: FAIL with KeyError or assertion error about missing band.
- [ ] Step 4: Update the schema function
Edit packages/data/src/phonolex_data/runtime/schema.py:89-101:
def selectional_schema() -> Mapping[str, pl.DataType]:
"""Schema for selectional.parquet — per-(verb, role, filler, band) PPMI.
Banded by corpus and age/grade slice (e.g. fineweb_adult, childes_age_2_5)
for parity with the existing freq surface (PHON-72/86/87/88).
"""
return {
"verb": pl.Utf8,
"role": pl.Utf8,
"filler": pl.Utf8,
"band": pl.Utf8,
"count_v_r_f": pl.UInt32,
"count_v_r_star": pl.UInt32,
"ppmi": pl.Float32,
}
- [ ] Step 5: Run all schema tests + run the full data tests to confirm nothing else breaks
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/runtime/test_schema.py -v
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/ -v 2>&1 | tail -30
Expected: new tests PASS; full data test suite still passes (the empty selectional.parquet doesn't yet have a band column, but no consumer reads it yet).
- [ ] Step 6: Commit
git -C /Users/jneumann/Repos/PhonoLex add packages/data/src/phonolex_data/runtime/schema.py packages/data/tests/runtime/test_schema.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: extend selectional_schema with band column
Adds Utf8 band column between filler and count_v_r_f to support
age/grade-banded selectional statistics. Population pending Task 7.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 3: Implement canonical_spacy pipeline module¶
Files:
- Create: packages/data/src/phonolex_data/pipeline/canonical_spacy.py
- Test: packages/data/tests/test_canonical_spacy.py
- [ ] Step 1: Write the failing test with fixture sentences
Create packages/data/tests/test_canonical_spacy.py:
"""Canonical spaCy pipeline tests.
Locks the canonical config: en_core_web_trf, full pipeline (parser +
lemmatizer + tagger), specific token filters. Future corpus-derived
stats reuse this module.
"""
import pytest
@pytest.fixture(scope="module")
def nlp():
"""Load the canonical pipeline once per test module."""
from phonolex_data.pipeline.canonical_spacy import load_canonical_pipeline
return load_canonical_pipeline()
def test_canonical_pipeline_has_parser_and_lemmatizer(nlp):
"""Parser + lemmatizer must be enabled (PHON-72 disabled them; we re-enable)."""
pipe_names = nlp.pipe_names
assert "parser" in pipe_names, f"parser missing from {pipe_names}"
assert "lemmatizer" in pipe_names, f"lemmatizer missing from {pipe_names}"
assert "tagger" in pipe_names, f"tagger missing from {pipe_names}"
# NER not needed
assert "ner" not in pipe_names, f"ner should be disabled but is in {pipe_names}"
def test_canonical_pipeline_uses_trf_model(nlp):
"""Production tagger is en_core_web_trf for transformer-quality DEP+POS."""
assert "trf" in nlp.meta["name"], f"expected trf model, got {nlp.meta['name']}"
def test_lemmatization_handles_inflection(nlp):
"""Common verb inflections collapse to the lemma."""
forms = ["running", "runs", "ran"]
lemmas = []
for form in forms:
doc = nlp(f"The cat {form} fast.")
# Find the verb token
for tok in doc:
if tok.pos_ == "VERB":
lemmas.append(tok.lemma_.lower())
break
assert lemmas == ["run", "run", "run"], f"got {lemmas}"
def test_pronoun_lemma_is_surface_not_sentinel(nlp):
"""Modern spaCy lemmatizes pronouns to the surface form, not -PRON-.
If this test fails (lemma == '-PRON-'), the canonical config and the
PRON-drop filter logic both need updating.
"""
doc = nlp("She runs.")
pron_tok = next(tok for tok in doc if tok.pos_ == "PRON")
assert pron_tok.lemma_.lower() != "-pron-", (
f"spaCy returned legacy -PRON- sentinel; "
f"canonical config must be updated to handle this"
)
def test_dep_labels_present(nlp):
"""Verify the role inventory's DEP labels are emitted by spaCy.
PHON-94 role inventory: nsubj, dobj, iobj, pobj, xcomp, ccomp.
"""
doc = nlp("The boy gave the girl a book that she liked.")
deps = {tok.dep_ for tok in doc}
expected = {"nsubj", "dobj", "iobj"}
missing = expected - deps
assert not missing, f"missing DEP labels {missing} in {deps}"
- [ ] Step 2: Run tests to verify they fail (module doesn't exist yet)
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_canonical_spacy.py -v 2>&1 | tail -20
Expected: FAIL with ModuleNotFoundError: No module named 'phonolex_data.pipeline.canonical_spacy'.
- [ ] Step 3: Verify en_core_web_trf is installed (may be a slow first install)
cd /Users/jneumann/Repos/PhonoLex && uv run python -c "import spacy; spacy.load('en_core_web_trf')" 2>&1 | tail -5
If it errors with model not found, install:
cd /Users/jneumann/Repos/PhonoLex && uv run python -m spacy download en_core_web_trf
This is ~500MB and may take a few minutes.
- [ ] Step 4: Implement the canonical pipeline module
Create packages/data/src/phonolex_data/pipeline/canonical_spacy.py:
"""Canonical PhonoLex spaCy pipeline.
Single source of truth for spaCy configuration across all corpus-derived
stats (PHON-72 freq+POS, PHON-94 selectional, future workstreams). Run-once
per corpus, reuse-everywhere — gives statistical consistency across all
derived columns without per-ticket coordination.
Locked config:
- Model: en_core_web_trf (RoBERTa-base backbone, LAS=0.939 on UD-EWT)
- Pipes: tok2vec, transformer, tagger, attribute_ruler, parser, lemmatizer
- NER disabled (not needed for any current downstream consumer)
- Token-level filters defined here as the canonical KEEP_POS set + alpha-only +
max length 30 char.
Reuse pattern:
from phonolex_data.pipeline.canonical_spacy import (
load_canonical_pipeline, KEEP_POS, MAX_WORD_LEN
)
nlp = load_canonical_pipeline()
for doc in nlp.pipe(texts, batch_size=256):
...
"""
from __future__ import annotations
import spacy
# Universal POS tags we keep as content-bearing.
# Excludes PUNCT, SPACE, SYM, X, NUM (numerics fail isalpha() anyway).
KEEP_POS: frozenset[str] = frozenset({
"NOUN", "VERB", "ADJ", "ADV", "ADP", "AUX", "CCONJ", "DET",
"INTJ", "PART", "PRON", "PROPN", "SCONJ",
})
MAX_WORD_LEN: int = 30
MAX_DOC_CHAR_LEN: int = 500_000 # truncate FineWeb-Edu giants; spaCy default is 1M
CANONICAL_MODEL_NAME: str = "en_core_web_trf"
def load_canonical_pipeline() -> spacy.language.Language:
"""Load the canonical spaCy pipeline.
Enables: tagger + attribute_ruler + parser + lemmatizer (the latter for
DEP-aware lemmatization). Disables: ner (not needed).
Tries GPU; falls back to CPU silently.
"""
prefer_gpu = getattr(spacy, "prefer_gpu", None)
if prefer_gpu is not None:
prefer_gpu()
nlp = spacy.load(CANONICAL_MODEL_NAME)
if "ner" in nlp.pipe_names:
nlp.disable_pipe("ner")
return nlp
def is_keepable_token(token) -> bool:
"""Token-level filter — alpha, length-bounded, in KEEP_POS."""
if not token.text.isalpha():
return False
if len(token.text) > MAX_WORD_LEN:
return False
if token.pos_ not in KEEP_POS:
return False
return True
- [ ] Step 5: Run tests to verify they pass
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_canonical_spacy.py -v
Expected: 5 tests PASS. First run will be slow (model load). If test_pronoun_lemma_is_surface_not_sentinel FAILS, halt — the canonical config needs adjustment (older spaCy returns -PRON-); ask user before continuing.
- [ ] Step 6: Commit
git -C /Users/jneumann/Repos/PhonoLex add packages/data/src/phonolex_data/pipeline/canonical_spacy.py packages/data/tests/test_canonical_spacy.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: canonical spaCy pipeline module
Single source of truth for spaCy configuration across all corpus-derived
stats. Locks en_core_web_trf with parser + lemmatizer enabled. Replaces
PHON-72's per-ticket POS-only config. Future corpus passes reuse this.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 4: Implement triple-extraction logic¶
Files:
- Create: packages/data/src/phonolex_data/pipeline/extract_triples.py
- Test: packages/data/tests/test_extract_triples.py
- [ ] Step 1: Write fixture-driven failing tests
Create packages/data/tests/test_extract_triples.py:
"""Tests for verb-role-filler triple extraction from spaCy docs.
Each test names a sentence and the triples we expect to extract.
Verifies: nsubj/dobj/iobj/pobj_X extraction, passive remap, V-rooted
PP filtering, PRON dropping, particle-verb conflation (accepted v1).
"""
import pytest
@pytest.fixture(scope="module")
def nlp():
from phonolex_data.pipeline.canonical_spacy import load_canonical_pipeline
return load_canonical_pipeline()
def extract(nlp, text):
"""Helper: parse text, return sorted list of (verb_lemma, role, filler_lemma) triples."""
from phonolex_data.pipeline.extract_triples import extract_triples
doc = nlp(text)
return sorted(extract_triples(doc))
def test_simple_svo_emits_nsubj_and_dobj(nlp):
triples = extract(nlp, "The boy ate the cake.")
assert ("eat", "nsubj", "boy") in triples
assert ("eat", "dobj", "cake") in triples
def test_ditransitive_emits_iobj(nlp):
triples = extract(nlp, "The teacher gave the student a book.")
# The DEP label for "student" may be `iobj` or `dative` depending on spaCy version;
# extract_triples normalizes both → "iobj".
iobj_triples = [t for t in triples if t[1] == "iobj"]
assert any(t[2] == "student" for t in iobj_triples), (
f"expected (give, iobj, student) in {triples}"
)
def test_pp_to_emits_pobj_to(nlp):
triples = extract(nlp, "She walked to the park.")
assert ("walk", "pobj_to", "park") in triples
def test_pp_with_emits_pobj_with(nlp):
triples = extract(nlp, "He ate the cake with a fork.")
# Note: "with a fork" attaches to "ate" (instrument), so V-rooted
assert ("eat", "pobj_with", "fork") in triples
def test_np_modifier_pp_is_filtered(nlp):
"""'a man with a hat' — `with` PP attaches to NOUN `man`, not to a verb.
This must NOT produce a (V, pobj_with, hat) triple — there is no such V.
"""
triples = extract(nlp, "The man with a hat sat down.")
pobj_with_triples = [t for t in triples if t[1] == "pobj_with"]
assert pobj_with_triples == [], f"NP-attached pobj_with leaked: {pobj_with_triples}"
def test_passive_voice_remaps_to_dobj(nlp):
"""'the apple was eaten' — apple is the patient, must be dobj for selectional."""
triples = extract(nlp, "The apple was eaten by the boy.")
# The patient (apple) must surface as dobj, not nsubj or nsubjpass
assert ("eat", "dobj", "apple") in triples, f"passive remap failed: {triples}"
# The agent (boy, in by-PP) is fine to also surface, but the patient is the load-bearing test
def test_pronoun_filler_dropped(nlp):
"""Pronouns don't carry semantic selectional signal — drop them."""
triples = extract(nlp, "He saw her.")
fillers = {t[2] for t in triples}
assert "he" not in fillers and "she" not in fillers and "her" not in fillers, (
f"pronoun fillers leaked: {triples}"
)
def test_xcomp_filler_is_verb(nlp):
"""Clausal complement — filler is the embedded predicate's lemma."""
triples = extract(nlp, "She wants to leave.")
assert ("want", "xcomp", "leave") in triples, f"xcomp missing: {triples}"
def test_filler_pos_filter_drops_adjectives_in_dobj(nlp):
"""Standard nominal-arg roles only accept NOUN/PROPN.
'I painted it red' — `red` is xcomp, not dobj of paint; but if a parser
edge labels a non-noun as dobj, our extraction must filter.
"""
# Use a sentence where this might trip:
triples = extract(nlp, "The girl saw red.")
# 'red' here may be NOUN ('the color red') or ADJ; we accept NOUN, drop ADJ
dobj_triples = [t for t in triples if t[1] == "dobj"]
for verb, role, filler in dobj_triples:
# We can't assert exactly without knowing what spaCy chose, but verify
# the filter doesn't crash and produces reasonable output
assert isinstance(filler, str)
- [ ] Step 2: Run tests to verify they fail
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_extract_triples.py -v 2>&1 | tail -20
Expected: FAIL with ModuleNotFoundError.
- [ ] Step 3: Implement the extraction module
Create packages/data/src/phonolex_data/pipeline/extract_triples.py:
"""Verb-role-filler triple extraction from spaCy docs.
Implements the PHON-94 role inventory:
nsubj, dobj, iobj, pobj_to, pobj_with, pobj_in, pobj_on, xcomp, ccomp
Filters:
- Passive remap: nsubjpass → dobj (the patient is what selectional cares about)
- V-rooted PP only: NP-modifier PPs filtered out by checking the prep's parent is a VERB
- PRON drop: he/she/it/they/etc. don't carry semantic selectional signal
- Filler POS:
- nominal-arg roles (nsubj/dobj/iobj/pobj_*) → NOUN, PROPN only
- clausal-complement roles (xcomp/ccomp) → VERB only
"""
from __future__ import annotations
from typing import Iterator, Tuple
# DEP labels that are mapped to our role inventory.
# Some spaCy versions emit `dative` instead of `iobj`; we normalize.
NSUBJ_LABELS = {"nsubj", "csubj"}
NSUBJPASS_LABELS = {"nsubjpass", "csubjpass", "nsubj:pass"} # last is UD scheme
DOBJ_LABELS = {"dobj", "obj"} # `obj` is UD scheme
IOBJ_LABELS = {"iobj", "dative"}
XCOMP_LABELS = {"xcomp"}
CCOMP_LABELS = {"ccomp"}
# Prepositions that map to specific pobj_X roles. PP must be V-rooted.
PREP_POBJ_MAP = {
"to": "pobj_to",
"with": "pobj_with",
"in": "pobj_in",
"on": "pobj_on",
}
NOMINAL_FILLER_POS = {"NOUN", "PROPN"}
VERBAL_FILLER_POS = {"VERB"}
PRON_POS = "PRON"
def extract_triples(doc) -> Iterator[Tuple[str, str, str]]:
"""Yield (verb_lemma, role, filler_lemma) triples from a spaCy Doc.
All lemmas are lowercased. PRON fillers are dropped. Passive nsubj is
remapped to dobj. Non-V-rooted PPs are skipped.
"""
for tok in doc:
if tok.pos_ != "VERB":
continue
verb_lemma = tok.lemma_.lower()
if not verb_lemma or not verb_lemma.isalpha():
continue
for child in tok.children:
yield from _emit_for_child(verb_lemma, tok, child)
def _emit_for_child(verb_lemma, verb_tok, child) -> Iterator[Tuple[str, str, str]]:
"""Yield triples generated by a single (verb, child) edge."""
dep = child.dep_
# Subject — including passive remap
if dep in NSUBJ_LABELS:
f = _filler_for_nominal(child)
if f:
yield (verb_lemma, "nsubj", f)
return
if dep in NSUBJPASS_LABELS:
# The grammatical subject of a passive verb is the semantic patient.
# Remap to dobj per standard selectional-preference practice.
f = _filler_for_nominal(child)
if f:
yield (verb_lemma, "dobj", f)
return
if dep in DOBJ_LABELS:
f = _filler_for_nominal(child)
if f:
yield (verb_lemma, "dobj", f)
return
if dep in IOBJ_LABELS:
f = _filler_for_nominal(child)
if f:
yield (verb_lemma, "iobj", f)
return
if dep in XCOMP_LABELS:
f = _filler_for_verbal(child)
if f:
yield (verb_lemma, "xcomp", f)
return
if dep in CCOMP_LABELS:
f = _filler_for_verbal(child)
if f:
yield (verb_lemma, "ccomp", f)
return
# Prepositional phrases: V → prep (ADP) → pobj
if dep == "prep" and child.pos_ == "ADP":
prep_lemma = child.lemma_.lower()
if prep_lemma not in PREP_POBJ_MAP:
return
role = PREP_POBJ_MAP[prep_lemma]
# Walk to the pobj child of this prep
for grandchild in child.children:
if grandchild.dep_ == "pobj":
f = _filler_for_nominal(grandchild)
if f:
yield (verb_lemma, role, f)
def _filler_for_nominal(tok) -> str | None:
"""Return lowercased lemma if tok is a NOUN/PROPN with usable lemma; else None.
Drops PRON (pronouns don't carry selectional signal). Drops non-alpha or empty lemmas.
"""
if tok.pos_ == PRON_POS:
return None
if tok.pos_ not in NOMINAL_FILLER_POS:
return None
lemma = tok.lemma_.lower()
if not lemma or not lemma.isalpha():
return None
return lemma
def _filler_for_verbal(tok) -> str | None:
"""Return lowercased lemma if tok is a VERB with usable lemma; else None."""
if tok.pos_ not in VERBAL_FILLER_POS:
return None
lemma = tok.lemma_.lower()
if not lemma or not lemma.isalpha():
return None
return lemma
- [ ] Step 4: Run tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_extract_triples.py -v
Expected: all PASS. If a test fails because spaCy's parser disagrees with the expected dependency edge for a fixture sentence, investigate before patching: spaCy's labels may differ across versions (e.g., UD's obj vs OntoNotes' dobj). The fix is usually to extend the label-set constant (DOBJ_LABELS, etc.), not to modify the test sentence.
- [ ] Step 5: Commit
git -C /Users/jneumann/Repos/PhonoLex add packages/data/src/phonolex_data/pipeline/extract_triples.py packages/data/tests/test_extract_triples.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: triple extraction with passive remap + V-rooted PP filter
Fixture-driven extraction of (verb, role, filler) triples from spaCy
Docs. Implements the 9-role inventory, passive nsubjpass → dobj remap,
PRON-filler dropping, and V-rooted PP filtering (NP-modifier PPs excluded).
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 5: Phase-0 probe¶
Files:
- Create: research/2026-05-06-phon-94-canonical-spacy-probe/probe.py
- Create: research/2026-05-06-phon-94-canonical-spacy-probe/notebook.md
- Create: research/2026-05-06-phon-94-canonical-spacy-probe/README.md
This task is research, not production code — outputs are JSON stats + a markdown lab notebook per feedback_research_workflow.md. There are no unit tests for the probe itself.
- [ ] Step 1: Create the probe directory
mkdir -p /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-canonical-spacy-probe
- [ ] Step 2: Create the README
Create research/2026-05-06-phon-94-canonical-spacy-probe/README.md:
# PHON-94 Phase-0 Probe
**Ticket:** [PHON-94](https://neumannsworkshop.atlassian.net/browse/PHON-94)
**Date:** 2026-05-06
## Why
Verify spaCy-output presumptions before committing to the 4-H100-hour FineWeb-Edu production parse. Locks:
- DEP label inventory (nsubj/dobj/iobj/pobj/xcomp/ccomp present?)
- Lemmatizer behavior (running/runs/ran → run)
- Pronoun lemma form (he/she/it vs -PRON- sentinel)
- Passive voice prevalence (warrants nsubjpass → dobj remap)
- PP attachment: V-rooted vs NP-rooted with-PPs
- Coordination prevalence (single-head extraction loss)
- Particle-verb prevalence
- Throughput (calibrates production wallclock)
## Run
```bash
cd research/2026-05-06-phon-94-canonical-spacy-probe
uv run python probe.py --n-docs 1000 --output stats.json
Local CPU run. ~10-15 min for 1,000 FineWeb-Edu docs with _trf + parser + lemmatizer.
After running¶
Read notebook.md for findings, decisions, and any required canonical-config adjustments.
- [ ] **Step 3: Create the probe script**
Create `research/2026-05-06-phon-94-canonical-spacy-probe/probe.py`:
```python
#!/usr/bin/env python3
"""Phase-0 probe: verify spaCy presumptions before production parse.
Streams a small FineWeb-Edu sample, runs the canonical pipeline, and
emits stats verifying: DEP-label distribution, lemmatizer behavior,
pronoun handling, passive prevalence, PP attachment, coordination,
particle verbs, throughput.
"""
from __future__ import annotations
import argparse
import json
import time
from collections import Counter
from pathlib import Path
from datasets import load_dataset
from phonolex_data.pipeline.canonical_spacy import (
KEEP_POS, MAX_DOC_CHAR_LEN, load_canonical_pipeline
)
from phonolex_data.pipeline.extract_triples import extract_triples
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("--n-docs", type=int, default=1000,
help="Number of FineWeb-Edu docs to parse")
p.add_argument("--output", default="stats.json",
help="Output JSON stats path")
return p.parse_args()
def main() -> int:
args = parse_args()
print(f"[probe] loading canonical pipeline ...")
nlp = load_canonical_pipeline()
print(f"[probe] streaming {args.n_docs} FineWeb-Edu docs ...")
ds = load_dataset("HuggingFaceFW/fineweb-edu", split="train", streaming=True)
# Counters
dep_label_counts = Counter()
pos_counts = Counter()
nsubjpass_count = 0
nsubj_count = 0
pobj_with_v_rooted = 0
pobj_with_n_rooted = 0
conj_under_subj_or_obj = 0
particle_verb_count = 0
pron_lemma_samples = []
inflection_samples = {"running": [], "runs": [], "ran": []}
top_verb_lemmas = Counter()
sample_triples_per_top_verb = {}
total_tokens = 0
total_docs = 0
start_t = time.time()
texts = []
for i, ex in enumerate(ds):
if i >= args.n_docs:
break
text = ex.get("text") or ""
if len(text) > MAX_DOC_CHAR_LEN:
text = text[:MAX_DOC_CHAR_LEN]
if text:
texts.append(text)
print(f"[probe] parsing {len(texts)} docs (this is the slow step) ...")
for doc in nlp.pipe(texts, batch_size=8):
total_docs += 1
for tok in doc:
if tok.text.isalpha():
total_tokens += 1
pos_counts[tok.pos_] += 1
dep_label_counts[tok.dep_] += 1
if tok.pos_ == "PRON" and len(pron_lemma_samples) < 50:
pron_lemma_samples.append({
"text": tok.text.lower(), "lemma": tok.lemma_.lower()
})
if tok.text.lower() in inflection_samples and len(inflection_samples[tok.text.lower()]) < 10:
inflection_samples[tok.text.lower()].append(tok.lemma_.lower())
if tok.dep_ == "nsubj":
nsubj_count += 1
if tok.dep_ in {"nsubjpass", "csubjpass", "nsubj:pass"}:
nsubjpass_count += 1
# PP attachment: with-PP V-rooted vs N-rooted
if tok.dep_ == "prep" and tok.lemma_.lower() == "with" and tok.pos_ == "ADP":
parent = tok.head
if parent.pos_ == "VERB":
pobj_with_v_rooted += 1
elif parent.pos_ in {"NOUN", "PROPN"}:
pobj_with_n_rooted += 1
# Coordination under subj/obj
if tok.dep_ == "conj":
head_dep = tok.head.dep_
if head_dep in {"nsubj", "dobj", "iobj"}:
conj_under_subj_or_obj += 1
# Particle verbs
if tok.dep_ == "prt":
particle_verb_count += 1
if tok.pos_ == "VERB":
top_verb_lemmas[tok.lemma_.lower()] += 1
# Top-30 verb sample triples
for verb_lemma, _ in top_verb_lemmas.most_common(30):
if verb_lemma not in sample_triples_per_top_verb:
trips = list(extract_triples(doc))
matching = [t for t in trips if t[0] == verb_lemma]
if matching:
sample_triples_per_top_verb[verb_lemma] = matching[:5]
elapsed = time.time() - start_t
tokens_per_sec = total_tokens / elapsed if elapsed > 0 else 0
stats = {
"n_docs": total_docs,
"total_tokens": total_tokens,
"elapsed_sec": elapsed,
"tokens_per_sec_local_cpu": tokens_per_sec,
"dep_label_top_30": dep_label_counts.most_common(30),
"pos_top_20": pos_counts.most_common(20),
"nsubj_count": nsubj_count,
"nsubjpass_count": nsubjpass_count,
"passive_pct_of_subj": (
nsubjpass_count / (nsubj_count + nsubjpass_count) * 100
if (nsubj_count + nsubjpass_count) > 0 else 0
),
"pobj_with_v_rooted": pobj_with_v_rooted,
"pobj_with_n_rooted": pobj_with_n_rooted,
"conj_under_subj_or_obj": conj_under_subj_or_obj,
"particle_verb_count": particle_verb_count,
"pron_lemma_samples": pron_lemma_samples,
"inflection_samples": inflection_samples,
"top_30_verb_lemmas": top_verb_lemmas.most_common(30),
"sample_triples_per_top_verb": {
k: v for k, v in sample_triples_per_top_verb.items()
},
"h100_sxm_speedup_estimate": 30,
"estimated_h100_wallclock_for_800M_tokens_hours": (
(800_000_000 / (tokens_per_sec * 30)) / 3600 if tokens_per_sec > 0 else None
),
}
Path(args.output).write_text(json.dumps(stats, indent=2, default=str))
print(f"[probe] wrote {args.output}")
print(f"[probe] tokens/sec: {tokens_per_sec:.1f}")
print(f"[probe] passive % of subj: {stats['passive_pct_of_subj']:.2f}%")
print(f"[probe] DEP top 10: {stats['dep_label_top_30'][:10]}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 4: Run the probe (this is the actual research execution)
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-canonical-spacy-probe && uv run python probe.py --n-docs 1000 --output stats.json
Expected wallclock: ~10-15 min on local CPU.
- [ ] Step 5: Inspect stats.json + write notebook.md
cat /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-canonical-spacy-probe/stats.json | head -100
Read the stats. Then create research/2026-05-06-phon-94-canonical-spacy-probe/notebook.md with sections covering each of the 8 presumption checks. Format:
# PHON-94 Phase-0 Probe — Findings
**Date:** 2026-05-06
**Sample:** 1,000 FineWeb-Edu docs (~N total tokens parsed)
**Wallclock:** XX min on local CPU
## 1. DEP label inventory
[Top 30 from stats.json. Confirm the 9 expected labels are present. Flag any high-frequency label not in our inventory.]
## 2. Top 30 verb lemmas + sample triples
[Spot-check inflection collapsing. Confirm running/runs/ran → run.]
## 3. Pronoun lemma form
[Verify lemma is surface form, not -PRON-.]
## 4. Passive voice prevalence
[Report nsubjpass / (nsubj + nsubjpass) %. >5% confirms remap is doing real work.]
## 5. PP attachment — V-rooted vs N-rooted with-PPs
[Confirm V-rooted is dominant (or at least non-negligible) and N-rooted filter is doing real work.]
## 6. Coordination prevalence
[Report conj-under-subj-or-obj count.]
## 7. Particle verbs
[Report count + magnitude.]
## 8. Throughput + production estimate
[Report tokens/sec local CPU. Estimate H100 SXM wallclock.]
## Decisions
- [ ] Canonical config required no changes / required these changes: ...
- [ ] Role inventory locked at: 9 roles as planned / extended with: ...
- [ ] Passive remap committed (or revisited).
- [ ] Production parse authorized.
## Surprises
[Anything unexpected. Brief.]
Fill in the actual data. Keep it concise.
- [ ] Step 6: If presumptions break, halt and ask user
Per the feedback_pause_on_method_snag.md: if the probe surfaces a presumption breakage (e.g., spaCy emits obj not dobj for direct objects, or pronoun lemmas come back as -PRON-), stop after one fix attempt and surface the alternatives to the user. Don't chain workarounds.
- [ ] Step 7: Commit probe artifacts
git -C /Users/jneumann/Repos/PhonoLex add research/2026-05-06-phon-94-canonical-spacy-probe/
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: Phase-0 probe — local sanity check on FineWeb-Edu sample
Verifies DEP-label inventory, lemmatizer behavior, pronoun handling,
passive prevalence, PP attachment, coordination, particle verbs, and
throughput before authorizing the H100×4 production parse.
See notebook.md for findings.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 6: Implement build_selectional.py shard worker¶
Files:
- Create: research/2026-05-06-phon-94-corpus-parse/build_selectional.py
- Create: research/2026-05-06-phon-94-corpus-parse/README.md
- Test: packages/data/tests/test_build_selectional.py
- [ ] Step 1: Create the production-run directory + README
mkdir -p /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse
Create research/2026-05-06-phon-94-corpus-parse/README.md:
# PHON-94 Production Corpus Parse
**Ticket:** [PHON-94](https://neumannsworkshop.atlassian.net/browse/PHON-94)
**Date:** 2026-05-06
## Why
Run the canonical spaCy pipeline over FineWeb-Edu (1.06M docs / 800M tokens), CHILDES (4.7M utts), and (smoke-gated) PhonBank to produce:
- `data/runtime/selectional.parquet` — banded per-(verb, role, filler) PPMI
- Refreshed FineWeb-Edu freq+POS deltas for `words.parquet` (PHON-72/PHON-88 regen)
## Phases
1. **Smoke run** — `build_selectional.py --shard 0/1 --n-docs 100 --save-parquet smoke.parquet`. Verifies the full pipeline locally before committing to RunPod.
2. **FineWeb-Edu production** — 4× RunPod H100 SXM, sharded `i/4`. ~3-4h wallclock total. Output → ExternalData1.
3. **CHILDES** — 1× H100 (or local, depending on probe estimate). ~30-60 min.
4. **PhonBank** — smoke-gated; conditional run.
5. **Merge** — local `merge_shards.py` Polars stream-aggregate → final selectional.parquet.
## Output destinations
- Per-shard parquets: `/Volumes/ExternalData1/phonolex/raw_corpus_parses/{fineweb_edu,childes,phonbank}/shard_{i}_of_{N}.parquet`
- Final selectional: `data/runtime/selectional.parquet` (LFS)
- FineWeb freq+POS deltas: emitted as a TSV alongside, fed back through `data/norms/phonolex_frequency.tsv` for the next words.parquet regen
- [ ] Step 2: Write a small synthetic-stream test
Create packages/data/tests/test_build_selectional.py:
"""Tests for build_selectional shard worker.
Uses a synthetic in-memory text stream (no HuggingFace download) to verify
the shard's per-band counter accumulation and Parquet output schema.
"""
import polars as pl
import pytest
def test_build_selectional_emits_correct_schema(tmp_path):
"""A synthetic 5-doc stream produces a parquet whose schema matches expectations."""
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from build_selectional import process_text_stream
texts = [
"The boy ate the cake.",
"The girl read the book.",
"She wrote a letter.",
"He drank the water.",
"The cat slept on the mat.",
]
out_path = tmp_path / "shard.parquet"
# Always assigns to the "fineweb_adult" band for this test
band_resolver = lambda doc_idx: ["fineweb_adult"]
process_text_stream(
texts=texts,
out_path=out_path,
band_resolver=band_resolver,
)
df = pl.read_parquet(out_path)
expected_cols = {"verb", "role", "filler", "band", "count_v_r_f"}
assert expected_cols.issubset(set(df.columns)), f"missing cols: {expected_cols - set(df.columns)}"
assert df.height > 0, "no triples extracted from 5 sample sentences"
# All rows in this fixture should be the fineweb_adult band
assert (df["band"] == "fineweb_adult").all()
def test_build_selectional_dual_band_increments_both(tmp_path):
"""A doc whose band_resolver returns multiple bands increments all of them."""
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from build_selectional import process_text_stream
texts = ["The boy ate the cake."]
out_path = tmp_path / "shard.parquet"
# Each doc is in BOTH bands
band_resolver = lambda doc_idx: ["fineweb_adult", "fineweb_grade_K_8"]
process_text_stream(
texts=texts,
out_path=out_path,
band_resolver=band_resolver,
)
df = pl.read_parquet(out_path)
bands = set(df["band"].to_list())
assert "fineweb_adult" in bands
assert "fineweb_grade_K_8" in bands
# The (eat, dobj, cake) triple should appear once per band
cake_rows = df.filter(
(pl.col("verb") == "eat") & (pl.col("role") == "dobj") & (pl.col("filler") == "cake")
)
assert cake_rows.height == 2, f"expected 2 band-rows for (eat,dobj,cake), got {cake_rows.height}"
def test_build_selectional_emits_freq_sibling_parquet(tmp_path):
"""The shard worker emits a sibling .freq.parquet alongside the selectional shard."""
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from build_selectional import process_text_stream
texts = [
"The boy ate the cake.",
"The girl read the book.",
]
out_path = tmp_path / "shard.parquet"
process_text_stream(
texts=texts,
out_path=out_path,
band_resolver=lambda doc_idx: ["fineweb_adult"],
)
freq_path = out_path.with_suffix(".freq.parquet")
assert freq_path.exists(), f"freq sibling parquet not written: {freq_path}"
freq_df = pl.read_parquet(freq_path)
assert {"kind", "band", "key", "pos", "count", "cd_count"}.issubset(set(freq_df.columns))
# Both surface and lemma kinds present
kinds = set(freq_df["kind"].to_list())
assert "surface" in kinds
assert "lemma" in kinds
# Sidecar metadata
meta_path = freq_path.with_suffix(".meta.json")
assert meta_path.exists()
import json
meta = json.loads(meta_path.read_text())
assert "docs_per_band" in meta
assert "tokens_per_band" in meta
assert meta["docs_per_band"]["fineweb_adult"] == 2
- [ ] Step 3: Run the failing tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_build_selectional.py -v 2>&1 | tail -10
Expected: FAIL with ModuleNotFoundError: build_selectional.
- [ ] Step 4: Implement
build_selectional.py
Create research/2026-05-06-phon-94-corpus-parse/build_selectional.py:
#!/usr/bin/env python3
"""Sharded corpus parse worker for PHON-94 selectional stats.
Mirrors PHON-72's `build_frequency_corpus.py` shape: streams a HuggingFace
text dataset (or local file), parses with the canonical spaCy pipeline,
extracts triples + frequency counts, accumulates per-band counters, and
writes a per-shard Parquet of *raw counts* (no PMI computation here —
that happens in the merge step).
Usage (smoke):
python build_selectional.py --dataset HuggingFaceFW/fineweb-edu \\
--shard 0/1 --n-docs 100 --save-parquet /tmp/smoke.parquet \\
--bands-config bands_fineweb.json
Usage (production shard on RunPod):
python build_selectional.py --dataset HuggingFaceFW/fineweb-edu \\
--shard 0/4 --save-parquet /workspace/shard_0_of_4.parquet \\
--bands-config bands_fineweb.json --batch-size 256
"""
from __future__ import annotations
import argparse
import json
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Callable, Iterator
import polars as pl
from datasets import load_dataset
from tqdm import tqdm
from phonolex_data.pipeline.canonical_spacy import (
KEEP_POS, MAX_DOC_CHAR_LEN, load_canonical_pipeline
)
from phonolex_data.pipeline.extract_triples import extract_triples
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("--dataset", default="HuggingFaceFW/fineweb-edu")
p.add_argument("--config", default="default")
p.add_argument("--split", default="train")
p.add_argument("--shard", default="0/1", help="i/N")
p.add_argument("--n-docs", type=int, default=None,
help="If set, stop after N docs (for smoke runs)")
p.add_argument("--save-parquet", required=True, help="Output shard parquet path")
p.add_argument("--batch-size", type=int, default=128)
p.add_argument("--bands-config", default=None,
help="JSON file mapping doc properties → band labels")
p.add_argument("--default-band", default="fineweb_adult",
help="Band used when bands-config is absent")
p.add_argument("--checkpoint-dir", default="./checkpoints")
p.add_argument("--checkpoint-every", type=int, default=5000)
return p.parse_args()
def build_band_resolver_from_config(
config_path: Path | None, default_band: str
) -> Callable[[int], list[str]]:
"""Returns a function (doc_idx) → list of bands this doc belongs to.
Without a config file, every doc is assigned the default band only.
With a config file, the function looks up doc-classification metadata.
For PHON-88 grade-banding integration, the config provides per-doc
grade scores; this function maps them to band labels.
"""
if config_path is None or not config_path.exists():
return lambda doc_idx: [default_band]
cfg = json.loads(config_path.read_text())
# Cfg shape: {"doc_to_bands": {doc_idx_str: [band1, band2]}, "default": "..."}
doc_to_bands = cfg.get("doc_to_bands", {})
default = cfg.get("default", default_band)
def resolver(doc_idx: int) -> list[str]:
return doc_to_bands.get(str(doc_idx), [default])
return resolver
def stream_documents(
dataset: str, config: str, split: str,
shard_idx: int, shard_total: int, n_docs: int | None
) -> Iterator[tuple[int, str]]:
ds = load_dataset(dataset, name=config, split=split, streaming=True)
yielded = 0
for i, ex in enumerate(ds):
if shard_total > 1 and (i % shard_total) != shard_idx:
continue
text = ex.get("text") or ex.get("content") or ""
if not text or not isinstance(text, str):
continue
if len(text) > MAX_DOC_CHAR_LEN:
text = text[:MAX_DOC_CHAR_LEN]
yield i, text
yielded += 1
if n_docs is not None and yielded >= n_docs:
break
def process_text_stream(
texts: list[str],
out_path: Path,
band_resolver: Callable[[int], list[str]],
batch_size: int = 8,
) -> None:
"""In-memory test entry point — used by unit tests with synthetic streams.
Emits TWO parquets:
- {out_path}: selectional triples (verb, role, filler, band, counts)
- {out_path}.with_suffix('.freq.parquet'): per-(word, lemma, pos, band) counts
for downstream FineWeb-Edu freq+POS regen (PHON-72/PHON-88 columns).
Production code calls into this same accumulation logic via process_corpus.
"""
from phonolex_data.pipeline.canonical_spacy import KEEP_POS, MAX_WORD_LEN
nlp = load_canonical_pipeline()
counters_per_band = defaultdict(Counter) # band → Counter[(v,r,f)]
cvrstar_per_band = defaultdict(Counter) # band → Counter[(v,r)]
# Freq accumulators
word_pos_per_band = defaultdict(Counter) # band → Counter[(word_lower, pos)]
lemma_pos_per_band = defaultdict(Counter) # band → Counter[(lemma_lower, pos)]
cd_per_band_word = defaultdict(Counter) # band → Counter[word_lower]; doc-level
cd_per_band_lemma = defaultdict(Counter) # band → Counter[lemma_lower]; doc-level
docs_per_band = defaultdict(int)
tokens_per_band = defaultdict(int)
for doc_idx, doc in enumerate(nlp.pipe(texts, batch_size=batch_size)):
bands = band_resolver(doc_idx)
# Triple extraction
for v, r, f in extract_triples(doc):
for band in bands:
counters_per_band[band][(v, r, f)] += 1
cvrstar_per_band[band][(v, r)] += 1
# Frequency + POS accumulation per band, with per-doc CD tracking
seen_words_per_band = defaultdict(set)
seen_lemmas_per_band = defaultdict(set)
for tok in doc:
if not tok.text.isalpha() or len(tok.text) > MAX_WORD_LEN:
continue
if tok.pos_ not in KEEP_POS:
continue
w = tok.text.lower()
lem = tok.lemma_.lower()
pos = tok.pos_
for band in bands:
word_pos_per_band[band][(w, pos)] += 1
lemma_pos_per_band[band][(lem, pos)] += 1
seen_words_per_band[band].add(w)
seen_lemmas_per_band[band].add(lem)
tokens_per_band[band] += 1
for band in bands:
for w in seen_words_per_band[band]:
cd_per_band_word[band][w] += 1
for lem in seen_lemmas_per_band[band]:
cd_per_band_lemma[band][lem] += 1
docs_per_band[band] += 1
_write_shard_parquet(counters_per_band, cvrstar_per_band, out_path)
_write_freq_shard_parquet(
word_pos_per_band, lemma_pos_per_band,
cd_per_band_word, cd_per_band_lemma,
docs_per_band, tokens_per_band,
out_path.with_suffix(".freq.parquet"),
)
def process_corpus(args: argparse.Namespace) -> None:
"""Production entry: stream from HuggingFace, parse, accumulate, write.
Emits both the selectional triple parquet and the .freq.parquet sibling
in the same pass — frequency aggregation is essentially free given the
corpus is already being parsed.
"""
from phonolex_data.pipeline.canonical_spacy import KEEP_POS, MAX_WORD_LEN
nlp = load_canonical_pipeline()
s_idx, s_tot = (int(x) for x in args.shard.split("/"))
print(f"[shard] {s_idx}/{s_tot}")
band_resolver = build_band_resolver_from_config(
Path(args.bands_config) if args.bands_config else None,
default_band=args.default_band,
)
counters_per_band: defaultdict[str, Counter] = defaultdict(Counter)
cvrstar_per_band: defaultdict[str, Counter] = defaultdict(Counter)
word_pos_per_band: defaultdict[str, Counter] = defaultdict(Counter)
lemma_pos_per_band: defaultdict[str, Counter] = defaultdict(Counter)
cd_per_band_word: defaultdict[str, Counter] = defaultdict(Counter)
cd_per_band_lemma: defaultdict[str, Counter] = defaultdict(Counter)
docs_per_band: defaultdict[str, int] = defaultdict(int)
tokens_per_band: defaultdict[str, int] = defaultdict(int)
n_docs = 0
t0 = time.time()
stream = stream_documents(
args.dataset, args.config, args.split,
s_idx, s_tot, args.n_docs
)
def _id_text_stream():
for doc_idx, text in stream:
yield (text, doc_idx)
pbar = tqdm(unit="doc", smoothing=0.05)
for doc, doc_idx in nlp.pipe(_id_text_stream(), batch_size=args.batch_size, as_tuples=True):
bands = band_resolver(doc_idx)
# Triples
for v, r, f in extract_triples(doc):
for band in bands:
counters_per_band[band][(v, r, f)] += 1
cvrstar_per_band[band][(v, r)] += 1
# Frequency + POS — same parse, separate accumulators
seen_words_per_band: defaultdict[str, set] = defaultdict(set)
seen_lemmas_per_band: defaultdict[str, set] = defaultdict(set)
for tok in doc:
if not tok.text.isalpha() or len(tok.text) > MAX_WORD_LEN:
continue
if tok.pos_ not in KEEP_POS:
continue
w = tok.text.lower()
lem = tok.lemma_.lower()
pos = tok.pos_
for band in bands:
word_pos_per_band[band][(w, pos)] += 1
lemma_pos_per_band[band][(lem, pos)] += 1
seen_words_per_band[band].add(w)
seen_lemmas_per_band[band].add(lem)
tokens_per_band[band] += 1
for band in bands:
for w in seen_words_per_band[band]:
cd_per_band_word[band][w] += 1
for lem in seen_lemmas_per_band[band]:
cd_per_band_lemma[band][lem] += 1
docs_per_band[band] += 1
n_docs += 1
pbar.update(1)
pbar.close()
elapsed = time.time() - t0
print(f"[shard] {n_docs} docs in {elapsed:.1f}s")
out_path = Path(args.save_parquet)
_write_shard_parquet(counters_per_band, cvrstar_per_band, out_path)
_write_freq_shard_parquet(
word_pos_per_band, lemma_pos_per_band,
cd_per_band_word, cd_per_band_lemma,
docs_per_band, tokens_per_band,
out_path.with_suffix(".freq.parquet"),
)
def _write_shard_parquet(
counters_per_band: dict, cvrstar_per_band: dict, out_path: Path
) -> None:
"""Materialize per-band selectional counters as a Parquet file (raw counts; no PMI yet)."""
rows = []
for band, c in counters_per_band.items():
cvrstar_band = cvrstar_per_band[band]
for (v, r, f), count in c.items():
rows.append({
"verb": v,
"role": r,
"filler": f,
"band": band,
"count_v_r_f": count,
"count_v_r_star": cvrstar_band[(v, r)],
})
df = pl.DataFrame(rows, schema={
"verb": pl.Utf8,
"role": pl.Utf8,
"filler": pl.Utf8,
"band": pl.Utf8,
"count_v_r_f": pl.UInt32,
"count_v_r_star": pl.UInt32,
})
out_path.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_path)
print(f"[write] {out_path} ({df.height:,} rows)")
def _write_freq_shard_parquet(
word_pos_per_band: dict, lemma_pos_per_band: dict,
cd_per_band_word: dict, cd_per_band_lemma: dict,
docs_per_band: dict, tokens_per_band: dict,
out_path: Path,
) -> None:
"""Per-shard frequency+POS counts for FineWeb-Edu freq+POS regen.
Two row kinds:
- kind="surface": (band, word, pos) → count + cd_count
- kind="lemma": (band, lemma, pos) → count + cd_count
Plus per-band metadata in a separate table.
"""
rows = []
for band, c in word_pos_per_band.items():
cd = cd_per_band_word[band]
for (w, pos), count in c.items():
rows.append({
"kind": "surface", "band": band, "key": w, "pos": pos,
"count": count, "cd_count": cd[w],
})
for band, c in lemma_pos_per_band.items():
cd = cd_per_band_lemma[band]
for (lem, pos), count in c.items():
rows.append({
"kind": "lemma", "band": band, "key": lem, "pos": pos,
"count": count, "cd_count": cd[lem],
})
df = pl.DataFrame(rows, schema={
"kind": pl.Utf8, "band": pl.Utf8, "key": pl.Utf8, "pos": pl.Utf8,
"count": pl.UInt32, "cd_count": pl.UInt32,
})
out_path.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_path)
# Sidecar JSON: per-band totals (needed for per-million normalization in merge step)
meta_path = out_path.with_suffix(".meta.json")
import json
meta_path.write_text(json.dumps({
"docs_per_band": dict(docs_per_band),
"tokens_per_band": dict(tokens_per_band),
}, indent=2))
print(f"[write] {out_path} ({df.height:,} rows) + {meta_path}")
def main() -> int:
process_corpus(parse_args())
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 5: Run tests + a CLI smoke
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_build_selectional.py -v
Expected: 2 tests PASS.
- [ ] Step 6: CLI smoke run on a tiny FineWeb sample (optional but recommended)
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse && uv run python build_selectional.py --shard 0/1 --n-docs 20 --save-parquet /tmp/smoke_shard.parquet
Expected: completes in ~1-2 min, writes a parquet with hundreds-to-thousands of triples. Spot-check:
uv run python -c "import polars as pl; df = pl.read_parquet('/tmp/smoke_shard.parquet'); print(df.shape); print(df.head(10))"
- [ ] Step 7: Commit
git -C /Users/jneumann/Repos/PhonoLex add research/2026-05-06-phon-94-corpus-parse/build_selectional.py research/2026-05-06-phon-94-corpus-parse/README.md packages/data/tests/test_build_selectional.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: build_selectional.py — sharded corpus parse worker
Streams HuggingFace text dataset, parses with canonical pipeline, extracts
verb-role-filler triples + per-band counters, writes raw-count Parquet
shard. Production runs 4× sharded on RunPod H100 SXM.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 7: Implement merge_shards.py¶
Files:
- Create: research/2026-05-06-phon-94-corpus-parse/merge_shards.py
- Test: packages/data/tests/test_merge_shards.py
- [ ] Step 1: Write the failing test
Create packages/data/tests/test_merge_shards.py:
"""Tests for the shard-merge step.
Synthetic shard parquets → final selectional.parquet with PMI computed.
"""
import polars as pl
import pytest
def _write_synthetic_shard(path, rows: list[dict]):
schema = {
"verb": pl.Utf8, "role": pl.Utf8, "filler": pl.Utf8, "band": pl.Utf8,
"count_v_r_f": pl.UInt32, "count_v_r_star": pl.UInt32,
}
pl.DataFrame(rows, schema=schema).write_parquet(path)
def test_merge_sums_counts_across_shards(tmp_path):
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from merge_shards import merge_shards_to_final
s1 = tmp_path / "s1.parquet"
s2 = tmp_path / "s2.parquet"
out = tmp_path / "final.parquet"
_write_synthetic_shard(s1, [
{"verb": "eat", "role": "dobj", "filler": "cake", "band": "fineweb_adult", "count_v_r_f": 10, "count_v_r_star": 100},
{"verb": "eat", "role": "dobj", "filler": "bread", "band": "fineweb_adult", "count_v_r_f": 5, "count_v_r_star": 100},
])
_write_synthetic_shard(s2, [
{"verb": "eat", "role": "dobj", "filler": "cake", "band": "fineweb_adult", "count_v_r_f": 7, "count_v_r_star": 80},
{"verb": "eat", "role": "dobj", "filler": "soup", "band": "fineweb_adult", "count_v_r_f": 6, "count_v_r_star": 80},
])
merge_shards_to_final([s1, s2], out, min_count=1)
df = pl.read_parquet(out)
cake_row = df.filter(
(pl.col("verb") == "eat") & (pl.col("role") == "dobj")
& (pl.col("filler") == "cake") & (pl.col("band") == "fineweb_adult")
)
assert cake_row.height == 1
assert cake_row["count_v_r_f"][0] == 17 # 10 + 7
# count_v_r_star is per-band, should be the merged sum across shards
assert cake_row["count_v_r_star"][0] == 180 # 100 + 80
def test_merge_applies_min_count_filter(tmp_path):
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from merge_shards import merge_shards_to_final
s = tmp_path / "s.parquet"
out = tmp_path / "final.parquet"
_write_synthetic_shard(s, [
{"verb": "eat", "role": "dobj", "filler": "cake", "band": "fineweb_adult", "count_v_r_f": 10, "count_v_r_star": 100},
{"verb": "eat", "role": "dobj", "filler": "rare", "band": "fineweb_adult", "count_v_r_f": 2, "count_v_r_star": 100},
])
merge_shards_to_final([s], out, min_count=5)
df = pl.read_parquet(out)
fillers = set(df["filler"].to_list())
assert "cake" in fillers
assert "rare" not in fillers, "min_count filter should drop count_v_r_f < 5"
def test_merge_computes_ppmi(tmp_path):
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from merge_shards import merge_shards_to_final
s = tmp_path / "s.parquet"
out = tmp_path / "final.parquet"
# Construct a scenario where PMI > 0 for "cake" (over-represented as eat/dobj)
_write_synthetic_shard(s, [
# eat takes cake 50/100 of its dobj uses → P(cake|eat,dobj) = 0.5
{"verb": "eat", "role": "dobj", "filler": "cake", "band": "fineweb_adult", "count_v_r_f": 50, "count_v_r_star": 100},
# rest of cake's dobj appearances are minor (2 verbs × 5 each = 10 cake events total via other verbs)
{"verb": "buy", "role": "dobj", "filler": "cake", "band": "fineweb_adult", "count_v_r_f": 5, "count_v_r_star": 200},
# bread is widely admitted across many verbs (as a marginal-noun)
{"verb": "eat", "role": "dobj", "filler": "bread", "band": "fineweb_adult", "count_v_r_f": 10, "count_v_r_star": 100},
{"verb": "buy", "role": "dobj", "filler": "bread", "band": "fineweb_adult", "count_v_r_f": 50, "count_v_r_star": 200},
])
merge_shards_to_final([s], out, min_count=1)
df = pl.read_parquet(out)
cake_eat = df.filter(
(pl.col("verb") == "eat") & (pl.col("role") == "dobj") & (pl.col("filler") == "cake")
)
assert cake_eat.height == 1
# cake is over-represented for eat → PPMI > 0
assert cake_eat["ppmi"][0] > 0
bread_eat = df.filter(
(pl.col("verb") == "eat") & (pl.col("role") == "dobj") & (pl.col("filler") == "bread")
)
# bread is balanced across eat/buy → PPMI ≈ 0
assert bread_eat["ppmi"][0] >= 0 # PPMI is non-negative by definition
- [ ] Step 2: Run failing test
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_merge_shards.py -v 2>&1 | tail -10
Expected: FAIL ModuleNotFoundError.
- [ ] Step 3: Implement
merge_shards.py
Create research/2026-05-06-phon-94-corpus-parse/merge_shards.py:
#!/usr/bin/env python3
"""Merge per-shard Parquets into final selectional.parquet with PMI.
Polars stream-aggregates raw count parquets across shards, computes
per-band marginals, applies Laplace smoothing, computes PPMI, applies
min_count floor. Writes the schema-conforming output to data/runtime/.
PMI formula (per band b):
P̂(f|v,r,b) = (c(v,r,f,b) + α) / (c(v,r,*,b) + α·|F_r,b|)
P̂(f|r,b) = (c(*,r,f,b) + α) / (c(*,r,*,b) + α·|F_r,b|)
PPMI(v,r,f,b) = max(0, log2( P̂(f|v,r,b) / P̂(f|r,b) ))
α=0.01 Laplace smoothing.
Usage:
python merge_shards.py \\
/Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/shard_*.parquet \\
--output data/runtime/selectional.parquet
"""
from __future__ import annotations
import argparse
import math
from pathlib import Path
import polars as pl
SMOOTHING_ALPHA = 0.01
DEFAULT_MIN_COUNT = 5
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("shards", nargs="+", help="Per-shard parquet paths")
p.add_argument("--output", required=True)
p.add_argument("--min-count", type=int, default=DEFAULT_MIN_COUNT)
p.add_argument("--alpha", type=float, default=SMOOTHING_ALPHA)
return p.parse_args()
def merge_shards_to_final(
shard_paths: list[Path | str], output_path: Path | str,
min_count: int = DEFAULT_MIN_COUNT, alpha: float = SMOOTHING_ALPHA,
) -> None:
"""Stream-merge shards, sum counts, compute PPMI, write final selectional.parquet."""
paths = [str(p) for p in shard_paths]
print(f"[merge] reading {len(paths)} shard(s) ...")
# 1. Aggregate counts across shards via Polars groupby-sum
df = (
pl.scan_parquet(paths)
.group_by(["verb", "role", "filler", "band"])
.agg([
pl.col("count_v_r_f").sum().alias("count_v_r_f"),
# count_v_r_star is duplicated per-row for the same (v,r,b);
# taking max-per-shard then summing reproduces shard totals
pl.col("count_v_r_star").max().alias("count_v_r_star_per_shard_max"),
])
.collect()
)
# The cvrstar duplication: for a shard, each (v,r,f,band) row carries the
# SAME count_v_r_star (the per-shard total for that v,r,band). Across shards,
# we need the SUM of those per-shard totals — which we can recover by re-grouping.
cvrstar_per_band_verb_role = (
pl.scan_parquet(paths)
.group_by(["verb", "role", "band"])
.agg(pl.col("count_v_r_star").max().alias("count_v_r_star_shard_max"))
# max-per-shard captures each shard's contribution; we sum across shards
.group_by(["verb", "role", "band"])
.agg(pl.col("count_v_r_star_shard_max").sum().alias("count_v_r_star"))
.collect()
)
# Per-shard recovery is awkward — simpler: re-aggregate from the raw shard rows.
# Since count_v_r_star within a shard is the same for all (v,r,*,band), we sum
# the *unique* (verb, role, band, shard) groups. Use the path as a shard ID.
df_with_path = pl.concat([
pl.scan_parquet(p).with_columns(pl.lit(p).alias("__shard")).collect()
for p in paths
])
cvrstar = (
df_with_path
.group_by(["verb", "role", "band", "__shard"])
.agg(pl.col("count_v_r_star").max().alias("vr_shard_max"))
.group_by(["verb", "role", "band"])
.agg(pl.col("vr_shard_max").sum().alias("count_v_r_star"))
)
df = df.drop("count_v_r_star_per_shard_max")
df = df.join(cvrstar, on=["verb", "role", "band"], how="left")
print(f"[merge] {df.height:,} unique (v,r,f,b) tuples after cross-shard sum")
# 2. Apply min_count floor
df = df.filter(pl.col("count_v_r_f") >= min_count)
print(f"[filter] {df.height:,} rows after min_count={min_count}")
# 3. Per-(role, band) marginals: c(*, r, f, b) and c(*, r, *, b) and |F_{r,b}|
role_filler = (
df.group_by(["role", "band", "filler"])
.agg(pl.col("count_v_r_f").sum().alias("c_star_r_f"))
)
role_total = (
df.group_by(["role", "band"])
.agg([
pl.col("count_v_r_f").sum().alias("c_star_r_star"),
pl.col("filler").n_unique().alias("F_r"),
])
)
df = df.join(role_filler, on=["role", "band", "filler"], how="left")
df = df.join(role_total, on=["role", "band"], how="left")
# 4. Compute PPMI
df = df.with_columns([
# P̂(f | v, r, b)
((pl.col("count_v_r_f").cast(pl.Float64) + alpha)
/ (pl.col("count_v_r_star").cast(pl.Float64) + alpha * pl.col("F_r").cast(pl.Float64))
).alias("p_f_given_vrb"),
# P̂(f | r, b)
((pl.col("c_star_r_f").cast(pl.Float64) + alpha)
/ (pl.col("c_star_r_star").cast(pl.Float64) + alpha * pl.col("F_r").cast(pl.Float64))
).alias("p_f_given_rb"),
])
df = df.with_columns([
# PMI = log2( P̂(f|v,r,b) / P̂(f|r,b) ); PPMI = max(0, PMI)
(pl.col("p_f_given_vrb") / pl.col("p_f_given_rb")).log(base=2).alias("_pmi"),
])
df = df.with_columns([
pl.when(pl.col("_pmi") > 0).then(pl.col("_pmi")).otherwise(0.0).cast(pl.Float32).alias("ppmi")
])
# 5. Project to final schema
out = df.select([
pl.col("verb"),
pl.col("role"),
pl.col("filler"),
pl.col("band"),
pl.col("count_v_r_f").cast(pl.UInt32),
pl.col("count_v_r_star").cast(pl.UInt32),
pl.col("ppmi"),
])
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
out.write_parquet(output_path)
print(f"[write] {output_path} ({out.height:,} rows)")
def main() -> int:
args = parse_args()
merge_shards_to_final(args.shards, args.output, args.min_count, args.alpha)
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 4: Run tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_merge_shards.py -v
Expected: 3 tests PASS. If a test fails on the PPMI math, walk through the formula by hand on the synthetic fixture before patching — the smoothing α=0.01 is small but non-zero, so exact-equality tests must use small tolerances.
- [ ] Step 5: Commit
git -C /Users/jneumann/Repos/PhonoLex add research/2026-05-06-phon-94-corpus-parse/merge_shards.py packages/data/tests/test_merge_shards.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: merge_shards.py — Polars stream-merge to final selectional.parquet
Aggregates per-shard count parquets, computes per-band marginals, applies
Laplace smoothing (α=0.01), computes PPMI per (verb, role, filler, band).
Applies min_count=5 floor at write time. Output conforms to extended
selectional_schema.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 8: Add WordStore.subcat_profile() and .role_fillability() derived views¶
Files:
- Modify: packages/data/src/phonolex_data/runtime/store.py
- Test: packages/data/tests/runtime/test_selectional_parquet.py
- [ ] Step 1: Read existing WordStore class to understand the pattern
cat /Users/jneumann/Repos/PhonoLex/packages/data/src/phonolex_data/runtime/store.py | head -150
Note how from_parquet, subset, etc. are wired. The new methods follow the same pattern.
- [ ] Step 2: Write the failing tests
Create packages/data/tests/runtime/test_selectional_parquet.py:
"""Tests for selectional.parquet runtime layer.
Schema round-trip + WordStore derived views (subcat_profile, role_fillability).
"""
import polars as pl
import pytest
def _write_synthetic_selectional(path):
"""Synthetic selectional data: give is ditrans (nsubj+dobj+iobj),
sleep is intrans (nsubj only). All band=fineweb_adult."""
schema = {
"verb": pl.Utf8, "role": pl.Utf8, "filler": pl.Utf8, "band": pl.Utf8,
"count_v_r_f": pl.UInt32, "count_v_r_star": pl.UInt32,
"ppmi": pl.Float32,
}
rows = [
# give: nsubj + dobj + iobj — ditrans
{"verb": "give", "role": "nsubj", "filler": "boy", "band": "fineweb_adult",
"count_v_r_f": 100, "count_v_r_star": 1000, "ppmi": 1.0},
{"verb": "give", "role": "dobj", "filler": "book", "band": "fineweb_adult",
"count_v_r_f": 200, "count_v_r_star": 800, "ppmi": 2.0},
{"verb": "give", "role": "iobj", "filler": "girl", "band": "fineweb_adult",
"count_v_r_f": 50, "count_v_r_star": 60, "ppmi": 1.5},
# sleep: nsubj only — intrans
{"verb": "sleep", "role": "nsubj", "filler": "cat", "band": "fineweb_adult",
"count_v_r_f": 80, "count_v_r_star": 500, "ppmi": 0.8},
# cake — appears as dobj of multiple verbs
{"verb": "eat", "role": "dobj", "filler": "cake", "band": "fineweb_adult",
"count_v_r_f": 60, "count_v_r_star": 600, "ppmi": 1.2},
{"verb": "bake", "role": "dobj", "filler": "cake", "band": "fineweb_adult",
"count_v_r_f": 30, "count_v_r_star": 200, "ppmi": 1.8},
# cake also rarely as nsubj
{"verb": "fall", "role": "nsubj", "filler": "cake", "band": "fineweb_adult",
"count_v_r_f": 5, "count_v_r_star": 200, "ppmi": 0.1},
]
pl.DataFrame(rows, schema=schema).write_parquet(path)
def test_schema_roundtrip(tmp_path):
"""Write tiny DF → read back → schema matches selectional_schema()."""
from phonolex_data.runtime.schema import selectional_schema
p = tmp_path / "s.parquet"
_write_synthetic_selectional(p)
df = pl.read_parquet(p)
expected = selectional_schema()
for col, dtype in expected.items():
assert col in df.columns, f"missing col {col}"
assert df.schema[col] == dtype, f"col {col}: expected {dtype}, got {df.schema[col]}"
def test_wordstore_subcat_profile_classifies_ditrans(tmp_path):
from phonolex_data.runtime.store import WordStore
p = tmp_path / "s.parquet"
_write_synthetic_selectional(p)
store = WordStore.from_selectional_parquet(p)
profile = store.subcat_profile(verb="give", band="fineweb_adult")
assert profile.transitivity == "ditrans", (
f"give has nsubj+dobj+iobj counts; expected ditrans, got {profile.transitivity}"
)
assert "nsubj" in profile.admitted_roles
assert "dobj" in profile.admitted_roles
assert "iobj" in profile.admitted_roles
def test_wordstore_subcat_profile_classifies_intrans(tmp_path):
from phonolex_data.runtime.store import WordStore
p = tmp_path / "s.parquet"
_write_synthetic_selectional(p)
store = WordStore.from_selectional_parquet(p)
profile = store.subcat_profile(verb="sleep", band="fineweb_adult")
assert profile.transitivity == "intrans"
def test_wordstore_role_fillability_dominant_role(tmp_path):
from phonolex_data.runtime.store import WordStore
p = tmp_path / "s.parquet"
_write_synthetic_selectional(p)
store = WordStore.from_selectional_parquet(p)
fill = store.role_fillability(filler="cake", band="fineweb_adult")
# cake appears as dobj 90 times (60+30) and nsubj 5 times — dobj should dominate
assert fill["dobj"] > fill.get("nsubj", 0.0)
# Marginals sum to 1
total = sum(fill.values())
assert abs(total - 1.0) < 1e-6, f"role_fillability marginals should sum to 1, got {total}"
- [ ] Step 3: Run failing tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/runtime/test_selectional_parquet.py -v 2>&1 | tail -20
Expected: FAIL on each test referencing WordStore.from_selectional_parquet / .subcat_profile / .role_fillability.
- [ ] Step 4: Add the dataclass + WordStore methods
Edit packages/data/src/phonolex_data/runtime/store.py — add at the top of the file (after existing imports):
from dataclasses import dataclass
@dataclass(frozen=True)
class SubcatProfile:
"""Per-(verb, band) subcategorization profile, derived from selectional.parquet.
Computed at consumer-load by groupby+aggregate over per-(verb, role, band) counts.
"""
verb: str
band: str
role_counts: dict[str, int]
admitted_roles: tuple[str, ...]
transitivity: str # one of: trans, ditrans, intrans, copular, unknown
Then add the methods to the WordStore class. Place after the existing from_parquet constructor (near where the load logic ends — typically file end):
# ------- Selectional preference views (PHON-94) -------
@classmethod
def from_selectional_parquet(cls, path):
"""Standalone constructor for tests / direct selectional loading.
Production code uses from_parquet for words/edges; selectional is
attached separately via attach_selectional below.
"""
store = cls.__new__(cls)
store._selectional_df = pl.read_parquet(path)
store._subcat_cache = {}
store._fillability_cache = {}
return store
def attach_selectional(self, path):
"""Load selectional.parquet and attach its DF + caches to this store."""
self._selectional_df = pl.read_parquet(path)
self._subcat_cache = {}
self._fillability_cache = {}
def subcat_profile(self, verb: str, band: str) -> SubcatProfile:
"""Derived view: groupby role, classify transitivity from dominant pattern."""
cache_key = (verb, band)
if cache_key in self._subcat_cache:
return self._subcat_cache[cache_key]
df = self._selectional_df.filter(
(pl.col("verb") == verb) & (pl.col("band") == band)
)
# Per-role total counts for this (verb, band)
role_counts_df = (
df.group_by("role")
.agg(pl.col("count_v_r_star").max().alias("c_v_r_star"))
)
role_counts = {row["role"]: row["c_v_r_star"] for row in role_counts_df.iter_rows(named=True)}
admitted = tuple(sorted(r for r, c in role_counts.items() if c >= 50))
# Transitivity classification
has_dobj = "dobj" in role_counts
has_iobj = "iobj" in role_counts
has_nsubj = "nsubj" in role_counts
# Copular detection — would need POS info; for now, a verb with primarily
# nsubj + xcomp/ccomp flagging is the closest proxy. v1: skip copular detection.
if has_dobj and has_iobj:
trans = "ditrans"
elif has_dobj:
trans = "trans"
elif has_nsubj:
trans = "intrans"
else:
trans = "unknown"
profile = SubcatProfile(
verb=verb, band=band,
role_counts=role_counts, admitted_roles=admitted, transitivity=trans,
)
self._subcat_cache[cache_key] = profile
return profile
def role_fillability(self, filler: str, band: str) -> dict[str, float]:
"""Derived view: per-(filler, band) marginal P(role | filler).
Computed as count(filler in role) / sum_{r'} count(filler in role r').
"""
cache_key = (filler, band)
if cache_key in self._fillability_cache:
return self._fillability_cache[cache_key]
df = self._selectional_df.filter(
(pl.col("filler") == filler) & (pl.col("band") == band)
)
per_role = (
df.group_by("role")
.agg(pl.col("count_v_r_f").sum().alias("c_filler_in_role"))
)
rows = list(per_role.iter_rows(named=True))
total = sum(r["c_filler_in_role"] for r in rows)
if total == 0:
self._fillability_cache[cache_key] = {}
return {}
result = {r["role"]: r["c_filler_in_role"] / total for r in rows}
self._fillability_cache[cache_key] = result
return result
- [ ] Step 5: Run tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/runtime/test_selectional_parquet.py -v
Expected: 4 tests PASS.
- [ ] Step 6: Run the full data test suite to verify no regression
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/ -v 2>&1 | tail -20
Expected: all tests PASS.
- [ ] Step 7: Commit
git -C /Users/jneumann/Repos/PhonoLex add packages/data/src/phonolex_data/runtime/store.py packages/data/tests/runtime/test_selectional_parquet.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: WordStore.subcat_profile + role_fillability derived views
Per-band groupby+aggregate over selectional.parquet, computed lazily,
cached per (verb|filler, band). Single source of truth = selectional.parquet;
no materialized columns on words.parquet, no sibling Parquets.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 9: End-to-end synthetic-corpus integration test¶
Files:
- Create: packages/data/tests/test_e2e_selectional.py
This task wires Tasks 3-8 together to verify the full pipeline produces correct output on a 10-sentence synthetic corpus before any real production run.
- [ ] Step 1: Write the integration test
Create packages/data/tests/test_e2e_selectional.py:
"""End-to-end integration test: synthetic 10-sentence corpus → selectional.parquet.
Wires together canonical_spacy + extract_triples + build_selectional + merge_shards.
Verifies output is queryable via WordStore views.
"""
import polars as pl
import pytest
def test_e2e_pipeline_on_synthetic_corpus(tmp_path):
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if repo_root not in sys.path:
sys.path.insert(0, f"{repo_root}/research/2026-05-06-phon-94-corpus-parse")
from build_selectional import process_text_stream
from merge_shards import merge_shards_to_final
from phonolex_data.runtime.store import WordStore
# 10-sentence corpus — heavy on (eat, dobj, *) for stat stability
corpus = [
"The boy ate the cake.",
"The girl ate the bread.",
"The man ate the soup.",
"The woman ate the rice.",
"The child ate the cake.",
"She ate the cake yesterday.",
"He ate the bread quickly.",
"The boy slept on the bed.",
"The girl read the book.",
"The cake was good.",
]
shard_path = tmp_path / "shard.parquet"
final_path = tmp_path / "selectional.parquet"
# Run shard
process_text_stream(
texts=corpus,
out_path=shard_path,
band_resolver=lambda doc_idx: ["fineweb_adult"],
)
# Run merge (use min_count=1 so we don't filter out our tiny dataset)
merge_shards_to_final([shard_path], final_path, min_count=1)
# Verify output schema
df = pl.read_parquet(final_path)
assert "verb" in df.columns
assert "role" in df.columns
assert "filler" in df.columns
assert "band" in df.columns
assert "count_v_r_f" in df.columns
assert "count_v_r_star" in df.columns
assert "ppmi" in df.columns
# Sanity: (eat, dobj, cake) should exist with positive count
eat_cake = df.filter(
(pl.col("verb") == "eat") & (pl.col("role") == "dobj")
& (pl.col("filler") == "cake")
)
assert eat_cake.height >= 1, f"missing (eat, dobj, cake) in {df}"
assert eat_cake["count_v_r_f"][0] >= 2 # cake appears as eat-dobj at least twice
# Verify WordStore can load the result
store = WordStore.from_selectional_parquet(final_path)
profile = store.subcat_profile(verb="eat", band="fineweb_adult")
assert profile.transitivity in {"trans", "ditrans"}, (
f"eat should be at least transitive in this corpus, got {profile.transitivity}"
)
- [ ] Step 2: Run the test
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_e2e_selectional.py -v
Expected: PASS. This is the proof that all the modular pieces compose correctly.
If this test fails, it points to an integration bug between two of the previously-tested modules — investigate where the boundary is wrong before adjusting test expectations.
- [ ] Step 3: Commit
git -C /Users/jneumann/Repos/PhonoLex add packages/data/tests/test_e2e_selectional.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: end-to-end integration test on synthetic corpus
Composes canonical_spacy + extract_triples + build_selectional + merge_shards
+ WordStore on a 10-sentence fixture. Verifies the full pipeline before
running on real corpora.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 10: Add lemma + lemma-frequency PropertyDefs¶
Files:
- Modify: packages/web/workers/scripts/config.py
- Test: packages/data/tests/test_datasets.py (or new test file for property definitions)
- [ ] Step 1: Read existing PropertyDef structure to understand the pattern
grep -A 12 "id=\"frequency\"" /Users/jneumann/Repos/PhonoLex/packages/web/workers/scripts/config.py | head -20
Note the source citation, scale, interpretation, and surfaced fields.
- [ ] Step 2: Write a test asserting the new properties exist
Add to packages/data/tests/test_datasets.py (or wherever PropertyDef tests live — search for existing tests on PROPERTY_MAP):
def test_property_map_includes_lemma():
"""PHON-94 adds lemma + lemma_frequency PropertyDefs alongside existing surface-keyed columns."""
import sys
repo_root = "/Users/jneumann/Repos/PhonoLex"
if f"{repo_root}/packages/web/workers/scripts" not in sys.path:
sys.path.insert(0, f"{repo_root}/packages/web/workers/scripts")
from config import PROPERTY_MAP
assert "lemma" in PROPERTY_MAP, "lemma column not registered in PROPERTY_MAP"
assert "lemma_frequency" in PROPERTY_MAP, "lemma_frequency not registered"
assert "lemma_log_frequency" in PROPERTY_MAP
# Banded lemma frequencies (PHON-88-style parity)
for grade_band in ["lemma_frequency_grade_K_8", "lemma_frequency_grade_9_12", "lemma_frequency_grade_13_16"]:
assert grade_band in PROPERTY_MAP, f"missing {grade_band}"
- [ ] Step 3: Run the failing test
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_datasets.py::test_property_map_includes_lemma -v 2>&1 | tail -10
Expected: FAIL because PROPERTY_MAP doesn't have these keys.
- [ ] Step 4: Add the PropertyDef records
Edit packages/web/workers/scripts/config.py. Find the lexical-frequency property category (search for frequency or LEXICAL_FREQUENCY) and add new properties to it:
# PHON-94 lemma additions
PropertyDef(
id="lemma",
label="Lemma",
short_label="Lemma",
source="Canonical spaCy lemmatizer (en_core_web_trf)",
description="Lowercased lemma form (per spaCy en_core_web_trf lemmatizer)",
scale="string",
interpretation="Identity for selectional preference cross-reference",
display_format="s",
filterable=False,
slider_step=0,
surfaced=False,
),
PropertyDef(
id="lemma_frequency",
label="Lemma Frequency (FineWeb-Edu)",
short_label="LemmaFreq",
source="FineWeb-Edu (PHON-94 canonical parse)",
description="Per-million-word frequency of the word's lemma across FineWeb-Edu adult corpus.",
scale="per million",
interpretation="Higher = more common (across all surface forms of this lemma)",
display_format=".2f",
filterable=True,
slider_step=1.0,
use_log_scale=True,
surfaced=True,
),
PropertyDef(
id="lemma_log_frequency",
label="Log10 Lemma Frequency",
short_label="Log10LemmaFreq",
source="FineWeb-Edu (PHON-94 canonical parse)",
description="log10(lemma_frequency_count + 1)",
scale="log10",
interpretation="Higher = more common",
display_format=".4f",
filterable=True,
surfaced=False, # surfaced via the linear scale variant
),
PropertyDef(
id="lemma_frequency_grade_K_8",
label="Lemma Frequency (Grade K-8)",
short_label="LemmaFreqK8",
source="FineWeb-Edu K-8 grade band (PHON-94 canonical parse)",
description="Per-million-word frequency of the lemma in FineWeb-Edu's K-8 grade band.",
scale="per million",
interpretation="Higher = more common in K-8 educational text",
display_format=".2f",
filterable=True,
use_log_scale=True,
surfaced=False, # PHON-88 banded fields are unsurfaced; aggregate is surfaced
),
PropertyDef(
id="lemma_frequency_grade_9_12",
label="Lemma Frequency (Grade 9-12)",
short_label="LemmaFreq912",
source="FineWeb-Edu 9-12 grade band (PHON-94 canonical parse)",
description="Per-million-word frequency of the lemma in FineWeb-Edu's 9-12 grade band.",
scale="per million",
interpretation="Higher = more common in 9-12 grade educational text",
display_format=".2f",
filterable=True,
use_log_scale=True,
surfaced=False,
),
PropertyDef(
id="lemma_frequency_grade_13_16",
label="Lemma Frequency (Grade 13-16)",
short_label="LemmaFreq1316",
source="FineWeb-Edu 13-16 grade band (PHON-94 canonical parse)",
description="Per-million-word frequency of the lemma in FineWeb-Edu's 13-16 grade band.",
scale="per million",
interpretation="Higher = more common in college-level educational text",
display_format=".2f",
filterable=True,
use_log_scale=True,
surfaced=False,
),
If the existing PROPERTY_MAP construction is automatic from the categories, the new properties will register automatically. If it's a hand-maintained dict, also add the entries there.
- [ ] Step 5: Run tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_datasets.py::test_property_map_includes_lemma -v
Expected: PASS.
- [ ] Step 6: Run the schema regen test (verify words.parquet schema picks up the new cols)
cd /Users/jneumann/Repos/PhonoLex && uv run python -c "from phonolex_data.runtime.schema import words_schema; s = words_schema(); print({k: v for k, v in s.items() if 'lemma' in k})"
Expected output: shows lemma, lemma_frequency, lemma_log_frequency, and the three grade-banded columns with their Polars dtypes.
- [ ] Step 7: Commit
git -C /Users/jneumann/Repos/PhonoLex add packages/web/workers/scripts/config.py packages/data/tests/test_datasets.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: PropertyDef records for lemma + lemma-banded frequency
Adds lemma (str) plus lemma_frequency, lemma_log_frequency, and three
grade-banded lemma freq cols. Surface-keyed columns retained for
CMU-dict-aligned lookups. words.parquet schema picks these up via
PropertyDef-driven codegen.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 11: Production parse — FineWeb-Edu (RunPod H100×4)¶
Files:
- Create: research/2026-05-06-phon-94-corpus-parse/launch_shards.sh
- Create: research/2026-05-06-phon-94-corpus-parse/poll_progress.sh
- Modify: research/2026-05-06-phon-94-corpus-parse/notebook.md (production-run log)
This task is operational — runs RunPod GPUs and tracks progress. No unit tests; the acceptance test is "shard parquets land on ExternalData1 with the expected schema and triple count."
- [ ] Step 1: Bands config for FineWeb-Edu
The bands config maps each FineWeb-Edu doc's edu_score → grade-band labels (matching PHON-88's classification). For the production run, the band resolver inspects each streamed doc's metadata.
Create research/2026-05-06-phon-94-corpus-parse/bands_fineweb.py:
"""Band resolver for FineWeb-Edu docs.
Mirrors PHON-88's edu_score → grade-band classification. Each doc emits
(fineweb_adult, fineweb_grade_X) — fineweb_adult is the materialized aggregate.
"""
# Per PHON-88 — replace with actual bands when verifying against PHON-88 source:
def resolve_bands_for_doc(ex: dict) -> list[str]:
"""Given a FineWeb-Edu doc dict, return list of band labels."""
bands = ["fineweb_adult"] # always increments the materialized aggregate
score = ex.get("score") or ex.get("edu_score")
if score is None:
return bands
# PHON-88 bins (verify against PHON-88's classifier; adjust if different):
if score < 2.5:
bands.append("fineweb_grade_K_8")
elif score < 4.0:
bands.append("fineweb_grade_9_12")
else:
bands.append("fineweb_grade_13_16")
return bands
Note: the actual PHON-88 binning thresholds need to be verified against the PHON-88 implementation. Inspect the PHON-88 loader before authorizing the production run.
- [ ] Step 2: Update
build_selectional.pyto call this resolver during streaming
Modify the stream_documents function in research/2026-05-06-phon-94-corpus-parse/build_selectional.py to yield (idx, text, bands) triples where bands comes from the resolver. Then plumb bands through process_corpus. Re-run the unit tests after this change.
The exact change:
def stream_documents(...):
from bands_fineweb import resolve_bands_for_doc # local import for production runs
ds = load_dataset(...)
for i, ex in enumerate(ds):
if shard_total > 1 and (i % shard_total) != shard_idx:
continue
bands = resolve_bands_for_doc(ex) # NEW
text = ex.get("text") or ""
if not text:
continue
if len(text) > MAX_DOC_CHAR_LEN:
text = text[:MAX_DOC_CHAR_LEN]
yield i, text, bands # bands is the third element
And in process_corpus, plumb bands through the loop:
for doc, (doc_idx, doc_bands) in nlp.pipe(
((text, (idx, bands)) for idx, text, bands in stream),
batch_size=args.batch_size, as_tuples=True
):
for v, r, f in extract_triples(doc):
for band in doc_bands:
counters_per_band[band][(v, r, f)] += 1
cvrstar_per_band[band][(v, r)] += 1
Run tests:
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/test_build_selectional.py packages/data/tests/test_e2e_selectional.py -v
Expected: PASS.
- [ ] Step 3: Create
launch_shards.sh
Create research/2026-05-06-phon-94-corpus-parse/launch_shards.sh:
#!/usr/bin/env bash
# PHON-94 RunPod H100×4 launcher — FineWeb-Edu corpus parse.
#
# Each pod runs build_selectional.py with --shard i/4 over FineWeb-Edu's
# streaming dataset, processes ~250K docs (its quartile), and writes a
# raw-count Parquet to /workspace/shard_${i}_of_4.parquet.
#
# After all 4 shards complete, run from local:
# runpodctl receive <pod_id>:/workspace/shard_*.parquet \
# /Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/
set -euo pipefail
GPU_TYPE="NVIDIA H100 80GB HBM3"
IMAGE="runpod/pytorch:2.4.0-py3.11-cuda12.4.1-devel-ubuntu22.04"
N_SHARDS=4
for i in $(seq 0 $((N_SHARDS - 1))); do
NAME="phon94-fineweb-shard-${i}-of-${N_SHARDS}"
echo "[launch] creating pod ${NAME} ..."
runpodctl pod create \
--name "${NAME}" \
--gpu-id "${GPU_TYPE}" \
--image "${IMAGE}" \
--container-disk-in-gb 50 \
--volume-in-gb 100 \
--ports "22/tcp" \
--env "SHARD_IDX=${i}" \
--env "SHARD_TOTAL=${N_SHARDS}"
done
echo "[launch] all shards launched. ssh in and run:"
echo " cd PhonoLex/research/2026-05-06-phon-94-corpus-parse"
echo " uv run python build_selectional.py --shard \$SHARD_IDX/\$SHARD_TOTAL --save-parquet /workspace/shard_\${SHARD_IDX}_of_\${SHARD_TOTAL}.parquet --batch-size 256"
echo ""
echo "after all 4 finish, runpodctl receive each pod's shard parquet to:"
echo " /Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/"
chmod +x /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse/launch_shards.sh
- [ ] Step 4: Create
poll_progress.sh
Create research/2026-05-06-phon-94-corpus-parse/poll_progress.sh:
#!/usr/bin/env bash
# Poll all 4 PHON-94 pods for status.
set -euo pipefail
runpodctl pod list 2>&1 | grep -E "phon94-fineweb-shard" || echo "[poll] no PHON-94 pods running"
chmod +x /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse/poll_progress.sh
- [ ] Step 5: Local smoke run before launching RunPod
Run a 100-doc smoke locally with the production code path (band resolver active):
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse && uv run python build_selectional.py --shard 0/1 --n-docs 100 --save-parquet /tmp/fineweb_smoke.parquet --bands-config bands_fineweb.py
Inspect:
uv run python -c "
import polars as pl
df = pl.read_parquet('/tmp/fineweb_smoke.parquet')
print('total rows:', df.height)
print('bands seen:', df['band'].unique().to_list())
print('top verbs:', df.group_by('verb').agg(pl.col('count_v_r_f').sum().alias('total')).sort('total', descending=True).head(10))
"
Expected: 3 bands seen (fineweb_adult plus 1+ of the grade bands), thousands of triples, sensible top verbs.
- [ ] Step 6: Confirm with user before launching RunPod
This is a checkpoint per feedback_estimate_revisions.md: the production parse costs ~$40 in GPU time. Pause here and ask the user: "Smoke run looks good. Authorize RunPod H100×4 launch (~$40, 3-4h wallclock)?"
- [ ] Step 7: Launch + run shards (if authorized)
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse && bash launch_shards.sh
For each pod, ssh in:
runpodctl pod list # get pod IPs + ports
ssh root@<pod-ip> -p <port>
git clone https://github.com/neumanns-workshop/PhonoLex.git
cd PhonoLex
uv pip install -e packages/data
uv run python -m spacy download en_core_web_trf
cd research/2026-05-06-phon-94-corpus-parse
uv run python build_selectional.py \
--shard $SHARD_IDX/$SHARD_TOTAL \
--save-parquet /workspace/shard_${SHARD_IDX}_of_${SHARD_TOTAL}.parquet \
--batch-size 256 \
--bands-config bands_fineweb.py
- [ ] Step 8: Pull shard parquets to local ExternalData1
After each shard finishes:
runpodctl receive <pod_id>:/workspace/shard_X_of_4.parquet /Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/
- [ ] Step 9: Stop RunPod pods
for pod in $(runpodctl pod list | grep phon94-fineweb | awk '{print $1}'); do runpodctl pod stop $pod && runpodctl pod remove $pod; done
- [ ] Step 10: Update notebook + commit operational artifacts
Update research/2026-05-06-phon-94-corpus-parse/notebook.md with the production-run log: actual wallclock, total token count, total triple count per band, any anomalies. Then:
git -C /Users/jneumann/Repos/PhonoLex add research/2026-05-06-phon-94-corpus-parse/launch_shards.sh research/2026-05-06-phon-94-corpus-parse/poll_progress.sh research/2026-05-06-phon-94-corpus-parse/bands_fineweb.py research/2026-05-06-phon-94-corpus-parse/build_selectional.py research/2026-05-06-phon-94-corpus-parse/notebook.md
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: FineWeb-Edu production parse + band resolver
Bands config + RunPod launcher for 4× H100 SXM. Production-run log in
notebook.md. Per-shard parquets stored on ExternalData1 cold storage.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 12: CHILDES corpus parse¶
Files:
- Create: research/2026-05-06-phon-94-corpus-parse/build_selectional_childes.py
- Create: research/2026-05-06-phon-94-corpus-parse/bands_childes.py
CHILDES is significantly smaller than FineWeb-Edu (~30M tokens vs 800M); single-pod or local execution suffices.
- [ ] Step 1: Locate CHILDES source data
ls /Users/jneumann/Repos/PhonoLex/data/_external/childes_mor/ 2>&1 | head -10
If not present locally, find PHON-87's loader and inspect how it accessed CHILDES:
cat /Users/jneumann/Repos/PhonoLex/packages/data/src/phonolex_data/loaders/phonolex_childes_ageband.py | head -50
Locate the CHILDES corpus path. Note: CHILDES is also used by PHON-86/87 with MOR-tier extraction; PHON-94 reuses the same source data but parses with spaCy instead of MOR.
- [ ] Step 2: Create the CHILDES band resolver
Create research/2026-05-06-phon-94-corpus-parse/bands_childes.py:
"""Band resolver for CHILDES utterances.
Each utterance has a participant age in months. Maps to age-banded label
matching PHON-87's banding scheme: 0-2y, 2-5y, 5-8y, 8-12y. Also emits
childes_general aggregate.
"""
def resolve_bands_for_age_months(age_months: int) -> list[str]:
bands = ["childes_general"]
if age_months < 24:
bands.append("childes_age_0_2")
elif age_months < 60:
bands.append("childes_age_2_5")
elif age_months < 96:
bands.append("childes_age_5_8")
elif age_months < 144:
bands.append("childes_age_8_12")
# Older participants: childes_general only
return bands
- [ ] Step 3: Create CHILDES corpus driver
Create research/2026-05-06-phon-94-corpus-parse/build_selectional_childes.py:
#!/usr/bin/env python3
"""CHILDES corpus parse for selectional preferences.
Iterates CHILDES MOR-tier utterances (loaded via the existing PHON-87
loader path), parses each utterance with the canonical spaCy pipeline,
extracts triples, accumulates per-band counters, writes shard parquet.
"""
from __future__ import annotations
import argparse
from collections import Counter, defaultdict
from pathlib import Path
import polars as pl
from phonolex_data.pipeline.canonical_spacy import load_canonical_pipeline
from phonolex_data.pipeline.extract_triples import extract_triples
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("--save-parquet", required=True)
p.add_argument("--batch-size", type=int, default=128)
p.add_argument("--n-utts", type=int, default=None,
help="If set, stop after N utterances (smoke run)")
return p.parse_args()
def iter_childes_utterances():
"""Yield (utterance_text, participant_age_months) tuples from PHON-87 substrate.
The exact loader call depends on existing CHILDES integration; see
packages/data/src/phonolex_data/loaders/phonolex_childes_ageband.py for
the patterns used by PHON-87.
"""
# Placeholder — wire to actual PHON-87 loader at implementation time.
# The PHON-87 loader yields per-utterance MOR + speaker metadata; here
# we want the surface utterance text + child age in months.
raise NotImplementedError(
"wire to PHON-87's CHILDES loader — see "
"packages/data/src/phonolex_data/loaders/phonolex_childes_ageband.py"
)
def main() -> int:
args = parse_args()
nlp = load_canonical_pipeline()
counters_per_band = defaultdict(Counter)
cvrstar_per_band = defaultdict(Counter)
from bands_childes import resolve_bands_for_age_months
n = 0
for text, age_months in iter_childes_utterances():
if args.n_utts is not None and n >= args.n_utts:
break
bands = resolve_bands_for_age_months(age_months)
doc = nlp(text)
for v, r, f in extract_triples(doc):
for band in bands:
counters_per_band[band][(v, r, f)] += 1
cvrstar_per_band[band][(v, r)] += 1
n += 1
rows = []
for band, c in counters_per_band.items():
cvrstar_band = cvrstar_per_band[band]
for (v, r, f), count in c.items():
rows.append({
"verb": v, "role": r, "filler": f, "band": band,
"count_v_r_f": count, "count_v_r_star": cvrstar_band[(v, r)],
})
df = pl.DataFrame(rows, schema={
"verb": pl.Utf8, "role": pl.Utf8, "filler": pl.Utf8, "band": pl.Utf8,
"count_v_r_f": pl.UInt32, "count_v_r_star": pl.UInt32,
})
Path(args.save_parquet).parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(args.save_parquet)
print(f"[write] {args.save_parquet} ({df.height:,} rows)")
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 4: Wire
iter_childes_utterances()to the actual PHON-87 loader
Read packages/data/src/phonolex_data/loaders/phonolex_childes_ageband.py to find the per-utterance + age extraction. Adapt to yield (text, age_months). Avoid duplicating PHON-87's MOR-tier filtering — reuse it directly via the loader's API.
- [ ] Step 5: Smoke run
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse && uv run python build_selectional_childes.py --save-parquet /tmp/childes_smoke.parquet --n-utts 1000
Verify the output has rows in the expected bands.
- [ ] Step 6: Full CHILDES run
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse && uv run python build_selectional_childes.py --save-parquet /Volumes/ExternalData1/phonolex/raw_corpus_parses/childes/shard_0_of_1.parquet
Expected wallclock: 30-60 min on a single H100, longer on local CPU. If the local CPU run extrapolates to many hours, run on a single RunPod H100 instead.
- [ ] Step 7: Commit
git -C /Users/jneumann/Repos/PhonoLex add research/2026-05-06-phon-94-corpus-parse/build_selectional_childes.py research/2026-05-06-phon-94-corpus-parse/bands_childes.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: CHILDES corpus parse with age-band resolver
Reuses PHON-87 MOR-tier loader for utterance + participant age, parses
each utterance with canonical spaCy, accumulates per-ageband counters.
Output stored on ExternalData1 cold storage.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 13: PhonBank smoke gate + conditional parse¶
Files:
- Create: research/2026-05-06-phon-94-corpus-parse/phonbank_smoke_gate.py
- Modify: research/2026-05-06-phon-94-corpus-parse/notebook.md
- [ ] Step 1: Create the smoke gate
Create research/2026-05-06-phon-94-corpus-parse/phonbank_smoke_gate.py:
#!/usr/bin/env python3
"""PhonBank smoke gate: decide whether per-band triple density supports inclusion.
PhonBank utterances are mostly 1-3 word child speech and diary-study segments;
selectional triple density per token is much sparser than CHILDES. The gate:
1. Parse 1,000 PhonBank utterances with canonical pipeline.
2. Count triples per ageband.
3. Assert: top-200 verbs (by frequency in this sample) have ≥ min_count=5
triples in the smallest ageband.
4. If gate passes → commit to full PhonBank parse.
If gate fails → drop PhonBank from band inventory; document in notebook.
"""
from __future__ import annotations
from collections import Counter
from pathlib import Path
from phonolex_data.pipeline.canonical_spacy import load_canonical_pipeline
from phonolex_data.pipeline.extract_triples import extract_triples
def iter_phonbank_utterances(limit: int = 1000):
"""Wire to PHON-86 phonbank loader. Yield (utterance_text, age_months)."""
raise NotImplementedError(
"wire to PHON-86's phonbank loader — see "
"packages/data/src/phonolex_data/loaders/phonolex_phonbank_ageband.py"
)
def main() -> int:
nlp = load_canonical_pipeline()
by_band: dict[str, Counter] = {}
verb_freq = Counter()
for text, age_months in iter_phonbank_utterances(limit=1000):
# Determine band as in bands_childes; PhonBank uses similar age-band slicing
if age_months < 24:
band = "phonbank_age_0_2"
elif age_months < 60:
band = "phonbank_age_2_5"
else:
continue # skip older participants for the gate
doc = nlp(text)
c = by_band.setdefault(band, Counter())
for v, r, f in extract_triples(doc):
c[(v, r, f)] += 1
verb_freq[v] += 1
# Smoke gate: top-200 verbs in each band must have ≥ 5 unique triples
top_200_verbs = {v for v, _ in verb_freq.most_common(200)}
print(f"[gate] top-200 verbs: {len(top_200_verbs)} unique")
band_passes = {}
for band, c in by_band.items():
# Per-verb unique fillers in this band
per_verb_fillers = Counter()
for (v, r, f), count in c.items():
if count >= 5:
per_verb_fillers[v] += 1
# How many top-200 verbs have at least one (any role) ≥ 5 triple?
passing_verbs = sum(1 for v in top_200_verbs if per_verb_fillers[v] >= 1)
pct = passing_verbs / len(top_200_verbs) * 100 if top_200_verbs else 0
band_passes[band] = pct
print(f"[gate] {band}: {passing_verbs}/{len(top_200_verbs)} top-200 verbs have ≥1 triple ≥ 5 ({pct:.1f}%)")
# Decision: gate passes if ≥ 50% of top-200 verbs are populated in each band
overall_pass = all(p >= 50 for p in band_passes.values())
print(f"\n[decision] PhonBank smoke gate {'PASSED' if overall_pass else 'FAILED'}")
print(f" → {'commit to full PhonBank parse' if overall_pass else 'drop PhonBank from band inventory'}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 2: Wire to PHON-86 loader
Read packages/data/src/phonolex_data/loaders/phonolex_phonbank_ageband.py to find utterance + age yield pattern. Adapt the placeholder.
- [ ] Step 3: Run the gate
cd /Users/jneumann/Repos/PhonoLex/research/2026-05-06-phon-94-corpus-parse && uv run python phonbank_smoke_gate.py 2>&1 | tee /tmp/phonbank_gate_log.txt
- [ ] Step 4: Document decision in notebook + conditional full parse
If GATE PASSED: write build_selectional_phonbank.py (mirror of CHILDES driver) and run the full PhonBank corpus to a shard parquet on ExternalData1.
If GATE FAILED: document the failure rate per band in research/2026-05-06-phon-94-corpus-parse/notebook.md. Drop phonbank_* bands from the final selectional.parquet inventory. Note this in the spec's "out-of-scope" section.
- [ ] Step 5: Commit
git -C /Users/jneumann/Repos/PhonoLex add research/2026-05-06-phon-94-corpus-parse/phonbank_smoke_gate.py research/2026-05-06-phon-94-corpus-parse/notebook.md
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: PhonBank smoke gate + (conditional) full parse
[Note in notebook.md whether gate passed or failed.]
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 14: Final merge + selectional.parquet acceptance tests¶
Files:
- (no new files; runs merge_shards over all corpus parses; updates data/runtime/selectional.parquet)
- Test: packages/data/tests/runtime/test_selectional_parquet.py (add production-data tests)
- [ ] Step 1: Run final merge across all corpora
cd /Users/jneumann/Repos/PhonoLex && uv run python research/2026-05-06-phon-94-corpus-parse/merge_shards.py \
/Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/shard_*.parquet \
/Volumes/ExternalData1/phonolex/raw_corpus_parses/childes/shard_*.parquet \
/Volumes/ExternalData1/phonolex/raw_corpus_parses/phonbank/shard_*.parquet \
--output data/runtime/selectional.parquet \
--min-count 5
(Adjust glob to skip phonbank if gate failed.)
Expected wallclock: 5-10 min for the merge step (Polars handles this efficiently). Output size: ~1-2 GB.
- [ ] Step 2: Add acceptance tests against the production data
Add to packages/data/tests/runtime/test_selectional_parquet.py:
import os
import pytest
PRODUCTION_PATH = "/Users/jneumann/Repos/PhonoLex/data/runtime/selectional.parquet"
PRODUCTION_AVAILABLE = os.path.exists(PRODUCTION_PATH) and os.path.getsize(PRODUCTION_PATH) > 1_000_000
@pytest.mark.skipif(not PRODUCTION_AVAILABLE,
reason="production selectional.parquet not yet generated")
class TestProductionSelectional:
def test_known_verb_dobj_admits_cake(self):
df = pl.read_parquet(PRODUCTION_PATH)
cake = df.filter(
(pl.col("verb") == "cut") & (pl.col("role") == "dobj")
& (pl.col("filler") == "cake") & (pl.col("band") == "fineweb_adult")
)
assert cake.height >= 1, "(cut, dobj, cake) missing from fineweb_adult"
assert cake["ppmi"][0] > 0, f"expected ppmi > 0, got {cake['ppmi'][0]}"
def test_known_verb_dobj_admits_paper_meat(self):
df = pl.read_parquet(PRODUCTION_PATH)
for filler in ["paper", "meat"]:
row = df.filter(
(pl.col("verb") == "cut") & (pl.col("role") == "dobj")
& (pl.col("filler") == filler) & (pl.col("band") == "fineweb_adult")
)
assert row.height >= 1, f"(cut, dobj, {filler}) missing"
assert row["ppmi"][0] > 0
def test_known_verb_dobj_rejects_thunder(self):
df = pl.read_parquet(PRODUCTION_PATH)
thunder = df.filter(
(pl.col("verb") == "cut") & (pl.col("role") == "dobj")
& (pl.col("filler") == "thunder") & (pl.col("band") == "fineweb_adult")
)
# thunder should NOT have ppmi > 0 for cut/dobj — it's either absent or has ppmi == 0
if thunder.height > 0:
assert thunder["ppmi"][0] == 0.0, f"(cut, dobj, thunder) has ppmi {thunder['ppmi'][0]}"
def test_band_inventory_present(self):
df = pl.read_parquet(PRODUCTION_PATH)
bands = set(df["band"].unique().to_list())
# Required bands
assert "fineweb_adult" in bands
assert "childes_general" in bands
# PhonBank conditional — don't assert
# Grade bands
for grade in ["fineweb_grade_K_8", "fineweb_grade_9_12", "fineweb_grade_13_16"]:
assert grade in bands, f"missing grade band {grade}"
# Childes age bands
for ab in ["childes_age_0_2", "childes_age_2_5", "childes_age_5_8", "childes_age_8_12"]:
assert ab in bands, f"missing childes age band {ab}"
def test_top_100_verbs_have_dobj_coverage(self):
df = pl.read_parquet(PRODUCTION_PATH)
adult = df.filter(pl.col("band") == "fineweb_adult")
# Top 100 most-frequent verbs (by total dobj count_v_r_star)
top100 = (
adult.filter(pl.col("role") == "dobj")
.group_by("verb")
.agg(pl.col("count_v_r_star").max().alias("max_cvrstar"))
.sort("max_cvrstar", descending=True)
.head(100)
)
# Each top-100 verb should have count_v_r_star >= 50 (the consumer-side gate threshold)
below_gate = top100.filter(pl.col("max_cvrstar") < 50)
assert below_gate.height == 0, (
f"{below_gate.height} top-100 verbs below count_v_r_star=50 gate"
)
- [ ] Step 3: Run tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/runtime/test_selectional_parquet.py -v
Expected: all PASS. If test_known_verb_dobj_rejects_thunder fails (thunder somehow has ppmi > 0 for cut/dobj), investigate the parse — likely a parser error or a noisy training-data artifact.
- [ ] Step 4: Verify selectional.parquet is LFS-tracked
cd /Users/jneumann/Repos/PhonoLex && git lfs track 'data/runtime/*.parquet' && cat .gitattributes | grep parquet
If not already tracked, add via gitattributes update.
- [ ] Step 5: Commit production parquet
git -C /Users/jneumann/Repos/PhonoLex add data/runtime/selectional.parquet packages/data/tests/runtime/test_selectional_parquet.py .gitattributes
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: populated selectional.parquet — banded PMI from FineWeb+CHILDES corpus parses
Final merge of FineWeb-Edu (~25M rows pre-filter, ~Xm post-min_count) +
CHILDES (~Y rows) [+ PhonBank conditional] across {Z} bands. LFS-tracked.
Acceptance tests pass: (cut, dobj, cake) ppmi > 0; (cut, dobj, thunder)
absent/zero; top-100 verbs have count_v_r_star ≥ 50 in fineweb_adult.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
(Update commit message with actual numbers from the merge output.)
Task 15: Merge freq shards + regenerate words.parquet¶
Files:
- Create: research/2026-05-06-phon-94-corpus-parse/merge_freq_shards.py
- Create: research/2026-05-06-phon-94-corpus-parse/emit_frequency_tsv.py
- Modify: data/norms/phonolex_frequency.tsv
- Modify: data/runtime/words.parquet (regenerated by existing pipeline)
Task 11's parse already emitted *.freq.parquet siblings alongside the selectional shards (per Task 6's process_corpus dual-output behavior). This task merges those freq siblings into the SUBTLEX-compatible TSV and triggers the existing words.parquet regen.
- [ ] Step 1: Implement merge_freq_shards.py
Create research/2026-05-06-phon-94-corpus-parse/merge_freq_shards.py:
#!/usr/bin/env python3
"""Merge per-shard *.freq.parquet siblings into a single aggregated freq parquet.
Sums counts across shards per (kind, band, key, pos). Reads the .meta.json
sidecars for per-band totals; combines per-band token counts and doc counts.
Usage:
python merge_freq_shards.py \\
/Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/shard_*.freq.parquet \\
--output /tmp/fineweb_freq_merged.parquet
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import polars as pl
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("shards", nargs="+")
p.add_argument("--output", required=True)
return p.parse_args()
def merge_freq(shard_paths: list[str], output_path: str) -> dict:
"""Aggregate per-shard freq parquets; return per-band metadata dict."""
print(f"[merge_freq] reading {len(shard_paths)} shard(s) ...")
df = (
pl.scan_parquet(shard_paths)
.group_by(["kind", "band", "key", "pos"])
.agg([
pl.col("count").sum().alias("count"),
pl.col("cd_count").sum().alias("cd_count"),
])
.collect()
)
# Aggregate metadata sidecars
docs_total: dict[str, int] = {}
tokens_total: dict[str, int] = {}
for sp in shard_paths:
meta_p = Path(sp).with_suffix(".meta.json")
if not meta_p.exists():
print(f"[warn] missing {meta_p}")
continue
meta = json.loads(meta_p.read_text())
for band, n in meta.get("docs_per_band", {}).items():
docs_total[band] = docs_total.get(band, 0) + n
for band, n in meta.get("tokens_per_band", {}).items():
tokens_total[band] = tokens_total.get(band, 0) + n
out_path = Path(output_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_path)
meta_out = out_path.with_suffix(".meta.json")
meta_out.write_text(json.dumps({
"docs_per_band": docs_total, "tokens_per_band": tokens_total,
}, indent=2))
print(f"[write] {out_path} ({df.height:,} rows) + {meta_out}")
return {"docs": docs_total, "tokens": tokens_total}
def main() -> int:
args = parse_args()
merge_freq(args.shards, args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 2: Run merge over FineWeb shards
cd /Users/jneumann/Repos/PhonoLex && uv run python research/2026-05-06-phon-94-corpus-parse/merge_freq_shards.py \
/Volumes/ExternalData1/phonolex/raw_corpus_parses/fineweb_edu/shard_*.freq.parquet \
--output /tmp/fineweb_freq_merged.parquet
- [ ] Step 3: Implement emit_frequency_tsv.py
Create research/2026-05-06-phon-94-corpus-parse/emit_frequency_tsv.py:
#!/usr/bin/env python3
"""Emit phonolex_frequency.tsv from the merged FineWeb freq parquet.
Backwards-compatible with PHON-72's TSV schema:
Word | FREQcount | CDcount | WF_per_million | Lg10WF | CD_pct | Lg10CD |
Dom_PoS | Freq_Dom_PoS | Percentage_Dom_PoS | All_PoS | All_freqs
Plus PHON-94 additions in a parallel TSV (phonolex_lemma_frequency.tsv) for
lemma-keyed columns.
"""
from __future__ import annotations
import argparse
import json
import math
from pathlib import Path
import polars as pl
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("--merged-freq", required=True, help="Path to fineweb_freq_merged.parquet")
p.add_argument("--output-surface-tsv", required=True,
help="Path to phonolex_frequency.tsv (PHON-72 schema)")
p.add_argument("--output-lemma-tsv", required=True,
help="Path to phonolex_lemma_frequency.tsv (new)")
p.add_argument("--target-band", default="fineweb_adult",
help="The band to use for the un-banded frequency columns")
p.add_argument("--min-freq", type=int, default=3)
return p.parse_args()
def emit_tsv_for_kind(
df: pl.DataFrame, kind: str, band: str,
docs_in_band: int, tokens_in_band: int,
output_path: Path, min_freq: int,
):
"""Emit a SUBTLEX-shape TSV for either kind='surface' or kind='lemma'."""
sub = df.filter((pl.col("kind") == kind) & (pl.col("band") == band))
print(f"[emit] {kind} band={band}: {sub.height:,} (key, pos) rows")
# Aggregate per-key: total freq + per-POS distribution
per_key = (
sub.group_by("key")
.agg([
pl.col("count").sum().alias("freq"),
pl.col("cd_count").max().alias("cd"), # cd is per-key; max is fine
pl.struct(["pos", "count"]).alias("pos_counts"),
])
.filter(pl.col("freq") >= min_freq)
)
print(f"[emit] {per_key.height:,} keys after min_freq={min_freq} filter")
rows = []
for r in per_key.iter_rows(named=True):
key = r["key"]
freq = r["freq"]
cd = r["cd"]
wf_per_million = (freq / tokens_in_band) * 1_000_000
lg10wf = math.log10(freq + 1)
cd_pct = (cd / docs_in_band) * 100 if docs_in_band > 0 else 0
lg10cd = math.log10(cd + 1)
# POS distribution
pos_counts = r["pos_counts"]
# pos_counts is a list-struct; collapse to a Counter dict
pos_d = {}
for pc in pos_counts:
pos_d[pc["pos"]] = pos_d.get(pc["pos"], 0) + pc["count"]
pos_sorted = sorted(pos_d.items(), key=lambda x: -x[1])
dom_pos, freq_dom_pos = pos_sorted[0]
pct_dom_pos = (freq_dom_pos / freq) * 100 if freq > 0 else 0
all_pos = ".".join(p for p, _ in pos_sorted)
all_freqs = ".".join(str(c) for _, c in pos_sorted)
rows.append((
key, freq, cd, wf_per_million, lg10wf, cd_pct, lg10cd,
dom_pos, freq_dom_pos, pct_dom_pos, all_pos, all_freqs,
))
rows.sort(key=lambda x: -x[1])
with open(output_path, "w") as f:
f.write("Word\tFREQcount\tCDcount\tWF_per_million\tLg10WF\tCD_pct\tLg10CD\t"
"Dom_PoS\tFreq_Dom_PoS\tPercentage_Dom_PoS\tAll_PoS\tAll_freqs\n")
for r in rows:
f.write(
f"{r[0]}\t{r[1]}\t{r[2]}\t{r[3]:.4f}\t{r[4]:.4f}\t"
f"{r[5]:.4f}\t{r[6]:.4f}\t{r[7]}\t{r[8]}\t{r[9]:.2f}\t"
f"{r[10]}\t{r[11]}\n"
)
print(f"[write] {output_path}")
def main() -> int:
args = parse_args()
df = pl.read_parquet(args.merged_freq)
meta = json.loads(Path(args.merged_freq).with_suffix(".meta.json").read_text())
docs_in_band = meta["docs_per_band"][args.target_band]
tokens_in_band = meta["tokens_per_band"][args.target_band]
emit_tsv_for_kind(df, "surface", args.target_band,
docs_in_band, tokens_in_band,
Path(args.output_surface_tsv), args.min_freq)
emit_tsv_for_kind(df, "lemma", args.target_band,
docs_in_band, tokens_in_band,
Path(args.output_lemma_tsv), args.min_freq)
return 0
if __name__ == "__main__":
raise SystemExit(main())
- [ ] Step 4: Run emit + replace data/norms TSVs
cd /Users/jneumann/Repos/PhonoLex && uv run python research/2026-05-06-phon-94-corpus-parse/emit_frequency_tsv.py \
--merged-freq /tmp/fineweb_freq_merged.parquet \
--output-surface-tsv data/norms/phonolex_frequency.tsv \
--output-lemma-tsv data/norms/phonolex_lemma_frequency.tsv
This overwrites data/norms/phonolex_frequency.tsv with canonical-pass values. Compare to backup:
cp data/norms/phonolex_frequency.tsv /tmp/phonolex_frequency_pre_phon94.tsv # back up before overwrite
# then run the emit. After:
diff <(head -50 /tmp/phonolex_frequency_pre_phon94.tsv) <(head -50 data/norms/phonolex_frequency.tsv) | head -30
Expected: small numeric drift in WF/Lg10WF values (parser-informed POS slightly redistributes counts between POS columns) but top words match qualitatively.
- [ ] Step 5: Add a lemma loader to mirror the surface loader pattern
Create packages/data/src/phonolex_data/loaders/phonolex_lemma_frequency.py that mirrors phonolex_frequency.py but loads phonolex_lemma_frequency.tsv. Read the existing phonolex_frequency.py to understand the loader signature, then write a parallel one keyed by lemma.
- [ ] Step 6: Wire lemma loader into the pipeline
Read packages/data/scripts/build_runtime_parquet.py (or wherever the pipeline lives) and add a step that:
1. Loads phonolex_lemma_frequency.tsv via the new loader
2. For each word in words.parquet, looks up its lemma's per-million freq + log freq + grade-banded freqs
3. Populates the lemma, lemma_frequency, lemma_log_frequency, lemma_frequency_grade_* columns
The lookup is: word.lemma → lemma_freq_table[lemma]. Multiple surface words share a lemma, so the values replicate.
- [ ] Step 7: Run the words.parquet regen pipeline
cd /Users/jneumann/Repos/PhonoLex && uv run python packages/data/scripts/build_runtime_parquet.py
Expected: data/runtime/words.parquet regenerated with PHON-94 columns populated.
- [ ] Step 8: Run all data tests
cd /Users/jneumann/Repos/PhonoLex && uv run python -m pytest packages/data/tests/ -v 2>&1 | tail -30
Expected: all PASS.
- [ ] Step 9: Regenerate d1-seed.sql
cd /Users/jneumann/Repos/PhonoLex && uv run python packages/web/workers/scripts/export-to-d1.py
- [ ] Step 10: Commit
git -C /Users/jneumann/Repos/PhonoLex add data/runtime/words.parquet packages/web/workers/scripts/d1-seed.sql data/norms/phonolex_frequency.tsv data/norms/phonolex_lemma_frequency.tsv packages/data/src/phonolex_data/loaders/phonolex_lemma_frequency.py packages/data/scripts/build_runtime_parquet.py research/2026-05-06-phon-94-corpus-parse/merge_freq_shards.py research/2026-05-06-phon-94-corpus-parse/emit_frequency_tsv.py
git -C /Users/jneumann/Repos/PhonoLex commit -m "$(cat <<'EOF'
PHON-94: words.parquet regenerated — canonical-pass freq+POS + lemma cols
FineWeb-Edu freq+POS columns refreshed from canonical pipeline (parser-
informed POS resolution). New columns: lemma, lemma_frequency,
lemma_log_frequency, plus 3 grade-banded equivalents.
D1 seed SQL regenerated to match.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
EOF
)"
Task 16: Documentation + PR¶
Files:
- Modify: CLAUDE.md
- Modify: data/SOURCES.md (add CHILDES + FineWeb-Edu selectional aggregation entries)
- Modify: MEMORY.md (mark PHON-94 done)
- [ ] Step 1: Update CLAUDE.md
Update the relevant sections of /Users/jneumann/Repos/PhonoLex/CLAUDE.md:
- "What This Is" — note ~150 cols → ~165 cols (or actual count after PHON-94)
- "Architecture" — under data pipeline, note that selectional.parquet is now populated
- "Generation Runtime Data Contract (PHON-93)" → rename or extend to reference PHON-94 population
-
"Project Structure" — add canonical_spacy.py + extract_triples.py modules to the layout
-
[ ] Step 2: Update data/SOURCES.md
Add entries for: - CHILDES selectional aggregation (citing CC BY-NC-SA 3.0; same posture as PHON-86/87 entries) - FineWeb-Edu canonical-pass selectional (citing ODC-BY 1.0; same posture as PHON-72)
- [ ] Step 3: Update MEMORY.md
Add PHON-94 done entry. Inspect /Users/jneumann/.claude/projects/-Users-jneumann-Repos-PhonoLex/memory/MEMORY.md to find the right section. Don't write the memory file inline — use the Write tool to save a new memory file at project_phon94_selectional_population.md and add a one-line index entry to MEMORY.md.
- [ ] Step 4: File cold-storage policy ticket
Per the spec's "Open follow-ups", file a Jira ticket for broader cold-storage policy:
Use the mcp__plugin_atlassian_atlassian__createJiraIssue MCP tool with:
- cloudId="neumannsworkshop.atlassian.net"
- projectKey="PHON"
- summary="Cold-storage policy for raw corpora + intermediate parses"
- issueTypeName="Task"
- description: links to PHON-94's spec, lists ExternalData1 paths used, requests durable policy decision
Per feedback_verify_jira_state.md: JQL the next free PHON-X before promising a number.
- [ ] Step 5: Push branch + open PR
git -C /Users/jneumann/Repos/PhonoLex push -u origin feature/phon-94-corpus-dep-reannotation
Then use the mcp__plugin_github_github__create_pull_request MCP tool to open a PR targeting release/v5.2.0:
- title:
PHON-94: corpus DEP reannotation + selectional.parquet population -
body: summary + spec link + acceptance test results + the "(cut, dobj, cake)" sanity-pass note + note that the cold-storage follow-up ticket is filed
-
[ ] Step 6: Update task tracking
Update the Jira PHON-94 ticket: - transition to "In Review" - add comment with PR link - add comment about smoke gate result for PhonBank - if PhonBank was dropped, note "PhonBank smoke gate failed — bands dropped from inventory; documented in notebook.md"
Self-Review Checklist (run before marking plan complete)¶
- [ ] Each spec section has a corresponding task. (Cross-check below.)
- [ ] No "TBD", "TODO", or "fill in" placeholders.
- [ ] Type/method/property names match across tasks (e.g.,
band_resolveris consistent throughout). - [ ] Each test step shows actual test code; no "write tests for the above".
- [ ] Each implementation step shows actual code or exact command + expected output.
- [ ] Commits land at frequent boundaries (1 per task).
- [ ] Operational tasks (Tasks 11/12/13) clearly mark themselves as "operational, no unit tests" and gate on user confirmation before spending money.
Spec coverage map:
| Spec section | Task |
|---|---|
Schema extension (band column) |
Task 2 |
| Canonical spaCy methodology | Task 3 |
| Triple extraction (9 roles, passive remap, V-rooted PP, PRON drop) | Task 4 |
| Probe (8 presumption checks) | Task 5 |
build_selectional.py shard worker |
Task 6 |
merge_shards.py Polars stream-merge + PMI |
Task 7 |
| WordStore derived views | Task 8 |
| End-to-end integration test | Task 9 |
lemma + lemma-frequency PropertyDefs |
Task 10 |
| FineWeb-Edu production parse (sharded H100×4) | Task 11 |
| CHILDES production parse | Task 12 |
| PhonBank smoke gate + conditional run | Task 13 |
| Final merge + acceptance tests | Task 14 |
| Words.parquet regen with lemma + freq+POS deltas | Task 15 |
| CLAUDE.md / SOURCES.md / MEMORY.md updates + PR | Task 16 |
| Cold-storage policy follow-up ticket | Task 16 step 4 |
All spec sections covered.