Corpus Retrieval Implementation Plan¶
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a real-corpus retrieval service that ships attested sentences from CoLA-pos + UD-EWT + GUM + CHILDES-adult + Tatoeba-en, filtered by the same CSP constraint schema, surfaced alongside CSP synthetic output via a new /api/sentences orchestrator endpoint.
Architecture: Offline ingest produces two LFS-tracked Parquet files (per-sentence index + per-(sentence, content-word) detail). Runtime loads them once at FastAPI cold-start. The orchestrator endpoint runs corpus retrieval (Polars filter) and CSP (existing solver+rerank) in parallel and returns both ranked lists in a single envelope. Constraint semantics are exact parity with CSP — every in-vocab content word must satisfy the filter, mirroring CSP's per-slot enforcement. Reuses hard_filter_expr and _load_pairs_for_request from the CSP package 1:1.
Tech Stack: Python 3.13 + Polars + spaCy (en_core_web_sm) + sentence-transformers (Qwen3-Embedding-0.6B) + FastAPI + Pydantic v2; TypeScript + Hono + React + MUI on the web side; Parquet over LFS for data artifacts.
Reference spec: docs/superpowers/specs/2026-05-11-corpus-retrieval-design.md
Branching: Create branch feature/corpus-retrieval off develop. If feature/csp-iteration has not merged to develop yet, branch off feature/csp-iteration and rebase onto develop once that lands. Do not use git worktrees — work in the main checkout.
File Structure¶
Created:
- packages/data/scripts/build_corpus_sentences.py — ingest pipeline (CLI entry-point)
- packages/data/src/phonolex_data/runtime/corpus_schema.py — Polars schemas + retention constants for the two corpus parquets
- packages/data/tests/test_corpus_ingest.py — unit tests for the ingest pipeline
- packages/generation/server/corpus.py — runtime CorpusStore + match_corpus
- packages/generation/server/routes/sentences.py — /api/sentences orchestrator
- packages/generation/server/validation.py — shared validate_constraints lifted from routes/generate.py
- packages/generation/server/tests/test_corpus.py
- packages/generation/server/tests/test_sentences_orchestrator.py
- data/runtime/corpus_sentences.parquet (LFS)
- data/runtime/corpus_sentences_index.parquet (LFS)
Modified:
- packages/generation/server/main.py — load CorpusStore at cold-start
- packages/generation/server/schemas.py — add CorpusMatch, SentencesRequest, SentencesResponse; add SyntheticMatch as a TypeAlias for SentenceCandidate
- packages/generation/server/routes/generate.py — drop the inline _validate_constraints, import from ..validation
- packages/web/workers/src/routes/generation.ts — add /sentences proxy route
- packages/web/frontend/src/types/governance.ts — add CorpusMatch, SyntheticMatch, SentencesResponse types (path to verify at Task 17)
- packages/web/frontend/src/pages/Generate.tsx or equivalent (path to verify at Task 18) — switch endpoint, render two stacked sections
- .gitattributes — add LFS patterns for the two new Parquet files
- CLAUDE.md — extend Generation Runtime Data Contract with the corpus artifacts and add /api/sentences to the endpoints list
Task 1: Ingest scaffolding — source loaders¶
Files:
- Create: packages/data/scripts/build_corpus_sentences.py
- Create: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import (
load_cola_positive, load_ud_ewt, load_gum, load_childes_adult, load_tatoeba_english,
)
def test_loaders_yield_text_and_source_id():
for loader in (load_cola_positive, load_ud_ewt, load_gum,
load_childes_adult, load_tatoeba_english):
first = next(iter(loader(limit=1)))
assert isinstance(first, tuple) and len(first) == 2
text, src_id = first
assert isinstance(text, str) and len(text) > 0
assert isinstance(src_id, str)
- [ ] Step 2: Run test to verify it fails
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_loaders_yield_text_and_source_id -v
- [ ] Step 3: Write minimal script with loaders
# packages/data/scripts/build_corpus_sentences.py
"""Build corpus retrieval Parquets — corpus_sentences.parquet + corpus_sentences_index.parquet.
Reuses CoLA/UD-EWT/GUM loaders from build_naturalness_reference.py and
adds CHILDES adult-input and Tatoeba English. Output schema is defined in
phonolex_data.runtime.corpus_schema.
"""
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Iterator
REPO_ROOT = Path(__file__).resolve().parents[3]
RUNTIME = REPO_ROOT / "data" / "runtime"
def load_cola_positive(limit: int | None = None) -> Iterator[tuple[str, str]]:
from datasets import load_dataset
n = 0
for split in ("train", "validation"):
ds = load_dataset("glue", "cola", split=split)
for i, row in enumerate(ds):
if row["label"] != 1:
continue
yield row["sentence"], f"cola:{split}:{i}"
n += 1
if limit and n >= limit:
return
def load_ud_ewt(limit: int | None = None) -> Iterator[tuple[str, str]]:
from datasets import load_dataset
n = 0
for split in ("train", "validation", "test"):
ds = load_dataset("universal_dependencies", "en_ewt", split=split)
for i, row in enumerate(ds):
yield row["text"], f"ud_ewt:{split}:{i}"
n += 1
if limit and n >= limit:
return
def load_gum(limit: int | None = None) -> Iterator[tuple[str, str]]:
from datasets import load_dataset
n = 0
for split in ("train", "validation", "test"):
try:
ds = load_dataset("universal_dependencies", "en_gum", split=split)
except ValueError:
continue
for i, row in enumerate(ds):
yield row["text"], f"gum:{split}:{i}"
n += 1
if limit and n >= limit:
return
def load_childes_adult(limit: int | None = None) -> Iterator[tuple[str, str]]:
"""Reuse PHON-94's CHILDES TalkBank XML reader. Filter to adult speakers
(MOT/FAT/INV) in English locales (Eng-NA, Eng-UK). Path to the cached
corpus is configured in the existing build_frequency_corpus pipeline.
"""
from phonolex_data.loaders.childes import iter_adult_utterances # added in Task 1.5
n = 0
for utt in iter_adult_utterances(locales=("Eng-NA", "Eng-UK")):
yield utt.text, f"childes:{utt.locale}:{utt.session_id}:{utt.utt_idx}"
n += 1
if limit and n >= limit:
return
def load_tatoeba_english(limit: int | None = None) -> Iterator[tuple[str, str]]:
"""Tatoeba English sentences (CC-BY 2.0 FR). Expects the dump at
data/raw/tatoeba/sentences.csv (downloaded once by the build script).
"""
import csv
path = REPO_ROOT / "data" / "raw" / "tatoeba" / "sentences.csv"
if not path.exists():
raise FileNotFoundError(
f"Tatoeba dump not found at {path}. Download from "
"https://tatoeba.org/en/downloads and extract the English subset."
)
n = 0
with path.open(encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
if len(row) < 3 or row[1] != "eng":
continue
sentence_id, _, text = row[0], row[1], row[2]
yield text, f"tatoeba:{sentence_id}"
n += 1
if limit and n >= limit:
return
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--limit-per-source", type=int, default=None)
args = parser.parse_args()
# Task 8 wires in the rest of the pipeline.
raise SystemExit("Pipeline not yet wired — see Task 8.")
if __name__ == "__main__":
main()
- [ ] Step 4: Add minimal CHILDES iterator stub
# packages/data/src/phonolex_data/loaders/childes.py (new or extended)
"""Adult-speaker utterance reader for CHILDES TalkBank XML."""
from dataclasses import dataclass
from typing import Iterator
ADULT_CODES = frozenset({"MOT", "FAT", "INV"})
@dataclass(frozen=True)
class AdultUtterance:
text: str
locale: str
session_id: str
utt_idx: int
speaker_code: str
def iter_adult_utterances(locales: tuple[str, ...] = ("Eng-NA", "Eng-UK")) -> Iterator[AdultUtterance]:
"""Stream adult-speaker utterances from cached CHILDES XML.
Reuses the cache directory established by PHON-94's
research/2026-04-30-frequency-corpus-build pipeline. If the cache is
absent, raise FileNotFoundError with a clear message — building it is
a separate, one-time operation.
"""
raise NotImplementedError(
"CHILDES adult iterator: lift implementation from "
"research/2026-04-30-frequency-corpus-build/build_frequency_corpus.py "
"where the same XML/speaker-code parsing is already proven. "
"Filter to ADULT_CODES; emit AdultUtterance per <u> tag."
)
- [ ] Step 5: Run test to verify it passes
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_loaders_yield_text_and_source_id -v
pytest.skip on those two with a clear marker — adjust the test to call only the three working loaders and add separate xfail cases for childes and tatoeba.
Update the test:
import pytest
CORE_LOADERS = [load_cola_positive, load_ud_ewt, load_gum]
@pytest.mark.parametrize("loader", CORE_LOADERS)
def test_core_loaders_yield_text_and_source_id(loader):
first = next(iter(loader(limit=1)))
text, src_id = first
assert isinstance(text, str) and len(text) > 0
assert isinstance(src_id, str)
@pytest.mark.xfail(reason="CHILDES requires PHON-94 cache + adult iterator (Task 1 Step 4)")
def test_childes_loader():
next(iter(load_childes_adult(limit=1)))
@pytest.mark.xfail(reason="Tatoeba dump must be provisioned at data/raw/tatoeba/sentences.csv")
def test_tatoeba_loader():
next(iter(load_tatoeba_english(limit=1)))
- [ ] Step 6: Commit
git add packages/data/scripts/build_corpus_sentences.py \
packages/data/src/phonolex_data/loaders/childes.py \
packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: source loaders for CoLA/UD-EWT/GUM/CHILDES/Tatoeba"
Task 2: Sentence-level pre-filter¶
Files:
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import (
prefilter_sentence, dedup_stream,
)
def test_prefilter_length():
assert prefilter_sentence("Hello world this is good") is True # 5 tokens
assert prefilter_sentence("Two words") is False # 2 < 5
assert prefilter_sentence("word " * 30) is False # 30 > 25
def test_prefilter_ascii():
assert prefilter_sentence("Café au lait morning daily") is False # non-ASCII
def test_dedup_lowercased():
stream = [("Hello world friend mine pal", "a"),
("HELLO WORLD FRIEND MINE PAL", "b"),
("Different sentence entirely now today", "c")]
out = list(dedup_stream(iter(stream)))
assert len(out) == 2
- [ ] Step 2: Run test to verify it fails
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_prefilter_length -v
- [ ] Step 3: Implement pre-filter + dedup
# packages/data/scripts/build_corpus_sentences.py — add:
MIN_TOKENS = 5
MAX_TOKENS = 25
def prefilter_sentence(text: str) -> bool:
"""Whitespace-tokenized length 5..25 and ASCII-printable."""
if not text:
return False
if not text.isascii() or not text.isprintable():
return False
n = len(text.split())
return MIN_TOKENS <= n <= MAX_TOKENS
def dedup_stream(stream: Iterator[tuple[str, str]]) -> Iterator[tuple[str, str]]:
"""Yield (text, source_record_id) pairs whose lowercased text has not been seen."""
seen: set[str] = set()
for text, src in stream:
key = text.lower().strip()
if key in seen:
continue
seen.add(key)
yield text, src
- [ ] Step 4: Run tests to verify they pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py -v
- [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: sentence pre-filter (length + ASCII + dedup)"
Task 3: spaCy content-token extraction¶
Files:
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import extract_content_tokens, ContentToken
def test_extract_content_tokens():
# "The big red ball rolled" → big(ADJ), red(ADJ), ball(NOUN), rolled(VERB)
tokens = extract_content_tokens("The big red ball rolled")
poss = [t.pos for t in tokens]
surfaces = [t.surface for t in tokens]
assert poss == ["ADJ", "ADJ", "NOUN", "VERB"]
assert surfaces == ["big", "red", "ball", "rolled"]
# Lemmas: rolled → roll
assert any(t.lemma == "roll" for t in tokens)
# Position is 0-indexed across the full token stream (not content-only)
# i.e., "big" at position 1, "rolled" at position 4
pos_by_lemma = {t.lemma: t.position for t in tokens}
assert pos_by_lemma["big"] == 1
assert pos_by_lemma["roll"] == 4
- [ ] Step 2: Run test to verify it fails
Expected: FAIL.
- [ ] Step 3: Implement extractor
# packages/data/scripts/build_corpus_sentences.py — add:
from dataclasses import dataclass
from functools import lru_cache
CONTENT_POS = frozenset({"NOUN", "VERB", "ADJ", "ADV"})
@dataclass(frozen=True)
class ContentToken:
surface: str
lemma: str
pos: str
position: int # 0-indexed position in the full token stream
@lru_cache(maxsize=1)
def _spacy_nlp():
import spacy
nlp = spacy.load("en_core_web_sm", disable=["parser", "attribute_ruler"])
return nlp
def extract_content_tokens(text: str) -> list[ContentToken]:
doc = _spacy_nlp()(text)
out: list[ContentToken] = []
for i, tok in enumerate(doc):
if tok.pos_ not in CONTENT_POS:
continue
if tok.ent_type_: # drop spaCy-flagged named entities
continue
if not tok.lemma_:
continue
out.append(ContentToken(
surface=tok.text,
lemma=tok.lemma_.lower(),
pos=tok.pos_,
position=i,
))
return out
def extract_content_tokens_batch(texts: list[str], batch_size: int = 256) -> Iterator[list[ContentToken]]:
"""Pipe `texts` through spaCy nlp.pipe for batch throughput. Yields one
list of ContentToken per input text, in input order."""
nlp = _spacy_nlp()
for doc in nlp.pipe(texts, batch_size=batch_size, n_process=1):
out: list[ContentToken] = []
for i, tok in enumerate(doc):
if tok.pos_ not in CONTENT_POS:
continue
if tok.ent_type_:
continue
if not tok.lemma_:
continue
out.append(ContentToken(
surface=tok.text, lemma=tok.lemma_.lower(),
pos=tok.pos_, position=i,
))
yield out
- [ ] Step 4: Run tests to verify they pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_extract_content_tokens -v
en_core_web_sm is not installed, install with uv run python -m spacy download en_core_web_sm and re-run.
- [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: spaCy content-token extractor (POS+lemma)"
Task 4: Lemma-join + retention rule¶
Files:
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
import polars as pl
from packages.data.scripts.build_corpus_sentences import (
build_lemma_lookup, join_content_tokens, MIN_CONTENT_IN_VOCAB,
)
def test_lemma_lookup_returns_dict():
df = pl.DataFrame({
"lemma": ["run", "ball", "red"],
"phonemes_str": ["|ɹ|ʌ|n|", "|b|ɔ|l|", "|ɹ|ɛ|d|"],
"frequency_log_zipf": [4.0, 5.0, 4.5],
})
lookup = build_lemma_lookup(df)
assert lookup["run"]["phonemes_str"] == "|ɹ|ʌ|n|"
def test_join_drops_oov_tokens():
from packages.data.scripts.build_corpus_sentences import ContentToken
tokens = [
ContentToken("running", "run", "VERB", 0),
ContentToken("widget", "widget", "NOUN", 1), # not in lookup
ContentToken("balls", "ball", "NOUN", 2),
]
lookup = {"run": {"phonemes_str": "|ɹ|ʌ|n|"},
"ball": {"phonemes_str": "|b|ɔ|l|"}}
survivors = list(join_content_tokens(tokens, lookup))
assert len(survivors) == 2
assert {t.lemma for t in survivors} == {"run", "ball"}
def test_retention_threshold():
assert MIN_CONTENT_IN_VOCAB == 2
- [ ] Step 2: Run tests to verify they fail
Expected: FAIL.
- [ ] Step 3: Implement lemma lookup + join + retention
# packages/data/scripts/build_corpus_sentences.py — add:
MIN_CONTENT_IN_VOCAB = 2
def build_lemma_lookup(words_df: pl.DataFrame) -> dict[str, dict]:
"""Build {lemma -> row_dict} for fast in-process lookup during ingest.
Row dict carries all columns from words.parquet (denormalized at emit time).
"""
return {
row["lemma"]: row
for row in words_df.iter_rows(named=True)
if row.get("lemma")
}
def join_content_tokens(
tokens: list[ContentToken],
lookup: dict[str, dict],
) -> Iterator[tuple[ContentToken, dict]]:
"""Yield (token, words_row) pairs for content tokens whose lemma joins."""
for t in tokens:
row = lookup.get(t.lemma)
if row is None:
continue
yield t, row
Add import at top of file:
import polars as pl
- [ ] Step 4: Run tests to verify they pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py -v
- [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: lemma lookup + content-token join + retention rule"
Task 5: Profanity filter¶
Files:
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import contains_profanity
def test_clean_sentence_passes():
assert contains_profanity("The cat sat on the warm mat") is False
def test_profanity_filtered():
# Use a placeholder slur token from the denylist
# (Hard-coding actual slurs in tests is avoided; we rely on the lib's
# internal list. This asserts the function actually catches *something*
# the better-profanity lib flags.)
# The lib provides a known-flagged seed:
from better_profanity import profanity
profanity.load_censor_words()
flagged = next(w for w in profanity.CENSOR_WORDSET)
assert contains_profanity(f"the {flagged} stuff word value here") is True
- [ ] Step 2: Run test to verify it fails
Expected: FAIL.
- [ ] Step 3: Implement profanity check via better-profanity
# packages/data/scripts/build_corpus_sentences.py — add:
@lru_cache(maxsize=1)
def _profanity_engine():
from better_profanity import profanity
profanity.load_censor_words()
return profanity
def contains_profanity(text: str) -> bool:
return _profanity_engine().contains_profanity(text)
Add dependency in packages/data/pyproject.toml under [project.dependencies]:
better-profanity>=0.7.0
Then:
uv pip install -e packages/data
- [ ] Step 4: Run tests to verify they pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py -v
- [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py \
packages/data/tests/test_corpus_ingest.py \
packages/data/pyproject.toml
git commit -m "corpus retrieval: profanity filter via better-profanity"
Task 6: Parquet schemas + emit¶
Files:
- Create: packages/data/src/phonolex_data/runtime/corpus_schema.py
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
import polars as pl
import tempfile
from pathlib import Path
from packages.data.scripts.build_corpus_sentences import emit_parquets, IngestedSentence
from packages.data.src.phonolex_data.runtime.corpus_schema import (
index_columns, words_columns,
)
def test_emit_round_trip():
sample = IngestedSentence(
sentence_id=0,
text="The cat sat on the mat",
source="cola",
source_record_id="cola:train:0",
n_tokens=6,
rows=[
# (ContentToken, words_row_dict) tuples
({"surface": "cat", "lemma": "cat", "pos": "NOUN", "position": 1},
{"phonemes_str": "|k|æ|t|", "lemma": "cat", "AoA": 3.5}),
({"surface": "sat", "lemma": "sit", "pos": "VERB", "position": 2},
{"phonemes_str": "|s|ɪ|t|", "lemma": "sit", "AoA": 3.0}),
({"surface": "mat", "lemma": "mat", "pos": "NOUN", "position": 5},
{"phonemes_str": "|m|æ|t|", "lemma": "mat", "AoA": 4.0}),
],
)
with tempfile.TemporaryDirectory() as tmp:
out = Path(tmp)
emit_parquets([sample], out_dir=out, words_columns=["phonemes_str", "lemma", "AoA"])
idx = pl.read_parquet(out / "corpus_sentences_index.parquet")
words = pl.read_parquet(out / "corpus_sentences.parquet")
assert idx.height == 1
assert idx["n_content_in_vocab"][0] == 3
assert words.height == 3
assert set(words.columns) >= {"sentence_id", "position", "surface", "lemma",
"pos", "phonemes_str", "AoA"}
- [ ] Step 2: Run test to verify it fails
Expected: FAIL.
- [ ] Step 3: Define corpus schema
# packages/data/src/phonolex_data/runtime/corpus_schema.py
"""Parquet schemas for corpus retrieval artifacts."""
from __future__ import annotations
import polars as pl
def index_columns() -> dict[str, pl.DataType]:
return {
"sentence_id": pl.UInt32,
"text": pl.Utf8,
"source": pl.Utf8,
"source_record_id": pl.Utf8,
"n_tokens": pl.UInt8,
"n_content_in_vocab": pl.UInt8,
"n_content_oov": pl.UInt8,
"naturalness_score": pl.Float32, # populated in Task 7
}
def words_columns(extra_norm_cols: list[str]) -> dict[str, pl.DataType]:
"""Base corpus-words schema. extra_norm_cols are the 167 norm columns
inlined from words.parquet; their dtypes follow Polars' inferred types
when emit_parquets writes the DataFrame, so we don't enumerate them here.
"""
base = {
"sentence_id": pl.UInt32,
"position": pl.UInt8,
"surface": pl.Utf8,
"lemma": pl.Utf8,
"pos": pl.Utf8,
"phonemes_str": pl.Utf8,
}
return base
- [ ] Step 4: Implement
emit_parquets
# packages/data/scripts/build_corpus_sentences.py — add:
from dataclasses import dataclass, field
@dataclass
class IngestedSentence:
sentence_id: int
text: str
source: str
source_record_id: str
n_tokens: int
rows: list[tuple[dict, dict]] # (content_token_dict, words_row_dict)
n_content_oov: int = 0
def emit_parquets(
sentences: list[IngestedSentence],
out_dir: Path,
words_columns: list[str],
) -> None:
"""Write corpus_sentences_index.parquet + corpus_sentences.parquet."""
out_dir.mkdir(parents=True, exist_ok=True)
idx_rows = []
word_rows = []
for s in sentences:
idx_rows.append({
"sentence_id": s.sentence_id,
"text": s.text,
"source": s.source,
"source_record_id": s.source_record_id,
"n_tokens": s.n_tokens,
"n_content_in_vocab": len(s.rows),
"n_content_oov": s.n_content_oov,
"naturalness_score": None, # filled by Task 7
})
for tok, words_row in s.rows:
row = {
"sentence_id": s.sentence_id,
"position": tok["position"],
"surface": tok["surface"],
"lemma": tok["lemma"],
"pos": tok["pos"],
"phonemes_str": words_row["phonemes_str"],
}
for col in words_columns:
if col in ("phonemes_str", "lemma"):
continue
row[col] = words_row.get(col)
word_rows.append(row)
pl.DataFrame(idx_rows).write_parquet(out_dir / "corpus_sentences_index.parquet")
pl.DataFrame(word_rows).write_parquet(out_dir / "corpus_sentences.parquet")
- [ ] Step 5: Run tests to verify they pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_emit_round_trip -v
- [ ] Step 6: Commit
git add packages/data/src/phonolex_data/runtime/corpus_schema.py \
packages/data/scripts/build_corpus_sentences.py \
packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: Parquet schemas + emit (index + words tables)"
Task 7: Naturalness pre-score with self-row exclusion¶
Files:
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
import numpy as np
from packages.data.scripts.build_corpus_sentences import (
score_naturalness, _exclude_self_row_idx,
)
def test_exclude_self_row_idx_match():
ref_texts = ["alpha sentence", "beta sentence", "gamma sentence"]
assert _exclude_self_row_idx("beta sentence", ref_texts) == 1
assert _exclude_self_row_idx("absent", ref_texts) is None
def test_score_naturalness_excludes_self():
# Synthetic test: build a 3x4 ref matrix; query identical to row 1 must
# NOT see cosine=1.0 contribute to its score.
ref = np.array([
[1.0, 0.0, 0.0, 0.0],
[0.5, 0.5, 0.5, 0.5],
[0.0, 1.0, 0.0, 0.0],
], dtype=np.float32)
ref = ref / np.linalg.norm(ref, axis=1, keepdims=True)
query = ref[1].copy() # identical to row 1
# With self-exclusion, top-K cosine should NOT be 1.0 — should pick the
# next-best non-self row.
score = score_naturalness(query, ref, self_row_idx=1, top_k=2)
assert score < 0.99
- [ ] Step 2: Run tests to verify they fail
Expected: FAIL.
- [ ] Step 3: Implement self-exclusion + scoring
# packages/data/scripts/build_corpus_sentences.py — add:
import numpy as np
def _exclude_self_row_idx(sentence_text: str, ref_texts: list[str]) -> int | None:
"""Return the index of an exact-match row in the naturalness reference,
or None if absent. Matches on lowercased stripped text.
"""
needle = sentence_text.strip().lower()
for i, t in enumerate(ref_texts):
if t.strip().lower() == needle:
return i
return None
def score_naturalness(
query_emb: np.ndarray, # (D,) L2-normalized
ref_emb: np.ndarray, # (N, D) L2-normalized
self_row_idx: int | None,
top_k: int = 20,
) -> float:
cos = ref_emb @ query_emb # (N,)
if self_row_idx is not None:
cos = np.delete(cos, self_row_idx)
if cos.size == 0:
return 0.0
k = min(top_k, cos.size)
top = np.partition(cos, -k)[-k:]
return float(top.mean())
def annotate_naturalness(
index_path: Path,
ref_npy: Path,
ref_meta_jsonl: Path,
model_name: str = "Qwen/Qwen3-Embedding-0.6B",
batch_size: int = 32,
) -> None:
"""Read corpus_sentences_index.parquet, embed each sentence, compute
self-excluded top-K cosine vs the ref matrix, write naturalness_score
back to the same file."""
import json
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(model_name)
ref_emb = np.load(ref_npy)
ref_texts = [json.loads(line)["sentence"] for line in ref_meta_jsonl.read_text().splitlines()]
if len(ref_texts) != ref_emb.shape[0]:
raise ValueError(f"ref matrix N={ref_emb.shape[0]} != meta N={len(ref_texts)}")
idx = pl.read_parquet(index_path)
texts = idx["text"].to_list()
embs = model.encode(texts, batch_size=batch_size, normalize_embeddings=True,
show_progress_bar=True, convert_to_numpy=True)
scores = []
for text, e in zip(texts, embs):
scores.append(score_naturalness(
e.astype(np.float32), ref_emb,
_exclude_self_row_idx(text, ref_texts),
))
idx = idx.with_columns(pl.Series("naturalness_score", scores, dtype=pl.Float32))
idx.write_parquet(index_path)
- [ ] Step 4: Run tests to verify they pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_exclude_self_row_idx_match \
packages/data/tests/test_corpus_ingest.py::test_score_naturalness_excludes_self -v
- [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: naturalness pre-score with self-row exclusion"
Task 8: Wire the ingest pipeline end-to-end¶
Files:
- Modify: packages/data/scripts/build_corpus_sentences.py
- Modify: packages/data/tests/test_corpus_ingest.py
- Modify: .gitattributes
- [ ] Step 1: Write the failing integration test
# add to packages/data/tests/test_corpus_ingest.py
def test_pipeline_small_sample(tmp_path, monkeypatch):
"""Run the full pipeline on a 5-sentence in-memory fixture and verify
both Parquets are written with the expected shapes."""
from packages.data.scripts.build_corpus_sentences import (
run_pipeline, ContentToken,
)
import polars as pl
fixture_sentences = [
("The cat sat on the mat today", "fx:0"),
("Dogs run quickly through the park", "fx:1"),
("Two words", "fx:2"), # filtered: < 5 tokens
("The cat sat on the mat today", "fx:3"), # filtered: dup
("A red ball rolled down the hill", "fx:4"),
]
# Minimal in-memory words.parquet covering the lemmas used above
fake_words = pl.DataFrame({
"lemma": ["cat", "sit", "mat", "dog", "run", "park",
"ball", "roll", "hill", "red"],
"phonemes_str": ["|k|æ|t|"] * 10,
"frequency_log_zipf": [4.0] * 10,
})
run_pipeline(
sources={"fixture": iter(fixture_sentences)},
words_df=fake_words,
out_dir=tmp_path,
skip_profanity=True,
skip_naturalness=True, # tested separately in Task 7
)
idx = pl.read_parquet(tmp_path / "corpus_sentences_index.parquet")
assert idx.height == 3 # dedupped + length-filtered survivors
words = pl.read_parquet(tmp_path / "corpus_sentences.parquet")
assert words.height >= 6
assert words["lemma"].is_in(fake_words["lemma"].to_list()).all()
- [ ] Step 2: Run test to verify it fails
Expected: FAIL (run_pipeline not defined).
- [ ] Step 3: Implement
run_pipeline+ CLI
# packages/data/scripts/build_corpus_sentences.py — replace main():
def run_pipeline(
sources: dict[str, Iterator[tuple[str, str]]],
words_df: pl.DataFrame,
out_dir: Path,
skip_profanity: bool = False,
skip_naturalness: bool = False,
ref_npy: Path | None = None,
ref_meta_jsonl: Path | None = None,
) -> None:
"""Run the ingest pipeline end-to-end.
sources: {source_name: iterator of (text, source_record_id)}
"""
lemma_lookup = build_lemma_lookup(words_df)
inlined_cols = [c for c in words_df.columns if c != "lemma"]
# 1. Pre-filter + profanity + dedup across all sources.
# We can't use `dedup_stream` directly because it drops the source name;
# do dedup inline here so we carry the triple through.
seen: set[str] = set()
pairs: list[tuple[str, str, str]] = []
for src_name, it in sources.items():
for text, src_id in it:
if not prefilter_sentence(text):
continue
if not skip_profanity and contains_profanity(text):
continue
key = text.lower().strip()
if key in seen:
continue
seen.add(key)
pairs.append((text, src_id, src_name))
# 2. spaCy POS+lemma batch parse
texts = [t for t, _, _ in pairs]
token_lists = list(extract_content_tokens_batch(texts, batch_size=256))
# 3. Join + retain
sentences: list[IngestedSentence] = []
next_id = 0
for (text, src_id, src_name), tokens in zip(pairs, token_lists):
joined = list(join_content_tokens(tokens, lemma_lookup))
if len(joined) < MIN_CONTENT_IN_VOCAB:
continue
oov_count = len(tokens) - len(joined)
rows = [
({"surface": t.surface, "lemma": t.lemma, "pos": t.pos, "position": t.position}, row)
for t, row in joined
]
sentences.append(IngestedSentence(
sentence_id=next_id,
text=text,
source=src_name,
source_record_id=src_id,
n_tokens=len(text.split()),
rows=rows,
n_content_oov=oov_count,
))
next_id += 1
# 4. Emit
emit_parquets(sentences, out_dir=out_dir, words_columns=inlined_cols)
# 5. Naturalness pre-score
if not skip_naturalness:
if ref_npy is None or ref_meta_jsonl is None:
ref_npy = RUNTIME / "naturalness_reference.npy"
ref_meta_jsonl = RUNTIME / "naturalness_reference_meta.jsonl"
annotate_naturalness(
out_dir / "corpus_sentences_index.parquet", ref_npy, ref_meta_jsonl,
)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--limit-per-source", type=int, default=None)
parser.add_argument("--out-dir", type=Path, default=RUNTIME)
parser.add_argument("--no-profanity-filter", action="store_true")
parser.add_argument("--skip-naturalness", action="store_true")
args = parser.parse_args()
words_df = pl.read_parquet(RUNTIME / "words.parquet")
sources = {
"cola": load_cola_positive(args.limit_per_source),
"ud_ewt": load_ud_ewt(args.limit_per_source),
"gum": load_gum(args.limit_per_source),
"childes": load_childes_adult(args.limit_per_source),
"tatoeba": load_tatoeba_english(args.limit_per_source),
}
run_pipeline(
sources=sources,
words_df=words_df,
out_dir=args.out_dir,
skip_profanity=args.no_profanity_filter,
skip_naturalness=args.skip_naturalness,
)
- [ ] Step 4: Run integration test to verify pass
uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_pipeline_small_sample -v
- [ ] Step 5: Update .gitattributes for LFS
Append to .gitattributes:
data/runtime/corpus_sentences.parquet filter=lfs diff=lfs merge=lfs -text
data/runtime/corpus_sentences_index.parquet filter=lfs diff=lfs merge=lfs -text
- [ ] Step 6: Commit
git add packages/data/scripts/build_corpus_sentences.py \
packages/data/tests/test_corpus_ingest.py \
.gitattributes
git commit -m "corpus retrieval: wire ingest pipeline end-to-end + LFS tracking"
Task 9: Runtime CorpusStore + load_corpus¶
Files:
- Create: packages/generation/server/corpus.py
- Create: packages/generation/server/tests/test_corpus.py
- [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_corpus.py
import polars as pl
import tempfile
from pathlib import Path
from packages.generation.server.corpus import load_corpus, CorpusStore
def test_load_corpus(tmp_path):
# Synthesize tiny corpus parquets
pl.DataFrame({
"sentence_id": [0, 1],
"text": ["The cat ran fast today", "Dogs run in the park"],
"source": ["cola", "ud_ewt"],
"source_record_id": ["cola:0", "ud_ewt:0"],
"n_tokens": [5, 5],
"n_content_in_vocab": [2, 2],
"n_content_oov": [0, 0],
"naturalness_score": [0.6, 0.55],
}).write_parquet(tmp_path / "corpus_sentences_index.parquet")
pl.DataFrame({
"sentence_id": [0, 0, 1, 1],
"position": [1, 2, 0, 1],
"surface": ["cat", "ran", "Dogs", "run"],
"lemma": ["cat", "run", "dog", "run"],
"pos": ["NOUN", "VERB", "NOUN", "VERB"],
"phonemes_str": ["|k|æ|t|", "|ɹ|ʌ|n|", "|d|ɔ|ɡ|", "|ɹ|ʌ|n|"],
}).write_parquet(tmp_path / "corpus_sentences.parquet")
store = load_corpus(tmp_path)
assert isinstance(store, CorpusStore)
idx = store.index_lf.collect()
assert idx.height == 2
- [ ] Step 2: Run test to verify it fails
Expected: FAIL (module not found).
- [ ] Step 3: Implement
CorpusStore+load_corpus
# packages/generation/server/corpus.py
"""Runtime corpus retrieval for /api/sentences.
Loads two Parquet files at cold-start (Polars LazyFrames) and exposes a
`match_corpus` function that filters sentences by CSP constraints.
Constraint semantics are exact parity with CSP's per-slot filters:
every in-vocab content (NOUN/VERB/ADJ/ADV) word in the sentence must
satisfy the same `hard_filter_expr` CSP applies to fillers and the verb.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import polars as pl
@dataclass(frozen=True)
class CorpusStore:
index_lf: pl.LazyFrame
words_lf: pl.LazyFrame
def load_corpus(runtime_dir: Path) -> CorpusStore:
"""Load corpus_sentences{,_index}.parquet as LazyFrames."""
return CorpusStore(
index_lf=pl.scan_parquet(runtime_dir / "corpus_sentences_index.parquet"),
words_lf=pl.scan_parquet(runtime_dir / "corpus_sentences.parquet"),
)
- [ ] Step 4: Run test to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_corpus.py::test_load_corpus -v
- [ ] Step 5: Commit
git add packages/generation/server/corpus.py packages/generation/server/tests/test_corpus.py
git commit -m "corpus retrieval: CorpusStore + load_corpus (Polars LazyFrames)"
Task 10: match_corpus — hard-constraint path¶
Files:
- Modify: packages/generation/server/corpus.py
- Modify: packages/generation/server/tests/test_corpus.py
- [ ] Step 1: Write the failing test
# add to packages/generation/server/tests/test_corpus.py
from phonolex_generators.csp.constraints import ExcludeConstraint, BoundConstraint
from packages.generation.server.corpus import match_corpus
def _make_store(tmp_path):
pl.DataFrame({
"sentence_id": [0, 1, 2],
"text": [
"The cat sat on the mat",
"The dog ran fast today",
"Cars race on the road",
],
"source": ["cola"] * 3,
"source_record_id": ["c:0", "c:1", "c:2"],
"n_tokens": [6, 5, 5],
"n_content_in_vocab": [2, 2, 2],
"n_content_oov": [0, 0, 0],
"naturalness_score": [0.7, 0.6, 0.5],
}).write_parquet(tmp_path / "corpus_sentences_index.parquet")
pl.DataFrame({
"sentence_id": [0, 0, 1, 1, 2, 2],
"position": [1, 2, 1, 2, 0, 1],
"surface": ["cat", "sat", "dog", "ran", "Cars", "race"],
"lemma": ["cat", "sit", "dog", "run", "car", "race"],
"pos": ["NOUN", "VERB"] * 3,
"phonemes_str": [
"|k|æ|t|", "|s|ɪ|t|",
"|d|ɔ|ɡ|", "|ɹ|ʌ|n|", # /ɹ/ appears here
"|k|ɑ|ɹ|", "|ɹ|eɪ|s|", # /ɹ/ appears here too
],
"AoA": [3.0, 3.0, 3.0, 3.5, 4.0, 5.0],
}).write_parquet(tmp_path / "corpus_sentences.parquet")
return load_corpus(tmp_path)
def test_match_corpus_exclude(tmp_path):
store = _make_store(tmp_path)
matches = match_corpus(
store, [ExcludeConstraint(phonemes=("ɹ",))],
pairs_df=None, top_k=10,
)
# Only sentence 0 ("cat sat") has no /ɹ/ in any content word.
assert [m.text for m in matches] == ["The cat sat on the mat"]
def test_match_corpus_bound(tmp_path):
store = _make_store(tmp_path)
matches = match_corpus(
store, [BoundConstraint(norm="AoA", max_value=3.5)],
pairs_df=None, top_k=10,
)
# Sentences 0 (AoA all ≤ 3.0) and 1 (AoA 3.0, 3.5) pass.
texts = {m.text for m in matches}
assert texts == {"The cat sat on the mat", "The dog ran fast today"}
- [ ] Step 2: Run test to verify it fails
Expected: FAIL.
- [ ] Step 3: Implement
match_corpushard path
# packages/generation/server/corpus.py — add:
from pydantic import BaseModel
from typing import Literal
from phonolex_generators.csp.constraints import (
Constraint as CSPConstraint,
MinpairConstraint, MaxoppConstraint, MultoppConstraint,
hard_filter_expr,
)
class CorpusMatch(BaseModel):
text: str
source: Literal["cola", "ud_ewt", "gum", "childes", "tatoeba"]
naturalness_score: float
n_content_in_vocab: int
def match_corpus(
store: CorpusStore,
constraints: list[CSPConstraint],
pairs_df: pl.DataFrame | None,
top_k: int,
) -> list[CorpusMatch]:
# Multopp is paragraph-only — caller must surface
# corpus_skipped_reason="multopp_paragraph_only" before calling. We
# defensively return [] here in case the caller forgets.
if any(isinstance(c, MultoppConstraint) for c in constraints):
return []
# 1. Hard filter — same expression CSP applies per slot.
expr = hard_filter_expr(constraints)
words_lf = store.words_lf
if expr is not None:
words_lf = words_lf.with_columns(passes=expr)
else:
words_lf = words_lf.with_columns(passes=pl.lit(True))
ok_sids = (
words_lf
.group_by("sentence_id")
.agg(pl.col("passes").all().alias("ok"))
.filter(pl.col("ok"))
.select("sentence_id")
)
# 2. Contrastive (placeholder — Task 11 fills this in)
contrast = [c for c in constraints if isinstance(c, (MinpairConstraint, MaxoppConstraint))]
if contrast:
# Implemented in Task 11; for now, fail loudly if reached.
raise NotImplementedError("Contrastive corpus matching arrives in Task 11.")
# 3. Join survivors with index, sort by naturalness_score
matches_df = (
store.index_lf
.join(ok_sids, on="sentence_id", how="inner")
.sort("naturalness_score", descending=True, nulls_last=True)
.head(top_k)
.collect()
)
return [
CorpusMatch(
text=row["text"],
source=row["source"],
naturalness_score=row["naturalness_score"] or 0.0,
n_content_in_vocab=row["n_content_in_vocab"],
)
for row in matches_df.iter_rows(named=True)
]
- [ ] Step 4: Run tests to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_corpus.py -v
- [ ] Step 5: Commit
git add packages/generation/server/corpus.py packages/generation/server/tests/test_corpus.py
git commit -m "corpus retrieval: match_corpus hard-constraint path (Exclude/Bound/Pattern)"
Task 11: match_corpus — contrastive path¶
Files:
- Modify: packages/generation/server/corpus.py
- Modify: packages/generation/server/tests/test_corpus.py
- [ ] Step 1: Write the failing test
# add to packages/generation/server/tests/test_corpus.py
from phonolex_generators.csp.constraints import MinpairConstraint
def _make_pairs_df():
return pl.DataFrame({
"word1": ["cat", "dog"],
"word2": ["bat", "log"],
"phoneme1": ["k", "d"],
"phoneme2": ["b", "l"],
"position_type": ["initial", "initial"],
"feature_distance": [1.2, 1.5],
"sonorant_diff": [0.0, 1.0],
})
def test_match_corpus_minpair(tmp_path):
# Sentence containing both "cat" and "bat" should pass an initial /k/-/b/ minpair.
pl.DataFrame({
"sentence_id": [0, 1],
"text": ["The cat saw the bat clearly", "The dog chased the squirrel"],
"source": ["cola", "cola"],
"source_record_id": ["c:0", "c:1"],
"n_tokens": [6, 5],
"n_content_in_vocab": [3, 3],
"n_content_oov": [0, 0],
"naturalness_score": [0.7, 0.6],
}).write_parquet(tmp_path / "corpus_sentences_index.parquet")
pl.DataFrame({
"sentence_id": [0, 0, 0, 1, 1, 1],
"position": [1, 2, 4, 1, 2, 4],
"surface": ["cat", "saw", "bat", "dog", "chased", "squirrel"],
"lemma": ["cat", "see", "bat", "dog", "chase", "squirrel"],
"pos": ["NOUN", "VERB", "NOUN", "NOUN", "VERB", "NOUN"],
"phonemes_str": ["|k|æ|t|", "|s|ɔ|", "|b|æ|t|",
"|d|ɔ|ɡ|", "|tʃ|eɪ|s|t|", "|s|k|w|ɝ|l|"],
}).write_parquet(tmp_path / "corpus_sentences.parquet")
store = load_corpus(tmp_path)
pairs_df = _make_pairs_df()
matches = match_corpus(
store,
[MinpairConstraint(phoneme1="k", phoneme2="b", position="initial")],
pairs_df=pairs_df,
top_k=10,
)
assert [m.text for m in matches] == ["The cat saw the bat clearly"]
- [ ] Step 2: Run test to verify it fails
Expected: FAIL (NotImplementedError).
- [ ] Step 3: Implement contrastive path
Replace the placeholder block in match_corpus:
# packages/generation/server/corpus.py — replace contrastive block:
contrast = [c for c in constraints if isinstance(c, (MinpairConstraint, MaxoppConstraint))]
if contrast:
if len(contrast) > 1:
raise ValueError("at most one contrastive constraint per request")
if pairs_df is None:
raise ValueError("contrastive constraint requires pairs_df")
from phonolex_generators.csp.skeleton import _load_pairs_for_request
# Surviving lemmas from the hard-filter pass (per-row, already in ok_sids).
survivors_lemmas = (
words_lf
.join(ok_sids, on="sentence_id", how="inner")
.filter(pl.col("passes"))
.select("lemma")
.unique()
.collect()
["lemma"].to_list()
)
pair_frame = _load_pairs_for_request(
constraint=contrast[0],
pairs_df=pairs_df,
filtered_spec=frozenset(survivors_lemmas),
)
if pair_frame.height == 0:
return []
# Build set of (lemma, sentence_id) for hard-filter survivors.
survivor_rows = (
words_lf
.join(ok_sids, on="sentence_id", how="inner")
.filter(pl.col("passes"))
.select(["sentence_id", "lemma"])
.collect()
)
# Group lemmas per sentence.
sentence_lemmas = (
survivor_rows
.group_by("sentence_id")
.agg(pl.col("lemma").alias("lemmas"))
)
# Determine which sentences contain both halves of any surviving pair.
# Brute force is acceptable at corpus scale (~500K-1M sentences, ~500K pairs):
# build a {frozenset({w1,w2}) for each pair} and check per-sentence.
pair_set: set[frozenset[str]] = set()
for row in pair_frame.iter_rows(named=True):
pair_set.add(frozenset({row["filler_a"], row["filler_b"]}))
passing_sids: list[int] = []
for row in sentence_lemmas.iter_rows(named=True):
lemmas = set(row["lemmas"])
for pair in pair_set:
if pair <= lemmas:
passing_sids.append(row["sentence_id"])
break
if not passing_sids:
return []
ok_sids = pl.LazyFrame({"sentence_id": passing_sids})
- [ ] Step 4: Run tests to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_corpus.py -v
- [ ] Step 5: Commit
git add packages/generation/server/corpus.py packages/generation/server/tests/test_corpus.py
git commit -m "corpus retrieval: match_corpus contrastive path (Minpair/Maxopp)"
Task 12: Shared validate_constraints helper¶
Files:
- Create: packages/generation/server/validation.py
- Modify: packages/generation/server/routes/generate.py
- Create: packages/generation/server/tests/test_validation.py
- [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_validation.py
import pytest
import polars as pl
from fastapi import HTTPException
from packages.generation.server.validation import validate_constraints
from packages.generation.server.schemas import (
BoundConstraint, MinpairConstraint, MultoppConstraint,
)
def _fake_df():
return pl.DataFrame({"word": ["a"], "phonemes_str": ["|a|"], "AoA": [3.0]})
def test_unknown_bound_norm_raises_422():
with pytest.raises(HTTPException) as exc:
validate_constraints([BoundConstraint(type="bound", norm="nonexistent")], _fake_df())
assert exc.value.status_code == 422
def test_multiple_contrastive_raises():
with pytest.raises(ValueError):
validate_constraints([
MinpairConstraint(type="contrastive_minpair", phoneme1="k", phoneme2="b"),
MinpairConstraint(type="contrastive_minpair", phoneme1="d", phoneme2="t"),
], _fake_df())
def test_multopp_in_sentences_raises():
with pytest.raises(ValueError):
validate_constraints(
[MultoppConstraint(type="contrastive_multopp", substitute="ɹ", targets=["s", "t"])],
_fake_df(),
context="sentences",
)
- [ ] Step 2: Run tests to verify they fail
Expected: FAIL.
- [ ] Step 3: Lift validation into shared module
# packages/generation/server/validation.py
"""Shared constraint validation for /api/generate-sentences and /api/sentences.
Lifted from routes/generate.py:_validate_constraints. Same checks, exposed
to the orchestrator endpoint so both paths fail the same way.
"""
from __future__ import annotations
from typing import Literal
import polars as pl
from fastapi import HTTPException
from .schemas import Constraint, MinpairConstraint, MaxoppConstraint, MultoppConstraint
def validate_constraints(
constraints: list[Constraint],
df: pl.DataFrame,
context: Literal["sentences", "paragraphs"] = "sentences",
) -> None:
"""Raise 422 (or ValueError for solver-side checks) on invalid constraints."""
word_cols = set(df.columns)
bad: list[tuple[str, str]] = []
for c in constraints:
if c.type == "bound" and c.norm not in word_cols:
bad.append((c.norm, "bound.norm"))
if bad:
valid = sorted(
col for col in df.columns
if df[col].dtype.is_numeric() and not col.startswith("_")
)
unknown = ", ".join(f"{path}={n!r}" for n, path in bad)
raise HTTPException(
status_code=422,
detail=f"Unknown norm(s): {unknown}. Valid norms (numeric columns): {valid}",
)
contrast = [c for c in constraints
if isinstance(c, (MinpairConstraint, MaxoppConstraint))]
if len(contrast) > 1:
raise ValueError("at most one contrastive constraint per request")
multopp = [c for c in constraints if isinstance(c, MultoppConstraint)]
if multopp and context == "sentences":
raise ValueError(
"contrastive_multopp produces an N+1-sentence opposition set "
"(substitute + N targets sharing verb+role). It is paragraph-only — "
"use POST /api/generate-paragraphs."
)
def has_multopp(constraints: list[Constraint]) -> bool:
return any(isinstance(c, MultoppConstraint) for c in constraints)
- [ ] Step 4: Update routes/generate.py to import the shared helper
# packages/generation/server/routes/generate.py — replace local _validate_constraints
# Remove the local definition and call sites; replace with:
from ..validation import validate_constraints
# Update call sites:
# _validate_constraints(req.constraints, request.app.state.store.df)
# becomes:
# validate_constraints(req.constraints, request.app.state.store.df)
Find the two call sites in routes/generate.py (sentences route + paragraphs route) and update each.
- [ ] Step 5: Run tests to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_validation.py -v
cd packages/generation && uv run python -m pytest server/tests/ -v # ensure nothing regressed
- [ ] Step 6: Commit
git add packages/generation/server/validation.py \
packages/generation/server/routes/generate.py \
packages/generation/server/tests/test_validation.py
git commit -m "corpus retrieval: lift validate_constraints into shared validation module"
Task 13: Add CorpusMatch/SentencesRequest/SentencesResponse schemas + SyntheticMatch alias¶
Files:
- Modify: packages/generation/server/schemas.py
- Create: packages/generation/server/tests/test_schemas.py
- [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_schemas.py
from packages.generation.server.schemas import (
CorpusMatch, SentencesRequest, SentencesResponse, SyntheticMatch, SentenceCandidate,
)
def test_corpus_match_validates():
m = CorpusMatch(text="The cat sat.", source="cola",
naturalness_score=0.7, n_content_in_vocab=2)
assert m.source == "cola"
def test_synthetic_match_is_sentence_candidate_alias():
assert SyntheticMatch is SentenceCandidate
def test_sentences_request_defaults():
req = SentencesRequest(constraints=[])
assert req.top_k_corpus == 10
assert req.top_k_synthetic == 10
assert req.include_synthetic is True
def test_sentences_response_shape():
resp = SentencesResponse(
corpus_matches=[],
synthetic_matches=[],
corpus_skipped_reason=None,
synthetic_skipped_reason=None,
elapsed_ms={"corpus": 0, "synthetic": 0, "total": 0},
)
assert resp.elapsed_ms["total"] == 0
- [ ] Step 2: Run test to verify it fails
Expected: FAIL.
- [ ] Step 3: Add the new schemas
# packages/generation/server/schemas.py — append:
from typing import TypeAlias
# Alias the existing CSP candidate type as SyntheticMatch so the orchestrator
# response uses the corpus/synthetic vocabulary. Field shapes unchanged.
SyntheticMatch: TypeAlias = SentenceCandidate
class CorpusMatch(BaseModel):
text: str
source: Literal["cola", "ud_ewt", "gum", "childes", "tatoeba"]
naturalness_score: float
n_content_in_vocab: int
class SentencesRequest(BaseModel):
constraints: list[Constraint] = Field(default_factory=list)
spec: str = "all"
band: str = "all"
top_k_corpus: int = 10
top_k_synthetic: int = 10
include_synthetic: bool = True
max_candidates: int = 5000
class SentencesResponse(BaseModel):
corpus_matches: list[CorpusMatch]
synthetic_matches: list[SyntheticMatch]
corpus_skipped_reason: Literal["multopp_paragraph_only"] | None = None
synthetic_skipped_reason: Literal["disabled_by_caller", "error"] | None = None
elapsed_ms: dict[str, int]
- [ ] Step 4: Run tests to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_schemas.py -v
- [ ] Step 5: Commit
git add packages/generation/server/schemas.py packages/generation/server/tests/test_schemas.py
git commit -m "corpus retrieval: CorpusMatch/SentencesRequest/Response schemas + SyntheticMatch alias"
Task 14: /api/sentences orchestrator route¶
Files:
- Create: packages/generation/server/routes/sentences.py
- Create: packages/generation/server/tests/test_sentences_orchestrator.py
- [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_sentences_orchestrator.py
"""Orchestrator endpoint tests. Mocks corpus + CSP to keep this fast."""
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock
@pytest.fixture
def client(monkeypatch, tmp_path):
# Build a tiny CorpusStore + minimal app.state stubs.
# Detailed setup is verbose; the goal here is to exercise the response shape
# and the parallel-execution behavior. Patch the inner workers so we don't
# need real data on disk.
from packages.generation.server.main import app
fake_corpus_match = MagicMock(text="The cat sat.", source="cola",
naturalness_score=0.7, n_content_in_vocab=2)
fake_synthetic = MagicMock(sentence="The cat sees the dog.",
composite_score=0.65)
monkeypatch.setattr(
"packages.generation.server.routes.sentences.run_corpus_path",
lambda *a, **kw: [fake_corpus_match],
)
monkeypatch.setattr(
"packages.generation.server.routes.sentences.run_synthetic_path",
lambda *a, **kw: [fake_synthetic],
)
return TestClient(app)
def test_sentences_endpoint_returns_both_lists(client):
resp = client.post("/api/sentences", json={"constraints": [], "include_synthetic": True})
assert resp.status_code == 200
body = resp.json()
assert "corpus_matches" in body and "synthetic_matches" in body
assert len(body["corpus_matches"]) == 1
assert len(body["synthetic_matches"]) == 1
assert body["corpus_skipped_reason"] is None
assert body["synthetic_skipped_reason"] is None
def test_include_synthetic_false_skips_csp(client):
resp = client.post("/api/sentences", json={"constraints": [], "include_synthetic": False})
body = resp.json()
assert body["synthetic_matches"] == []
assert body["synthetic_skipped_reason"] == "disabled_by_caller"
def test_multopp_skips_corpus(client):
multopp = {"type": "contrastive_multopp", "substitute": "ɹ", "targets": ["s", "t"]}
# Multopp on /api/sentences should 422 because Multopp is paragraph-only;
# the orchestrator validates before dispatching either path.
resp = client.post("/api/sentences", json={"constraints": [multopp]})
assert resp.status_code == 422
- [ ] Step 2: Run tests to verify they fail
Expected: FAIL.
- [ ] Step 3: Implement the orchestrator route
# packages/generation/server/routes/sentences.py
"""Orchestrator endpoint that runs corpus retrieval and CSP synthetic
generation in parallel and returns both ranked lists in one envelope.
Routes:
POST /api/sentences — orchestrator
"""
from __future__ import annotations
import asyncio
import time
from fastapi import APIRouter, HTTPException, Request
from ..corpus import CorpusStore, match_corpus
from ..schemas import (
Constraint, SentencesRequest, SentencesResponse,
CorpusMatch, SyntheticMatch,
MultoppConstraint,
)
from ..validation import validate_constraints, has_multopp
# Import CSP machinery (mirrors routes/generate.py)
from phonolex_generators.csp import solver
from phonolex_generators.csp.reranker.rerank import rerank_with_axes
from phonolex_generators.csp.skeleton import spec_lexicon
from phonolex_generators.csp.constraints import (
ExcludeConstraint as CExclude, BoundConstraint as CBound,
MinpairConstraint as CMinpair, MaxoppConstraint as CMaxopp,
PatternConstraint as CPattern,
)
router = APIRouter()
def _to_csp_constraints(constraints: list[Constraint]) -> list:
"""Convert pydantic constraints to CSP dataclasses. Mirrors the
conversion already done inside routes/generate.py — keep in sync."""
out = []
for c in constraints:
if c.type == "exclude":
out.append(CExclude(phonemes=tuple(c.phonemes)))
elif c.type == "bound":
out.append(CBound(norm=c.norm, min_value=c.min_value, max_value=c.max_value))
elif c.type == "contrastive_minpair":
out.append(CMinpair(phoneme1=c.phoneme1, phoneme2=c.phoneme2,
position=c.position, slots=c.slots))
elif c.type == "contrastive_maxopp":
out.append(CMaxopp(phoneme1=c.phoneme1, phoneme2=c.phoneme2,
position=c.position,
min_sonorant_diff=c.min_sonorant_diff,
slots=c.slots))
elif c.type == "pattern":
out.append(CPattern(pattern_type=c.pattern_type,
phonemes=tuple(c.phonemes)))
return out
def run_corpus_path(
corpus_store: CorpusStore,
constraints: list,
pairs_df,
top_k: int,
) -> list[CorpusMatch]:
return match_corpus(corpus_store, constraints, pairs_df, top_k)
def run_synthetic_path(
request: Request,
csp_constraints: list,
spec: str,
band: str,
top_k: int,
max_candidates: int,
) -> list[SyntheticMatch]:
"""Run the same CSP+rerank path that /api/generate-sentences uses.
Returns a list of SyntheticMatch (== SentenceCandidate)."""
state = request.app.state
spec_words = spec_lexicon(state.store, spec)
candidates = solver.solve(
spec_words=spec_words,
word_df=state.store.df,
sel_df=state.sel_df,
pairs_df=state.pairs_df,
skeletons_df=state.skeletons_df,
band=band,
constraints=csp_constraints,
max_candidates=max_candidates,
)
if not candidates:
return []
reranked = rerank_with_axes(candidates, top_k=top_k)
# rerank_with_axes returns the same shape as routes/generate.py converts
# into SentenceCandidate. Reuse that conversion path:
from .generate import _to_sentence_candidates # type: ignore[attr-defined]
return _to_sentence_candidates(reranked)
@router.post("/sentences", response_model=SentencesResponse)
async def post_sentences(req: SentencesRequest, request: Request) -> SentencesResponse:
state = request.app.state
validate_constraints(req.constraints, state.store.df, context="sentences")
csp_constraints = _to_csp_constraints(req.constraints)
corpus_skipped: str | None = None
synthetic_skipped: str | None = None
elapsed: dict[str, int] = {}
t0 = time.perf_counter()
# Corpus path always runs (validate_constraints already rejected Multopp).
corpus_task = asyncio.create_task(
asyncio.to_thread(
run_corpus_path, state.corpus_store, csp_constraints,
state.pairs_df, req.top_k_corpus,
)
)
if req.include_synthetic:
synthetic_task = asyncio.create_task(
asyncio.to_thread(
run_synthetic_path, request, csp_constraints,
req.spec, req.band, req.top_k_synthetic, req.max_candidates,
)
)
else:
synthetic_task = None
synthetic_skipped = "disabled_by_caller"
try:
corpus_matches = await corpus_task
except Exception as e:
corpus_matches = []
# Don't blow up the whole response on corpus failure.
corpus_skipped = "error"
elapsed["corpus"] = int((time.perf_counter() - t0) * 1000)
if synthetic_task:
try:
synthetic_matches = await synthetic_task
except Exception:
synthetic_matches = []
synthetic_skipped = "error"
else:
synthetic_matches = []
elapsed["synthetic"] = int((time.perf_counter() - t0) * 1000) - elapsed["corpus"]
elapsed["total"] = int((time.perf_counter() - t0) * 1000)
return SentencesResponse(
corpus_matches=corpus_matches,
synthetic_matches=synthetic_matches,
corpus_skipped_reason=corpus_skipped, # type: ignore[arg-type]
synthetic_skipped_reason=synthetic_skipped, # type: ignore[arg-type]
elapsed_ms=elapsed,
)
- [ ] Step 4: Extract
_to_sentence_candidatesfrom generate.py
If routes/generate.py builds SentenceCandidate objects inline, extract that conversion into a named function _to_sentence_candidates(reranked) -> list[SentenceCandidate] and import it from the new orchestrator. Verify by reading lines that build SentenceCandidate(...) in routes/generate.py and lifting them into a single helper. Then both /api/generate-sentences and /api/sentences call the same function.
- [ ] Step 5: Mount the route in main.py
# packages/generation/server/main.py — modify routes block:
from .routes import generate, sentences
app.include_router(generate.router, prefix="/api")
app.include_router(sentences.router, prefix="/api")
- [ ] Step 6: Run orchestrator tests to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_sentences_orchestrator.py -v
- [ ] Step 7: Commit
git add packages/generation/server/routes/sentences.py \
packages/generation/server/routes/generate.py \
packages/generation/server/main.py \
packages/generation/server/tests/test_sentences_orchestrator.py
git commit -m "corpus retrieval: /api/sentences orchestrator (corpus ∥ CSP)"
Task 15: Cold-start CorpusStore in main.py¶
Files:
- Modify: packages/generation/server/main.py
- [ ] Step 1: Write the failing test (smoke)
# add to packages/generation/server/tests/test_sentences_orchestrator.py
def test_corpus_store_loaded_at_startup():
from packages.generation.server.main import app
# The lifespan handler should have populated app.state.corpus_store.
# Run a startup cycle by entering the lifespan context.
import asyncio
from fastapi.testclient import TestClient
with TestClient(app) as client:
assert hasattr(app.state, "corpus_store")
assert app.state.corpus_store is not None
- [ ] Step 2: Run test to verify it fails
Expected: FAIL (corpus_store attribute missing).
- [ ] Step 3: Load corpus at cold-start
# packages/generation/server/main.py — extend lifespan:
from .corpus import load_corpus
@asynccontextmanager
async def lifespan(app: FastAPI):
# ...existing loads...
print(f"[startup] loading corpus_sentences{{,_index}}.parquet…")
app.state.corpus_store = load_corpus(DATA_RUNTIME)
# Also cache pairs_df for the orchestrator (CSP loads it lazily inside
# solve(); the orchestrator wants it at hand for match_corpus).
print(f"[startup] loading pairs.parquet…")
app.state.pairs_df = pl.read_parquet(DATA_RUNTIME / "pairs.parquet")
print(f"[startup] ready.")
yield
- [ ] Step 4: Run test to verify pass
cd packages/generation && uv run python -m pytest server/tests/test_sentences_orchestrator.py::test_corpus_store_loaded_at_startup -v
FileNotFoundError on corpus_sentences.parquet, that means the ingest from Task 8 hasn't been run yet — run it on at least a tiny fixture (or wrap the load in a try/except for dev environments without the artifact).
- [ ] Step 5: Commit
git add packages/generation/server/main.py packages/generation/server/tests/test_sentences_orchestrator.py
git commit -m "corpus retrieval: load CorpusStore + pairs_df at FastAPI cold-start"
Task 16: Worker /sentences proxy route¶
Files:
- Modify: packages/web/workers/src/routes/generation.ts
- Modify: packages/web/workers/test/routes/generation.test.ts (path to verify)
- [ ] Step 1: Write the failing test
// packages/web/workers/test/routes/generation.test.ts — add:
import { describe, it, expect } from 'vitest';
import { createApp } from '../../src/index'; // adjust import to match existing test pattern
describe('POST /sentences', () => {
it('proxies to backend /api/sentences', async () => {
const app = createApp({ /* env with mocked GENERATION_SERVICE */ });
const res = await app.request('/sentences', {
method: 'POST',
body: JSON.stringify({ constraints: [], include_synthetic: true }),
headers: { 'Content-Type': 'application/json' },
});
expect(res.status).toBe(200);
});
});
(Adapt to the project's existing Worker test harness — read packages/web/workers/test/ for the convention used by the /generate-sentences test.)
- [ ] Step 2: Run test to verify it fails
cd packages/web/workers && npm test -- --run generation.test
- [ ] Step 3: Add the proxy route
// packages/web/workers/src/routes/generation.ts — add after existing routes:
generation.post('/sentences', (c) => proxy(c, 'POST', '/api/sentences'));
- [ ] Step 4: Run test to verify pass
cd packages/web/workers && npm test -- --run generation.test
- [ ] Step 5: Commit
git add packages/web/workers/src/routes/generation.ts \
packages/web/workers/test/routes/generation.test.ts
git commit -m "corpus retrieval: Worker proxy for /api/sentences"
Task 17: Frontend types¶
Files:
- Modify: packages/web/frontend/src/types/governance.ts (path to verify by reading the file)
- [ ] Step 1: Read the existing types file
cat packages/web/frontend/src/types/governance.ts | head -100
SentenceCandidate (or equivalent) type. The orchestrator response uses the same fields.
- [ ] Step 2: Add
CorpusMatch,SyntheticMatch,SentencesResponsetypes
// packages/web/frontend/src/types/governance.ts — append:
export type CorpusSource = 'cola' | 'ud_ewt' | 'gum' | 'childes' | 'tatoeba';
export interface CorpusMatch {
text: string;
source: CorpusSource;
naturalness_score: number;
n_content_in_vocab: number;
}
// SyntheticMatch is the corpus/synthetic-vocabulary alias for the existing
// candidate shape (sentence, composite_score, axis_scores, verb, fillers, ...).
// Re-export the existing type under the new name so call sites stay short.
export type SyntheticMatch = SentenceCandidate; // adjust if the type is named differently
export interface SentencesResponse {
corpus_matches: CorpusMatch[];
synthetic_matches: SyntheticMatch[];
corpus_skipped_reason: 'multopp_paragraph_only' | null;
synthetic_skipped_reason: 'disabled_by_caller' | 'error' | null;
elapsed_ms: { corpus: number; synthetic: number; total: number };
}
- [ ] Step 3: Run TypeScript check
cd packages/web/frontend && npm run typecheck
- [ ] Step 4: Commit
git add packages/web/frontend/src/types/governance.ts
git commit -m "corpus retrieval: frontend types — CorpusMatch + SyntheticMatch + SentencesResponse"
Task 18: Frontend — two stacked sections on Generate page¶
Files:
- Modify: packages/web/frontend/src/pages/Generate.tsx or equivalent (path to verify by grepping the codebase)
- [ ] Step 1: Find the Generate page
grep -rln "generate-sentences" packages/web/frontend/src --include="*.tsx" --include="*.ts"
/api/generate-sentences. That is the page to modify.
- [ ] Step 2: Switch endpoint + add two-section render
// In the page file from Step 1 — switch the fetch URL and adjust the response handling.
// (Concrete diff depends on the existing code; the patterns below are the
// shape to land on — adapt to the page's existing API client + state model.)
// 1. Endpoint switch:
// POST /api/generate-sentences → POST /api/sentences
//
// 2. Request body:
// { constraints, spec, band, top_k: 8 }
// becomes
// { constraints, spec, band, top_k_corpus: 10, top_k_synthetic: 10, include_synthetic: true }
//
// 3. Response type:
// GenerateSentencesResponse → SentencesResponse (imported from types/governance.ts)
//
// 4. Render — replace the single result list with two sections:
import type { SentencesResponse, CorpusMatch, SyntheticMatch } from '../types/governance';
function CorpusSection({ matches, skipped }: { matches: CorpusMatch[]; skipped: 'multopp_paragraph_only' | null }) {
if (skipped === 'multopp_paragraph_only') {
return <SectionHeader title="Corpus matches"
subtitle="Multiple opposition is a paragraph property — see Synthetic matches below." />;
}
if (matches.length === 0) {
return <SectionHeader title="Corpus matches"
subtitle="No attested sentences match these constraints. Try the synthetic matches below." />;
}
return (
<>
<SectionHeader title="Corpus matches"
subtitle={`${matches.length} attested sentences match your constraints`} />
{matches.map((m, i) => (
<CorpusRow key={i} match={m} />
))}
</>
);
}
function SyntheticSection({ matches, skipped }: { matches: SyntheticMatch[]; skipped: 'disabled_by_caller' | 'error' | null }) {
if (skipped === 'error') {
return <SectionHeader title="Synthetic matches"
subtitle="Synthetic generation failed — see corpus matches above." />;
}
if (matches.length === 0) {
return <SectionHeader title="Synthetic matches"
subtitle="No synthetic matches — try loosening your constraints." />;
}
return (
<>
<SectionHeader title="Synthetic matches"
subtitle={`${matches.length} generated alternatives`} />
{matches.map((m, i) => (
<SyntheticRow key={i} match={m} />
))}
</>
);
}
// In the page body:
{result && (
<>
<CorpusSection matches={result.corpus_matches} skipped={result.corpus_skipped_reason} />
<Divider />
<SyntheticSection matches={result.synthetic_matches} skipped={result.synthetic_skipped_reason} />
</>
)}
CorpusRow renders the sentence text with a small source pill (CoLA / UD-EWT / GUM / CHILDES / Tatoeba) and the naturalness_score on hover. SyntheticRow reuses the existing per-candidate row component already used on the page.
- [ ] Step 3: Manual smoke test in the dev server
cd packages/web/frontend && npm run dev
# In another terminal: start the generation server
cd /Users/jneumann/Repos/PhonoLex && uv run uvicorn packages.generation.server.main:app --host 0.0.0.0 --port 8000
In the browser, run a query with Exclude(/ɹ/) and verify two stacked sections render: corpus matches (real attested text containing no /ɹ/) then synthetic matches. Test the empty-state copy by setting an over-restrictive bound (e.g., Bound(AoA, max=2) with Pattern(STARTS_WITH, /z/)) — corpus section should show "No attested sentences match…".
- [ ] Step 4: Commit
git add packages/web/frontend/src/
git commit -m "corpus retrieval: Generate page — two stacked sections (corpus + synthetic)"
Task 19: Documentation — CLAUDE.md update¶
Files:
- Modify: CLAUDE.md
- [ ] Step 1: Update the Generation Runtime Data Contract section
Read the existing section in CLAUDE.md titled "Generation Runtime Data Contract (PHON-93 / PHON-106 / PHON-107 / PHON-109 / PHON-110)" and add two bullets to the list of runtime artifacts:
- `corpus_sentences.parquet` — per-(sentence, in-vocab content word) rows, ~1–3M rows × ~167 norm cols denormalized from words.parquet. Drives /api/sentences corpus matching; reuses CSP's hard_filter_expr exactly (every in-vocab content word must satisfy the filter, mirroring CSP's per-slot enforcement). Built by `packages/data/scripts/build_corpus_sentences.py`.
- `corpus_sentences_index.parquet` — per-sentence header rows with text, source, n_content_in_vocab, and precomputed naturalness_score (mean top-K Qwen3-Embedding cosine vs naturalness_reference.npy, self-row excluded).
Also update the architecture section:
- Add
/api/sentencesto the endpoint list ("Four endpoints: …,/api/sentences(orchestrator)"). - Add a short paragraph under Governed Generation explaining the corpus/synthetic split: "v5.3 (or successor) introduces a corpus retrieval path alongside CSP synthetic generation. The
/api/sentencesorchestrator runs both in parallel and returns ranked corpus matches first, ranked synthetic matches below. Real-world attested text reads naturally by definition and augments synthetic output, especially on permissive constraint combinations. Corpus retrieval reuses CSP'shard_filter_exprand_load_pairs_for_requestdirectly — constraint semantics are exact parity across the two paths."
Add a Code Style / Key Patterns bullet:
**Corpus vs synthetic vocabulary** — Real-world attested text is called "corpus" everywhere; CSP-generated text is called "synthetic". Used in schemas (`CorpusMatch`, `SyntheticMatch`), response fields (`corpus_matches`, `synthetic_matches`), UI section headers ("Corpus matches", "Synthetic matches"), and docs. Don't introduce alternatives like "real-world" / "generated" / "examples" / "alternatives" — vocabulary drift makes the two surfaces harder to talk about consistently.
- [ ] Step 2: Commit
git add CLAUDE.md
git commit -m "docs: corpus retrieval — Generation Runtime Data Contract + endpoints + terminology"
Self-review summary¶
This plan's task structure maps cleanly to the spec's sections:
| Spec section | Plan tasks |
|---|---|
| Data model (index + words parquets) | 6 (schemas + emit) |
| Ingest pipeline (5 stages) | 1 (loaders), 2 (pre-filter), 3 (spaCy), 4 (lemma-join), 5 (profanity), 6 (emit), 7 (naturalness), 8 (wire e2e) |
| Query path (hard + contrastive) | 9 (loader), 10 (hard), 11 (contrastive) |
| Orchestrator endpoint | 12 (validation), 13 (schemas), 14 (route), 15 (cold-start) |
| Worker proxy | 16 |
| Frontend | 17 (types), 18 (page) |
| Docs | 19 |
Open decisions §1 (SentenceCandidate rename) is resolved by aliasing SyntheticMatch = SentenceCandidate (Task 13) — no risky rename, but the orchestrator surface uses corpus/synthetic vocabulary externally.
Open decisions §2 (slots UX) is deferred: Task 18 leaves the slots picker untouched on the page; a small follow-up ticket can decide whether to annotate or hide it.
Open decisions §3 (SSE streaming) is deferred to a follow-up — Task 18 ships the wait-for-both behavior.
Open decisions §4 (denylist source) is resolved by Task 5 using better-profanity.
The plan does not include a git push step at the end of each task because pushing is a per-feature decision; once the full feature is green locally, the engineer pushes the branch and opens a PR against develop. Per feedback_finish_the_job.md, the engineer should push + open the PR once Task 19 lands.