Skip to content

Corpus Retrieval Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add a real-corpus retrieval service that ships attested sentences from CoLA-pos + UD-EWT + GUM + CHILDES-adult + Tatoeba-en, filtered by the same CSP constraint schema, surfaced alongside CSP synthetic output via a new /api/sentences orchestrator endpoint.

Architecture: Offline ingest produces two LFS-tracked Parquet files (per-sentence index + per-(sentence, content-word) detail). Runtime loads them once at FastAPI cold-start. The orchestrator endpoint runs corpus retrieval (Polars filter) and CSP (existing solver+rerank) in parallel and returns both ranked lists in a single envelope. Constraint semantics are exact parity with CSP — every in-vocab content word must satisfy the filter, mirroring CSP's per-slot enforcement. Reuses hard_filter_expr and _load_pairs_for_request from the CSP package 1:1.

Tech Stack: Python 3.13 + Polars + spaCy (en_core_web_sm) + sentence-transformers (Qwen3-Embedding-0.6B) + FastAPI + Pydantic v2; TypeScript + Hono + React + MUI on the web side; Parquet over LFS for data artifacts.

Reference spec: docs/superpowers/specs/2026-05-11-corpus-retrieval-design.md

Branching: Create branch feature/corpus-retrieval off develop. If feature/csp-iteration has not merged to develop yet, branch off feature/csp-iteration and rebase onto develop once that lands. Do not use git worktrees — work in the main checkout.


File Structure

Created: - packages/data/scripts/build_corpus_sentences.py — ingest pipeline (CLI entry-point) - packages/data/src/phonolex_data/runtime/corpus_schema.py — Polars schemas + retention constants for the two corpus parquets - packages/data/tests/test_corpus_ingest.py — unit tests for the ingest pipeline - packages/generation/server/corpus.py — runtime CorpusStore + match_corpus - packages/generation/server/routes/sentences.py/api/sentences orchestrator - packages/generation/server/validation.py — shared validate_constraints lifted from routes/generate.py - packages/generation/server/tests/test_corpus.py - packages/generation/server/tests/test_sentences_orchestrator.py - data/runtime/corpus_sentences.parquet (LFS) - data/runtime/corpus_sentences_index.parquet (LFS)

Modified: - packages/generation/server/main.py — load CorpusStore at cold-start - packages/generation/server/schemas.py — add CorpusMatch, SentencesRequest, SentencesResponse; add SyntheticMatch as a TypeAlias for SentenceCandidate - packages/generation/server/routes/generate.py — drop the inline _validate_constraints, import from ..validation - packages/web/workers/src/routes/generation.ts — add /sentences proxy route - packages/web/frontend/src/types/governance.ts — add CorpusMatch, SyntheticMatch, SentencesResponse types (path to verify at Task 17) - packages/web/frontend/src/pages/Generate.tsx or equivalent (path to verify at Task 18) — switch endpoint, render two stacked sections - .gitattributes — add LFS patterns for the two new Parquet files - CLAUDE.md — extend Generation Runtime Data Contract with the corpus artifacts and add /api/sentences to the endpoints list


Task 1: Ingest scaffolding — source loaders

Files: - Create: packages/data/scripts/build_corpus_sentences.py - Create: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import (
    load_cola_positive, load_ud_ewt, load_gum, load_childes_adult, load_tatoeba_english,
)

def test_loaders_yield_text_and_source_id():
    for loader in (load_cola_positive, load_ud_ewt, load_gum,
                   load_childes_adult, load_tatoeba_english):
        first = next(iter(loader(limit=1)))
        assert isinstance(first, tuple) and len(first) == 2
        text, src_id = first
        assert isinstance(text, str) and len(text) > 0
        assert isinstance(src_id, str)
  • [ ] Step 2: Run test to verify it fails

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_loaders_yield_text_and_source_id -v
Expected: FAIL (module does not exist).

  • [ ] Step 3: Write minimal script with loaders
# packages/data/scripts/build_corpus_sentences.py
"""Build corpus retrieval Parquets — corpus_sentences.parquet + corpus_sentences_index.parquet.

Reuses CoLA/UD-EWT/GUM loaders from build_naturalness_reference.py and
adds CHILDES adult-input and Tatoeba English. Output schema is defined in
phonolex_data.runtime.corpus_schema.
"""
from __future__ import annotations

import argparse
from pathlib import Path
from typing import Iterator

REPO_ROOT = Path(__file__).resolve().parents[3]
RUNTIME = REPO_ROOT / "data" / "runtime"


def load_cola_positive(limit: int | None = None) -> Iterator[tuple[str, str]]:
    from datasets import load_dataset
    n = 0
    for split in ("train", "validation"):
        ds = load_dataset("glue", "cola", split=split)
        for i, row in enumerate(ds):
            if row["label"] != 1:
                continue
            yield row["sentence"], f"cola:{split}:{i}"
            n += 1
            if limit and n >= limit:
                return


def load_ud_ewt(limit: int | None = None) -> Iterator[tuple[str, str]]:
    from datasets import load_dataset
    n = 0
    for split in ("train", "validation", "test"):
        ds = load_dataset("universal_dependencies", "en_ewt", split=split)
        for i, row in enumerate(ds):
            yield row["text"], f"ud_ewt:{split}:{i}"
            n += 1
            if limit and n >= limit:
                return


def load_gum(limit: int | None = None) -> Iterator[tuple[str, str]]:
    from datasets import load_dataset
    n = 0
    for split in ("train", "validation", "test"):
        try:
            ds = load_dataset("universal_dependencies", "en_gum", split=split)
        except ValueError:
            continue
        for i, row in enumerate(ds):
            yield row["text"], f"gum:{split}:{i}"
            n += 1
            if limit and n >= limit:
                return


def load_childes_adult(limit: int | None = None) -> Iterator[tuple[str, str]]:
    """Reuse PHON-94's CHILDES TalkBank XML reader. Filter to adult speakers
    (MOT/FAT/INV) in English locales (Eng-NA, Eng-UK). Path to the cached
    corpus is configured in the existing build_frequency_corpus pipeline.
    """
    from phonolex_data.loaders.childes import iter_adult_utterances  # added in Task 1.5
    n = 0
    for utt in iter_adult_utterances(locales=("Eng-NA", "Eng-UK")):
        yield utt.text, f"childes:{utt.locale}:{utt.session_id}:{utt.utt_idx}"
        n += 1
        if limit and n >= limit:
            return


def load_tatoeba_english(limit: int | None = None) -> Iterator[tuple[str, str]]:
    """Tatoeba English sentences (CC-BY 2.0 FR). Expects the dump at
    data/raw/tatoeba/sentences.csv (downloaded once by the build script).
    """
    import csv
    path = REPO_ROOT / "data" / "raw" / "tatoeba" / "sentences.csv"
    if not path.exists():
        raise FileNotFoundError(
            f"Tatoeba dump not found at {path}. Download from "
            "https://tatoeba.org/en/downloads and extract the English subset."
        )
    n = 0
    with path.open(encoding="utf-8") as f:
        reader = csv.reader(f, delimiter="\t")
        for row in reader:
            if len(row) < 3 or row[1] != "eng":
                continue
            sentence_id, _, text = row[0], row[1], row[2]
            yield text, f"tatoeba:{sentence_id}"
            n += 1
            if limit and n >= limit:
                return


def main() -> None:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--limit-per-source", type=int, default=None)
    args = parser.parse_args()
    # Task 8 wires in the rest of the pipeline.
    raise SystemExit("Pipeline not yet wired — see Task 8.")


if __name__ == "__main__":
    main()
  • [ ] Step 4: Add minimal CHILDES iterator stub
# packages/data/src/phonolex_data/loaders/childes.py (new or extended)
"""Adult-speaker utterance reader for CHILDES TalkBank XML."""
from dataclasses import dataclass
from typing import Iterator

ADULT_CODES = frozenset({"MOT", "FAT", "INV"})


@dataclass(frozen=True)
class AdultUtterance:
    text: str
    locale: str
    session_id: str
    utt_idx: int
    speaker_code: str


def iter_adult_utterances(locales: tuple[str, ...] = ("Eng-NA", "Eng-UK")) -> Iterator[AdultUtterance]:
    """Stream adult-speaker utterances from cached CHILDES XML.

    Reuses the cache directory established by PHON-94's
    research/2026-04-30-frequency-corpus-build pipeline. If the cache is
    absent, raise FileNotFoundError with a clear message — building it is
    a separate, one-time operation.
    """
    raise NotImplementedError(
        "CHILDES adult iterator: lift implementation from "
        "research/2026-04-30-frequency-corpus-build/build_frequency_corpus.py "
        "where the same XML/speaker-code parsing is already proven. "
        "Filter to ADULT_CODES; emit AdultUtterance per <u> tag."
    )
  • [ ] Step 5: Run test to verify it passes

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_loaders_yield_text_and_source_id -v
Expected: PASS for cola/ud_ewt/gum loaders; FAIL for childes (NotImplementedError) and tatoeba (FileNotFoundError) until those sources are provisioned. The test should pytest.skip on those two with a clear marker — adjust the test to call only the three working loaders and add separate xfail cases for childes and tatoeba.

Update the test:

import pytest

CORE_LOADERS = [load_cola_positive, load_ud_ewt, load_gum]

@pytest.mark.parametrize("loader", CORE_LOADERS)
def test_core_loaders_yield_text_and_source_id(loader):
    first = next(iter(loader(limit=1)))
    text, src_id = first
    assert isinstance(text, str) and len(text) > 0
    assert isinstance(src_id, str)


