Skip to content

RunPod Serverless Deployment — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Deploy the governed generation server (T5Gemma 9B-2B + constraint engine) to RunPod Serverless with scale-to-zero, proxied through the existing Cloudflare Worker so the frontend talks to a single API domain.

Architecture: RunPod Serverless handler wraps the existing generation pipeline. Model weights are baked into the Docker image for fast FlashBoot cold starts. The Cloudflare Worker at api.phonolex.com gets a /api/generate/* route that proxies to RunPod, adding the API key server-side. The handler uses RunPod's generator streaming so status updates flow back through the Worker as SSE events — preserving the existing frontend UX.

Tech Stack: RunPod Serverless (GPU), Docker (packaging), Hono streaming (Worker proxy), existing FastAPI generation code (reused as library)


File Structure

New Files

File Responsibility
packages/generation/rp_handler.py RunPod serverless handler — model loading + generation pipeline
packages/generation/Dockerfile Docker image definition — PyTorch base, model weights, app code
packages/generation/.dockerignore Excludes tests, dev files from Docker build
packages/web/workers/src/routes/generation.ts Cloudflare Worker proxy — RunPod API calls + SSE streaming

Modified Files

File Change
packages/generation/server/routes/generate.py Extract generate_pipeline() async generator from _generate_sse()
packages/web/workers/src/types.ts Add RUNPOD_API_KEY, RUNPOD_ENDPOINT_ID to Env
packages/web/workers/src/index.ts Mount generation proxy route, add staging-api CORS origin
packages/web/workers/wrangler.toml Document RunPod secrets (set via wrangler secret put)
packages/web/frontend/src/lib/generationApi.ts Default VITE_GENERATION_API_URL to VITE_API_URL for production
.github/workflows/deploy.yml Add VITE_GENERATION_API_URL to production frontend build
.github/workflows/deploy-staging.yml Add VITE_GENERATION_API_URL to staging frontend build

Task 1: Extract Generation Pipeline from SSE Formatting

Separate the generation logic (constraint resolution, trie tagging, generation, GUARD checking, compliance) from SSE string formatting. Both the FastAPI route and the RunPod handler will call the same pipeline generator.

Files: - Modify: packages/generation/server/routes/generate.py

  • [ ] Step 1: Add generate_pipeline() async generator

Add this function above _generate_sse(). It yields dicts instead of SSE strings — the same shape the frontend already parses ({status: ...}, {result: ...}, {error: ...}).

async def generate_pipeline(req: GenerateSingleRequest):
    """Core generation pipeline. Yields status/result/error dicts.

    Used by both the FastAPI SSE route and the RunPod serverless handler.
    """
    try:
        # 1. Resolve constraints -> word lists
        yield {"status": "Resolving constraints..."}
        resolved = await resolve_constraints(req.constraints)
        yield {"status": f"Fetched word lists ({len(resolved)} constraints)"}

        # 2. Prepare BAN/BOOST — tag vocabulary trie, build boost lists
        tokenizer = model.get_tokenizer()
        yield {"status": "Tagging vocabulary trie..."}
        trie, boost_lists = await prepare_generation(resolved)

        # 3. Build processors
        has_bans = any(rc.mode == "ban" for rc in resolved)
        reranker = (
            Reranker(
                tokenizer=tokenizer,
                trie=trie if has_bans else None,
                boost_lists=boost_lists,
            )
            if (has_bans or boost_lists)
            else None
        )
        punct_boost = PunctuationBoostProcessor(tokenizer=tokenizer)

        # 4. Build CheckerConfig for GUARD
        checker_config = _build_guard_config(req.constraints, resolved)
        g2p_cache = G2PCache()

        # 5. Generate drafts with GUARD retry loop
        warnings: list[str] = []
        has_bans = any(rc.mode == "ban" for rc in resolved)
        if has_bans and trie.root.total_below > 0:
            surviving_ratio = 1.0 - (trie.root.banned_below / trie.root.total_below)
            if surviving_ratio < 0.05:
                max_tokens = 48
                warnings.append(
                    f"Very restrictive constraint combination — only {surviving_ratio:.0%} of vocabulary survives. "
                    "Output quality may be degraded. Consider relaxing bounds or removing a constraint."
                )
            elif surviving_ratio < 0.2:
                max_tokens = 80
                warnings.append(
                    f"Restrictive constraints — {surviving_ratio:.0%} vocabulary survival. "
                    "Output will be shorter than usual."
                )
            else:
                max_tokens = 128
            yield {"status": f"Vocabulary survival: {surviving_ratio:.0%} → max {max_tokens} tokens"}
        else:
            max_tokens = 128

        n_batch = 4
        max_retries = 2
        compliant_drafts: list[tuple[float, list[int], str]] = []
        best_fallback: tuple[int, list[int], str, float] | None = None
        bad_words_ids: list[list[int]] = []

        for attempt in range(max_retries + 1):
            yield {"status": f"Generating {n_batch} drafts (attempt {attempt + 1})..."}

            proc_list = []
            if reranker:
                proc_list.append(reranker)
            proc_list.append(punct_boost)
            processors = LogitsProcessorList(proc_list)
            batch_results = model.generate_batch(
                req.prompt,
                n=n_batch,
                logits_processor=processors,
                bad_words_ids=bad_words_ids or None,
                max_new_tokens=max_tokens,
            )

            yield {"status": f"Checking compliance on {len(batch_results)} drafts..."}
            for idx, (gen_ids, text, gen_time) in enumerate(batch_results):
                violations = _guard_check(text, checker_config, g2p_cache)
                score = model._score_draft(text)

                if not violations:
                    compliant_drafts.append((score, gen_ids, text))
                    yield {"status": f"  Draft {idx + 1}: compliant (score={score:.1f})"}
                else:
                    yield {"status": f"  Draft {idx + 1}: {len(violations)} violations"}
                    if best_fallback is None or len(violations) < best_fallback[0]:
                        best_fallback = (len(violations), gen_ids, text, gen_time)
                    for word in violations:
                        for variant in [word, " " + word]:
                            tids = tokenizer.encode(variant, add_special_tokens=False)
                            if tids and tids not in bad_words_ids:
                                bad_words_ids.append(tids)

            if compliant_drafts:
                break

        # 6. Escalation: targeted rollout if no compliant drafts
        if not compliant_drafts and req.constraints:
            try:
                all_ban_words: set[str] = set()
                all_allow_words: set[str] | None = None
                for rc in resolved:
                    if rc.mode == "ban" and rc.strategy == "direct":
                        all_ban_words.update(rc.words)
                    elif rc.mode == "ban" and rc.strategy == "complement":
                        if all_allow_words is None:
                            all_allow_words = set()
                        all_allow_words.update(rc.words)

                if all_ban_words or all_allow_words:
                    yield {"status": "Activating targeted rollout..."}
                    from phonolex_governors.generation.lookahead import TargetedRolloutProcessor

                    hf_model = model.get_model()
                    enc_outputs = None
                    if hasattr(hf_model, 'get_encoder'):
                        full_prompt = f"{model.SYSTEM_PROMPT_GENERATE}\n\n{req.prompt}"
                        enc_input = tokenizer(full_prompt, return_tensors="pt").to(model.DEVICE)
                        with torch.no_grad():
                            enc_outputs = hf_model.get_encoder()(**enc_input)

                    rollout = TargetedRolloutProcessor(
                        model=hf_model,
                        tokenizer=tokenizer,
                        ban_words=all_ban_words,
                        allow_words=all_allow_words,
                        rollout_depth=3,
                        top_k=50,
                        penalty=50.0,
                        encoder_outputs=enc_outputs,
                    )

                    rollout_processors = [rollout]
                    if reranker is not None:
                        rollout_processors.append(reranker)
                    rollout_processors.append(punct_boost)
                    processors = LogitsProcessorList(rollout_processors)

                    yield {"status": "Generating with targeted rollout..."}
                    gen_ids, text, gen_time = model.generate_single(
                        req.prompt, logits_processor=processors,
                        max_new_tokens=max_tokens,
                    )
                    violations = _guard_check(text, checker_config, g2p_cache)
                    if not violations:
                        score = model._score_draft(text)
                        compliant_drafts.append((score, gen_ids, text))
                        yield {"status": "Rollout draft: compliant"}
                    else:
                        yield {"status": f"Rollout draft: {len(violations)} violations remaining"}
                        score = model._score_draft(text)
                        if best_fallback is None or len(violations) < best_fallback[0]:
                            best_fallback = (len(violations), gen_ids, text, gen_time)
            except Exception as e:
                log.warning("Escalation failed: %s", e)
                yield {"status": f"Escalation skipped: {type(e).__name__}"}

        # 7. Select best draft
        if compliant_drafts:
            compliant_drafts.sort(key=lambda x: x[0], reverse=True)
            best_score, best_ids, best_text = compliant_drafts[0]
            yield {"status": f"Selected best of {len(compliant_drafts)} drafts (score={best_score:.1f})"}
        elif best_fallback is not None:
            _, best_ids, best_text, gen_time = best_fallback
            yield {"status": f"No compliant draft — using best with {best_fallback[0]} violations"}
        else:
            best_ids, best_text = [], ""
            yield {"status": "No output produced"}

        # 8. Compute compliance + coverage
        yield {"status": "Computing compliance details..."}
        all_words = re.findall(r"[a-zA-Z]+", best_text)
        bound_norms = [c.norm for c in req.constraints if isinstance(c, BoundConstraint)]

        from server.word_norms import get_word_norms
        norms_data = get_word_norms()

        word_violations: list[str] = []
        word_violation_details: list[WordViolation] = []
        word_compliance: list[WordComplianceDetail] = []

        if checker_config:
            for w in all_words:
                result = check_word(w, checker_config, g2p_cache)
                clean = w.strip().lower()
                word_norms_entry = norms_data.get(clean, {})
                relevant_values = {n: word_norms_entry.get(n) for n in bound_norms}

                if not result.passed:
                    word_violations.append(w)
                    details = [v.details for v in result.violations]
                    word_violation_details.append(WordViolation(word=w, details=details))
                    word_compliance.append(WordComplianceDetail(
                        word=w, passed=False, values=relevant_values, violations=details,
                    ))
                else:
                    word_compliance.append(WordComplianceDetail(
                        word=w, passed=True, values=relevant_values,
                    ))

        boost_coverage: list[BoostCoverage] = []
        for bl in boost_lists:
            hit_words = [w for w in all_words if w.lower() in bl.words]
            total = len(all_words) if all_words else 0
            actual_rate = len(hit_words) / total if total > 0 else 0.0
            boost_coverage.append(BoostCoverage(
                label=bl.label,
                target_rate=round(bl.target_rate * 100, 1),
                actual_rate=round(actual_rate * 100, 1),
                hit_words=hit_words,
                total_words=total,
            ))

        gen_tokens = model.enrich_tokens(best_ids, {}, [], tokenizer)

        response = SingleGenerationResponse(
            tokens=gen_tokens,
            text=best_text,
            gen_time_ms=gen_time,
            compliant=len(word_violations) == 0,
            violation_count=len(word_violations),
            violation_words=word_violations,
            word_violations=word_violation_details,
            word_compliance=word_compliance,
            boost_coverage=boost_coverage,
            warnings=warnings or None,
        )

        log.info("Generation complete", extra={"context": {
            "prompt": req.prompt,
            "text": best_text,
            "constraints": [c.model_dump() for c in req.constraints],
            "compliant": len(word_violations) == 0,
            "violation_count": len(word_violations),
            "drafts_produced": len(compliant_drafts),
        }})

        yield {"result": response.model_dump()}

    except Exception as e:
        log.exception("Generation failed", extra={"context": {
            "prompt": req.prompt,
            "constraints": [c.model_dump() for c in req.constraints],
        }})
        yield {"error": str(e)}
  • [ ] Step 2: Rewrite _generate_sse() as a thin SSE wrapper

Replace the entire body of _generate_sse() with:

async def _generate_sse(req: GenerateSingleRequest):
    async for event in generate_pipeline(req):
        if "status" in event:
            yield _emit(event["status"])
        elif "result" in event:
            yield _emit_result(event["result"])
        elif "error" in event:
            yield _emit_error(event["error"])
  • [ ] Step 3: Verify the refactor doesn't break existing tests

Run: cd /Users/jneumann/Repos/PhonoLex/packages/generation && uv run python -m pytest server/tests/ -v Expected: All existing tests pass (or skip if they require model weights).

  • [ ] Step 4: Commit
git add packages/generation/server/routes/generate.py
git commit -m "refactor: extract generate_pipeline() for reuse by RunPod handler"

Task 2: RunPod Serverless Handler

Create the RunPod handler that wraps the generation pipeline. It loads the model on cold start (cached for warm requests), then yields streaming events to RunPod's generator protocol.

Files: - Create: packages/generation/rp_handler.py

  • [ ] Step 1: Create the handler
"""RunPod Serverless handler for governed generation.

Wraps the existing generation pipeline for RunPod's serverless infrastructure.
Model loads on cold start and persists across warm requests.

Env vars (set in RunPod template):
    PHONOLEX_API_URL: PhonoLex Workers API (default: https://api.phonolex.com)
    HF_HOME:          HuggingFace cache dir (default: /root/.cache/huggingface)
"""
from __future__ import annotations

import asyncio
import logging
import os
import sys
import time

# Ensure server package is importable
sys.path.insert(0, os.path.dirname(__file__))

# Override device for CUDA (production) — MPS is macOS-only dev default
os.environ.setdefault("PHONOLEX_DEVICE", "cuda")

from server.logging_config import setup_logging
setup_logging()

log = logging.getLogger("phonolex.runpod")

# --- Model loading (runs once on cold start) ---

import server.model as model_module

# Patch device before loading
model_module.DEVICE = os.environ.get("PHONOLEX_DEVICE", "cuda")

log.info("Cold start: loading model...")
t0 = time.time()
model_module.load_model()
log.info("Model ready in %.1fs", time.time() - t0)

# --- Handler ---

import runpod
from server.schemas import GenerateSingleRequest


def handler(job: dict):
    """RunPod serverless handler. Yields streaming status/result dicts."""
    input_data = job["input"]
    action = input_data.get("action", "generate")

    if action == "status":
        return model_module.get_status()

    if action != "generate":
        return {"error": f"Unknown action: {action}"}

    # Parse and validate request
    try:
        req = GenerateSingleRequest(
            prompt=input_data["prompt"],
            constraints=input_data.get("constraints", []),
        )
    except Exception as e:
        return {"error": f"Invalid request: {e}"}

    # Run the async pipeline in an event loop, yielding each event
    async def _run():
        from server.routes.generate import generate_pipeline
        events = []
        async for event in generate_pipeline(req):
            events.append(event)
        return events

    events = asyncio.get_event_loop().run_until_complete(_run())

    # Yield all events for RunPod streaming
    for event in events:
        yield event


runpod.serverless.start({
    "handler": handler,
    "return_aggregate_stream": True,
})
  • [ ] Step 2: Verify handler imports resolve

Run: cd /Users/jneumann/Repos/PhonoLex/packages/generation && uv run python -c "from server.routes.generate import generate_pipeline; print('OK')" Expected: OK

  • [ ] Step 3: Commit
git add packages/generation/rp_handler.py
git commit -m "feat: add RunPod serverless handler for governed generation"

Task 3: Dockerfile and Docker Ignore

Build a Docker image with T5Gemma weights baked in. Uses RunPod's PyTorch base image. Model downloads during build so FlashBoot caches it for fast cold starts.

Files: - Create: packages/generation/Dockerfile - Create: packages/generation/.dockerignore

  • [ ] Step 1: Create .dockerignore
# Test and dev files
**/__pycache__
**/*.pyc
**/.pytest_cache
**/tests/
**/.ruff_cache

