name: http-service-patterns description: > Use when building an HTTP service with FastAPI lifecycle management, background poll loops, SPA static file serving with API reverse proxy, bidirectional WebSocket relay, or SSE event streaming.
HTTP Services & Proxies
The Pattern
Problem: You need a web service that does more than serve requests — it has a background loop reconciling state, it proxies WebSocket connections to a backend process, and it must start reliably even when the previous instance didn't exit cleanly.
Approach: FastAPI lifespan for startup/shutdown, asyncio.create_task for background loops, bidirectional WebSocket relay for proxying, and pre-bind port cleanup to prevent systemd crash-loops.
Pattern proven in production across multiple Python CLI tools and web services.
Key Design Decisions
1. Pre-bind port cleanup — preventing the restart crash-loop
When systemd restarts a service, the old process may still hold the port in TIME_WAIT. The new process fails to bind, exits with status=1, systemd restarts it, repeat. In one production deployment, 2,075+ systemd restarts occurred before manual intervention.
The fix runs before uvicorn.run():
def _kill_stale_port_holder(port: int) -> None:
"""Kill any existing process on *port* to prevent EADDRINUSE crash-loops."""
try:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
my_pid = os.getpid()
for pid_str in result.stdout.strip().split("\n"):
pid = int(pid_str.strip())
if pid != my_pid:
os.kill(pid, signal.SIGTERM)
time.sleep(1) # Brief wait for the port to be released
except Exception:
pass # lsof not available — proceed; uvicorn will fail naturally
Called right before server start:
_kill_stale_port_holder(port)
2. FastAPI lifespan with background poll loop
Use the FastAPI lifespan pattern to start background tasks at startup and clean them up at shutdown.
Starting a poll loop and an httpx client:
async def lifespan(app: FastAPI):
global _poll_task, _http_client
await kill_orphan_processes()
_poll_task = asyncio.create_task(_poll_loop())
_http_client = httpx.AsyncClient(verify=False)
app.state.http_client = _http_client
yield
# Shutdown
_poll_task.cancel()
await _http_client.aclose()
Starting both a monitor loop and a watchdog loop:
# Example: dual-loop lifespan for services that need both monitoring and maintenance
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
client = await _try_create_client() # graceful degradation if unavailable
app.state.orchestrator = Orchestrator(client=client)
monitor_instance = asyncio.create_task(monitor_loop(app))
watchdog_instance = asyncio.create_task(
app.state.orchestrator.watchdog_loop(app.state.instance_store))
try:
yield
finally:
watchdog_instance.cancel()
monitor_instance.cancel()
if client is not None:
await client.shutdown()
3. Bidirectional WebSocket relay with auth + auto-spawn
When proxying browser WebSocket connections to a backend process, check auth and verify the backend is alive BEFORE accepting the browser WS:
@app.websocket("/terminal/ws")
async def terminal_ws_proxy(websocket: WebSocket) -> None:
# Auth check BEFORE accept — middleware doesn't cover WebSocket scope
if not await _ws_auth_check(websocket):
return
# Ensure backend is reachable BEFORE accepting the browser WS
if not _is_backend_alive():
# Auto-spawn backend, wait for it to bind
...
await websocket.accept(subprotocol="tty")
async with websockets.connect(
f"ws://localhost:{BACKEND_PORT}/ws",
subprotocols=[Subprotocol("tty")]
) as backend_ws:
# Two concurrent tasks: client→backend and backend→client
async def client_to_backend():
while True:
msg = await websocket.receive()
if msg.get("bytes"):
await backend_ws.send(msg["bytes"])
elif msg.get("text"):
await backend_ws.send(msg["text"])
async def backend_to_client():
async for message in backend_ws:
if isinstance(message, bytes):
await websocket.send_bytes(message)
else:
await websocket.send_text(message)
# Run both directions concurrently, cancel on first completion
done, pending = await asyncio.wait(
[asyncio.create_task(client_to_backend()),
asyncio.create_task(backend_to_client())],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
Note: The subprotocol value (
"tty"in this example) should match your backend's WebSocket protocol.
4. SSE streaming from file tailing
Stream events to the browser using Server-Sent Events read from a JSONL file:
@router.get("/instances/{instance_id}/events")
async def stream_events(request, instance_id) -> StreamingResponse:
async def _generate():
# Wait for events file to appear (container may be starting)
events_path = get_instance_dir(instance_id) / EVENTS_DIR / "events.jsonl"
for _ in range(60):
if events_path.exists():
break
await asyncio.sleep(1)
# Incremental read: track file position between polls
with open(events_path) as fh:
while True:
line = fh.readline()
if line:
yield f"data: {line}\n\n"
else:
# Check if instance is in terminal state → close stream
instance = store.get_instance(instance_id)
if instance and instance.status in TERMINAL_STATUSES:
return
await asyncio.sleep(1)
return StreamingResponse(_generate(), media_type="text/event-stream")
Template / Starter Code
# app.py — FastAPI with lifespan, background loop, and port cleanup
import asyncio, os, signal, subprocess, time
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket
async def _poll_loop():
while True:
# Your reconciliation logic here
await asyncio.sleep(2.0)
@asynccontextmanager
async def lifespan(app: FastAPI):
poll_task = asyncio.create_task(_poll_loop())
yield
poll_task.cancel()
app = FastAPI(lifespan=lifespan)
def kill_stale_port_holder(port: int) -> None:
try:
result = subprocess.run(["lsof", "-ti", f":{port}"],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
my_pid = os.getpid()
for pid_str in result.stdout.strip().split("\n"):
pid = int(pid_str.strip())
if pid != my_pid:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
except Exception:
pass
@app.get("/health")
async def health():
return {"status": "ok"}
@app.websocket("/ws")
async def ws_proxy(websocket: WebSocket):
await websocket.accept()
# ... bidirectional relay logic ...
Gotchas & Lessons Learned
-
The reconnect-counter bounce bug. In one production system, the browser WebSocket was accepted immediately, then the proxy tried to connect to the backend. If the backend was dead, the WS closed. The browser's
onopenhad already fired (resetting reconnect attempts to 0), so the backoff never kicked in, causing rapid reconnect floods. Fix: verify the backend BEFORE callingwebsocket.accept(). -
BaseHTTPMiddlewaredoesn't cover WebSocket scope. FastAPI's HTTP middleware is not invoked for WebSocket connections. Implement a separate auth check function for WebSocket endpoints. -
Self-signed TLS + WebSocket requires
verify=False. When your WebSocket proxy connects to an upstream service that uses self-signed TLS, the SSL context must explicitly trust self-signed certs or the websockets library will reject the connection. -
Orphan process cleanup on startup. Kill orphaned child processes from a previous run during lifespan startup. Any service with child processes needs startup-time cleanup to avoid resource leaks.
-
SSE file tailing needs a wait-for-file phase. The events file may not exist when the SSE connection opens (the container is still starting). Poll for up to 60 seconds. Without this, the stream would immediately 404 during the startup window.