@pytest.mark.xfail(reason="CHILDES requires PHON-94 cache + adult iterator (Task 1 Step 4)")
def test_childes_loader():
    next(iter(load_childes_adult(limit=1)))


@pytest.mark.xfail(reason="Tatoeba dump must be provisioned at data/raw/tatoeba/sentences.csv")
def test_tatoeba_loader():
    next(iter(load_tatoeba_english(limit=1)))
  • [ ] Step 6: Commit
git add packages/data/scripts/build_corpus_sentences.py \
        packages/data/src/phonolex_data/loaders/childes.py \
        packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: source loaders for CoLA/UD-EWT/GUM/CHILDES/Tatoeba"

Task 2: Sentence-level pre-filter

Files: - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import (
    prefilter_sentence, dedup_stream,
)

def test_prefilter_length():
    assert prefilter_sentence("Hello world this is good") is True   # 5 tokens
    assert prefilter_sentence("Two words") is False                  # 2 < 5
    assert prefilter_sentence("word " * 30) is False                 # 30 > 25

def test_prefilter_ascii():
    assert prefilter_sentence("Café au lait morning daily") is False  # non-ASCII

def test_dedup_lowercased():
    stream = [("Hello world friend mine pal", "a"),
              ("HELLO WORLD FRIEND MINE PAL", "b"),
              ("Different sentence entirely now today", "c")]
    out = list(dedup_stream(iter(stream)))
    assert len(out) == 2
  • [ ] Step 2: Run test to verify it fails

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_prefilter_length -v
Expected: FAIL (functions not defined).

  • [ ] Step 3: Implement pre-filter + dedup
# packages/data/scripts/build_corpus_sentences.py — add:

MIN_TOKENS = 5
MAX_TOKENS = 25


def prefilter_sentence(text: str) -> bool:
    """Whitespace-tokenized length 5..25 and ASCII-printable."""
    if not text:
        return False
    if not text.isascii() or not text.isprintable():
        return False
    n = len(text.split())
    return MIN_TOKENS <= n <= MAX_TOKENS


def dedup_stream(stream: Iterator[tuple[str, str]]) -> Iterator[tuple[str, str]]:
    """Yield (text, source_record_id) pairs whose lowercased text has not been seen."""
    seen: set[str] = set()
    for text, src in stream:
        key = text.lower().strip()
        if key in seen:
            continue
        seen.add(key)
        yield text, src
  • [ ] Step 4: Run tests to verify they pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py -v