# Git
.git
.gitignore

# Dev configs
*.md
.env*

# Coverage reports (large)
lookups/coverage_report_*.json

# Models dir (weights baked in via HuggingFace download)
models/
  • [ ] Step 2: Create Dockerfile
# ============================================================================
# PhonoLex Governed Generation — RunPod Serverless
#
# Bakes T5Gemma 9B-2B weights into the image for fast FlashBoot cold starts.
# GPU requirement: 24 GB+ VRAM (L4, A40, A100)
# ============================================================================
FROM runpod/pytorch:2.6.0-py3.12-cuda12.6.3-devel-ubuntu22.04

WORKDIR /app

# System deps
RUN apt-get update && apt-get install -y --no-install-recommends \
    git \
    && rm -rf /var/lib/apt/lists/*

# Python deps — install before copying source for layer caching
COPY packages/generation/pyproject.toml /app/packages/generation/pyproject.toml
COPY packages/governors/pyproject.toml /app/packages/governors/pyproject.toml
COPY packages/data/pyproject.toml /app/packages/data/pyproject.toml
COPY pyproject.toml /app/pyproject.toml

# Install RunPod SDK
RUN pip install --no-cache-dir runpod>=1.7.0

# Install workspace packages (editable for import resolution)
COPY packages/data/ /app/packages/data/
COPY packages/governors/ /app/packages/governors/
COPY packages/generation/server/ /app/packages/generation/server/

RUN pip install --no-cache-dir \
    -e /app/packages/data \
    -e /app/packages/governors \
    -e /app/packages/generation

# Download model weights at build time (cached in Docker layer)
ARG HF_TOKEN
ENV HF_HOME=/root/.cache/huggingface
RUN python -c "\
from huggingface_hub import snapshot_download; \
snapshot_download('google/t5gemma-9b-2b-ul2-it', cache_dir='/root/.cache/huggingface')" \
    ${HF_TOKEN:+--token=$HF_TOKEN}

# Copy lookup file (55 MB, required at runtime)
COPY packages/generation/lookups/governor_lookup_google_t5gemma-2-4b-4b.json \
     /app/packages/generation/lookups/governor_lookup_google_t5gemma-2-4b-4b.json

# Copy handler
COPY packages/generation/rp_handler.py /app/packages/generation/rp_handler.py

# RunPod env defaults
ENV PHONOLEX_API_URL=https://api.phonolex.com
ENV PHONOLEX_DEVICE=cuda

WORKDIR /app/packages/generation
CMD ["python", "-u", "rp_handler.py"]
  • [ ] Step 3: Verify Dockerfile syntax

Run: cd /Users/jneumann/Repos/PhonoLex && docker build --check -f packages/generation/Dockerfile . (or just review — don't build the full image locally since it needs CUDA).

  • [ ] Step 4: Commit
git add packages/generation/Dockerfile packages/generation/.dockerignore
git commit -m "feat: add Dockerfile for RunPod serverless deployment"

Task 4: Cloudflare Worker Generation Proxy

Add /api/generate/* routes to the existing Hono Worker. These proxy to the RunPod serverless endpoint, adding the API key server-side. The proxy converts RunPod's streaming format to SSE so the frontend doesn't need to change.

Files: - Modify: packages/web/workers/src/types.ts - Create: packages/web/workers/src/routes/generation.ts - Modify: packages/web/workers/src/index.ts

  • [ ] Step 1: Add RunPod bindings to Env type

In packages/web/workers/src/types.ts, add to the Env interface:

export interface Env {
  DB: D1Database;
  RUNPOD_API_KEY: string;
  RUNPOD_ENDPOINT_ID: string;
}
  • [ ] Step 2: Create the generation proxy route

Create packages/web/workers/src/routes/generation.ts:

/**
 * Generation proxy — forwards requests to RunPod Serverless endpoint.
 *
 * Converts RunPod's streaming protocol to SSE so the frontend's existing
 * EventSource parsing works unchanged. The RunPod API key stays server-side.
 *
 * Routes:
 *   POST /generate-single  → RunPod /run + /stream → SSE
 *   GET  /server/status     → RunPod /health
 */

