name: manage-conversation-db description: This skill should be used when implementing stateless conversation persistence, creating/loading conversations by ID, saving user/assistant messages, fetching history for agent input, and handling async database queries.
Manage Conversation DB Skill
This skill provides guidance for implementing conversation persistence in the database.
Purpose
Handle stateless conversation persistence:
- Create/load conversation by ID
- Save user/assistant messages with role and content
- Fetch history for agent input
- Async queries for performance
When to Use
Use this skill when:
- Implementing conversation storage layer
- Building message history retrieval
- Creating conversation management utilities
- Setting up async database operations for chat
Capabilities
- Conversation Lifecycle: Create new or load existing conversations
- Message Storage: Save messages with role, content, and metadata
- History Retrieval: Fetch conversation messages in chronological order
- Async Operations: Non-blocking database queries
- User Isolation: Conversations tied to user_id
Database Schema
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import Optional, List
class Conversation(SQLModel, table=True):
id: str = Field(primary_key=True)
user_id: str = Field(index=True)
title: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
messages: List["Message"] = Relationship(back_populates="conversation")
class Message(SQLModel, table=True):
id: int = Field(primary_key=True, autoincrement=True)
conversation_id: str = Field(foreign_key="conversation.id", index=True)
role: str = Field(index=True) # "user", "assistant", "tool"
content: str
tool_name: Optional[str] = None
tool_call_id: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
conversation: Optional[Conversation] = Relationship(back_populates="messages")
Implementation Pattern
Conversation Service
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, desc
class ConversationService:
def __init__(self, session: AsyncSession):
self.session = session
async def get_or_create_conversation(
self,
conversation_id: Optional[str],
user_id: str
) -> Conversation:
"""Get existing or create new conversation."""
if conversation_id:
result = await self.session.execute(
select(Conversation).where(
Conversation.id == conversation_id,
Conversation.user_id == user_id
)
)
conversation = result.scalar_one_or_none()
if conversation:
return conversation
# Create new conversation
conversation = Conversation(
id=conversation_id or str(uuid.uuid4()),
user_id=user_id,
title="New Chat"
)
self.session.add(conversation)
await self.session.commit()
await self.session.refresh(conversation)
return conversation
async def save_message(
self,
conversation_id: str,
role: str,
content: str,
tool_name: Optional[str] = None,
tool_call_id: Optional[str] = None
) -> Message:
"""Save a message to the conversation."""
message = Message(
conversation_id=conversation_id,
role=role,
content=content,
tool_name=tool_name,
tool_call_id=tool_call_id
)
self.session.add(message)
await self.session.commit()
await self.session.refresh(message)
return message
async def get_conversation_history(
self,
conversation_id: str,
user_id: str,
limit: int = 50
) -> List[Message]:
"""Fetch conversation messages for agent input."""
result = await self.session.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.asc())
.limit(limit)
)
messages = result.scalars().all()
# Verify user owns this conversation
conv_result = await self.session.execute(
select(Conversation).where(Conversation.id == conversation_id)
)
conversation = conv_result.scalar_one_or_none()
if not conversation or conversation.user_id != user_id:
raise PermissionError("Conversation not found")
return messages
async def save_conversation_messages(
self,
conversation_id: str,
user_id: str,
messages: List[dict]
) -> None:
"""Save multiple messages atomically."""
for msg in messages:
await self.save_message(
conversation_id=conversation_id,
role=msg["role"],
content=msg["content"],
tool_name=msg.get("tool_name"),
tool_call_id=msg.get("tool_call_id")
)
# Update conversation timestamp
await self.session.execute(
select(Conversation)
.where(Conversation.id == conversation_id)
)
# ... update updated_at
Helper Functions
async def build_message_array(
db_messages: List[Message],
user_message: str
) -> List[dict]:
"""Convert DB messages to agent input format."""
message_array = [{"role": "system", "content": SYSTEM_PROMPT}]
for msg in db_messages:
message_array.append({
"role": msg.role,
"content": msg.content
})
message_array.append({
"role": "user",
"content": user_message
})
return message_array
def message_to_dict(message: Message) -> dict:
"""Convert Message to dictionary."""
return {
"id": message.id,
"role": message.role,
"content": message.content,
"tool_name": message.tool_name,
"tool_call_id": message.tool_call_id,
"created_at": message.created_at.isoformat()
}
Async Session Management
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def get_db_session():
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Verification Checklist
- Conversations can be created and loaded by ID
- Messages saved with correct role and content
- History retrieved in chronological order
- User isolation enforced on conversations
- Async queries work without blocking
- Tool calls stored with metadata