Expected: 3 new tests PASS.

  • [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: sentence pre-filter (length + ASCII + dedup)"

Task 3: spaCy content-token extraction

Files: - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import extract_content_tokens, ContentToken

def test_extract_content_tokens():
    # "The big red ball rolled" → big(ADJ), red(ADJ), ball(NOUN), rolled(VERB)
    tokens = extract_content_tokens("The big red ball rolled")
    poss = [t.pos for t in tokens]
    surfaces = [t.surface for t in tokens]
    assert poss == ["ADJ", "ADJ", "NOUN", "VERB"]
    assert surfaces == ["big", "red", "ball", "rolled"]
    # Lemmas: rolled → roll
    assert any(t.lemma == "roll" for t in tokens)
    # Position is 0-indexed across the full token stream (not content-only)
    # i.e., "big" at position 1, "rolled" at position 4
    pos_by_lemma = {t.lemma: t.position for t in tokens}
    assert pos_by_lemma["big"] == 1
    assert pos_by_lemma["roll"] == 4
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL.

  • [ ] Step 3: Implement extractor
# packages/data/scripts/build_corpus_sentences.py — add:

from dataclasses import dataclass
from functools import lru_cache

CONTENT_POS = frozenset({"NOUN", "VERB", "ADJ", "ADV"})


@dataclass(frozen=True)
class ContentToken:
    surface: str
    lemma: str
    pos: str
    position: int  # 0-indexed position in the full token stream


@lru_cache(maxsize=1)
def _spacy_nlp():
    import spacy
    nlp = spacy.load("en_core_web_sm", disable=["parser", "attribute_ruler"])
    return nlp


def extract_content_tokens(text: str) -> list[ContentToken]:
    doc = _spacy_nlp()(text)
    out: list[ContentToken] = []
    for i, tok in enumerate(doc):
        if tok.pos_ not in CONTENT_POS:
            continue
        if tok.ent_type_:  # drop spaCy-flagged named entities
            continue
        if not tok.lemma_:
            continue
        out.append(ContentToken(
            surface=tok.text,
            lemma=tok.lemma_.lower(),
            pos=tok.pos_,
            position=i,
        ))
    return out


def extract_content_tokens_batch(texts: list[str], batch_size: int = 256) -> Iterator[list[ContentToken]]:
    """Pipe `texts` through spaCy nlp.pipe for batch throughput. Yields one
    list of ContentToken per input text, in input order."""
    nlp = _spacy_nlp()
    for doc in nlp.pipe(texts, batch_size=batch_size, n_process=1):
        out: list[ContentToken] = []
        for i, tok in enumerate(doc):
            if tok.pos_ not in CONTENT_POS:
                continue
            if tok.ent_type_:
                continue
            if not tok.lemma_:
                continue
            out.append(ContentToken(
                surface=tok.text, lemma=tok.lemma_.lower(),
                pos=tok.pos_, position=i,
            ))
        yield out
  • [ ] Step 4: Run tests to verify they pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_extract_content_tokens -v
Expected: PASS. If spaCy en_core_web_sm is not installed, install with uv run python -m spacy download en_core_web_sm and re-run.

  • [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: spaCy content-token extractor (POS+lemma)"

Task 4: Lemma-join + retention rule

Files: - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
import polars as pl
from packages.data.scripts.build_corpus_sentences import (
    build_lemma_lookup, join_content_tokens, MIN_CONTENT_IN_VOCAB,
)

def test_lemma_lookup_returns_dict():
    df = pl.DataFrame({
        "lemma": ["run", "ball", "red"],
        "phonemes_str": ["|ɹ|ʌ|n|", "|b|ɔ|l|", "|ɹ|ɛ|d|"],
        "frequency_log_zipf": [4.0, 5.0, 4.5],
    })
    lookup = build_lemma_lookup(df)
    assert lookup["run"]["phonemes_str"] == "|ɹ|ʌ|n|"


def test_join_drops_oov_tokens():
    from packages.data.scripts.build_corpus_sentences import ContentToken
    tokens = [
        ContentToken("running", "run", "VERB", 0),
        ContentToken("widget", "widget", "NOUN", 1),   # not in lookup
        ContentToken("balls", "ball", "NOUN", 2),
    ]
    lookup = {"run": {"phonemes_str": "|ɹ|ʌ|n|"},
              "ball": {"phonemes_str": "|b|ɔ|l|"}}
    survivors = list(join_content_tokens(tokens, lookup))
    assert len(survivors) == 2
    assert {t.lemma for t in survivors} == {"run", "ball"}


def test_retention_threshold():
    assert MIN_CONTENT_IN_VOCAB == 2
  • [ ] Step 2: Run tests to verify they fail

Expected: FAIL.

  • [ ] Step 3: Implement lemma lookup + join + retention
# packages/data/scripts/build_corpus_sentences.py — add:

MIN_CONTENT_IN_VOCAB = 2


def build_lemma_lookup(words_df: pl.DataFrame) -> dict[str, dict]:
    """Build {lemma -> row_dict} for fast in-process lookup during ingest.
    Row dict carries all columns from words.parquet (denormalized at emit time).
    """
    return {
        row["lemma"]: row
        for row in words_df.iter_rows(named=True)
        if row.get("lemma")
    }


def join_content_tokens(
    tokens: list[ContentToken],
    lookup: dict[str, dict],
) -> Iterator[tuple[ContentToken, dict]]:
    """Yield (token, words_row) pairs for content tokens whose lemma joins."""
    for t in tokens:
        row = lookup.get(t.lemma)
        if row is None:
            continue
        yield t, row

Add import at top of file:

import polars as pl
  • [ ] Step 4: Run tests to verify they pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py -v
Expected: 3 new tests PASS.

  • [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: lemma lookup + content-token join + retention rule"

Task 5: Profanity filter

Files: - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
from packages.data.scripts.build_corpus_sentences import contains_profanity

def test_clean_sentence_passes():
    assert contains_profanity("The cat sat on the warm mat") is False

def test_profanity_filtered():
    # Use a placeholder slur token from the denylist
    # (Hard-coding actual slurs in tests is avoided; we rely on the lib's
    # internal list. This asserts the function actually catches *something*
    # the better-profanity lib flags.)
    # The lib provides a known-flagged seed:
    from better_profanity import profanity
    profanity.load_censor_words()
    flagged = next(w for w in profanity.CENSOR_WORDSET)
    assert contains_profanity(f"the {flagged} stuff word value here") is True
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL.

  • [ ] Step 3: Implement profanity check via better-profanity
# packages/data/scripts/build_corpus_sentences.py — add:

@lru_cache(maxsize=1)
def _profanity_engine():
    from better_profanity import profanity
    profanity.load_censor_words()
    return profanity


def contains_profanity(text: str) -> bool:
    return _profanity_engine().contains_profanity(text)

Add dependency in packages/data/pyproject.toml under [project.dependencies]:

better-profanity>=0.7.0

Then:

uv pip install -e packages/data
  • [ ] Step 4: Run tests to verify they pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py -v
Expected: 2 new tests PASS.

  • [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py \
        packages/data/tests/test_corpus_ingest.py \
        packages/data/pyproject.toml
git commit -m "corpus retrieval: profanity filter via better-profanity"

Task 6: Parquet schemas + emit

Files: - Create: packages/data/src/phonolex_data/runtime/corpus_schema.py - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
import polars as pl
import tempfile
from pathlib import Path

from packages.data.scripts.build_corpus_sentences import emit_parquets, IngestedSentence
from packages.data.src.phonolex_data.runtime.corpus_schema import (
    index_columns, words_columns,
)


def test_emit_round_trip():
    sample = IngestedSentence(
        sentence_id=0,
        text="The cat sat on the mat",
        source="cola",
        source_record_id="cola:train:0",
        n_tokens=6,
        rows=[
            # (ContentToken, words_row_dict) tuples
            ({"surface": "cat", "lemma": "cat", "pos": "NOUN", "position": 1},
             {"phonemes_str": "|k|æ|t|", "lemma": "cat", "AoA": 3.5}),
            ({"surface": "sat", "lemma": "sit", "pos": "VERB", "position": 2},
             {"phonemes_str": "|s|ɪ|t|", "lemma": "sit", "AoA": 3.0}),
            ({"surface": "mat", "lemma": "mat", "pos": "NOUN", "position": 5},
             {"phonemes_str": "|m|æ|t|", "lemma": "mat", "AoA": 4.0}),
        ],
    )
    with tempfile.TemporaryDirectory() as tmp:
        out = Path(tmp)
        emit_parquets([sample], out_dir=out, words_columns=["phonemes_str", "lemma", "AoA"])
        idx = pl.read_parquet(out / "corpus_sentences_index.parquet")
        words = pl.read_parquet(out / "corpus_sentences.parquet")

        assert idx.height == 1
        assert idx["n_content_in_vocab"][0] == 3
        assert words.height == 3
        assert set(words.columns) >= {"sentence_id", "position", "surface", "lemma",
                                       "pos", "phonemes_str", "AoA"}
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL.

  • [ ] Step 3: Define corpus schema
# packages/data/src/phonolex_data/runtime/corpus_schema.py
"""Parquet schemas for corpus retrieval artifacts."""
from __future__ import annotations
import polars as pl


def index_columns() -> dict[str, pl.DataType]:
    return {
        "sentence_id": pl.UInt32,
        "text": pl.Utf8,
        "source": pl.Utf8,
        "source_record_id": pl.Utf8,
        "n_tokens": pl.UInt8,
        "n_content_in_vocab": pl.UInt8,
        "n_content_oov": pl.UInt8,
        "naturalness_score": pl.Float32,  # populated in Task 7
    }


def words_columns(extra_norm_cols: list[str]) -> dict[str, pl.DataType]:
    """Base corpus-words schema. extra_norm_cols are the 167 norm columns
    inlined from words.parquet; their dtypes follow Polars' inferred types
    when emit_parquets writes the DataFrame, so we don't enumerate them here.
    """
    base = {
        "sentence_id": pl.UInt32,
        "position": pl.UInt8,
        "surface": pl.Utf8,
        "lemma": pl.Utf8,
        "pos": pl.Utf8,
        "phonemes_str": pl.Utf8,
    }
    return base
  • [ ] Step 4: Implement emit_parquets
# packages/data/scripts/build_corpus_sentences.py — add:

from dataclasses import dataclass, field

@dataclass
class IngestedSentence:
    sentence_id: int
    text: str
    source: str
    source_record_id: str
    n_tokens: int
    rows: list[tuple[dict, dict]]  # (content_token_dict, words_row_dict)
    n_content_oov: int = 0


def emit_parquets(
    sentences: list[IngestedSentence],
    out_dir: Path,
    words_columns: list[str],
) -> None:
    """Write corpus_sentences_index.parquet + corpus_sentences.parquet."""
    out_dir.mkdir(parents=True, exist_ok=True)

    idx_rows = []
    word_rows = []
    for s in sentences:
        idx_rows.append({
            "sentence_id": s.sentence_id,
            "text": s.text,
            "source": s.source,
            "source_record_id": s.source_record_id,
            "n_tokens": s.n_tokens,
            "n_content_in_vocab": len(s.rows),
            "n_content_oov": s.n_content_oov,
            "naturalness_score": None,  # filled by Task 7
        })
        for tok, words_row in s.rows:
            row = {
                "sentence_id": s.sentence_id,
                "position": tok["position"],
                "surface": tok["surface"],
                "lemma": tok["lemma"],
                "pos": tok["pos"],
                "phonemes_str": words_row["phonemes_str"],
            }
            for col in words_columns:
                if col in ("phonemes_str", "lemma"):
                    continue
                row[col] = words_row.get(col)
            word_rows.append(row)

    pl.DataFrame(idx_rows).write_parquet(out_dir / "corpus_sentences_index.parquet")
    pl.DataFrame(word_rows).write_parquet(out_dir / "corpus_sentences.parquet")
  • [ ] Step 5: Run tests to verify they pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_emit_round_trip -v
Expected: PASS.

  • [ ] Step 6: Commit
git add packages/data/src/phonolex_data/runtime/corpus_schema.py \
        packages/data/scripts/build_corpus_sentences.py \
        packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: Parquet schemas + emit (index + words tables)"

Task 7: Naturalness pre-score with self-row exclusion

Files: - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py

  • [ ] Step 1: Write the failing test
# add to packages/data/tests/test_corpus_ingest.py
import numpy as np

from packages.data.scripts.build_corpus_sentences import (
    score_naturalness, _exclude_self_row_idx,
)


def test_exclude_self_row_idx_match():
    ref_texts = ["alpha sentence", "beta sentence", "gamma sentence"]
    assert _exclude_self_row_idx("beta sentence", ref_texts) == 1
    assert _exclude_self_row_idx("absent", ref_texts) is None


def test_score_naturalness_excludes_self():
    # Synthetic test: build a 3x4 ref matrix; query identical to row 1 must
    # NOT see cosine=1.0 contribute to its score.
    ref = np.array([
        [1.0, 0.0, 0.0, 0.0],
        [0.5, 0.5, 0.5, 0.5],
        [0.0, 1.0, 0.0, 0.0],
    ], dtype=np.float32)
    ref = ref / np.linalg.norm(ref, axis=1, keepdims=True)
    query = ref[1].copy()  # identical to row 1
    # With self-exclusion, top-K cosine should NOT be 1.0 — should pick the
    # next-best non-self row.
    score = score_naturalness(query, ref, self_row_idx=1, top_k=2)
    assert score < 0.99
  • [ ] Step 2: Run tests to verify they fail

Expected: FAIL.

  • [ ] Step 3: Implement self-exclusion + scoring
# packages/data/scripts/build_corpus_sentences.py — add:

import numpy as np


def _exclude_self_row_idx(sentence_text: str, ref_texts: list[str]) -> int | None:
    """Return the index of an exact-match row in the naturalness reference,
    or None if absent. Matches on lowercased stripped text.
    """
    needle = sentence_text.strip().lower()
    for i, t in enumerate(ref_texts):
        if t.strip().lower() == needle:
            return i
    return None


def score_naturalness(
    query_emb: np.ndarray,         # (D,) L2-normalized
    ref_emb: np.ndarray,           # (N, D) L2-normalized
    self_row_idx: int | None,
    top_k: int = 20,
) -> float:
    cos = ref_emb @ query_emb     # (N,)
    if self_row_idx is not None:
        cos = np.delete(cos, self_row_idx)
    if cos.size == 0:
        return 0.0
    k = min(top_k, cos.size)
    top = np.partition(cos, -k)[-k:]
    return float(top.mean())


def annotate_naturalness(
    index_path: Path,
    ref_npy: Path,
    ref_meta_jsonl: Path,
    model_name: str = "Qwen/Qwen3-Embedding-0.6B",
    batch_size: int = 32,
) -> None:
    """Read corpus_sentences_index.parquet, embed each sentence, compute
    self-excluded top-K cosine vs the ref matrix, write naturalness_score
    back to the same file."""
    import json
    from sentence_transformers import SentenceTransformer

    model = SentenceTransformer(model_name)
    ref_emb = np.load(ref_npy)
    ref_texts = [json.loads(line)["sentence"] for line in ref_meta_jsonl.read_text().splitlines()]
    if len(ref_texts) != ref_emb.shape[0]:
        raise ValueError(f"ref matrix N={ref_emb.shape[0]} != meta N={len(ref_texts)}")

    idx = pl.read_parquet(index_path)
    texts = idx["text"].to_list()
    embs = model.encode(texts, batch_size=batch_size, normalize_embeddings=True,
                        show_progress_bar=True, convert_to_numpy=True)

    scores = []
    for text, e in zip(texts, embs):
        scores.append(score_naturalness(
            e.astype(np.float32), ref_emb,
            _exclude_self_row_idx(text, ref_texts),
        ))
    idx = idx.with_columns(pl.Series("naturalness_score", scores, dtype=pl.Float32))
    idx.write_parquet(index_path)
  • [ ] Step 4: Run tests to verify they pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_exclude_self_row_idx_match \
                       packages/data/tests/test_corpus_ingest.py::test_score_naturalness_excludes_self -v
Expected: PASS.

  • [ ] Step 5: Commit
git add packages/data/scripts/build_corpus_sentences.py packages/data/tests/test_corpus_ingest.py
git commit -m "corpus retrieval: naturalness pre-score with self-row exclusion"

Task 8: Wire the ingest pipeline end-to-end

Files: - Modify: packages/data/scripts/build_corpus_sentences.py - Modify: packages/data/tests/test_corpus_ingest.py - Modify: .gitattributes

  • [ ] Step 1: Write the failing integration test
# add to packages/data/tests/test_corpus_ingest.py
def test_pipeline_small_sample(tmp_path, monkeypatch):
    """Run the full pipeline on a 5-sentence in-memory fixture and verify
    both Parquets are written with the expected shapes."""
    from packages.data.scripts.build_corpus_sentences import (
        run_pipeline, ContentToken,
    )
    import polars as pl

    fixture_sentences = [
        ("The cat sat on the mat today", "fx:0"),
        ("Dogs run quickly through the park", "fx:1"),
        ("Two words", "fx:2"),  # filtered: < 5 tokens
        ("The cat sat on the mat today", "fx:3"),  # filtered: dup
        ("A red ball rolled down the hill", "fx:4"),
    ]

    # Minimal in-memory words.parquet covering the lemmas used above
    fake_words = pl.DataFrame({
        "lemma": ["cat", "sit", "mat", "dog", "run", "park",
                  "ball", "roll", "hill", "red"],
        "phonemes_str": ["|k|æ|t|"] * 10,
        "frequency_log_zipf": [4.0] * 10,
    })

    run_pipeline(
        sources={"fixture": iter(fixture_sentences)},
        words_df=fake_words,
        out_dir=tmp_path,
        skip_profanity=True,
        skip_naturalness=True,  # tested separately in Task 7
    )

    idx = pl.read_parquet(tmp_path / "corpus_sentences_index.parquet")
    assert idx.height == 3  # dedupped + length-filtered survivors
    words = pl.read_parquet(tmp_path / "corpus_sentences.parquet")
    assert words.height >= 6
    assert words["lemma"].is_in(fake_words["lemma"].to_list()).all()
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL (run_pipeline not defined).

  • [ ] Step 3: Implement run_pipeline + CLI
# packages/data/scripts/build_corpus_sentences.py — replace main():

def run_pipeline(
    sources: dict[str, Iterator[tuple[str, str]]],
    words_df: pl.DataFrame,
    out_dir: Path,
    skip_profanity: bool = False,
    skip_naturalness: bool = False,
    ref_npy: Path | None = None,
    ref_meta_jsonl: Path | None = None,
) -> None:
    """Run the ingest pipeline end-to-end.

    sources: {source_name: iterator of (text, source_record_id)}
    """
    lemma_lookup = build_lemma_lookup(words_df)
    inlined_cols = [c for c in words_df.columns if c != "lemma"]

    # 1. Pre-filter + profanity + dedup across all sources.
    # We can't use `dedup_stream` directly because it drops the source name;
    # do dedup inline here so we carry the triple through.
    seen: set[str] = set()
    pairs: list[tuple[str, str, str]] = []
    for src_name, it in sources.items():
        for text, src_id in it:
            if not prefilter_sentence(text):
                continue
            if not skip_profanity and contains_profanity(text):
                continue
            key = text.lower().strip()
            if key in seen:
                continue
            seen.add(key)
            pairs.append((text, src_id, src_name))

    # 2. spaCy POS+lemma batch parse
    texts = [t for t, _, _ in pairs]
    token_lists = list(extract_content_tokens_batch(texts, batch_size=256))

    # 3. Join + retain
    sentences: list[IngestedSentence] = []
    next_id = 0
    for (text, src_id, src_name), tokens in zip(pairs, token_lists):
        joined = list(join_content_tokens(tokens, lemma_lookup))
        if len(joined) < MIN_CONTENT_IN_VOCAB:
            continue
        oov_count = len(tokens) - len(joined)
        rows = [
            ({"surface": t.surface, "lemma": t.lemma, "pos": t.pos, "position": t.position}, row)
            for t, row in joined
        ]
        sentences.append(IngestedSentence(
            sentence_id=next_id,
            text=text,
            source=src_name,
            source_record_id=src_id,
            n_tokens=len(text.split()),
            rows=rows,
            n_content_oov=oov_count,
        ))
        next_id += 1

    # 4. Emit
    emit_parquets(sentences, out_dir=out_dir, words_columns=inlined_cols)

    # 5. Naturalness pre-score
    if not skip_naturalness:
        if ref_npy is None or ref_meta_jsonl is None:
            ref_npy = RUNTIME / "naturalness_reference.npy"
            ref_meta_jsonl = RUNTIME / "naturalness_reference_meta.jsonl"
        annotate_naturalness(
            out_dir / "corpus_sentences_index.parquet", ref_npy, ref_meta_jsonl,
        )


def main() -> None:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--limit-per-source", type=int, default=None)
    parser.add_argument("--out-dir", type=Path, default=RUNTIME)
    parser.add_argument("--no-profanity-filter", action="store_true")
    parser.add_argument("--skip-naturalness", action="store_true")
    args = parser.parse_args()

    words_df = pl.read_parquet(RUNTIME / "words.parquet")
    sources = {
        "cola": load_cola_positive(args.limit_per_source),
        "ud_ewt": load_ud_ewt(args.limit_per_source),
        "gum": load_gum(args.limit_per_source),
        "childes": load_childes_adult(args.limit_per_source),
        "tatoeba": load_tatoeba_english(args.limit_per_source),
    }
    run_pipeline(
        sources=sources,
        words_df=words_df,
        out_dir=args.out_dir,
        skip_profanity=args.no_profanity_filter,
        skip_naturalness=args.skip_naturalness,
    )
  • [ ] Step 4: Run integration test to verify pass

uv run python -m pytest packages/data/tests/test_corpus_ingest.py::test_pipeline_small_sample -v
Expected: PASS.

  • [ ] Step 5: Update .gitattributes for LFS

Append to .gitattributes:

data/runtime/corpus_sentences.parquet filter=lfs diff=lfs merge=lfs -text
data/runtime/corpus_sentences_index.parquet filter=lfs diff=lfs merge=lfs -text
  • [ ] Step 6: Commit
git add packages/data/scripts/build_corpus_sentences.py \
        packages/data/tests/test_corpus_ingest.py \
        .gitattributes
git commit -m "corpus retrieval: wire ingest pipeline end-to-end + LFS tracking"

Task 9: Runtime CorpusStore + load_corpus

Files: - Create: packages/generation/server/corpus.py - Create: packages/generation/server/tests/test_corpus.py

  • [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_corpus.py
import polars as pl
import tempfile
from pathlib import Path

from packages.generation.server.corpus import load_corpus, CorpusStore


def test_load_corpus(tmp_path):
    # Synthesize tiny corpus parquets
    pl.DataFrame({
        "sentence_id": [0, 1],
        "text": ["The cat ran fast today", "Dogs run in the park"],
        "source": ["cola", "ud_ewt"],
        "source_record_id": ["cola:0", "ud_ewt:0"],
        "n_tokens": [5, 5],
        "n_content_in_vocab": [2, 2],
        "n_content_oov": [0, 0],
        "naturalness_score": [0.6, 0.55],
    }).write_parquet(tmp_path / "corpus_sentences_index.parquet")
    pl.DataFrame({
        "sentence_id": [0, 0, 1, 1],
        "position": [1, 2, 0, 1],
        "surface": ["cat", "ran", "Dogs", "run"],
        "lemma": ["cat", "run", "dog", "run"],
        "pos": ["NOUN", "VERB", "NOUN", "VERB"],
        "phonemes_str": ["|k|æ|t|", "|ɹ|ʌ|n|", "|d|ɔ|ɡ|", "|ɹ|ʌ|n|"],
    }).write_parquet(tmp_path / "corpus_sentences.parquet")

    store = load_corpus(tmp_path)
    assert isinstance(store, CorpusStore)
    idx = store.index_lf.collect()
    assert idx.height == 2
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL (module not found).

  • [ ] Step 3: Implement CorpusStore + load_corpus
# packages/generation/server/corpus.py
"""Runtime corpus retrieval for /api/sentences.

Loads two Parquet files at cold-start (Polars LazyFrames) and exposes a
`match_corpus` function that filters sentences by CSP constraints.

Constraint semantics are exact parity with CSP's per-slot filters:
every in-vocab content (NOUN/VERB/ADJ/ADV) word in the sentence must
satisfy the same `hard_filter_expr` CSP applies to fillers and the verb.
"""
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

import polars as pl


@dataclass(frozen=True)
class CorpusStore:
    index_lf: pl.LazyFrame
    words_lf: pl.LazyFrame


def load_corpus(runtime_dir: Path) -> CorpusStore:
    """Load corpus_sentences{,_index}.parquet as LazyFrames."""
    return CorpusStore(
        index_lf=pl.scan_parquet(runtime_dir / "corpus_sentences_index.parquet"),
        words_lf=pl.scan_parquet(runtime_dir / "corpus_sentences.parquet"),
    )
  • [ ] Step 4: Run test to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_corpus.py::test_load_corpus -v
Expected: PASS.

  • [ ] Step 5: Commit
git add packages/generation/server/corpus.py packages/generation/server/tests/test_corpus.py
git commit -m "corpus retrieval: CorpusStore + load_corpus (Polars LazyFrames)"

Task 10: match_corpus — hard-constraint path

Files: - Modify: packages/generation/server/corpus.py - Modify: packages/generation/server/tests/test_corpus.py

  • [ ] Step 1: Write the failing test
# add to packages/generation/server/tests/test_corpus.py
from phonolex_generators.csp.constraints import ExcludeConstraint, BoundConstraint

from packages.generation.server.corpus import match_corpus


def _make_store(tmp_path):
    pl.DataFrame({
        "sentence_id": [0, 1, 2],
        "text": [
            "The cat sat on the mat",
            "The dog ran fast today",
            "Cars race on the road",
        ],
        "source": ["cola"] * 3,
        "source_record_id": ["c:0", "c:1", "c:2"],
        "n_tokens": [6, 5, 5],
        "n_content_in_vocab": [2, 2, 2],
        "n_content_oov": [0, 0, 0],
        "naturalness_score": [0.7, 0.6, 0.5],
    }).write_parquet(tmp_path / "corpus_sentences_index.parquet")
    pl.DataFrame({
        "sentence_id": [0, 0, 1, 1, 2, 2],
        "position": [1, 2, 1, 2, 0, 1],
        "surface": ["cat", "sat", "dog", "ran", "Cars", "race"],
        "lemma": ["cat", "sit", "dog", "run", "car", "race"],
        "pos": ["NOUN", "VERB"] * 3,
        "phonemes_str": [
            "|k|æ|t|", "|s|ɪ|t|",
            "|d|ɔ|ɡ|", "|ɹ|ʌ|n|",   # /ɹ/ appears here
            "|k|ɑ|ɹ|", "|ɹ|eɪ|s|",  # /ɹ/ appears here too
        ],
        "AoA": [3.0, 3.0, 3.0, 3.5, 4.0, 5.0],
    }).write_parquet(tmp_path / "corpus_sentences.parquet")
    return load_corpus(tmp_path)


def test_match_corpus_exclude(tmp_path):
    store = _make_store(tmp_path)
    matches = match_corpus(
        store, [ExcludeConstraint(phonemes=("ɹ",))],
        pairs_df=None, top_k=10,
    )
    # Only sentence 0 ("cat sat") has no /ɹ/ in any content word.
    assert [m.text for m in matches] == ["The cat sat on the mat"]


def test_match_corpus_bound(tmp_path):
    store = _make_store(tmp_path)
    matches = match_corpus(
        store, [BoundConstraint(norm="AoA", max_value=3.5)],
        pairs_df=None, top_k=10,
    )
    # Sentences 0 (AoA all ≤ 3.0) and 1 (AoA 3.0, 3.5) pass.
    texts = {m.text for m in matches}
    assert texts == {"The cat sat on the mat", "The dog ran fast today"}
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL.

  • [ ] Step 3: Implement match_corpus hard path
# packages/generation/server/corpus.py — add:

from pydantic import BaseModel
from typing import Literal

from phonolex_generators.csp.constraints import (
    Constraint as CSPConstraint,
    MinpairConstraint, MaxoppConstraint, MultoppConstraint,
    hard_filter_expr,
)


class CorpusMatch(BaseModel):
    text: str
    source: Literal["cola", "ud_ewt", "gum", "childes", "tatoeba"]
    naturalness_score: float
    n_content_in_vocab: int


def match_corpus(
    store: CorpusStore,
    constraints: list[CSPConstraint],
    pairs_df: pl.DataFrame | None,
    top_k: int,
) -> list[CorpusMatch]:
    # Multopp is paragraph-only — caller must surface
    # corpus_skipped_reason="multopp_paragraph_only" before calling. We
    # defensively return [] here in case the caller forgets.
    if any(isinstance(c, MultoppConstraint) for c in constraints):
        return []

    # 1. Hard filter — same expression CSP applies per slot.
    expr = hard_filter_expr(constraints)
    words_lf = store.words_lf
    if expr is not None:
        words_lf = words_lf.with_columns(passes=expr)
    else:
        words_lf = words_lf.with_columns(passes=pl.lit(True))

    ok_sids = (
        words_lf
        .group_by("sentence_id")
        .agg(pl.col("passes").all().alias("ok"))
        .filter(pl.col("ok"))
        .select("sentence_id")
    )

    # 2. Contrastive (placeholder — Task 11 fills this in)
    contrast = [c for c in constraints if isinstance(c, (MinpairConstraint, MaxoppConstraint))]
    if contrast:
        # Implemented in Task 11; for now, fail loudly if reached.
        raise NotImplementedError("Contrastive corpus matching arrives in Task 11.")

    # 3. Join survivors with index, sort by naturalness_score
    matches_df = (
        store.index_lf
        .join(ok_sids, on="sentence_id", how="inner")
        .sort("naturalness_score", descending=True, nulls_last=True)
        .head(top_k)
        .collect()
    )

    return [
        CorpusMatch(
            text=row["text"],
            source=row["source"],
            naturalness_score=row["naturalness_score"] or 0.0,
            n_content_in_vocab=row["n_content_in_vocab"],
        )
        for row in matches_df.iter_rows(named=True)
    ]
  • [ ] Step 4: Run tests to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_corpus.py -v
Expected: 2 new tests PASS.

  • [ ] Step 5: Commit
git add packages/generation/server/corpus.py packages/generation/server/tests/test_corpus.py
git commit -m "corpus retrieval: match_corpus hard-constraint path (Exclude/Bound/Pattern)"

Task 11: match_corpus — contrastive path

Files: - Modify: packages/generation/server/corpus.py - Modify: packages/generation/server/tests/test_corpus.py

  • [ ] Step 1: Write the failing test
# add to packages/generation/server/tests/test_corpus.py
from phonolex_generators.csp.constraints import MinpairConstraint


def _make_pairs_df():
    return pl.DataFrame({
        "word1": ["cat", "dog"],
        "word2": ["bat", "log"],
        "phoneme1": ["k", "d"],
        "phoneme2": ["b", "l"],
        "position_type": ["initial", "initial"],
        "feature_distance": [1.2, 1.5],
        "sonorant_diff": [0.0, 1.0],
    })


def test_match_corpus_minpair(tmp_path):
    # Sentence containing both "cat" and "bat" should pass an initial /k/-/b/ minpair.
    pl.DataFrame({
        "sentence_id": [0, 1],
        "text": ["The cat saw the bat clearly", "The dog chased the squirrel"],
        "source": ["cola", "cola"],
        "source_record_id": ["c:0", "c:1"],
        "n_tokens": [6, 5],
        "n_content_in_vocab": [3, 3],
        "n_content_oov": [0, 0],
        "naturalness_score": [0.7, 0.6],
    }).write_parquet(tmp_path / "corpus_sentences_index.parquet")
    pl.DataFrame({
        "sentence_id": [0, 0, 0, 1, 1, 1],
        "position": [1, 2, 4, 1, 2, 4],
        "surface": ["cat", "saw", "bat", "dog", "chased", "squirrel"],
        "lemma": ["cat", "see", "bat", "dog", "chase", "squirrel"],
        "pos": ["NOUN", "VERB", "NOUN", "NOUN", "VERB", "NOUN"],
        "phonemes_str": ["|k|æ|t|", "|s|ɔ|", "|b|æ|t|",
                          "|d|ɔ|ɡ|", "|tʃ|eɪ|s|t|", "|s|k|w|ɝ|l|"],
    }).write_parquet(tmp_path / "corpus_sentences.parquet")

    store = load_corpus(tmp_path)
    pairs_df = _make_pairs_df()
    matches = match_corpus(
        store,
        [MinpairConstraint(phoneme1="k", phoneme2="b", position="initial")],
        pairs_df=pairs_df,
        top_k=10,
    )
    assert [m.text for m in matches] == ["The cat saw the bat clearly"]
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL (NotImplementedError).

  • [ ] Step 3: Implement contrastive path

Replace the placeholder block in match_corpus:

# packages/generation/server/corpus.py — replace contrastive block:

    contrast = [c for c in constraints if isinstance(c, (MinpairConstraint, MaxoppConstraint))]
    if contrast:
        if len(contrast) > 1:
            raise ValueError("at most one contrastive constraint per request")
        if pairs_df is None:
            raise ValueError("contrastive constraint requires pairs_df")

        from phonolex_generators.csp.skeleton import _load_pairs_for_request

        # Surviving lemmas from the hard-filter pass (per-row, already in ok_sids).
        survivors_lemmas = (
            words_lf
            .join(ok_sids, on="sentence_id", how="inner")
            .filter(pl.col("passes"))
            .select("lemma")
            .unique()
            .collect()
            ["lemma"].to_list()
        )
        pair_frame = _load_pairs_for_request(
            constraint=contrast[0],
            pairs_df=pairs_df,
            filtered_spec=frozenset(survivors_lemmas),
        )
        if pair_frame.height == 0:
            return []

        # Build set of (lemma, sentence_id) for hard-filter survivors.
        survivor_rows = (
            words_lf
            .join(ok_sids, on="sentence_id", how="inner")
            .filter(pl.col("passes"))
            .select(["sentence_id", "lemma"])
            .collect()
        )
        # Group lemmas per sentence.
        sentence_lemmas = (
            survivor_rows
            .group_by("sentence_id")
            .agg(pl.col("lemma").alias("lemmas"))
        )

        # Determine which sentences contain both halves of any surviving pair.
        # Brute force is acceptable at corpus scale (~500K-1M sentences, ~500K pairs):
        # build a {frozenset({w1,w2}) for each pair} and check per-sentence.
        pair_set: set[frozenset[str]] = set()
        for row in pair_frame.iter_rows(named=True):
            pair_set.add(frozenset({row["filler_a"], row["filler_b"]}))

        passing_sids: list[int] = []
        for row in sentence_lemmas.iter_rows(named=True):
            lemmas = set(row["lemmas"])
            for pair in pair_set:
                if pair <= lemmas:
                    passing_sids.append(row["sentence_id"])
                    break

        if not passing_sids:
            return []

        ok_sids = pl.LazyFrame({"sentence_id": passing_sids})
  • [ ] Step 4: Run tests to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_corpus.py -v
Expected: 3 tests PASS.

  • [ ] Step 5: Commit
git add packages/generation/server/corpus.py packages/generation/server/tests/test_corpus.py
git commit -m "corpus retrieval: match_corpus contrastive path (Minpair/Maxopp)"

Task 12: Shared validate_constraints helper

Files: - Create: packages/generation/server/validation.py - Modify: packages/generation/server/routes/generate.py - Create: packages/generation/server/tests/test_validation.py

  • [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_validation.py
import pytest
import polars as pl
from fastapi import HTTPException

from packages.generation.server.validation import validate_constraints
from packages.generation.server.schemas import (
    BoundConstraint, MinpairConstraint, MultoppConstraint,
)


def _fake_df():
    return pl.DataFrame({"word": ["a"], "phonemes_str": ["|a|"], "AoA": [3.0]})


def test_unknown_bound_norm_raises_422():
    with pytest.raises(HTTPException) as exc:
        validate_constraints([BoundConstraint(type="bound", norm="nonexistent")], _fake_df())
    assert exc.value.status_code == 422


def test_multiple_contrastive_raises():
    with pytest.raises(ValueError):
        validate_constraints([
            MinpairConstraint(type="contrastive_minpair", phoneme1="k", phoneme2="b"),
            MinpairConstraint(type="contrastive_minpair", phoneme1="d", phoneme2="t"),
        ], _fake_df())


def test_multopp_in_sentences_raises():
    with pytest.raises(ValueError):
        validate_constraints(
            [MultoppConstraint(type="contrastive_multopp", substitute="ɹ", targets=["s", "t"])],
            _fake_df(),
            context="sentences",
        )
  • [ ] Step 2: Run tests to verify they fail

Expected: FAIL.

  • [ ] Step 3: Lift validation into shared module
# packages/generation/server/validation.py
"""Shared constraint validation for /api/generate-sentences and /api/sentences.

Lifted from routes/generate.py:_validate_constraints. Same checks, exposed
to the orchestrator endpoint so both paths fail the same way.
"""
from __future__ import annotations
from typing import Literal

import polars as pl
from fastapi import HTTPException

from .schemas import Constraint, MinpairConstraint, MaxoppConstraint, MultoppConstraint


def validate_constraints(
    constraints: list[Constraint],
    df: pl.DataFrame,
    context: Literal["sentences", "paragraphs"] = "sentences",
) -> None:
    """Raise 422 (or ValueError for solver-side checks) on invalid constraints."""
    word_cols = set(df.columns)
    bad: list[tuple[str, str]] = []
    for c in constraints:
        if c.type == "bound" and c.norm not in word_cols:
            bad.append((c.norm, "bound.norm"))
    if bad:
        valid = sorted(
            col for col in df.columns
            if df[col].dtype.is_numeric() and not col.startswith("_")
        )
        unknown = ", ".join(f"{path}={n!r}" for n, path in bad)
        raise HTTPException(
            status_code=422,
            detail=f"Unknown norm(s): {unknown}. Valid norms (numeric columns): {valid}",
        )

    contrast = [c for c in constraints
                if isinstance(c, (MinpairConstraint, MaxoppConstraint))]
    if len(contrast) > 1:
        raise ValueError("at most one contrastive constraint per request")

    multopp = [c for c in constraints if isinstance(c, MultoppConstraint)]
    if multopp and context == "sentences":
        raise ValueError(
            "contrastive_multopp produces an N+1-sentence opposition set "
            "(substitute + N targets sharing verb+role). It is paragraph-only — "
            "use POST /api/generate-paragraphs."
        )


def has_multopp(constraints: list[Constraint]) -> bool:
    return any(isinstance(c, MultoppConstraint) for c in constraints)
  • [ ] Step 4: Update routes/generate.py to import the shared helper
# packages/generation/server/routes/generate.py — replace local _validate_constraints
# Remove the local definition and call sites; replace with:
from ..validation import validate_constraints

# Update call sites:
#     _validate_constraints(req.constraints, request.app.state.store.df)
# becomes:
#     validate_constraints(req.constraints, request.app.state.store.df)

Find the two call sites in routes/generate.py (sentences route + paragraphs route) and update each.

  • [ ] Step 5: Run tests to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_validation.py -v
cd packages/generation && uv run python -m pytest server/tests/ -v  # ensure nothing regressed
Expected: PASS, and no existing test regressions.

  • [ ] Step 6: Commit
git add packages/generation/server/validation.py \
        packages/generation/server/routes/generate.py \
        packages/generation/server/tests/test_validation.py
git commit -m "corpus retrieval: lift validate_constraints into shared validation module"

Task 13: Add CorpusMatch/SentencesRequest/SentencesResponse schemas + SyntheticMatch alias

Files: - Modify: packages/generation/server/schemas.py - Create: packages/generation/server/tests/test_schemas.py

  • [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_schemas.py
from packages.generation.server.schemas import (
    CorpusMatch, SentencesRequest, SentencesResponse, SyntheticMatch, SentenceCandidate,
)


def test_corpus_match_validates():
    m = CorpusMatch(text="The cat sat.", source="cola",
                    naturalness_score=0.7, n_content_in_vocab=2)
    assert m.source == "cola"


def test_synthetic_match_is_sentence_candidate_alias():
    assert SyntheticMatch is SentenceCandidate


def test_sentences_request_defaults():
    req = SentencesRequest(constraints=[])
    assert req.top_k_corpus == 10
    assert req.top_k_synthetic == 10
    assert req.include_synthetic is True


def test_sentences_response_shape():
    resp = SentencesResponse(
        corpus_matches=[],
        synthetic_matches=[],
        corpus_skipped_reason=None,
        synthetic_skipped_reason=None,
        elapsed_ms={"corpus": 0, "synthetic": 0, "total": 0},
    )
    assert resp.elapsed_ms["total"] == 0
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL.

  • [ ] Step 3: Add the new schemas
# packages/generation/server/schemas.py — append:

from typing import TypeAlias

# Alias the existing CSP candidate type as SyntheticMatch so the orchestrator
# response uses the corpus/synthetic vocabulary. Field shapes unchanged.
SyntheticMatch: TypeAlias = SentenceCandidate


class CorpusMatch(BaseModel):
    text: str
    source: Literal["cola", "ud_ewt", "gum", "childes", "tatoeba"]
    naturalness_score: float
    n_content_in_vocab: int


class SentencesRequest(BaseModel):
    constraints: list[Constraint] = Field(default_factory=list)
    spec: str = "all"
    band: str = "all"
    top_k_corpus: int = 10
    top_k_synthetic: int = 10
    include_synthetic: bool = True
    max_candidates: int = 5000


class SentencesResponse(BaseModel):
    corpus_matches: list[CorpusMatch]
    synthetic_matches: list[SyntheticMatch]
    corpus_skipped_reason: Literal["multopp_paragraph_only"] | None = None
    synthetic_skipped_reason: Literal["disabled_by_caller", "error"] | None = None
    elapsed_ms: dict[str, int]
  • [ ] Step 4: Run tests to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_schemas.py -v
Expected: PASS.

  • [ ] Step 5: Commit
git add packages/generation/server/schemas.py packages/generation/server/tests/test_schemas.py
git commit -m "corpus retrieval: CorpusMatch/SentencesRequest/Response schemas + SyntheticMatch alias"

Task 14: /api/sentences orchestrator route

Files: - Create: packages/generation/server/routes/sentences.py - Create: packages/generation/server/tests/test_sentences_orchestrator.py

  • [ ] Step 1: Write the failing test
# packages/generation/server/tests/test_sentences_orchestrator.py
"""Orchestrator endpoint tests. Mocks corpus + CSP to keep this fast."""
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock


@pytest.fixture
def client(monkeypatch, tmp_path):
    # Build a tiny CorpusStore + minimal app.state stubs.
    # Detailed setup is verbose; the goal here is to exercise the response shape
    # and the parallel-execution behavior. Patch the inner workers so we don't
    # need real data on disk.
    from packages.generation.server.main import app

    fake_corpus_match = MagicMock(text="The cat sat.", source="cola",
                                   naturalness_score=0.7, n_content_in_vocab=2)
    fake_synthetic = MagicMock(sentence="The cat sees the dog.",
                                composite_score=0.65)

    monkeypatch.setattr(
        "packages.generation.server.routes.sentences.run_corpus_path",
        lambda *a, **kw: [fake_corpus_match],
    )
    monkeypatch.setattr(
        "packages.generation.server.routes.sentences.run_synthetic_path",
        lambda *a, **kw: [fake_synthetic],
    )
    return TestClient(app)


def test_sentences_endpoint_returns_both_lists(client):
    resp = client.post("/api/sentences", json={"constraints": [], "include_synthetic": True})
    assert resp.status_code == 200
    body = resp.json()
    assert "corpus_matches" in body and "synthetic_matches" in body
    assert len(body["corpus_matches"]) == 1
    assert len(body["synthetic_matches"]) == 1
    assert body["corpus_skipped_reason"] is None
    assert body["synthetic_skipped_reason"] is None


def test_include_synthetic_false_skips_csp(client):
    resp = client.post("/api/sentences", json={"constraints": [], "include_synthetic": False})
    body = resp.json()
    assert body["synthetic_matches"] == []
    assert body["synthetic_skipped_reason"] == "disabled_by_caller"


def test_multopp_skips_corpus(client):
    multopp = {"type": "contrastive_multopp", "substitute": "ɹ", "targets": ["s", "t"]}
    # Multopp on /api/sentences should 422 because Multopp is paragraph-only;
    # the orchestrator validates before dispatching either path.
    resp = client.post("/api/sentences", json={"constraints": [multopp]})
    assert resp.status_code == 422
  • [ ] Step 2: Run tests to verify they fail

Expected: FAIL.

  • [ ] Step 3: Implement the orchestrator route
# packages/generation/server/routes/sentences.py
"""Orchestrator endpoint that runs corpus retrieval and CSP synthetic
generation in parallel and returns both ranked lists in one envelope.

Routes:
    POST /api/sentences   — orchestrator
"""
from __future__ import annotations

import asyncio
import time

from fastapi import APIRouter, HTTPException, Request

from ..corpus import CorpusStore, match_corpus
from ..schemas import (
    Constraint, SentencesRequest, SentencesResponse,
    CorpusMatch, SyntheticMatch,
    MultoppConstraint,
)
from ..validation import validate_constraints, has_multopp

# Import CSP machinery (mirrors routes/generate.py)
from phonolex_generators.csp import solver
from phonolex_generators.csp.reranker.rerank import rerank_with_axes
from phonolex_generators.csp.skeleton import spec_lexicon
from phonolex_generators.csp.constraints import (
    ExcludeConstraint as CExclude, BoundConstraint as CBound,
    MinpairConstraint as CMinpair, MaxoppConstraint as CMaxopp,
    PatternConstraint as CPattern,
)


router = APIRouter()


def _to_csp_constraints(constraints: list[Constraint]) -> list:
    """Convert pydantic constraints to CSP dataclasses. Mirrors the
    conversion already done inside routes/generate.py — keep in sync."""
    out = []
    for c in constraints:
        if c.type == "exclude":
            out.append(CExclude(phonemes=tuple(c.phonemes)))
        elif c.type == "bound":
            out.append(CBound(norm=c.norm, min_value=c.min_value, max_value=c.max_value))
        elif c.type == "contrastive_minpair":
            out.append(CMinpair(phoneme1=c.phoneme1, phoneme2=c.phoneme2,
                                position=c.position, slots=c.slots))
        elif c.type == "contrastive_maxopp":
            out.append(CMaxopp(phoneme1=c.phoneme1, phoneme2=c.phoneme2,
                                position=c.position,
                                min_sonorant_diff=c.min_sonorant_diff,
                                slots=c.slots))
        elif c.type == "pattern":
            out.append(CPattern(pattern_type=c.pattern_type,
                                 phonemes=tuple(c.phonemes)))
    return out


def run_corpus_path(
    corpus_store: CorpusStore,
    constraints: list,
    pairs_df,
    top_k: int,
) -> list[CorpusMatch]:
    return match_corpus(corpus_store, constraints, pairs_df, top_k)


def run_synthetic_path(
    request: Request,
    csp_constraints: list,
    spec: str,
    band: str,
    top_k: int,
    max_candidates: int,
) -> list[SyntheticMatch]:
    """Run the same CSP+rerank path that /api/generate-sentences uses.
    Returns a list of SyntheticMatch (== SentenceCandidate)."""
    state = request.app.state
    spec_words = spec_lexicon(state.store, spec)
    candidates = solver.solve(
        spec_words=spec_words,
        word_df=state.store.df,
        sel_df=state.sel_df,
        pairs_df=state.pairs_df,
        skeletons_df=state.skeletons_df,
        band=band,
        constraints=csp_constraints,
        max_candidates=max_candidates,
    )
    if not candidates:
        return []
    reranked = rerank_with_axes(candidates, top_k=top_k)
    # rerank_with_axes returns the same shape as routes/generate.py converts
    # into SentenceCandidate. Reuse that conversion path:
    from .generate import _to_sentence_candidates  # type: ignore[attr-defined]
    return _to_sentence_candidates(reranked)


@router.post("/sentences", response_model=SentencesResponse)
async def post_sentences(req: SentencesRequest, request: Request) -> SentencesResponse:
    state = request.app.state
    validate_constraints(req.constraints, state.store.df, context="sentences")

    csp_constraints = _to_csp_constraints(req.constraints)

    corpus_skipped: str | None = None
    synthetic_skipped: str | None = None
    elapsed: dict[str, int] = {}

    t0 = time.perf_counter()

    # Corpus path always runs (validate_constraints already rejected Multopp).
    corpus_task = asyncio.create_task(
        asyncio.to_thread(
            run_corpus_path, state.corpus_store, csp_constraints,
            state.pairs_df, req.top_k_corpus,
        )
    )

    if req.include_synthetic:
        synthetic_task = asyncio.create_task(
            asyncio.to_thread(
                run_synthetic_path, request, csp_constraints,
                req.spec, req.band, req.top_k_synthetic, req.max_candidates,
            )
        )
    else:
        synthetic_task = None
        synthetic_skipped = "disabled_by_caller"

    try:
        corpus_matches = await corpus_task
    except Exception as e:
        corpus_matches = []
        # Don't blow up the whole response on corpus failure.
        corpus_skipped = "error"
    elapsed["corpus"] = int((time.perf_counter() - t0) * 1000)

    if synthetic_task:
        try:
            synthetic_matches = await synthetic_task
        except Exception:
            synthetic_matches = []
            synthetic_skipped = "error"
    else:
        synthetic_matches = []
    elapsed["synthetic"] = int((time.perf_counter() - t0) * 1000) - elapsed["corpus"]
    elapsed["total"] = int((time.perf_counter() - t0) * 1000)

    return SentencesResponse(
        corpus_matches=corpus_matches,
        synthetic_matches=synthetic_matches,
        corpus_skipped_reason=corpus_skipped,  # type: ignore[arg-type]
        synthetic_skipped_reason=synthetic_skipped,  # type: ignore[arg-type]
        elapsed_ms=elapsed,
    )
  • [ ] Step 4: Extract _to_sentence_candidates from generate.py

If routes/generate.py builds SentenceCandidate objects inline, extract that conversion into a named function _to_sentence_candidates(reranked) -> list[SentenceCandidate] and import it from the new orchestrator. Verify by reading lines that build SentenceCandidate(...) in routes/generate.py and lifting them into a single helper. Then both /api/generate-sentences and /api/sentences call the same function.

  • [ ] Step 5: Mount the route in main.py
# packages/generation/server/main.py — modify routes block:

from .routes import generate, sentences
app.include_router(generate.router, prefix="/api")
app.include_router(sentences.router, prefix="/api")
  • [ ] Step 6: Run orchestrator tests to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_sentences_orchestrator.py -v
Expected: PASS.

  • [ ] Step 7: Commit
git add packages/generation/server/routes/sentences.py \
        packages/generation/server/routes/generate.py \
        packages/generation/server/main.py \
        packages/generation/server/tests/test_sentences_orchestrator.py
git commit -m "corpus retrieval: /api/sentences orchestrator (corpus ∥ CSP)"

Task 15: Cold-start CorpusStore in main.py

Files: - Modify: packages/generation/server/main.py

  • [ ] Step 1: Write the failing test (smoke)
# add to packages/generation/server/tests/test_sentences_orchestrator.py
def test_corpus_store_loaded_at_startup():
    from packages.generation.server.main import app
    # The lifespan handler should have populated app.state.corpus_store.
    # Run a startup cycle by entering the lifespan context.
    import asyncio
    from fastapi.testclient import TestClient
    with TestClient(app) as client:
        assert hasattr(app.state, "corpus_store")
        assert app.state.corpus_store is not None
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL (corpus_store attribute missing).

  • [ ] Step 3: Load corpus at cold-start
# packages/generation/server/main.py — extend lifespan:

from .corpus import load_corpus

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ...existing loads...
    print(f"[startup] loading corpus_sentences{{,_index}}.parquet…")
    app.state.corpus_store = load_corpus(DATA_RUNTIME)
    # Also cache pairs_df for the orchestrator (CSP loads it lazily inside
    # solve(); the orchestrator wants it at hand for match_corpus).
    print(f"[startup] loading pairs.parquet…")
    app.state.pairs_df = pl.read_parquet(DATA_RUNTIME / "pairs.parquet")
    print(f"[startup] ready.")
    yield
  • [ ] Step 4: Run test to verify pass

cd packages/generation && uv run python -m pytest server/tests/test_sentences_orchestrator.py::test_corpus_store_loaded_at_startup -v
Expected: PASS. If the test fails with FileNotFoundError on corpus_sentences.parquet, that means the ingest from Task 8 hasn't been run yet — run it on at least a tiny fixture (or wrap the load in a try/except for dev environments without the artifact).

  • [ ] Step 5: Commit
git add packages/generation/server/main.py packages/generation/server/tests/test_sentences_orchestrator.py
git commit -m "corpus retrieval: load CorpusStore + pairs_df at FastAPI cold-start"

Task 16: Worker /sentences proxy route

Files: - Modify: packages/web/workers/src/routes/generation.ts - Modify: packages/web/workers/test/routes/generation.test.ts (path to verify)

  • [ ] Step 1: Write the failing test
// packages/web/workers/test/routes/generation.test.ts — add:
import { describe, it, expect } from 'vitest';
import { createApp } from '../../src/index';  // adjust import to match existing test pattern

describe('POST /sentences', () => {
  it('proxies to backend /api/sentences', async () => {
    const app = createApp({ /* env with mocked GENERATION_SERVICE */ });
    const res = await app.request('/sentences', {
      method: 'POST',
      body: JSON.stringify({ constraints: [], include_synthetic: true }),
      headers: { 'Content-Type': 'application/json' },
    });
    expect(res.status).toBe(200);
  });
});

(Adapt to the project's existing Worker test harness — read packages/web/workers/test/ for the convention used by the /generate-sentences test.)

  • [ ] Step 2: Run test to verify it fails

cd packages/web/workers && npm test -- --run generation.test
Expected: FAIL (route not registered).

  • [ ] Step 3: Add the proxy route
// packages/web/workers/src/routes/generation.ts — add after existing routes:

generation.post('/sentences', (c) => proxy(c, 'POST', '/api/sentences'));
  • [ ] Step 4: Run test to verify pass

cd packages/web/workers && npm test -- --run generation.test
Expected: PASS.

  • [ ] Step 5: Commit
git add packages/web/workers/src/routes/generation.ts \
        packages/web/workers/test/routes/generation.test.ts
git commit -m "corpus retrieval: Worker proxy for /api/sentences"

Task 17: Frontend types

Files: - Modify: packages/web/frontend/src/types/governance.ts (path to verify by reading the file)

  • [ ] Step 1: Read the existing types file

cat packages/web/frontend/src/types/governance.ts | head -100
Confirm the file path and inspect the existing SentenceCandidate (or equivalent) type. The orchestrator response uses the same fields.

  • [ ] Step 2: Add CorpusMatch, SyntheticMatch, SentencesResponse types
// packages/web/frontend/src/types/governance.ts — append:

export type CorpusSource = 'cola' | 'ud_ewt' | 'gum' | 'childes' | 'tatoeba';

export interface CorpusMatch {
  text: string;
  source: CorpusSource;
  naturalness_score: number;
  n_content_in_vocab: number;
}

// SyntheticMatch is the corpus/synthetic-vocabulary alias for the existing
// candidate shape (sentence, composite_score, axis_scores, verb, fillers, ...).
// Re-export the existing type under the new name so call sites stay short.
export type SyntheticMatch = SentenceCandidate;  // adjust if the type is named differently

export interface SentencesResponse {
  corpus_matches: CorpusMatch[];
  synthetic_matches: SyntheticMatch[];
  corpus_skipped_reason: 'multopp_paragraph_only' | null;
  synthetic_skipped_reason: 'disabled_by_caller' | 'error' | null;
  elapsed_ms: { corpus: number; synthetic: number; total: number };
}
  • [ ] Step 3: Run TypeScript check

cd packages/web/frontend && npm run typecheck
Expected: no errors.

  • [ ] Step 4: Commit
git add packages/web/frontend/src/types/governance.ts
git commit -m "corpus retrieval: frontend types — CorpusMatch + SyntheticMatch + SentencesResponse"

Task 18: Frontend — two stacked sections on Generate page

Files: - Modify: packages/web/frontend/src/pages/Generate.tsx or equivalent (path to verify by grepping the codebase)

  • [ ] Step 1: Find the Generate page

grep -rln "generate-sentences" packages/web/frontend/src --include="*.tsx" --include="*.ts"
Note the file that posts to /api/generate-sentences. That is the page to modify.

  • [ ] Step 2: Switch endpoint + add two-section render
// In the page file from Step 1 — switch the fetch URL and adjust the response handling.
// (Concrete diff depends on the existing code; the patterns below are the
// shape to land on — adapt to the page's existing API client + state model.)

// 1. Endpoint switch:
//   POST /api/generate-sentences  →  POST /api/sentences
//
// 2. Request body:
//   { constraints, spec, band, top_k: 8 }
//   becomes
//   { constraints, spec, band, top_k_corpus: 10, top_k_synthetic: 10, include_synthetic: true }
//
// 3. Response type:
//   GenerateSentencesResponse  →  SentencesResponse  (imported from types/governance.ts)
//
// 4. Render — replace the single result list with two sections:

import type { SentencesResponse, CorpusMatch, SyntheticMatch } from '../types/governance';

function CorpusSection({ matches, skipped }: { matches: CorpusMatch[]; skipped: 'multopp_paragraph_only' | null }) {
  if (skipped === 'multopp_paragraph_only') {
    return <SectionHeader title="Corpus matches"
                          subtitle="Multiple opposition is a paragraph property — see Synthetic matches below." />;
  }
  if (matches.length === 0) {
    return <SectionHeader title="Corpus matches"
                          subtitle="No attested sentences match these constraints. Try the synthetic matches below." />;
  }
  return (
    <>
      <SectionHeader title="Corpus matches"
                     subtitle={`${matches.length} attested sentences match your constraints`} />
      {matches.map((m, i) => (
        <CorpusRow key={i} match={m} />
      ))}
    </>
  );
}

function SyntheticSection({ matches, skipped }: { matches: SyntheticMatch[]; skipped: 'disabled_by_caller' | 'error' | null }) {
  if (skipped === 'error') {
    return <SectionHeader title="Synthetic matches"
                          subtitle="Synthetic generation failed — see corpus matches above." />;
  }
  if (matches.length === 0) {
    return <SectionHeader title="Synthetic matches"
                          subtitle="No synthetic matches — try loosening your constraints." />;
  }
  return (
    <>
      <SectionHeader title="Synthetic matches"
                     subtitle={`${matches.length} generated alternatives`} />
      {matches.map((m, i) => (
        <SyntheticRow key={i} match={m} />
      ))}
    </>
  );
}

// In the page body:
{result && (
  <>
    <CorpusSection matches={result.corpus_matches} skipped={result.corpus_skipped_reason} />
    <Divider />
    <SyntheticSection matches={result.synthetic_matches} skipped={result.synthetic_skipped_reason} />
  </>
)}

CorpusRow renders the sentence text with a small source pill (CoLA / UD-EWT / GUM / CHILDES / Tatoeba) and the naturalness_score on hover. SyntheticRow reuses the existing per-candidate row component already used on the page.

  • [ ] Step 3: Manual smoke test in the dev server
cd packages/web/frontend && npm run dev
# In another terminal: start the generation server
cd /Users/jneumann/Repos/PhonoLex && uv run uvicorn packages.generation.server.main:app --host 0.0.0.0 --port 8000

In the browser, run a query with Exclude(/ɹ/) and verify two stacked sections render: corpus matches (real attested text containing no /ɹ/) then synthetic matches. Test the empty-state copy by setting an over-restrictive bound (e.g., Bound(AoA, max=2) with Pattern(STARTS_WITH, /z/)) — corpus section should show "No attested sentences match…".

  • [ ] Step 4: Commit
git add packages/web/frontend/src/
git commit -m "corpus retrieval: Generate page — two stacked sections (corpus + synthetic)"

Task 19: Documentation — CLAUDE.md update

Files: - Modify: CLAUDE.md

  • [ ] Step 1: Update the Generation Runtime Data Contract section

Read the existing section in CLAUDE.md titled "Generation Runtime Data Contract (PHON-93 / PHON-106 / PHON-107 / PHON-109 / PHON-110)" and add two bullets to the list of runtime artifacts:

- `corpus_sentences.parquet` — per-(sentence, in-vocab content word) rows, ~1–3M rows × ~167 norm cols denormalized from words.parquet. Drives /api/sentences corpus matching; reuses CSP's hard_filter_expr exactly (every in-vocab content word must satisfy the filter, mirroring CSP's per-slot enforcement). Built by `packages/data/scripts/build_corpus_sentences.py`.
- `corpus_sentences_index.parquet` — per-sentence header rows with text, source, n_content_in_vocab, and precomputed naturalness_score (mean top-K Qwen3-Embedding cosine vs naturalness_reference.npy, self-row excluded).

Also update the architecture section:

  • Add /api/sentences to the endpoint list ("Four endpoints: …, /api/sentences (orchestrator)").
  • Add a short paragraph under Governed Generation explaining the corpus/synthetic split: "v5.3 (or successor) introduces a corpus retrieval path alongside CSP synthetic generation. The /api/sentences orchestrator runs both in parallel and returns ranked corpus matches first, ranked synthetic matches below. Real-world attested text reads naturally by definition and augments synthetic output, especially on permissive constraint combinations. Corpus retrieval reuses CSP's hard_filter_expr and _load_pairs_for_request directly — constraint semantics are exact parity across the two paths."

Add a Code Style / Key Patterns bullet:

**Corpus vs synthetic vocabulary** — Real-world attested text is called "corpus" everywhere; CSP-generated text is called "synthetic". Used in schemas (`CorpusMatch`, `SyntheticMatch`), response fields (`corpus_matches`, `synthetic_matches`), UI section headers ("Corpus matches", "Synthetic matches"), and docs. Don't introduce alternatives like "real-world" / "generated" / "examples" / "alternatives" — vocabulary drift makes the two surfaces harder to talk about consistently.
  • [ ] Step 2: Commit
git add CLAUDE.md
git commit -m "docs: corpus retrieval — Generation Runtime Data Contract + endpoints + terminology"

Self-review summary

This plan's task structure maps cleanly to the spec's sections:

Spec section Plan tasks
Data model (index + words parquets) 6 (schemas + emit)
Ingest pipeline (5 stages) 1 (loaders), 2 (pre-filter), 3 (spaCy), 4 (lemma-join), 5 (profanity), 6 (emit), 7 (naturalness), 8 (wire e2e)
Query path (hard + contrastive) 9 (loader), 10 (hard), 11 (contrastive)
Orchestrator endpoint 12 (validation), 13 (schemas), 14 (route), 15 (cold-start)
Worker proxy 16
Frontend 17 (types), 18 (page)
Docs 19

Open decisions §1 (SentenceCandidate rename) is resolved by aliasing SyntheticMatch = SentenceCandidate (Task 13) — no risky rename, but the orchestrator surface uses corpus/synthetic vocabulary externally.

Open decisions §2 (slots UX) is deferred: Task 18 leaves the slots picker untouched on the page; a small follow-up ticket can decide whether to annotate or hide it.

Open decisions §3 (SSE streaming) is deferred to a follow-up — Task 18 ships the wait-for-both behavior.

Open decisions §4 (denylist source) is resolved by Task 5 using better-profanity.

The plan does not include a git push step at the end of each task because pushing is a per-feature decision; once the full feature is green locally, the engineer pushes the branch and opens a PR against develop. Per feedback_finish_the_job.md, the engineer should push + open the PR once Task 19 lands.