import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import type { Env } from '../types';
import { log } from '../lib/logger';

const generation = new Hono<{ Bindings: Env; Variables: { requestId: string } }>();

function runpodUrl(endpointId: string, path: string): string {
  return `https://api.runpod.ai/v2/${endpointId}${path}`;
}

function runpodHeaders(apiKey: string): Record<string, string> {
  return {
    'Authorization': `Bearer ${apiKey}`,
    'Content-Type': 'application/json',
  };
}

/**
 * POST /generate-single
 *
 * Starts a RunPod job, polls the /stream endpoint, and converts
 * each chunk to an SSE event. Falls back to /status polling if
 * the stream endpoint returns no data.
 */
generation.post('/generate-single', async (c) => {
  const { RUNPOD_API_KEY, RUNPOD_ENDPOINT_ID } = c.env;
  if (!RUNPOD_API_KEY || !RUNPOD_ENDPOINT_ID) {
    return c.json({ error: 'Generation service not configured' }, 503);
  }

  const body = await c.req.json();
  const rid = c.get('requestId');

  // Start the RunPod job
  let jobId: string;
  try {
    const startRes = await fetch(
      runpodUrl(RUNPOD_ENDPOINT_ID, '/run'),
      {
        method: 'POST',
        headers: runpodHeaders(RUNPOD_API_KEY),
        body: JSON.stringify({
          input: { action: 'generate', ...body },
        }),
      }
    );

    if (!startRes.ok) {
      const detail = await startRes.text();
      log('error', `RunPod /run failed: ${startRes.status}`, {
        request_id: rid,
        context: { status: startRes.status, detail },
      });
      return c.json({ error: `Generation service error: ${startRes.status}` }, 502);
    }

    const startData = await startRes.json() as { id: string; status: string };
    jobId = startData.id;
  } catch (e) {
    log('error', `RunPod /run fetch failed: ${(e as Error).message}`, {
      request_id: rid,
    });
    return c.json({ error: 'Generation service unreachable' }, 502);
  }

  // Stream SSE events by polling RunPod /stream
  return streamSSE(c, async (stream) => {
    await stream.writeSSE({ data: JSON.stringify({ status: 'Connecting to GPU...' }) });

    const maxPollMs = 300_000; // 5 minute timeout
    const pollIntervalMs = 300;
    const startTime = Date.now();
    let completed = false;

    while (!completed && (Date.now() - startTime) < maxPollMs) {
      try {
        const streamRes = await fetch(
          runpodUrl(RUNPOD_ENDPOINT_ID, `/stream/${jobId}`),
          { headers: runpodHeaders(RUNPOD_API_KEY) }
        );

        if (streamRes.ok) {
          const streamData = await streamRes.json() as {
            status: string;
            stream?: Array<{ output: Record<string, unknown> }>;
          };

          // Emit any new stream chunks as SSE events
          if (streamData.stream) {
            for (const chunk of streamData.stream) {
              await stream.writeSSE({
                data: JSON.stringify(chunk.output),
              });
            }
          }

          if (streamData.status === 'COMPLETED') {
            completed = true;
            break;
          }

          if (streamData.status === 'FAILED') {
            await stream.writeSSE({
              data: JSON.stringify({ error: 'Generation failed on GPU worker' }),
            });
            completed = true;
            break;
          }
        }
      } catch (e) {
        log('warn', `RunPod /stream poll error: ${(e as Error).message}`, {
          request_id: rid,
        });
      }

      await stream.sleep(pollIntervalMs);
    }

    if (!completed) {
      // Check final status if we timed out of stream polling
      try {
        const statusRes = await fetch(
          runpodUrl(RUNPOD_ENDPOINT_ID, `/status/${jobId}`),
          { headers: runpodHeaders(RUNPOD_API_KEY) }
        );
        if (statusRes.ok) {
          const statusData = await statusRes.json() as {
            status: string;
            output?: Record<string, unknown>;
          };
          if (statusData.status === 'COMPLETED' && statusData.output) {
            await stream.writeSSE({
              data: JSON.stringify(statusData.output),
            });
          } else {
            await stream.writeSSE({
              data: JSON.stringify({ error: 'Generation timed out' }),
            });
          }
        }
      } catch {
        await stream.writeSSE({
          data: JSON.stringify({ error: 'Generation timed out' }),
        });
      }
    }

    log('info', `Generation proxy complete for job ${jobId}`, {
      request_id: rid,
      context: { jobId, durationMs: Date.now() - startTime },
    });
  });
});

