name: fastapi-pro
type: reference
description: "Production FastAPI patterns — async endpoints, SQLAlchemy 2.0 async, Pydantic V2, dependency injection, JWT auth, testing. Use for Python 3.11+ FastAPI backends. NOT for Django (→ django-patterns) or Node.js (→ backend-patterns)."
paths: ["/*.py", "/requirements*.txt", "/pyproject.toml", "/main.py", "/app//*.py"]
effort: 3
allowed-tools: Read, Glob, Grep, Write, Edit, Bash
user-invocable: true
when_to_use: "When building async FastAPI APIs with SQLAlchemy 2.0 async + Pydantic V2 — endpoints, auth, DB session handling, testing, deployment"
FastAPI Production Patterns
Critical rules (non-obvious)
async defendpoint blocking sync DB call → blocks entire event loop. Either useasyncDB driver (asyncpg/aiomysql) throughout OR switch endpoint to plaindef(FastAPI runs it in threadpool).- Pydantic V2
model_config = ConfigDict(...)replaces V1class Config. Forgetting this silently loses settings likefrom_attributes=Trueneeded for ORM → DTO conversion. Depends()caches per-request: same dependency called twice in one request returns same instance. Don't rely on this for cross-request state — use app state / Redis instead.- SQLAlchemy 2.0 async session must not leak across requests: always scope via
Dependswithasync with AsyncSession(...)— raw module-level session causesGreenletErrorunder load. BackgroundTasksruns AFTER response sent in the same worker process: if worker dies mid-task the work is lost. For durable background jobs use Celery / Dramatiq / ARQ.- Uvicorn
--workers Nforks processes — can't share in-memory state. Use Redis or DB for any shared state (rate-limit counters, cache).
Project layout
app/
├── main.py # FastAPI() instance + lifespan
├── api/
│ ├── deps.py # shared Depends (get_db, get_current_user)
│ └── v1/
│ ├── users.py # APIRouter
│ └── products.py
├── core/
│ ├── config.py # Pydantic Settings
│ ├── security.py # JWT encode/decode, password hashing
│ └── db.py # engine + AsyncSession factory
├── models/ # SQLAlchemy ORM models
├── schemas/ # Pydantic DTOs (Request/Response)
├── services/ # business logic (no framework coupling)
└── tests/
Pydantic V2 settings + config
# app/core/config.py
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_prefix="APP_")
database_url: str = Field(..., description="postgresql+asyncpg://...")
jwt_secret: str = Field(..., min_length=32)
jwt_algorithm: str = "HS256"
jwt_exp_minutes: int = 30
cors_origins: list[str] = Field(default_factory=list)
settings = Settings() # fails fast at import if required vars missing
SQLAlchemy 2.0 async session (per-request)
# app/core/db.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
engine = create_async_engine(
settings.database_url,
pool_size=20,
max_overflow=10,
pool_pre_ping=True, # reconnect on stale conns (LB idle timeout)
echo=False,
)
SessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# app/api/deps.py
from typing import AsyncIterator
from fastapi import Depends
async def get_db() -> AsyncIterator[AsyncSession]:
async with SessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# close is automatic via `async with`
Lifespan + startup/shutdown
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: warm caches, test DB
async with engine.begin() as conn:
await conn.execute(text("SELECT 1"))
yield
# Shutdown: drain connections
await engine.dispose()
app = FastAPI(title="My API", lifespan=lifespan)
JWT auth with OAuth2PasswordBearer
# app/core/security.py
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
pwd = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(p: str) -> str: return pwd.hash(p)
def verify_password(p: str, h: str) -> bool: return pwd.verify(p, h)
def create_access_token(sub: str) -> str:
exp = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_exp_minutes)
return jwt.encode({"sub": sub, "exp": exp}, settings.jwt_secret, settings.jwt_algorithm)
# app/api/deps.py
from fastapi import HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])
user_id: str = payload.get("sub")
except JWTError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
user = await db.get(User, user_id)
if not user:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
return user
Endpoint pattern (thin controller, service below)
# app/api/v1/users.py
from fastapi import APIRouter, Depends, HTTPException, status
router = APIRouter(prefix="/users", tags=["users"])
@router.post("", response_model=UserOut, status_code=status.HTTP_201_CREATED)
async def create_user(
payload: UserCreate,
db: AsyncSession = Depends(get_db),
) -> UserOut:
try:
user = await user_service.create(db, payload)
except DuplicateEmailError:
raise HTTPException(status.HTTP_409_CONFLICT, "Email taken")
return UserOut.model_validate(user) # V2 from-attributes
@router.get("/{user_id}", response_model=UserOut)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current: User = Depends(get_current_user),
) -> UserOut:
user = await db.get(User, user_id)
if not user:
raise HTTPException(status.HTTP_404_NOT_FOUND)
if user.id != current.id and not current.is_admin:
raise HTTPException(status.HTTP_403_FORBIDDEN)
return UserOut.model_validate(user)
Pydantic V2 schemas with from_attributes
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class UserBase(BaseModel):
model_config = ConfigDict(from_attributes=True)
email: EmailStr
full_name: str = Field(min_length=1, max_length=100)
class UserCreate(UserBase):
password: str = Field(min_length=8, max_length=128)
class UserOut(UserBase):
id: int
created_at: datetime
Global exception handler
# app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse
class AppError(Exception):
def __init__(self, msg: str, status_code: int = 400):
self.msg, self.status_code = msg, status_code
@app.exception_handler(AppError)
async def app_error_handler(req: Request, exc: AppError):
return JSONResponse(status_code=exc.status_code, content={"error": exc.msg})
@app.exception_handler(Exception)
async def unhandled(req: Request, exc: Exception):
logger.exception("Unhandled error", extra={"path": req.url.path})
return JSONResponse(status_code=500, content={"error": "Internal server error"})
Testing with pytest-asyncio
# tests/conftest.py
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
@pytest_asyncio.fixture
async def client():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
@pytest_asyncio.fixture
async def db_session():
async with SessionLocal() as s:
yield s
await s.rollback() # isolate test
# tests/test_users.py
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
r = await client.post("/api/v1/users", json={"email": "a@b.com", "full_name": "A", "password": "pw12345678"})
assert r.status_code == 201
assert r.json()["email"] == "a@b.com"
Production deployment (Uvicorn + Gunicorn)
# Dockerfile CMD — recommended for production
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 60 \
--keep-alive 5 \
--access-logfile -
Workers = (2 × CPU) + 1 for CPU-bound; lower for IO-heavy async (async workers share event loop already).
Common pitfalls
| Pitfall | Fix |
|---|---|
async def + sync psycopg2/pymysql | Use asyncpg / aiomysql OR drop async on endpoint |
BaseSettings V1 pattern (class Config) | V2 uses model_config = SettingsConfigDict(...) |
from_attributes=True missing → validation error from ORM instance | Add to ConfigDict on every DTO reading from ORM |
Forgetting await on SQLAlchemy 2.0 async query | Linter — use sqlalchemy[asyncio] type stubs + mypy |
| Returning ORM object → leaks relationships (N+1 on serialize) | Always model_validate(orm_obj) to scoped Pydantic DTO |
BackgroundTasks for durable work | Switch to Celery / Dramatiq / ARQ |
CORSMiddleware added AFTER auth middleware | CORS must be OUTERMOST — added first |
response_model + return extra fields → silently stripped | Use response_model_exclude_unset=True consciously |
Observability hooks
# Structured logging
import structlog
logger = structlog.get_logger()
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.monotonic()
response = await call_next(request)
logger.info("http_request",
method=request.method, path=request.url.path,
status=response.status_code,
duration_ms=(time.monotonic() - start) * 1000,
)
return response
- Add
prometheus-fastapi-instrumentatorfor/metrics. - Health check: GET
/healthzreturns{"status": "ok"}+ DBSELECT 1. - Request ID:
X-Request-IDheader middleware → bind to contextvars for logging.