Файл генерируется автоматически из файлов в specs/rules/*
- 📖 Какую спецификацию читать для вашего сценария
- 📥 Установка
- 🚀 Запуск
- 🧪 Тестирование
- 💻 Разработка
- 📂 Структура проекта
- 📚 Документация
Архитектурный шаблон для сервисов GEN AI
Шаблон составлен с учетом опыта разработки сервисов в GEN AI.
Разработчики прошли через боль и слезы плохие решения, ошибки, оверинжиниринг и неудобный дизайн.
Чтоб не делать одни и те же ошибки, мы собрали оптимальные решения в этом проекте.
- 🔧 В этом проекте заложены необходимые инфраструктурные настройки для развертывания и работы сервиса внутри контура
- 🤖 Запуск Telegram бота с мониторингом HTTP запросов в Prometheus и необходимыми инфрастурктурными настройками контура
- ⚡ Запуск FastAPI с мониторингом HTTP запросов в Prometheus и необходимыми инфрастурктурными настройками контура
- 📐 Определена структура моделуй и их границы (что должно и не должно в них находится)
- 🔌 Готовые интеграции с нашими сервисами внутри контура с отслеживанием запросов через Prometheus (Клиенты-адаптеры для баз данных, аутентификации, llm моделей, OpenAI, keycloak и другие)
- 🛠️ Набор переиспользуемых утилит, декораторов и другого шаблонного кода
- 📝 Правильно настроенное логирование и получение переменных окружений с валидацией
- 🐳 Докер контейнер для быстрого локального развертывания с инфраструктурой
- ✅ Все необходимое для создания автотестов без боли
- 📚 Набор проверенных библиотек, с которыми мы работаем
- 🎨 Вайбкодинг с файлом AGENTS.md с подробными правилами и инструкциями для генерации кода через LLM агента
✨ Преимущества шаблона:
- 🎯 Позволит вам сосредоточиться на бизнес-логике
- 🔄 Стандартизирует повторяемый код в разных проектах
- 🚀 Разработчики будут быстрее развертывать проекты в контуре
📖 Какую спецификацию читать для вашего сценария
В директории specs/rules/ находятся спецификации с подробными правилами и рекомендациями по разработке:
✨ Создание нового функционала:
- 🔌 Создать адаптер к внешнему API → create-adapter.md
- 🤖 Создать обработчик Telegram бота → telegram-handlers.md
- 🌐 Создать API эндпоинт → fastapi-and-api-endpoints.md
- 🗄️ Добавить миграцию БД → alembic-db-migration.md
- 🧪 Создать авто-тест → auto-tests.md
- 🚨 Как создать своё исключение → exceptions.md
- Как создать ORM модель → create-orm-model.md
- 🎨 Вайбкодинг → development.md
🏛️ Работа с архитектурой:
- 📐 Понять структуру слоев и границы модулей → layers.md
- Доменно-специфичные типы → tiny-types.md
- 🔌 Узнать про существующие адаптеры → adapters.md
- ⚠️ Как избежать типичных ошибок и сохранить нервы и время, себе и другим → anti-patterns.md
- 🧱 Какие есть переиспользуемые модули в проекте → project-utils.md
⚙️ Инфраструктура и конфигурация:
- 🗄️ Как работать с бд через ORM, сессии и транзакции → database-sessions.md
- 🔧 Как добавить переменные окружуния и использовать их в проекте → settings-and-environments.md
- 📊 Настроить мониторинг для функции или участка кода → monitoring.md
- ⏱️ Использовать ленивую инициализацию → lazy-init-objects.md
- 🔍 Настроить линтеры → linters.md
- 📦 Выбор и использование внешних библиотек → python-libs.md
📥 Установка
Мы используем пакетный менеджер UV, как установить.
Основные зависимости
# Установка всех зависимостей
uv sync --all-groups
Группы зависимостей
Зависимости организованы по группам для более гибкой установки:
dev - зависимости для разработки
uv sync --locked --group dev
# или вместе с основными
uv sync --locked --all-extras
voice - обработка аудио (голос)
uv sync --locked --group voice
telegram - Telegram бот + uvloop + Flask
uv sync --locked --group telegram
restapi - FastAPI + uvicorn
uv sync --locked --group restapi
database - Alembic + PostgreSQL + SQLAlchemy
uv sync --locked --group database
Либо через pip
pip install -r requirements.txt -r requirements.dev.txt
Переменные окружения
Нужно создать файл с переменными окружения .env в директории,
пример файла с обязательными переменнами в env.example.
Ознакомьтесь с переменными окружения в settings.py.
Linux
cp .env.example .env
Windows
Copy-Item .env.example .env
Crossplatform
python3 -c "import shutil; shutil.copy('.env.example', '.env2')"
🚀 Запуск
Запускайте приложение с переменной окружения ENV=LOCAL, оно выключает проверку аутентификации и другие вещи, которые
не нужны в локальной раработке.
⚡ Fastapi
uv run uvicorn project.infrastructure.apps.api:app
🤖 Telegram Bot
uv run python -m project.infrastructure.apps.bot
Для продакшена написать запуск в модуле main.py и запустить таким образом:
python -m project.infrastructure.apps.main
TODO: как запустить через докер
🧪 Тестирование
Для запуска тестов используйте:
pytest --cov=project tests/
Уровень логирование в тестах настраивается в pytest.ini
💻 Разработка
Следуйте соглашению именования комитов.
Форматирование кода и линтеры выполняется автоматически при коммите через pre-commit хуки. Чтобы активировать pre-commit, выполните:
pre-commit install
Настройки линтеров находятся в pyproject.toml. Настройки линтеров для тестов находятся в ruff-tests.toml.
При коммите автоматически выполняются:
- ✅ ruff check --fix (линтер)
- 🎨 ruff format (форматирование кода)
- 💉 di-linter (проверка инъекций зависимостей)
- 🏛️ la-linter (проверка архитектурных слоев)
- 📦 обновление requirements.txt через UV
📂 Структура проекта
Модули объединяются по компонентам, так рекомендуются в чистой архитектуре, в DDD, это наиболее оптимально. Благодаря этому взглянув на структуру проекта, можно сразу понять, про что он. И не надо прыгать по директориям, искать модули относящиеся к одному компоненту.
Границы модулей:
project/infrastructure/apps/bot.py- Telegram botproject/infrastructure/apps/flask.py- Запускатеся на проде для параллельного запуска с Telegram bot для создания эндпоинтов health-check и prometheusproject/infrastructure/apps/api.py- FastAPI appproject/infrastructure/apps/main.py- Запуск приложения на проде, там их может быть несколько запущено параллельно, поэтому отдельный модульproject/infrastructure/adapters/*- интеграции к внешним системам (адаптеры, клиенты)project/infrastructure/utils- универсальный переиспользуемые код, не связанный с бизнес-логикой, относящиеся кproject/components/{component}/cli.py- обработчики CLI интерфейсаproject/components/{component}/endpoints.py- эндпоинты APIproject/components/{component}/handlers.py- обработчики ботаproject/components/{component}/models.py- модели данных ORMproject/components/{component}/repositories.py- Любое обращение к даннымproject/components/{component}/enums.py- Наборы значенийproject/components/{component}/usecases.py- точка входа в бизнес-логику (сценарии использования приложения)project/components/{component}/service.py- детали реализации бизнес-логики (когда бизнес-логика не влезает в
usecases.py, переноси сюда)project/components/{component}/exceptions- исключения бизнес-логики относящиеся к компонентуproject/components/{component}/schemas.py- схемы данных и/или валидация pydanticproject/components/{component}/ai/{agent_name}/exceptions.pyproject/components/{component}/ai/{agent_name}/schemas.py- схемы данных агента (pydantic модели)project/components/{component}/ai/{agent_name}/prompts.py- llm промпты, еще второй вариант ниже, где каждый промпт в отдельнос файле, в общей директории promptsproject/components/{component}/ai/{agent_name}/prompts/*.py- llm промптыproject/components/{component}/ai/{agent_name}/tools/*.py- инструменты ai агентаproject/components/{component}/ai/{agent_name}/agent.py- логика ai агентаproject/libs/*- универсальный переиспользуемые код, не связанный с бизнес-логикой и инфраструктуройproject/exceptions.py- базовые исключенияproject/logger.py- настройки логированияproject/settings.py- Переменные окруженияproject/container.py- Контейнер для внедрения зависимостей фреймворками. Почему не вproject/libs? Потому что там запрещен импорт из project.infrastructure.tests/test_*- Автотестыtests/conftest.py- Фикстурыtests/factories.py- Фабрики данныхalembic/versions/*- Миграции бдscripts/*- Скриптыspecs/rules/*- Документация и спецификации по разработке используя этот шаблон и заложенный в нем стильspecs/features/*- Спецификация по разработке фичи при вайбкодинге
Вайбкодинг
Для того, чтобы работать с внешними моделями, можно сгенерировать промпт с содержимом модулей проекта, через команду:
uv run python ./scripts/project_prompt.py # по умолчанию конфиг ./project-prompt.toml
uv run python ./scripts/project_prompt.py -c my-custom-config.toml
Управлять тем, какие файлы войдут в промпт, можно через конфиг project-prompt.toml
- Если модель не соблюдает правила игры, пропускает и не соблюдает какие то требования, то нужно добавить инструкции или исправить существующие в ./specs/rules
- Добавляйте в контекст модели весь файл AGENTS.md, он формируется автоматически из всех файлов в ./specs/rules
📚 Документация
В директории specs/rules/ находятся документация по использованию шаблона:
- adapters.md - Общие принципы работы с адаптерами и обзор существующих адаптеров
- alembic-db-migration.md - Создание и управление миграциями базы данных через Alembic
- anti-patterns.md - Антипаттерны, которых следует избегать в разработке
- database-sessions.md - Управление сессиями и транзакциями базы данных (asession, atransaction, current_atransaction)
- auto-tests.md - Правила написания и запуска автотестов
- create-adapter.md - Пошаговая инструкция по созданию адаптера к внешней системе
- exceptions.md - Правила работы с исключениями и создание собственных ошибок
- fastapi-and-api-endpoints.md - Best practices для создания API эндпоинтов
- layers.md - Архитектурные границы и слои приложения
- lazy-init-objects.md - Ленивая инициализация объектов вместо глобальных переменных
- linters.md - Запуск линтеров и инструментов проверки кода
- monitoring.md - Настройка мониторинга через Prometheus
- project-utils.md - Описание переиспользуемых утилит
- python-libs.md - Рекомендации по использованию библиотек Python
- settings-and-environments.md - Работа с переменными окружения через Settings
- development.md - Методология разработки на основе спецификаций
- telegram-handlers.md - Правила создания обработчиков Telegram бота
- create-orm-model.md - Правила создания ORM моделей
- tiny-types.md - Доменно-специфичные типы
Адаптеры (Adapters)
Адаптеры — это компоненты инфраструктурного слоя, которые инкапсулируют взаимодействие с внешними системами и сервисами.
Расположение: project/infrastructure/adapters/
Основные принципы:
- Изолируют внешние зависимости от бизнес-логики
- Предоставляют удобный интерфейс для работы с внешними сервисами
- Обрабатывают ошибки и специфику внешних API
- Упрощают замену внешних зависимостей на другие, в случае необходимости
- Позволяют подменять реализацию адаптера в тестах на заглушки
Существующие адаптеры
auth.py — Адаптер аутентификации
Адаптер для работы с сервисом аутентификации пользователей. Запросы отслеживаются через prometheus.
Основной функционал:
check_telegram_user(user_telegram_id: int) -> bool— проверка существования пользователя Telegramget_users_data() -> dict— получение данных всех пользователей
keycloak.py — Адаптер Keycloak
Адаптер для интеграции с Keycloak (система управления идентификацией и доступом).
voice.py — Адаптер транскрибации и синтеза речи
⭐ Качественное решение, проверенное на практике
Адаптер для работы с голосовыми данными: преобразование речи в текст (STT) и текста в речь (TTS).
Реализация: VoiceAdapter — использует OpenAI API
Основной функционал:
voice_to_text(voice: bytes | bytearray) -> str— преобразование голоса в текст- Поддерживает указание языка (по умолчанию
ru)
- Поддерживает указание языка (по умолчанию
text_to_voice(text: str, instructions: str, voice: str = "alloy") -> io.BytesIO— синтез речи из текста
Особенности:
- Использует retry-механизм с исключением определенных ошибок (BadRequestError, RateLimitError и др.)
- Интегрирован с Prometheus через декораторы
@action_tracking_decorator - Оптимизирован для работы с Telegram (поддержка
download_as_bytearray())
Пример использования:
# Для Telegram
ogg_data = await voice_file.download_as_bytearray()
text = await voice_adapter.voice_to_text(ogg_data)
llm.py — Адаптер для работы с LLM
Основной функционал:
llm_chat_client() -> ChatOpenAI— создание клиента LangChain для чатаllm_aclient() -> AsyncClient— создание асинхронного OpenAI клиента
Особенности:
- Поддержка Prometheus мониторинга
- Поддержка middleware proxy для работы в контуре
database.py / adatabase.py — Адаптеры базы данных
Адаптеры для работы с базой данных (синхронный и асинхронный варианты).
Основной функционал:
- Управление соединениями с БД
- Выполнение запросов
- Управление транзакциями
TODO: примеры использования сессий и транзакций
cache.py / acache.py — Адаптеры к Redis
Адаптеры для работы с системой кеширования (синхронный и асинхронный варианты).
Основной функционал:
- Сохранение и получение данных из кеша
- Управление TTL (время жизни)
- Инвалидация кеша
Антипаттерны
- хранить данные в файлах и в глобальных объектах, используйте базу данных, redis, postgres
- использовать pickle сериализацию, это не безопасно
- использовать для состояния ai агента просто словарь, надо минимум TypedDict, лучше pydantic схему, она с валидацией
- использовать одно состояния для всех агентов, это не безопасно, потому что при изменении поведения одного агента, есть риск повлиять на другой
- Запрещено использовать Pandas, Polars и тому подобное. По опыту никто не может и не хочет разбираться в их апи, оно плохо читается, требуется знание их апи. Предпочтительнее язык SQL или операции над нативными объектами, списками и словарями, аннотированными через TypedDict, dataclass и т.д.
- Связь Many-to-Many в ORM и БД через отдельную таблицу. Делайте через хранение идентификаторов в поля списках, без промежуточной таблицы
- Запрещено добавлять логику на стороне БД, только на стороне приложения!
Инъекция зависимостей
Видео про эту проблему https://www.youtube.com/watch?v=3Z_3yCgVKkM
Плохо: Создание зависимостей внутри классов или функций.
class AskUseCase:
def __init__(self):
# Жесткая связка с реализацией, внутри этого класса, могут быть еще много зависимостей.
self.chat = ChatService(...)
def ask(self, user_id: int, question: str) -> str:
# Тестирование потребует патча.
repo = DatabaseRepository() # прямое создание зависимости
...
При тестировании придется патчить все вложенные зависимости ChatService и DatabaseRepository:
from unittes import mock
def test_ask():
with mock.patch('path.to.ChatService'), \
mock.patch('path.to.ChatService.create_answer'), \
mock.patch('path.to.DatabaseRepository'):
use_case = AskUseCase()
...
Такие тесты делают рефакторинг болезненным. При изменении путей, имен объектов, потребуется вносить изменения во все тесты с патчами. Лучше такие зависимости проносить через аргументы методов, ниже будет пример.
Хорошо: Внедрение зависимостей через контейнер зависимостей (DI Container):
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ... import Repository, ChatService
class AskUseCase:
def __init__(
self,
repo: "Repository",
chat: "ChatService",
):
self.repo = repo
self.chat = chat
def ask(self, user_id: int, question: str) -> str:
...
class Repository: # Реализация
def get(self):
pass
class ChatService: # Реализация
def create_answer(self, user_id, question):
return "Answer"
class DIContainer:
def __init__(self, repo=None, chat_service=None):
self.repo = repo or Repository()
self._chat = chat_service or ChatService()
self.ask_use_case = AskUseCase(self.repo, self._chat)
container = DIContainer()
assert container.ask_use_case.ask(user_id=1, question="My question") == "Answer"
class OtherChatService: # Другая реализация
def create_answer(self, user_id, question):
return "Other Answer"
other_container = DIContainer(chat_service=OtherChatService)
assert other_container.ask_use_case.ask(user_id=1, question="My question") == "Other Answer"
Такой класс можно тестировать с разными реализациями зависимостей без патчей:
def test_ask_use_case():
user_id = 1
question_text = "Test question"
expected_answer = "Test answer"
class TestRepo:
def get(self):
return user_id
class TestChatService:
def create_answer(self, user_id, question):
return expected_answer
use_case = AskUseCase(
repository=TestRepo(),
chat=TestChatService(),
)
result = use_case.ask(user_id, question_text)
assert result == expected_answer
Преимущества подхода:
- Не требуется патчить внутренние зависимости
- Явно видны все используемые зависимости и данные
- Тест проверяет только публичное API класса
- При рефакторинге внутренней реализации тест останется рабочим
Автотесты
- Как запускать тесты написанно в README.md
- Фикстуры в conftest.py.
- При запуске тестов автоматически поднимаются контейнеры с базами данных через testconteiners (на время тестов)
- Уровень логирование в тестах настраивается в pytest.ini
Пример теста эндпоинта API test_endpoints.py.
Антипаттерны в тестах
- абстракции для тестов, это последнее, во что хочется вникать, максимально избегайте их. Идеально, когда всё, что относится к данному тесту, находится внутри него, без необходимости прыгать по модулям, чтоб понять, как он работает.
- данные для теста в фикстурах затрудяет чтение теста, добавление и изменение данных. Данные должны быть внутри теста, несмотря на то, что данные могут дублироваться между тестами.
- большое кол-во assert в тесте это плохо, скорее всего в тамком тесте проверяется много кейсов, наилучший вариант, в одном тесте проверять что-то одно
- не пишите юнит-тесты, потому что при рефакторинге приходится изменять тесты, а этим ни кто не хочет заниматься, поэтому будет затруднять рефакторинг. Тестируйте поведение программы. Тестируйте сверху, через UseCase или эндпоинты. Пишите функциональные тесты.
- запрещено использовать unittest.mock.patch, если он вам понадобился, значит реализация спроектирована неверно, перепишите код, следуя принципу dependency inversion. Подмените реализацию объекта, передав мок объект с другим поведением (заглушку) в конструктор или через аргументы функции.
- проверяйте в тестах поведение программы, а не объекты
Как создавать данные для теста
В проекте есть фабрики объектов ORM моделей, они позволяют подготавливать окружение для тестирования. Находятся в factories.py. Пример использования в test_use_case.py.
В тестах используется одна сессия SqlAlchemy с БД без завершения транзакции, поэтому объекты на самом деле не создаются фабрикой в БД, но благодаря внутреннему хранилищу ORM, программа видит эти объекты.
Где обычно создают данные для теста? В фикстурах. Потом их использует и в других тестах. Чтоб создать немного другие данные, создает еще фикстуру и так проект ими зарастает.
Фабрики позволяют создавать объекты с разными параметрами, что делает их более гибкими. Одна фабрика для создания объектов с разными настройками. Полезно, когда вам нужно создавать объекты с разными состояниями в тестах.
Фабрики позволяют изолировать данные тестов друг от друга, так как каждый тест может создавать свои собственные объекты.
Фабрики упрощают поддержку кода, так как логика создания объектов сосредоточена в одном месте. Если вам нужно изменить способ создания объекта, вы делаете это только в фабрике, а не в каждом тесте.
Фикстуры в pytest могут быть "магическими" — они автоматически подставляются в тесты,
что может затруднить понимание того, что именно происходит.
Фабрики, явно вызываются в коде, улучшая понимание теста.
Фабрики лучше подходят для сложных сценариев, где нужно создавать объекты с множеством зависимостей или выполнять дополнительные действия при создании. Фикстуры могут стать громоздкими в таких случаях.
Описание фикстур в conftest.py
Фикстуры настройки окружения
setup (scope="session", autouse=True)
- Автоматически применяется ко всем тестам
- Устанавливает тестовое окружение (ENV="AUTOTESTS", токены, ключи)
- Настраивает логирование
Фикстуры для работы с базой данных
init_database (scope="session")
- Поднимает PostgreSQL контейнер через testcontainers
- Создает схему БД
- Возвращает engine для подключения
- Автоматически очищается после всех тестов
session
- Создает синхронную сессию SQLAlchemy для теста
- Все изменения откатываются после теста (rollback)
- БД всегда остается чистой между тестами
Пример использования:
def test_create_user(session):
# После теста данные автоматически откатятся
...
asession
- Асинхронная версия
session
Пример использования:
async def test_create_user_async(asession):
# После теста данные автоматически откатятся
...
Фикстуры для работы с Redis
init_redis (scope="session")
- Поднимает Redis контейнер
- Возвращает синхронный клиент Redis
redis
- Синхронный Redis клиент для теста
- Автоматически очищает базу после теста (flushdb)
Пример использования:
def test_cache(redis):
# После теста Redis очищается
...
async_init_redis (scope="session")
- Асинхронная версия Redis контейнера
async_redis
- Асинхронный Redis клиент
- Очищается после теста
Пример использования:
async def test_cache_async(async_redis):
# После теста Redis очищается
...
Фикстуры для тестирования API
api_client
- TestClient для тестирования FastAPI эндпоинтов
- Автоматически добавляет API токен в заголовки
Пример использования:
def test_endpoint(api_client):
response = api_client.get("/api/users")
assert response.status_code == 200
assert response.json() == []
Фикстуры для мокирования HTTP запросов
httpx_responses
- Для мокирования синхронных HTTP запросов (httpx)
- Использует библиотеку respx
Пример использования:
def test_external_api(httpx_responses):
httpx_responses.add(
"GET",
"https://api.example.com/data",
json={"result": "success"},
status=200
)
# Теперь запросы на этот URL вернут mock-ответ
aiohttp_responses
- Для мокирования асинхронных HTTP запросов (aiohttp)
- Использует библиотеку aioresponses
Пример использования:
async def test_external_api_async(aiohttp_responses):
aiohttp_responses.add(
"https://api.example.com/data",
method="GET",
payload={"result": "success"},
status=200
)
# Асинхронные запросы на этот URL вернут mock-ответ
Фикстуры для работы с Keycloak
keycloak_client
- Синхронный клиент Keycloak с тестовыми настройками
- Для тестирования интеграции с Keycloak
mock_keycloak
- Мокирует ответы Keycloak для синхронных запросов
- Возвращает тестовый токен
Пример использования:
def test_keycloak_auth(keycloak_client, mock_keycloak):
# mock_keycloak автоматически мокирует запросы
token = keycloak_client.get_token()
assert token == "test_token"
keycloak_aclient
- Асинхронный клиент Keycloak
mock_async_keycloak
- Мокирует асинхронные запросы к Keycloak
Вспомогательные фикстуры
project_dir (scope="session")
- Возвращает путь к директории проекта
- Полезно для работы с файлами проекта в тестах
Пример использования:
def test_config_file(project_dir):
config_path = project_dir / "config.yaml"
assert config_path.exists()
Обработка дисконнектов клиентов в FastAPI
При работе с LLM и стриминговыми ответами важно корректно обрабатывать ситуации, когда клиент неожиданно закрывает соединение. Без обработки дисконнектов:
- GPU продолжает генерацию токенов впустую
- Транзакции в БД могут остаться незавершёнными
- Ресурсы сервера тратятся впустую
Расположение
project/infrastructure/utils/disconnect.py
Основные концепции
Как работает обнаружение дисконнекта
ASGI-сервер (Uvicorn) отправляет событие http.disconnect при закрытии соединения.
Для обнаружения нужно явно опрашивать await request.receive() и проверять тип сообщения.
Важно: Не используйте request.is_disconnected() — он некорректно работает с BaseHTTPMiddleware.
Автоматическая обработка через Middleware (рекомендуется)
Добавьте DisconnectMiddleware в приложение — он автоматически обрабатывает дисконнекты для обычных эндпоинтов.
from fastapi import FastAPI
from project.infrastructure.utils.disconnect import DisconnectMiddleware
app = FastAPI()
app.add_middleware(DisconnectMiddleware)
Как работает middleware
Middleware запускает две задачи параллельно:
handler_task— выполнение эндпоинтаdisconnect_task— ожидание событияhttp.disconnectот клиента
Затем ждёт первую завершённую задачу:
- Если эндпоинт выполнился быстрее — возвращает результат
- Если клиент отключился первым — отменяет эндпоинт и освобождает ресурсы
Для стриминговых ответов middleware пропускает обработку, так как StreamingResponse и EventSourceResponse имеют встроенную логику отмены через task groups. Вместо этого используйте detect_disconnect() или полагайтесь на sse-starlette.
Ограничения
- Не работает с
StreamingResponseиEventSourceResponse— нужно использовать другие инструменты - Требует, чтобы эндпоинты были async
Утилиты для ручной обработки
1. Декоратор @with_cancellation
Для обычных (не-стриминговых) эндпоинтов — автоматически отменяет обработчик при дисконнекте.
from project.infrastructure.utils.disconnect import with_cancellation
from fastapi import Request
@app.post("/process")
@with_cancellation
async def process_job(job: JobRequest, raw_request: Request):
try:
result = await llm.generate(job.prompt)
return {"result": result}
except asyncio.CancelledError:
# Логируем отмену, отправляем метрики
raise # Пробрасываем исключение дальше
finally:
# Очистка ресурсов с защитой от отмены
with shield_cancel_scope():
await cleanup()
2. Контекстный менеджер detect_disconnect
Для гранулярного контроля — защищает критичные операции и позволяет отменять выборочно.
from project.infrastructure.utils.disconnect import detect_disconnect, cancel_on_disconnect
@app.post("/process")
async def process_job(job: JobRequest, request: Request):
async with detect_disconnect(request) as disconnect_event:
# Защищено от отмены — выполнится всегда
await db.log_request(job.job_id)
# Проверяем, отключился ли клиент
if disconnect_event.is_set():
await db.log_cancellation(job.job_id)
return Response(status_code=499)
# Может быть отменено при дисконнекте
result = await cancel_on_disconnect(
expensive_operation(),
disconnect_event
)
return result
3. Функция shield_cancel_scope
Защита критичных операций от отмены в стриминговых ответах.
from project.infrastructure.utils.disconnect import shield_cancel_scope
async def stream_response():
try:
async with aclosing(generate_chunks()) as gen:
async for chunk in gen:
yield chunk
finally:
with shield_cancel_scope():
await cleanup() # Не будет прервано при отмене
4. Функция safe_async_generator_cleanup
Безопасная очистка вложенных асинхронных генераторов через aclosing().
Используется внутри стриминговых эндпоинтов для корректного освобождения ресурсов вложенного генератора.
from project.infrastructure.utils.disconnect import safe_async_generator_cleanup
from contextlib import aclosing
async def sse_endpoint():
# Внутренний генератор от LLM
async def llm_stream():
async for chunk in llm_client.stream(prompt):
yield chunk
# Прокидываем через safe_async_generator_cleanup
async for chunk in safe_async_generator_cleanup(llm_stream()):
yield {"data": chunk}
Зачем нужно:
- Немедленное освобождение ресурсов (без ожидания garbage collection)
- Правильная очистка контекстных переменных в async-контексте
Особенности реализации
Использование anyio.CancelScope для стриминга
При работе с sse-starlette и стриминговыми ответами используйте anyio.CancelScope
вместо asyncio.shield(), так как sse-starlette работает на anyio task groups.
import anyio
with anyio.CancelScope(shield=True):
await cleanup_operation()
Обработка CancelledError
Всегда явно обрабатывайте CancelledError:
- Логируйте событие отмены
- Отправляйте метрики
- Пробрасывайте исключение дальше
- Выполняйте очистку в
finallyблоке
try:
result = await operation()
except asyncio.CancelledError:
# Логируем метрику отмены
metrics.increment("request_cancelled")
raise # Пробрасываем
Избегайте двойной отмены
Не проверяйте дисконнект вручную внутри стриминговых генераторов, если уже
используете EventSourceResponse или StreamingResponse — фреймворки
уже обрабатывают отмену. Ручная проверка создаёт race condition.
Когда что использовать
| Сценарий | Инструмент |
|---|---|
| Все эндпоинты автоматически | DisconnectMiddleware |
| Простой endpoint без стриминга (без middleware) | @with_cancellation |
| Нужна защита БД-операций | detect_disconnect |
| Стриминговый ответ (SSE) | shield_cancel_scope + aclosing() |
| Вложенные генераторы в стриме | safe_async_generator_cleanup |
| Сложная логика с несколькими этапами | Комбинация инструментов |
Ссылки
Как добавить адаптер к внешней системе
Для создания адаптеров, работающих с внешними HTTP API, используйте классы из base_client.py.
Как замокать http запросы в тестах можно найти в спеке auto-tests.md.
Основные компоненты:
-
AsyncApi — базовый класс для асинхронных HTTP клиентов
- Поддерживает context manager для управления сессиями
- Автоматическая обработка ошибок (4xx, 5xx)
- Интеграция с Prometheus метриками
- Логирование запросов и ответов
-
SyncApi — базовый класс для синхронных HTTP клиентов
- Аналогичный интерфейс с AsyncApi
- Подходит для синхронного кода
-
IClient — Protocol, определяющий интерфейс клиента
- Определяет структуру клиентского класса
- Обеспечивает единообразие адаптеров
Пошаговая инструкция по созданию адаптера
Шаг 1: Создать файл адаптера
Создайте новый файл в project/infrastructure/adapters/, например my_service.py.
Шаг 2: Определить кастомные классы ошибок (опционально)
from project.exceptions import ExternalApiError, ServerError, ClientError
class MyServiceApiError(ExternalApiError):
pass
class MyServiceServerError(ServerError):
pass
class MyServiceClientError(ClientError):
pass
Шаг 3: Создать класс адаптера
Вариант А: Асинхронный адаптер
from project.infrastructure.utils.base_client import AsyncApi, IClient
class MyServiceClient(IClient):
class Api(AsyncApi):
ApiError = MyServiceApiError
ServerError = MyServiceServerError
ClientError = MyServiceClientError
# Опционально: кастомная сессия для мониторинга
# ClientSession = MyCustomHttpClient
def __init__(self, api_key: str):
self.api_root = "https://api.myservice.com"
self.api = self.Api(
self.api_root,
name_for_monitoring="my_service_api",
headers={"Authorization": f"Bearer {api_key}"},
request_settings={"timeout": 30},
)
async def get_items(self, resource_id: str) -> dict:
"""Получить данные ресурса."""
return await self.api.call_endpoint(
f"path/to/resource/{resource_id}",
method="GET",
request_settings={"timeout": 3} # example timeout for resource
)
async def create_items(self, data: dict) -> dict:
"""Создать новый ресурс."""
return await self.api.call_endpoint(
"path/to/resource/",
method="POST",
json=data,
)
Вариант Б: Синхронный адаптер
from project.infrastructure.utils.base_client import SyncApi
class MyServiceSyncClient:
class Api(SyncApi):
ApiError = MyServiceApiError
ServerError = MyServiceServerError
ClientError = MyServiceClientError
def __init__(self, api_key: str):
self.api_root = "https://api.myservice.com"
self.api = self.Api(
self.api_root,
name_for_monitoring="my_service_api",
headers={"Authorization": f"Bearer {api_key}"},
)
def get_items(self, resource_id: str) -> dict:
return self.api.call_endpoint(
f"resources/{resource_id}",
method="GET",
)
Шаг 4: Добавить фабричный метод (опционально) для ленивой инициализации
from functools import cache
from project.settings import Settings
@cache
def my_service_client():
return MyServiceClient(
api_key=Settings().MY_SERVICE_API_KEY.get_secret_value()
)
Шаг 5: Использование адаптера
from project.infrastructure.adapters.my_service import my_service_client
client = my_service_client()
data = await client.get_items("123")
# С переиспользованием сессии (для множественных запросов)
async with client.api.Session():
data1 = await client.get_items("123")
data2 = await client.get_items("456")
# Сессия будет переиспользована
Расширенные возможности
Кастомная обработка ответов
Если нужна специфическая обработка ответов, переопределите методы в Api классе:
class LimitError(MyServiceClientError):
pass
class MyServiceClient(IClient):
class Api(AsyncApi):
async def response_to_native(self, response):
# Кастомная десериализация
return await super().response_to_native(response)
async def error_handling(self, response, response_data):
# Кастомная обработка ошибок
if response.status == 429:
raise LimitError("LimitError: Too many requests.")
return await super().error_handling(response, response_data)
Retry-механизм
Для критичных операций добавьте retry:
from project.libs.retry import retry_on_exception
class MyServiceClient(IClient):
# ... (определение Api класса)
@retry_on_exception(
(LimitError,),
max_attempts=3,
backoff=2,
)
async def create_item(self, data: dict):
...
Правила создания моделей данных ORM и миграций
- Таблицы в бд необходимо называть в единственном числе.
- В проекте используется sqlalchemy>=2.0 версии.
- База данных Postgres.
- После нужно создания или изменения модели нужно создать миграцию через alembic.
- При создании ORM модели нужно создавать фабрику для генерации данных этой модели в factories.py
- Declarative Mapping 2.0 (использование
Mappedиmapped_column) - Строгая типизация. Запрещено использовать примитивы (
int,str) для идентификаторов и ключевых полей сущностей
1. Миксины моделей в проекте:
import datetime as dt
from sqlalchemy import func, MetaData
from sqlalchemy.orm import (
Mapped,
mapped_column,
declarative_base,
)
public_schema = MetaData()
Base = declarative_base(metadata=public_schema)
class TimeMixin:
created_at: Mapped[dt.datetime] = mapped_column(nullable=False, server_default=func.now())
updated_at: Mapped[dt.datetime] = mapped_column(nullable=False, server_default=func.now(), onupdate=func.now())
2. Доменно-специфичные типы (Domain Types)
Для устранения двусмысленности и повышения читаемости кода, вместо примитивных типов (int, str) необходимо создавать специальные типы для каждой сущности.
- Определение: Типы создаются через
typing.NewType, описание поля задается черезtyping.Annotated. - Расположение: Все типы должны находиться в файле
project/datatypes.py.
Пример (project/datatypes.py):
import typing as t
UserIdT = t.NewType("UserIdT", t.Annotated[int, "User ID"])
OrderIdT = t.NewType("OrderIdT", t.Annotated[int, "Order ID"])
ProductNameT = t.NewType("ProductNameT", t.Annotated[str, "Product Name"])
3. Работа с Enum (Нативные типы Postgres)
В PostgreSQL следует использовать нативные ENUM типы.
- Python класс: Наследуйтесь от
strиenum.Enum. - SQLAlchemy поле: Используйте
sqlalchemy.Enum(..., name="..."). - Важно: Параметр
nameобязателен для создания типа в БД.
class UserRole(str, enum.Enum):
ADMIN = "admin"
USER = "user"
# В модели:
role: Mapped[UserRole] = mapped_column(Enum(UserRole, name="user_role_enum"))
4. Первичные ключи (Primary Keys)
4.1. Типы и BigInteger
- Для аннотации типа (
Mapped[...]) используйте доменный тип (например,UserIdT), а неint. - Для конфигурации колонки (
mapped_column(...)) всегда указывайтеBigInteger(аналогBIGSERIAL).
# Правильно:
id: Mapped[UserIdT] = mapped_column(BigInteger, primary_key=True)
4.2. Составные ключи
Если ключ составной, указывайте primary_key=True для каждого поля.
5. Реализация Many-to-Many (Через массивы)
Не создавайте промежуточные таблицы. Используйте нативный тип ARRAY для хранения списка идентификаторов.
- Тип колонки:
mapped_column(ARRAY(BigInteger)). - Аннотация:
Mapped[list[DomainIdT]].
# Пример: Статья хранит список ID тегов
tag_ids: Mapped[list[TagIdT]] = mapped_column(ARRAY(BigInteger), default=list)
6. Индексы
- Простые:
index=Trueвнутриmapped_column. - Составные:
Index("name", "col1", "col2")в__table_args__. - Когда добавлять: Для полей фильтрации, сортировки и внешних ключей.
7. Пример (Golden Sample)
Генерируй код, строго следуя этому шаблону. Обрати внимание на импорт типов из project.datatypes.
import enum
import typing as t
from datetime import datetime
from sqlalchemy import (
BigInteger,
String,
ForeignKey,
func,
Index,
Enum
)
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# Предполагается, что этот код находится в project/datatypes.py
# Но для генерации моделей импортируй их:
# from project.datatypes import ProductIdT, ProductNameT, OrderIdT, OrderStatusT
# --- MOCK DATATYPES (для примера) ---
ProductIdT = t.NewType("ProductIdT", t.Annotated[int, "Product ID"])
ProductNameT = t.NewType("ProductNameT", t.Annotated[str, "Product Name"])
OrderIdT = t.NewType("OrderIdT", t.Annotated[int, "Order ID"])
# ------------------------------------
# 1. Base
class Base(DeclarativeBase):
pass
# 2. Enums
class OrderStatus(str, enum.Enum):
CREATED = "created"
PROCESSING = "processing"
COMPLETED = "completed"
# 3. Models
class Product(Base):
__tablename__ = "products"
# Использование доменного типа + BigInteger
id: Mapped[ProductIdT] = mapped_column(BigInteger, primary_key=True)
# Доменный тип для строки
name: Mapped[ProductNameT] = mapped_column(String(150), index=True)
price: Mapped[int] = mapped_column(BigInteger) # Для простых значений можно int
# Many-to-Many via Array of IDs
related_product_ids: Mapped[list[ProductIdT]] = mapped_column(ARRAY(BigInteger), default=list)
class Order(Base):
__tablename__ = "orders"
id: Mapped[OrderIdT] = mapped_column(BigInteger, primary_key=True)
# Native Postgres Enum
status: Mapped[OrderStatus] = mapped_column(
Enum(OrderStatus, name="order_status_enum"),
default=OrderStatus.CREATED,
index=True
)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
class OrderLog(Base):
__tablename__ = "order_logs"
# Composite Primary Key Example with Domain Types
order_id: Mapped[OrderIdT] = mapped_column(
ForeignKey("orders.id", ondelete="CASCADE"),
primary_key=True
)
log_index: Mapped[int] = mapped_column(BigInteger, primary_key=True)
message: Mapped[str] = mapped_column(String)
Создание миграций базы данных Alembic
Проект использует Alembic для управления миграциями базы данных.
Создание миграций
# Автоматически создать миграцию на основе изменений в моделях
alembic revision --autogenerate -m "Описание изменений"
# Или создать пустую миграцию
alembic revision -m "Описание изменений"
Применение миграций
# Применить все ожидающие миграции
alembic upgrade head
# Откатить последнюю миграцию
alembic downgrade -1
При переключение веток не забудьте откатывать миграции, если в ваших ветках они ушли дальше, чем на ветке, в которую вы переключились.
Создание CacheRepository
CacheRepository — это базовый класс для работы с Redis в качестве кеша. Он предоставляет унифицированный интерфейс для кеширования данных с поддержкой TTL.
Расположение: project/components/{component}/repositories.py
Обязательные атрибуты класса
key_template
Тип: t.ClassVar[str]
Шаблон для формирования ключей кеша. Должен содержать плейсхолдер {} для подстановки идентификатора:
# Правильно
key_template = "user:{}"
# Неправильно
key_template = "user" # Нет плейсхолдера
ttl
Тип: t.ClassVar[timedelta]
Время жизни записи в кеше:
# Примеры TTL
ttl = timedelta(days=7) # 7 дней
ttl = timedelta(seconds=60) # 60 секунд
Атрибуты класса
client
Тип: t.ClassVar[redis_client]
Клиент для работы с Redis. Уже определен в базовом классе:
from project.infrastructure.adapters.acache import redis_client
class CacheRepository:
client = redis_client
Используйте метод cls.client() для получения экземпляра клиента внутри методов.
Схемы данных (Pydantic)
Для данных кеша рекомендуется использовать Pydantic схемы:
from pydantic import BaseModel
class UserCacheSchema(BaseModel):
id: int
name: str
email: str
created_at: datetime
class Config:
from_attributes = True
Правила использования
1. Именование класса
Имя класса должно заканчиваться на CacheRepository и наследоваться от CacheRepository:
class UserCacheRepository(CacheRepository):
...
class ProductCacheRepository(CacheRepository):
...
2. Доменные типы для ключей
Используйте доменные типы из project/datatypes.py для аннотации ключей:
from project.datatypes import UserIdT, ProductIdT
async def save(cls, user_id: UserIdT, data: "BaseModel"): ...
async def get(cls, product_id: ProductIdT): ...
3. Сериализация данных
- Используйте
orjsonдля быстрой сериализации - Используйте
data.model_dump(exclude_unset=True)для получения словаря - Для десериализации используйте
orjson.loads()
Полный актуальный пример в repositories.py:
class UserCacheRepository(CacheRepository):
key_template = "user:{}"
ttl = timedelta(days=7)
@classmethod
async def save(cls, user_id: UserIdT, data: "BaseModel"):
async with redis_atransaction() as tr:
content = orjson.dumps(data.model_dump(exclude_unset=True))
tr.set(cls.key_template.format(user_id), content, ex=cls.ttl)
@classmethod
async def get(cls, user_id: UserIdT):
content = await cls.client().get(cls.key_template.format(user_id))
if content:
data = orjson.loads(content)
return UserCacheSchema(**data)
return content
@classmethod
async def delete(cls, user_id: UserIdT):
async with redis_atransaction() as tr:
tr.delete(cls.key_template.format(user_id))
Дополнительные методы
При необходимости можно добавить дополнительные методы для работы с кешем:
class UserCacheRepository(CacheRepository):
...
@classmethod
async def exists(cls, user_id: UserIdT) -> bool:
"""Проверка существования ключа в кеше."""
return await cls.client().exists(cls.key_template.format(user_id)) > 0
Управление сессиями базы данных (Database Sessions)
Расположение: project/infrastructure/adapters/adatabase.py
asession() — Управление сессиями
Для чтения данных. Переиспользует существующую сессию или создает новую.
# Простое чтение
async with asession() as session:
result = await session.execute(select(User).where(User.id == user_id))
user = resulscalar_one_or_none()
# Вложенные вызовы используют ту же сессию
async with asession() as session1:
async with asession() as session2:
# session1 === session2
⚠️ Не создает транзакцию. Для изменений используйте atransaction().
atransaction() — Управление транзакциями
Для изменения данных. Создает транзакцию с автоматическим commit/rollback.
# Простая транзакция
async with atransaction() as session:
user = User(name="John")
session.add(user)
# Автоматический commit
# Вложенные транзакции создают SavePoint
async with atransaction() as session:
user = User(name="John")
session.add(user)
try:
async with atransaction() as s:
# Создается SavePoint
post = Post(title="Test", user=user)
s.add(post)
raise ValueError()
except ValueError:
pass # SavePoint откатился, но user сохранится
Поведение при вложенности:
- Сессия в транзакции → создает
begin_nested()(SavePoint) - Сессия без транзакции → создает
begin() - Нет сессии → создает сессию и транзакцию
current_atransaction() — Текущая или новая транзакция
Для переиспользуемых функций. Возвращает активную транзакцию или создает новую.
async def reusable_operation():
async with current_atransaction() as session:
# Работает и внутри, и вне существующей транзакции
user = User(name="John")
session.add(user)
# Вариант 1: создаст транзакцию
await reusable_operation()
# Вариант 2: использует существующую
async with atransaction():
await reusable_operation()
Отличие от atransaction():
atransaction()— всегда создает новый уровень (SavePoint)current_atransaction()— переиспользует текущую транзакцию без SavePoint
Использование сессий и транзакций в классах репозиториях
Расположение: project/components/base/repositories.py
ORMRepository — Базовый класс репозиториев
Дает доступ к открытие сессии и транзакции. Область применения, внутри методов классов, во вне лучше использовать обертки через project.container.AllRepositories
Вне классов, их использовать не надо!
class ORMRepository(Generic[T]):
@classmethod
@contextmanager
def get_session(cls):
with Session() as session:
yield session
@classmethod
@contextmanager
def get_transaction(cls):
with transaction() as session:
yield session
@classmethod
@contextmanager
def get_current_transaction(cls):
with current_transaction() as session:
yield session
class ORMModelRepository(ORMRepository[T]):
# Наследует методы от ORMRepository.
...
Использование через контейнер
AllRepositories — Точка доступа к репозиториям
Расположение: project/container.py
Класс AllRepositories предоставляет централизованный доступ к транзакциям.
Его надо использовать, когда на уровне бизнес-логики
нужно обернуть вызов методов из нескольких репозиториев.
class AllRepositories:
def __init__(self):
self.user = UserRepository()
...
@classmethod
@contextmanager
def transaction(cls) -> Generator["ORMSession", Any, None]:
with transaction() as session:
yield session
@classmethod
@contextmanager
def current_transaction(cls) -> Generator["ORMSession", Any, None]:
with current_transaction() as session:
yield session
Использование:
from project.container import Repositories
# Через экземпляр репозитория
user = Repositories().user.get(user_id)
# Через контейнер транзакций
with Repositories.transaction() as session:
Repositories.user.save(user_data)
Repositories.employee.save(employee_data)
# Переиспользование текущей транзакции
with Repositories.current_transaction() as session:
...
Использование в тестах
В tests/conftespy определены фикстуры с автоматическим rollback:
Синхронная фикстура session
@pytesfixture
def session(init_database):
with database.Session() as session:
with session.begin() as t:
with session.begin_nested():
yield session
# Данные откатываются после теста
rollback()
database.engine_factory.cache_clear()
database.scoped_session_factory.cache_clear()
Асинхронная фикстура asession
@pytest_asyncio.fixture
async def asession(init_database):
async with adatabase.asession() as asession:
async with asession.begin() as t:
async with asession.begin_nested():
yield asession
# Данные откатываются после теста
await rollback()
adatabase.aengine_factory.cache_clear()
adatabase.async_sessionmaker_factory.cache_clear()
Как это работает:
- Открывается сессия через
adatabase.asession() - Создается основная транзакция через
begin() - Создается вложенная транзакция (SavePoint) через
begin_nested() - Тест работает внутри SavePoint
- После теста данные откатываются через
rollback() - Очищаются кеши фабрик для изоляции между тестами
Это обеспечивает чистую БД для каждого теста без необходимости пересоздания схемы.
- При удалении и переименовании объектов в коде, их также нужно изменять в документации находящихся в ./specs
- Не создавай обработку исключений для импортов пакетов
- Не используй пакет future
- Не делай импорты в
__init__.pyфайлах пакетов — импортируй напрямую из модулей - Создавай асинхронные контекстные менеджеры через декоратор
@asynccontextmanagerизcontextlib, а не через класс с__aenter__/__aexit__
Exceptions
Свои собственные исключения наследуйте от project.exceptions.AppError
API Best practices
uvicorn запускается с циклом uvloop
uvicorn project.presentation.api:app -host 0.0.0.0 --loop uvloop
Эндпоинты должны располагаться в project.components.{name}.endpoints.
Формат ответов ресурсов API
Лучше возвращать в виде словаря.
Тогда при необходимости добавление новых данных в ответе ручки, нужно будет добавить только новое поле.
Используйте готовую схему для этого project.components.base.schemas.ApiResponse.
Если указать в аргументе response_model, в swagger появится документация по выводу.
Но в случае тяжелых данных это может быть затратно по времени,
потому что данные буду валидироваться через pydantic, это замедляет 2.5 раза по сравнению с обычным dict/dataclass.
from project.components.base.schemas import ApiResponseSchema
@app.get("/my", response_model=ApiResponseSchema[list[int]])
async def my_resource():
return {"data": [1, 2]}
Аутентификация по токену в API
Проверка уже зашита в эндпоинты.
def auth_by_token(auth_token: str = Header(alias="Api-Token")):
if auth_token != Settings().API_TOKEN:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Token")
return auth_token
app = FastAPI(..., dependencies=[Depends(auth_by_token)])
Быстрая сериализация в JSON
ORJSONResponse использует пакет orjson написанный на RUST, очень быстрый.
Можно указывать в самой ручке через аргумент response_class.
from fastapi.responses import ORJSONResponse
@app.get("/health", response_class=ORJSONResponse)
async def health_check():
return {}
Версионирование
- Версионирование через URL путь
/user/v1/list - Изменение ручек делаем через добавление новой ручки с новой версией, а предыдущую помечаем
deprecated.
@app.get("/user/v1/list", deprecated=True)
Обработка исключений в FastAPI
Правильный подход к обработке исключений
Поток обработки ошибок: Бизнес-логика → raise Exception → FastAPI Exception Handler → HTTP Response
В бизнес-логике поднимать обычные исключения (наследники AppError):
# В use cases, services, repositories
def process_user(user_id: int):
user = user_repo.get(user_id)
if not user:
raise UserNotFoundError(f"User {user_id} not found")
return user
В FastAPI эндпоинтах использовать обработчики исключений FastAPI для преобразования бизнес-исключений в HTTP ответы:
app = FastAPI(...)
@app.exception_handler(Exception)
async def custom_exception_handler(request, exc: Exception):
message = f"Unexpected Error: {exc}"
logger.exception(message)
return ORJSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Internal Server Error"},
)
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc: HTTPException):
message = f"{request.method} {request.url} {exc.status_code}"
if exc.detail:
message = f"{message} ({exc.detail})"
if Settings().is_local():
message = f"{message} headers={request.headers}"
logger.error(message)
return await http_exception_handler(request, exc)
@app.exception_handler(RequestValidationError)
async def custom_validation_exception_handler(request: Request, exc: RequestValidationError):
message = f"{request.method} {request.url} {status.HTTP_422_UNPROCESSABLE_ENTITY} ({exc.errors()})"
logger.error(message)
return await request_validation_exception_handler(request, exc)
@app.exception_handler(exceptions.NotFoundError)
async def not_found_error_handler(request: Request, exc: exceptions.NotFoundError):
message = f"{request.method} {request.url} {status.HTTP_404_NOT_FOUND} ({exc})"
logger.error(message)
return ORJSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={"detail": str(exc)},
)
@app.exception_handler(exceptions.AuthError)
async def auth_error_handler(request: Request, exc: exceptions.NotFoundError):
message = f"{request.method} {request.url} {status.HTTP_401_UNAUTHORIZED} ({exc})"
logger.error(message)
return ORJSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": str(exc)},
)
@app.exception_handler(exceptions.ExternalApiError)
async def integration_error_handler(request: Request, exc: exceptions.NotFoundError):
message = f"{request.method} {request.url} {status.HTTP_500_INTERNAL_SERVER_ERROR} ({exc})"
logger.error(message)
return ORJSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": str(exc)},
)
Почему так правильно?
- Разделение ответственностей: Бизнес-логика не знает о HTTP, фреймворк обрабатывает HTTP аспекты
- Переиспользование: Одна бизнес-логика может использоваться в разных контекстах (API, CLI, тесты)
- Тестируемость: Легче тестировать бизнес-логику без HTTP зависимостей
- Читаемость: Код бизнес-логики фокусируется на логике, а не на HTTP деталях
Антипаттерны
❌ Не использовать HTTPException в бизнес-логике:
# ПЛОХО: бизнес-логика зависит от FastAPI
def process_user(user_id: int):
user = user_repo.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found") # ❌
return user
❌ Не подавлять исключения в эндпоинтах:
# ПЛОХО: скрывает ошибки
@app.get("/users/{user_id}")
async def get_user(user_id: int):
try:
user = await user_service.get_user(user_id)
return user
except Exception as e:
return {"error": "Something went wrong"} # ❌
Архитектурные границы
TODO: переработать
Основная цель слоистых архитектур - отделить бизнес-логику от фреймворков, инфраструктуры и интерфейсов ввода/вывода.
Инфраструктура - это подключение к базе данных, ORM, обращение к внешнему сервису, шина сообщений, отправка уведомлений, сбор метрик и т.д.
Главное правило - бизнес-логика не должна зависеть от деталей реализации и быть подальше от побочных эффектов. Для этого нужно соблюдать правило инверсии зависимостей, нижележащие слои не могут зависеть от вышестоящих.
Конкретно в Python это означает, что в слой бизнес-логики не должно быть импортов
из слоя инфраструктуры и ввода/вывода.
Чтобы обозначить ожидаемый интерфейс объектов из слоя инфраструктуры в аннотациях типов, не импортируя их,
можно создавать заглушки объектов наследуясь от класса Protocol.
Пример в interfaces.py
Из-за того, что бизнес-логика не зависит от деталей реализации, упрощается тестирование. Пример теста бизнес-логики посмотрите тут test_ask.py
Пример, как надо писать модули смотрите на примере домена chat
Слои
Считаю, что абстракции UseCase, Service, Repository могут быть достаточными для скрытия сложности на ранней стадии проекта. Вводить новые сущности можно по мере увеличения сложности проекта. Поэтому ограничимся описанием этих паттернов.
UseCase - точка входа в бизнес-сценарии. Пример use_cases.py.
Слой, через который интерфейсы ввода/вывода запускают бизнес-логику.
Здесь содержится валидация данных, авторизация, проверка квоты, лимитов и т.д.
Поэтому другие домены и поддомены бизнес-логики не должны использовать UseCase.
Реализация бизнес-процессов должна находится в Service-ах.
В UseCase не должно быть того, что потребуется в Service-ах в других доменах и поддоменах.
Имя сценария должно отражать бизнес-функцию.
Ожидается, что UseCase должен быть очень простым (мало строк) и понятным для чтения.
Сам код, нейминг классов и методов должен описывать, что происходит в терминах бизнеса -
провалидировать данные, проверить авторизацию, квоту, запустить бизнес-процесс.
Валидацию данных лучше использовать на этом слое,
но детали реализации (функции валидаций) выносить в модуль validation.py
UseCase - это объект без состояния.
Service - скрывает детали реализации бизнес-процесса. Пример service.py
Может объединять в себе работу одного или нескольких доменов.
Объект без состояния.
Repository - нужны, чтобы отделить доступ к данным от ORM. Пример [repositories.py]
(project/components/chat/repositories.py)
Объект без состояния.
- доступ к данным изолируйте в классах Repository, в бизнес-логике извлечение данных из бд затрудняет читать и понимать саму бизнес-логику. Ищите примеры в модулях repositories.py
- Есть generic базовый класс с базовыми методами, наследуйте ваши Repository от него, пример в repositories.py
# Когда мы смотрим на бизнес-логику, лучше увидеть такое
UserRepo.get_users(user_ids=[1, 2, 3])
# чем такое
query = select(User).where(User.id.in_([1, 2, 3]))
async with Session() as session:
result = session.execute(query)
data = await result.scalars().all()
Interface - это объект, показывающий ожидаемый интерфейс, используется только в аннотациях типов. Пример interfaces.py. Избавляет от необходимости импорта реального объекта, чтобы не нарушать правило инверсии зависимостей.
Adapter - реализация интерфейса. Пример llm.py. Чтобы не зависеть от конкретных фреймворков и других зависимостей, мы взаимодействуем с ними через фасад. Благодаря этому можно заменить технологию, находящуюся за фасадом.
DIContainer - контейнер, в котором разрешаются зависимости, создается один раз. Пример container.py. Знает кому, какие зависимости нужны и откуда их взять. Избавляет нас от необходимости думать, как создать объект. Если нужно в одной транзакции изменить несколько моделей, т.е. вне границ репозитория домена, лучше создайте еще один репозиторий для этого.
Линтер слоев
Есть линтер, который проверяет направление зависимостей, настроенных в конфиге layers.toml. Запускается через
layers-linter project
Не создавайте глобальные объекты
глобальные объекты это топ антипаттерн, далующий программу крайне плохой (TODO: написать подробнее почему).
Вместо них создавайте объекты с ленивой инициализацией (в момент реального использования объекта)
Самый простой и частый вариант это создать фнукцию с кешом. Там где используете клиент, вы вызываете эту функцию и получаете объект, это позволяет отложить инициализацию до момента реально использования.
from functools import cache
from langchain_openai import ChatOpenAI
from project.settings import Settings
@cache
def client():
return ChatOpenAI(
api_key=Settings().LLM_API_KEY.get_secret_value(),
)
async def llm_logic():
result = await client().ainvoke()
Если в тестах нужно инициализировать объект с другими параметрами, тогда можете использовать
LazyInit из structures.py, он предоставляют механизм ленивой инициализации и
контекстный менеджер для инициализации объекта с другими параметрами.
Пример такого использования для класса Settings, который под капотом получает переменные окружения, а в тестах мы хотим заменять переменные окружения.
# ✅ ПРАВИЛЬНО - объявление класса
class MyServiceClass:
def __init__(self, param):
self.param = param
def do_something(self):
return self.param
MyService = LazyInit(MyServiceClass, kwargs_func=lambda: {"param": Settings().PARAM})
# ❌ НЕПРАВИЛЬНО - инициализация в глобальной области
result = MyService().do_something()
# ❌ НЕПРАВИЛЬНО - сохранение экземпляра в глобальной области
my_service = MyService() # ❌ ПЛОХО
my_service.do_something()
# ✅ ПРАВИЛЬНО - сохранение в атрибутах экземпляра допускается, потому что экземпляр создается лениво.
class MyClass:
def __init__(self):
self.adapter = GitLabAdapter()
# ✅ ПРАВИЛЬНО - Вызов внутри функции будет инициализироваться лениво
def myfunc():
x = MyService().do_something()
Запуск линтеров
- ruff check --fix (линтер)
- ruff format (форматирование)
- di-linter (проверка инъекций зависимостей)
- la-linter (проверка архитектурных слоев)
- обновление requirements.txt через UV
Проверка слоистой архитектуры с layers-linter
Проект использует layers-linter для автоматической проверки соблюдения архитектурных границ между слоями.
Конфигурация находится в файле layers.toml, где определены:
- Слои приложения (dicontainer, usecases, services, repo, orm, adapters и т.д.)
- Модули, которые входят в каждый слой
- Направление зависимостей между слоями (перечисление разрешенных слоев для импорта для каждого слоя)
- Ограничения на использование некоторых внешних библиотек в слоях
Запуск проверки:
layers-linter project
Линтер анализирует импорты в коде и выявляет нарушения архитектурных границ:
- Когда модуль из одного слоя импортирует модуль из слоя, от которого ему не разрешено зависеть
- Когда модуль использует внешнюю библиотеку, которую для него не разрешено использовать
Это помогает поддерживать чистоту архитектуры и предотвращает появление нежелательных зависимостей между слоями.
Мониторинг
Есть готовые настроенные дашборды для метрик собираемых в этом проекте через Prometheus. Можно узнать у коллег разработчиков и девопсов. Также на эти метрики в дашбордах можно настроить алерты в телеграм
Для критичных секций кода. Для Telegram обработчиков и callback если это telegram бот.
Отслеживание времени и статуса выполнения секций кода
Для этого есть контекстный менеджер и декоратор.
Чтобы эти примитивы смогли отследить возниклования exception, внутри этих контекстного менеджера и декоратора не должны подавляться exception.
from llm_common.prometheus import action_tracking, action_tracking_decorator
# Использование контекст-менеджера
with action_tracking("data_processing") as tracker:
# Ваш код
processed_data = process_data()
# Опционально: трекинг размера данных
tracker.size(len(processed_data))
# Опционально: зафиксировать, как ошибку
tracker.to_fail()
# Использование декоратора
@action_tracking_decorator("myfeature_llm_call")
async def make_llm_request():
# Ваш код
return result
Боты Telegram
Хорошей практикой является отслеживания всех хендлеров.
Применяйется на хендлеры и обработки callback кнопок декоратор или контекстный менеджеры action_tracking и action_tracking_decorator В качестве имени указывайте суффикс "_handler" action_tracking(name="menu_handler"), это позволит офильтровать на графике только метрики для хэндлеров
Именование отслеживаемых action
Для обработчиков Telegram, суффикс "_handler" Для регулярных задач, суффикс "_task" Для вызовов llm, суффикс "_llm_call" Для запуска агента llm, суффикс "_agent"
Разделитель для имен: "_"
📖 API Документация
action_tracking(name: str)
Контекст-менеджер для отслеживания действий:
- Автоматически измеряет время выполнения
- Подсчитывает успешные и ошибочные выполнения
- Позволяет трекить размер обработанных данных
action_tracking_decorator(name: str)
Декоратор для функций и корутин, поддерживает все возможности action_tracking.
🔍 Метрики и мониторинг
Доступные метрики
Все метрики имеют префикс genapp_:
HTTP метрики:
genapp_http_requests_total- Общее количество HTTP запросовgenapp_http_request_duration_sec- Гистограмма времени выполненияgenapp_http_request_size_bytes- Размер запросов/ответов
Метрики действий:
genapp_action_count_total- Количество выполненных действийgenapp_action_duration_sec- Время выполнения действийgenapp_action_size_total- Размер обработанных данных
Labels (теги)
Метрики содержат labels:
- http_requests_total → method, status, resource, app_type, env, app
- http_request_duration_sec → method, status, resource, app_type, env, app
- http_request_size_bytes → resource, status, method, direction, app_type, env, app
- action_count_total → name, status, env, app
- action_duration_sec → name, env, app
- action_size_total → name, env, app
Про утилиты
- Минималистичный конечный автомат fsm.py (машина состояний, FSM). TODO: Добавить примеры использования
TODO: описать другие утилиты
Про библиотеки
Alembic
- Alembic для создания миграций базы данных
uvloop
Асинхронные приожения запускаются через uvloop. Он гораздо быстрее.
Линтеры
про них написано в linters.md
python-telegram-bot
Используется для создания ботов
FastAPI
Используется для создания API
TODO: добавить про другие библиотеки
Правила использования переменных окружения через project.settings.Settings
TODO: рассказать про SecretStr
Правильное использование Settings
Объект Settings в файле project/settings.py реализован через LazyInit
и должен использоваться только через вызов класса Settings().param_name.
Это обеспечивает правильную работу ленивой инициализации, потокобезопасность и возможность динамического переопределения настроек для тестирования.
1. Ленивая инициализация
LazyInit создает экземпляр настроек только при первом обращении
и переиспользует его в рамках одного контекста выполнения. Это обеспечивает:
- Отложенную загрузку конфигурации!
- Экономию памяти
- Переопределять настройки для тестов через
Settings.local(**kwargs)
❌ АНТИПАТТЕРНЫ - что НЕ надо делать
1. Создание локальных переменных
# ❌ НЕПРАВИЛЬНО
settings = Settings()
model_name = settings.LLM_MODEL_NAME
Почему плохо:
- Нарушает паттерн ленивой инициализации!
- Создает ненужную ссылку на объект
- Может привести к использованию устаревшего экземпляра при изменении контекста
- Усложняет тестирование с переопределением настроек
2. Использование в аргументах функций
# ❌ НЕПРАВИЛЬНО
def process_data(settings=Settings()):
return settings.MAX_TOKENS
# ❌ НЕПРАВИЛЬНО
def create_agent(config: SettingsValidator = Settings()):
pass
Почему плохо:
- Создает экземпляр на момент определения функции, а не вызова
- Нарушает принцип единственного источника истины
- Делает функцию менее тестируемой
- Может привести к использованию устаревших настроек
3. Сохранение в атрибутах класса
# ❌ НЕПРАВИЛЬНО
class MyClass:
def __init__(self):
self.settings = Settings() # ❌ ПЛОХО
def process(self):
return self.settings.MAX_TOKENS
Почему плохо:
- Фиксирует экземпляр настроек на момент создания объекта!
- Препятствует динамическому изменению настроек
- Усложняет тестирование с мокированием настроек
Тестирование с переопределением настроек
# ✅ ПРАВИЛЬНО для тестов
def some_function():
print(Settings().MAX_TOKENS)
with Settings.local(MAX_TOKENS=1000, TEMPERATURE=0.5):
result = some_function()
# В этом контексте some_function и Settings().MAX_TOKENS вернет 1000
Переменные окружение проекта
import os
import typing as t
from enum import Enum
from pathlib import Path
from pydantic import PostgresDsn, AfterValidator, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
from project.libs.structures import LazyInit
def not_empty_validator(value):
if not value:
error_msg = "Field cannot be empty"
raise ValueError(error_msg)
return value
NotEmptyStrT = t.Annotated[str, AfterValidator(not_empty_validator)]
NotEmptySecretStrT = t.Annotated[SecretStr, AfterValidator(not_empty_validator)]
class Constants:
MONITORING_APP_NAME = ""
API_ROOT_PATH = os.getenv("API_ROOT_PATH", "/api")
class Envs(Enum):
PROD = "PROD" # to work at a prod stand
LAMBDA = "LAMBDA" # to work at a stable stand
SANDBOX = "SANDBOX" # to work on a test stand
TEST = "AUTOTEST" # for run testing
LOCAL = "LOCAL" # for local development
class SettingsValidator(BaseSettings):
# Application
ENV: Envs = Envs.PROD
API_TOKEN: NotEmptySecretStrT
HISTORY_WINDOW: int = 20
# Keycloak
KEYCLOAK_URL: str = ""
KEYCLOAK_CLIENT_ID: str = ""
KEYCLOAK_USERNAME: str = ""
KEYCLOAK_PASSWORD: SecretStr | None = None
# Auth service
BOT_AUTH_SERVICE_URL: str = ""
# Database
SQLALCHEMY_DATABASE_DSN: PostgresDsn # Example: postgresql+psycopg2://user:password@localhost:5432/database
DATABASE_PRE_PING: t.Annotated[bool, "Checks and creates connection if closed before requesting"] = False
# Telegram
TELEGRAM_BOT_TOKEN: NotEmptySecretStrT
TELEGRAM_BASE_URL: str = ""
TELEGRAM_FILE_BASE_URL: str = ""
# Redis
REDIS_HOST: str = ""
REDIS_PORT: str = ""
REDIS_DB: str = ""
# LLM
LLM_MODEL: NotEmptyStrT
LLM_API_KEY: NotEmptySecretStrT
LLM_MIDDLE_PROXY_URL: str = ""
LLM_TEMPERATURE: float = 0.3
LLM_MAX_TOKENS: int = 8192
LLM_TIMEOUT: float | None = None
# Langfuse
LANGFUSE_TRACING_ENABLED: bool = "false"
LANGFUSE_PUBLIC_KEY: str | None = None
LANGFUSE_SECRET_KEY: str | None = None
LANGFUSE_HOST: str | None = None
# Logging
WRITE_LOGS_TO_FILE: bool = False
LOG_LEVEL: str = "INFO"
FASTAPI_LOG_LEVEL: str = "INFO"
TELEGRAM_LOG_LEVEL: str = "INFO"
HTTP_REQUESTS_LOG_LEVEL: str = "ERROR"
SQLALCHEMY_LOG_LEVEL: str = "ERROR"
REDIS_LOG_LEVEL: str = "ERROR"
FLASK_LOG_LEVEL: str = "ERROR"
# Loading local settings for development environment.
model_config = SettingsConfigDict(env_file=Path(__file__).parent.parent / ".env", extra="allow")
def is_local(self):
return self.ENV == Envs.LOCAL
def is_production(self):
return self.ENV == Envs.PROD
def is_testable_stand(self):
return self.ENV in (Envs.LAMBDA, Envs.SANDBOX)
def is_any_stand(self):
return self.ENV in (Envs.PROD, Envs.LAMBDA, Envs.SANDBOX)
Settings = LazyInit(SettingsValidator)
Создавайте обработчики телеграма в таком виде.
Не нужно подавлять ошибки внутри обработчика, потому что декоратор processing_errors должен его перехватить и он является предпочтительным местом обработчик ошибок, являясь централизованным местом обработки ошибок, чтоб избежать дублирования кода по обработке ошибок в каждом обработчике.
{Объяснить каждый декоратор для телеграмма из project/infrastructure/utils/telegram.py}
Порядок декораторов важен!
@check_auth
@timeout_with_retry
@processing_errors
@action_tracking_decorator("start_handler")
async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
message = f"Привет {user_id}! Это пример обработчика Телеграм!"
await update.message.reply_text(message)
Доменно-специфичные типы.
Не используйте в аннотациях примитивные типы str, int и т.п. Вместо этого создавайте типы для каждой сущности с именем, которое будет отражать сущность этого объекта. Код с такими типами лучше читается, устраняет двусмысленность и устраняет риск перепутать объекты.
Пример:
import typing as t
UserIdT = t.NewType("UserIdT", t.Annotated[int, "User ID"])
Такие типы размещаются в datatypes.py
Список всех таких типов в проекте:
import typing as t
UserIdT = t.NewType("UserIdT", t.Annotated[int, "User ID"])
ChatIdT = t.NewType("ChatIdT", t.Annotated[int, "Chat ID"])
MessageIdT = t.NewType("MessageIdT", t.Annotated[int, "Message ID"])
AnswerT = t.NewType("AnswerT", t.Annotated[str, "Ответ пользователю"])
QuestionT = t.NewType("QuestionT", t.Annotated[str, "Вопрос пользвоателя"])