/**
 * GET /server/status
 *
 * Returns generation server health. Maps RunPod health endpoint to the
 * status shape the frontend expects. When no workers are active, returns
 * a "serverless" status indicating scale-to-zero is normal.
 */
generation.get('/server/status', async (c) => {
  const { RUNPOD_API_KEY, RUNPOD_ENDPOINT_ID } = c.env;
  if (!RUNPOD_API_KEY || !RUNPOD_ENDPOINT_ID) {
    return c.json({
      model: 'not-configured',
      vocab_size: 0,
      memory_gb: 0,
      status: 'error',
      error: 'Generation service not configured',
      lookup_entries: 0,
    });
  }

  try {
    const healthRes = await fetch(
      runpodUrl(RUNPOD_ENDPOINT_ID, '/health'),
      { headers: runpodHeaders(RUNPOD_API_KEY) }
    );

    if (!healthRes.ok) {
      return c.json({
        model: 'google/t5gemma-9b-2b-ul2-it',
        vocab_size: 256000,
        memory_gb: 0,
        status: 'error',
        error: `RunPod health check failed: ${healthRes.status}`,
        lookup_entries: 0,
      });
    }

    const health = await healthRes.json() as {
      jobs: { completed: number; failed: number; inProgress: number; inQueue: number; retried: number };
      workers: { idle: number; initializing: number; ready: number; running: number; throttled: number };
    };

    const totalWorkers = health.workers.idle + health.workers.ready + health.workers.running;
    const anyInitializing = health.workers.initializing > 0;

    return c.json({
      model: 'google/t5gemma-9b-2b-ul2-it',
      vocab_size: 256000,
      memory_gb: totalWorkers > 0 ? 24.6 : 0,
      status: anyInitializing ? 'loading' : (totalWorkers > 0 ? 'ready' : 'serverless'),
      error: null,
      lookup_entries: totalWorkers > 0 ? 256000 : 0,
      workers: health.workers,
    });
  } catch (e) {
    return c.json({
      model: 'google/t5gemma-9b-2b-ul2-it',
      vocab_size: 256000,
      memory_gb: 0,
      status: 'error',
      error: `RunPod unreachable: ${(e as Error).message}`,
      lookup_entries: 0,
    });
  }
});

export default generation;
  • [ ] Step 3: Mount the generation route in index.ts

In packages/web/workers/src/index.ts, add the import and route mount:

import generation from './routes/generation';

Add after the existing route mounts:

app.route('/api', generation);

Also add staging-api.phonolex.com to the CORS origins:

if (origin === 'https://staging-api.phonolex.com') return origin;
  • [ ] Step 4: Type check the Worker

Run: cd /Users/jneumann/Repos/PhonoLex/packages/web/workers && npm run type-check Expected: No type errors.

  • [ ] Step 5: Commit
git add packages/web/workers/src/types.ts packages/web/workers/src/routes/generation.ts packages/web/workers/src/index.ts
git commit -m "feat: add generation proxy route to Cloudflare Worker"

Task 5: Frontend Environment Config

Update the frontend to route generation requests through the Cloudflare Worker proxy in production, while preserving direct localhost access for local development.

Files: - Modify: packages/web/frontend/src/lib/generationApi.ts

  • [ ] Step 1: Update GENERATION_API_URL fallback

In packages/web/frontend/src/lib/generationApi.ts, change line 12:

// Old:
const GENERATION_API_URL = import.meta.env.VITE_GENERATION_API_URL || 'http://localhost:8000';

// New — in production, use the same API as the main app (Worker proxies to RunPod):
const GENERATION_API_URL = import.meta.env.VITE_GENERATION_API_URL
  || import.meta.env.VITE_API_URL
  || 'http://localhost:8000';
  • [ ] Step 2: Handle "serverless" status in the status hook

The status hook should treat "serverless" as a valid state (scale-to-zero, will cold-start on first request). Update the ServerStatus type in packages/web/frontend/src/types/governance.ts if needed. At minimum, the existing useServerStatus hook already returns null when unreachable, so the frontend should already handle cold workers gracefully. Verify by reading the type.

  • [ ] Step 3: Commit
git add packages/web/frontend/src/lib/generationApi.ts
git commit -m "feat: route generation API through Worker proxy in production"

Task 6: Deploy Workflow Updates

Add VITE_GENERATION_API_URL to the frontend build steps in both deploy workflows so production and staging builds route generation through their respective Worker proxies.

Files: - Modify: .github/workflows/deploy.yml - Modify: .github/workflows/deploy-staging.yml

  • [ ] Step 1: Update production deploy workflow

In .github/workflows/deploy.yml, update the "Build frontend" step to add the env var:

      - name: Build frontend
        run: npm run build
        working-directory: ./packages/web/frontend
        env:
          VITE_API_URL: https://api.phonolex.com
          VITE_GENERATION_API_URL: https://api.phonolex.com
  • [ ] Step 2: Update staging deploy workflow

In .github/workflows/deploy-staging.yml, update the "Build frontend (staging)" step:

      - name: Build frontend (staging)
        run: npm run build
        working-directory: ./packages/web/frontend
        env:
          VITE_API_URL: https://staging-api.phonolex.com
          VITE_GENERATION_API_URL: https://staging-api.phonolex.com
  • [ ] Step 3: Commit
git add .github/workflows/deploy.yml .github/workflows/deploy-staging.yml
git commit -m "feat: add VITE_GENERATION_API_URL to deploy workflows"

Task 7: Document Secrets and Deployment Steps

Add comments to wrangler.toml documenting the required secrets, and create a deployment checklist for first-time RunPod setup.

Files: - Modify: packages/web/workers/wrangler.toml

  • [ ] Step 1: Document secrets in wrangler.toml

Add comments to the production and staging sections of packages/web/workers/wrangler.toml:

After the [[d1_databases]] block:

# Generation proxy secrets (set via wrangler secret put):
#   RUNPOD_API_KEY       — RunPod API key (shared across envs)
#   RUNPOD_ENDPOINT_ID   — RunPod serverless endpoint ID (production)

After the [[env.staging.d1_databases]] block:

# Staging generation proxy secrets (set via wrangler secret put --env staging):
#   RUNPOD_API_KEY       — RunPod API key (shared across envs)
#   RUNPOD_ENDPOINT_ID   — RunPod serverless endpoint ID (staging)
  • [ ] Step 2: Commit
git add packages/web/workers/wrangler.toml
git commit -m "docs: document RunPod secrets in wrangler.toml"

Deployment Checklist (Manual — Run Once)

After all tasks are complete, deploy to RunPod:

1. Build and Push Docker Image

# From repo root — build context needs packages/data, packages/governors, packages/generation.
# Weights are copied from the host HF cache (host must already have the model pulled).
cd /Users/jneumann/Repos/PhonoLex

docker build --platform linux/amd64 \
    --build-context hfcache=$HOME/.cache/huggingface/hub \
    -f packages/generation/Dockerfile \
    -t harmfulhumanbloom/phonolex-generation:5.0.0 \
    -t harmfulhumanbloom/phonolex-generation:latest \
    .

# Push (PAT with write scope required; `docker login` first)
docker push harmfulhumanbloom/phonolex-generation:5.0.0
docker push harmfulhumanbloom/phonolex-generation:latest

2. Create RunPod Serverless Endpoint

In the RunPod dashboard (runpod.io/console/serverless):

  1. Create Template
  2. Name: phonolex-generation
  3. Image: harmfulhumanbloom/phonolex-generation:latest
  4. Container disk: 30 GB
  5. Env vars: PHONOLEX_API_URL=https://api.phonolex.com

  6. Create Endpoint

  7. Name: phonolex-gen-prod
  8. Template: phonolex-generation
  9. GPU: 24 GB+ (L4, A40, or A100)
  10. Min workers: 0 (scale to zero)
  11. Max workers: 2 (or your budget)
  12. Idle timeout: 60s (FlashBoot keeps image cached)
  13. Execution timeout: 300s

  14. Note the endpoint ID (e.g., abc123def456)

  15. Repeat for staging with PHONOLEX_API_URL=https://staging-api.phonolex.com

3. Set Cloudflare Worker Secrets

cd packages/web/workers

# Production
echo "YOUR_RUNPOD_API_KEY" | npx wrangler secret put RUNPOD_API_KEY
echo "YOUR_PROD_ENDPOINT_ID" | npx wrangler secret put RUNPOD_ENDPOINT_ID

# Staging
echo "YOUR_RUNPOD_API_KEY" | npx wrangler secret put RUNPOD_API_KEY --env staging
echo "YOUR_STAGING_ENDPOINT_ID" | npx wrangler secret put RUNPOD_ENDPOINT_ID --env staging

4. Deploy Workers

# The next push to main/develop will auto-deploy via GitHub Actions.
# Or deploy manually:
npx wrangler deploy              # production
npx wrangler deploy --env staging # staging

5. Verify

# Health check
curl https://api.phonolex.com/api/server/status

# Generate (will trigger cold start on first call)
curl -X POST https://api.phonolex.com/api/generate-single \
  -H "Content-Type: application/json" \
  -d '{"prompt": "Write a short story about a cat."}'