Week 1: Parallel Agent Execution Plan
Overview
Launch 10 independent agents to build the cognitive memory schema foundation. All agents work in the same worktree with file-level isolation.
Epic: cognitive-memory-schema
Worktree: ../epic-cognitive-memory
Branch: epic/cognitive-memory
Infrastructure
Canonical Truth: Supabase Storage (JSON files) Graph Index: Neo4j Aura Free Authentication: Supabase Auth Session State: Supabase Postgres (sessions, prompts, sync metadata only)
Pre-Launch Checklist
# 1. Ensure main is up to date
git checkout main
git pull origin main
# 2. Create worktree with branch
git worktree add ../epic-cognitive-memory -b epic/cognitive-memory
# 3. Push branch to remote
cd ../epic-cognitive-memory
git push -u origin epic/cognitive-memory
# 4. Verify worktree
git worktree list
External Services Setup
Supabase:
- Create project at https://supabase.com
- Create storage bucket named
canonical(private) - Note: Project URL, Anon Key, Service Role Key
Neo4j Aura:
- Create free instance at https://neo4j.com/cloud/aura-free/
- Note: Connection URI, Username, Password
Agent Assignments
Agent 1: Preference Memory Schemas
Scope: Create preference and core values schemas as Pydantic models
Files:
backend/memory/schemas/preference.py(NEW)
Deliverables:
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from backend.memory.schemas.base import CanonicalRecord, RefLink
class PreferenceRecord(CanonicalRecord):
"""User preferences - likes, dislikes, habits. Serializes to JSON in Storage."""
type_: str = Field(alias='@type', default='Preference')
preference_type: str # like, dislike, value, habit, aversion
category: str # food, communication, work, leisure, relationships
subject: str # what the preference is about
description: str # detailed description
strength: str # strong, moderate, mild
confidence: float # 0.0-1.0
source_type: str # stated, inferred, observed
is_verified: bool # user confirmed
last_accessed_at: Optional[datetime]
access_count: int = 0
claims: List[RefLink] = []
evidence: List[RefLink] = []
embedding: Optional[List[float]] = None
class CoreValueRecord(CanonicalRecord):
"""User's core values and priorities. Serializes to JSON in Storage."""
type_: str = Field(alias='@type', default='CoreValue')
value_name: str # honesty, family, achievement, creativity
description: str
priority_rank: int # 1 = highest priority
confidence: float
source_type: str
is_verified: bool
supporting_evidence: List[RefLink] = []
claims: List[RefLink] = []
embedding: Optional[List[float]] = None
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the Preference memory layer schemas following the Technical Spec.
1. Read the base schema for patterns:
- backend/memory/schemas/base.py (CanonicalRecord base class)
2. Create backend/memory/schemas/preference.py with:
- PreferenceRecord model (likes, dislikes, habits)
- CoreValueRecord model (user's core values)
- All fields from Technical Spec
- Pydantic model that serializes to JSON-LD format
- RefLink references for claims and evidence
- Optional embedding field for vector indexing
3. Update backend/memory/schemas/__init__.py to export new schemas
4. Commit: "Add Preference memory layer schemas"
Agent 2: Decision Memory Schemas
Scope: Create decision tracking schemas as Pydantic models
Files:
backend/memory/schemas/decision.py(NEW)
Deliverables:
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from backend.memory.schemas.base import CanonicalRecord, RefLink
class DecisionRecord(CanonicalRecord):
"""Records of user decisions with context and reasoning."""
type_: str = Field(alias='@type', default='Decision')
decision_date: datetime
domain: str # career, financial, personal, health, relationships
summary: str # brief description
context: str # situation that led to decision
decision_made: str # what was decided
alternatives_considered: Dict[str, Any] = {}
reasoning: str # why this was chosen
outcome: Optional[str] # what happened
outcome_evaluation: Optional[str] # positive, negative, neutral, mixed
lessons_learned: Optional[str]
significance: str # major, moderate, minor
confidence: float
source_type: str
is_verified: bool
claims: List[RefLink] = []
evidence: List[RefLink] = []
embedding: Optional[List[float]] = None
class DecisionPatternRecord(CanonicalRecord):
"""Patterns identified across multiple decisions."""
type_: str = Field(alias='@type', default='DecisionPattern')
domain: str
pattern_name: str
pattern_description: str
supporting_decisions: List[RefLink] = [] # Decision URN references
confidence: float
first_observed: datetime
last_observed: datetime
occurrence_count: int
claims: List[RefLink] = []
evidence: List[RefLink] = []
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the Decision memory layer schemas following the Technical Spec.
1. Read the base schema for patterns:
- backend/memory/schemas/base.py (CanonicalRecord base class)
2. Create backend/memory/schemas/decision.py with:
- DecisionRecord model (individual decisions)
- DecisionPatternRecord model (patterns across decisions)
- Dict field for alternatives_considered
- RefLink references for related decisions
- Pydantic model that serializes to JSON-LD format
3. Update backend/memory/schemas/__init__.py to export new schemas
4. Commit: "Add Decision memory layer schemas"
Agent 3: Semantic Knowledge Schemas
Scope: Create semantic memory schemas for facts and knowledge domains
Files:
backend/memory/schemas/semantic.py(NEW)
Deliverables:
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from backend.memory.schemas.base import CanonicalRecord, RefLink
class SemanticKnowledgeRecord(CanonicalRecord):
"""Facts and knowledge the user possesses."""
type_: str = Field(alias='@type', default='SemanticKnowledge')
domain: str # technology, finance, cooking, music, etc.
topic: str # specific topic within domain
fact: str # the knowledge statement
fact_type: str # definition, relationship, procedure, rule
confidence: float
source_type: str # learned, inferred, stated, extracted
is_verified: bool
related_knowledge: List[RefLink] = [] # URN references to related knowledge
last_accessed_at: Optional[datetime]
access_count: int = 0
claims: List[RefLink] = []
evidence: List[RefLink] = []
embedding: Optional[List[float]] = None
class KnowledgeDomainRecord(CanonicalRecord):
"""Aggregated view of user expertise in a domain."""
type_: str = Field(alias='@type', default='KnowledgeDomain')
domain_name: str
description: str
expertise_level: str # novice, familiar, proficient, expert
knowledge_count: int
first_learned: datetime
last_updated: datetime
confidence: float
knowledge_items: List[RefLink] = [] # URN references to SemanticKnowledge
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the Semantic memory layer schemas following the Technical Spec.
1. Read the base schema for patterns:
- backend/memory/schemas/base.py (CanonicalRecord base class)
2. Create backend/memory/schemas/semantic.py with:
- SemanticKnowledgeRecord model (individual facts)
- KnowledgeDomainRecord model (expertise areas)
- RefLink references for related knowledge
- Temporal dynamics fields (access tracking)
- Pydantic model that serializes to JSON-LD format
3. Update backend/memory/schemas/__init__.py to export new schemas
4. Commit: "Add Semantic memory layer schemas"
Agent 4: Autobiographical Narrative Schemas
Scope: Create narrative synthesis schemas
Files:
backend/memory/schemas/narrative.py(NEW)
Deliverables:
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from backend.memory.schemas.base import CanonicalRecord, RefLink
class AutobiographicalNarrativeRecord(CanonicalRecord):
"""Synthesized identity narrative from all memory layers."""
type_: str = Field(alias='@type', default='AutobiographicalNarrative')
version: int # increments with each update
identity_summary: str # "Who am I?" synthesis
life_chapters: Dict[str, Any] = {} # major life periods
key_relationships: Dict[str, Any] = {} # important people
core_values: Dict[str, Any] = {} # derived from CoreValue
defining_experiences: Dict[str, Any] = {} # formative events
current_focus: str # what user is focused on now
aspirations: str # future goals
challenges: str # current struggles
is_current: bool # most recent version
generated_at: datetime
source_data_cutoff: datetime # data up to this point
confidence: float
chapter_refs: List[RefLink] = [] # URN references to NarrativeChapter
class NarrativeChapterRecord(CanonicalRecord):
"""Individual life chapter within narrative."""
type_: str = Field(alias='@type', default='NarrativeChapter')
narrative_ref: RefLink # URN reference to parent narrative
chapter_name: str
time_period: str # e.g., "2020-2023"
start_date: datetime
end_date: Optional[datetime] # nullable for current
summary: str
key_events: List[RefLink] = [] # URN references to Episodes
key_people: List[RefLink] = [] # URN references to Persons
themes: List[str] = []
lessons: str
order_index: int
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the Autobiographical Narrative schemas following the Technical Spec.
1. Read the base schema for patterns:
- backend/memory/schemas/base.py (CanonicalRecord base class)
2. Create backend/memory/schemas/narrative.py with:
- AutobiographicalNarrativeRecord model (identity synthesis)
- NarrativeChapterRecord model (life periods)
- Dict fields for complex nested data
- Version tracking for narrative updates
- RefLink references to Person, Episode records
- Pydantic model that serializes to JSON-LD format
3. Update backend/memory/schemas/__init__.py to export new schemas
4. Commit: "Add Autobiographical Narrative schemas"
Agent 5: Reasoning Pattern Schemas
Scope: Create reasoning and communication profile schemas
Files:
backend/memory/schemas/reasoning.py(NEW)
Deliverables:
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from backend.memory.schemas.base import CanonicalRecord, RefLink
class ReasoningPatternRecord(CanonicalRecord):
"""Patterns in how user thinks and solves problems."""
type_: str = Field(alias='@type', default='ReasoningPattern')
pattern_type: str # argumentation, problem_solving, risk_assessment
pattern_name: str
description: str
examples: List[str] = [] # example instances
counter_examples: List[str] = [] # exceptions
confidence: float
source_type: str
is_verified: bool
first_observed: datetime
last_observed: datetime
occurrence_count: int
claims: List[RefLink] = []
evidence: List[RefLink] = []
class CommunicationProfileRecord(CanonicalRecord):
"""User's communication style and preferences."""
type_: str = Field(alias='@type', default='CommunicationProfile')
formality_level: str # formal, casual, adaptive
directness: str # direct, diplomatic, balanced
detail_preference: str # high_detail, summary, context_dependent
tone: List[str] = [] # warm, professional, analytical
vocabulary_level: str # simple, moderate, advanced, technical
preferred_channels: List[str] = [] # email, chat, call
response_speed: str # quick, thoughtful, varies
confidence: float
is_verified: bool
claims: List[RefLink] = []
evidence: List[RefLink] = []
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the Reasoning Pattern schemas following the Technical Spec.
1. Read the base schema for patterns:
- backend/memory/schemas/base.py (CanonicalRecord base class)
2. Create backend/memory/schemas/reasoning.py with:
- ReasoningPatternRecord model (thinking patterns)
- CommunicationProfileRecord model (communication style)
- List fields for examples and preferences
- Temporal tracking for pattern observation
- Pydantic model that serializes to JSON-LD format
3. Update backend/memory/schemas/__init__.py to export new schemas
4. Commit: "Add Reasoning Pattern schemas"
Agent 6: LLM Abstraction Layer
Scope: Create LLM provider abstraction for cloud + local support
Files:
backend/llm/__init__.py(NEW directory init)backend/llm/provider.py(NEW)backend/llm/providers/__init__.py(NEW)backend/llm/providers/openai.py(NEW)backend/llm/providers/ollama.py(NEW)
Deliverables:
# provider.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, AsyncIterator
class LLMProvider(ABC):
"""Abstract base class for LLM providers."""
@abstractmethod
async def generate(
self,
prompt: str,
system_prompt: str = None,
temperature: float = 0.7,
max_tokens: int = 1000,
) -> str:
"""Generate a completion."""
pass
@abstractmethod
async def generate_stream(
self,
prompt: str,
system_prompt: str = None,
temperature: float = 0.7,
max_tokens: int = 1000,
) -> AsyncIterator[str]:
"""Stream a completion."""
pass
@abstractmethod
async def embed(self, text: str) -> List[float]:
"""Generate embeddings."""
pass
def get_llm_provider(provider: str = None) -> LLMProvider:
"""Factory to get configured LLM provider."""
pass
# providers/openai.py
class OpenAIProvider(LLMProvider):
"""OpenAI API implementation."""
pass
# providers/ollama.py
class OllamaProvider(LLMProvider):
"""Ollama local model implementation."""
pass
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the LLM abstraction layer for hybrid cloud/local support.
1. Read existing config for patterns:
- backend/config.py (settings patterns)
2. Create directory structure:
- backend/llm/__init__.py
- backend/llm/providers/__init__.py
3. Create backend/llm/provider.py with:
- Abstract LLMProvider base class
- generate() - single completion
- generate_stream() - streaming completion
- embed() - embeddings (for vector indexing)
- Factory function get_llm_provider()
4. Create backend/llm/providers/openai.py with:
- OpenAIProvider class
- Uses openai async client
- Reads API key from config
5. Create backend/llm/providers/ollama.py with:
- OllamaProvider class
- HTTP calls to local Ollama server
- Configurable model and host
6. Update backend/config.py to add:
- llm_provider: str (openai, ollama)
- ollama_host: str
- ollama_model: str
7. Commit: "Add LLM abstraction layer with OpenAI and Ollama support"
Agent 7: Frontend Shell
Scope: Create chat and dashboard page layouts with mock data
Files:
frontend/src/pages/chat.tsx(NEW)frontend/src/pages/dashboard.tsx(MODIFY - add memory stats)frontend/src/components/chat/ChatMessage.tsx(NEW)frontend/src/components/chat/EvidenceCard.tsx(NEW)
Deliverables:
// chat.tsx - Main chat interface
export default function ChatPage() {
// Message list with evidence citations
// Input field with send button
// Streaming response display
// Evidence cards (expandable sources)
}
// ChatMessage.tsx - Individual message component
export function ChatMessage({ role, content, sources }) {
// User/Assistant message styling
// Source citations inline
// Expandable evidence cards
}
// EvidenceCard.tsx - Source evidence display
export function EvidenceCard({ source }) {
// Source type icon
// Title and snippet
// Confidence indicator
// Link to original
}
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the frontend chat and dashboard shell with mock data.
1. Read existing frontend patterns:
- frontend/src/pages/onboarding.tsx (page patterns)
- frontend/src/components/ (component patterns)
2. Create frontend/src/pages/chat.tsx with:
- Full-height chat layout
- Message list component
- Input field with send button
- Mock conversation data
- Evidence cards for sources
- Streaming response placeholder
3. Create frontend/src/components/chat/:
- ChatMessage.tsx - message display
- EvidenceCard.tsx - source citation card
- ChatInput.tsx - input with send
4. Update frontend/src/pages/dashboard.tsx to add:
- Memory stats cards (People, Events, Knowledge)
- Quick query input
- Recent prompts section (mock)
5. Add routes in App.tsx for /chat
6. Commit: "Add chat and dashboard UI shell"
Agent 8: Canonical Base Types
Scope: Create base Pydantic models and utility functions for canonical JSON-LD format
Files:
backend/memory/__init__.py(NEW directory init)backend/memory/schemas/__init__.py(NEW directory init)backend/memory/schemas/base.py(NEW)backend/memory/schemas/evidence.py(NEW)backend/memory/schemas/claim.py(NEW)
Deliverables:
# base.py
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from uuid import uuid4
SCHEMA_CONTEXT = "https://continuum.dev/schema/v1"
def create_urn(record_type: str, record_id: str = None) -> str:
"""Create a URN identifier for a record."""
if record_id is None:
record_id = str(uuid4())
return f"urn:continuum:{record_type.lower()}:{record_id}"
def extract_id_from_urn(urn: str) -> str:
"""Extract the ID portion from a URN."""
return urn.split(':')[-1]
class RefLink(BaseModel):
"""Reference link to another record."""
ref: str = Field(alias='@ref')
class Config:
populate_by_name = True
class EvidenceRefLink(RefLink):
"""Reference link to evidence with type."""
type: str
class CanonicalRecord(BaseModel):
"""Base class for all canonical memory records."""
context: str = Field(alias='@context', default=SCHEMA_CONTEXT)
type_: str = Field(alias='@type')
id_: str = Field(alias='@id')
user_id: str
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
version: int = 1
class Config:
populate_by_name = True
def to_storage_path(self) -> str:
"""Generate the storage path for this record."""
record_type = self.type_.lower()
record_id = extract_id_from_urn(self.id_)
return f"users/{self.user_id}/memories/{record_type}/{record_id}.json"
# evidence.py
class EvidenceRecord(CanonicalRecord):
"""Link to source data that supports a claim."""
type_: str = Field(alias='@type', default='Evidence')
source_type: str # email, calendar, contact, document, user_input
source_integration: str # google, microsoft, manual
source_id: str # ID from the source system
raw_data_ref: str # Path to raw file in storage
extracted_text: Optional[str]
occurred_at: datetime
metadata: dict = {}
# claim.py
class ClaimRecord(CanonicalRecord):
"""An atomic assertion about a memory with evidence."""
type_: str = Field(alias='@type', default='Claim')
claim_type: str # fact, relationship, preference, event, decision, pattern
subject_type: str # Person, Episode, etc.
subject_id: str # URN of the subject
claim_text: str # The assertion
confidence: float # 0.0-1.0
verification_status: str # inferred, suggested, verified, rejected, corrected
evidence: List[EvidenceRefLink] = []
inference_chain: Optional[str] # How this was derived
verified_at: Optional[datetime]
verified_by: Optional[str] # user, system
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the canonical base types that all memory schemas inherit from.
1. Create directory structure:
- backend/memory/__init__.py
- backend/memory/schemas/__init__.py
2. Create backend/memory/schemas/base.py with:
- SCHEMA_CONTEXT constant
- create_urn() function to generate URN identifiers
- extract_id_from_urn() function
- RefLink model for references between records
- EvidenceRefLink model with type field
- CanonicalRecord base class with:
- @context, @type, @id fields (JSON-LD)
- user_id, created_at, updated_at, version
- to_storage_path() method
- Pydantic config for JSON-LD serialization
3. Create backend/memory/schemas/evidence.py with:
- EvidenceRecord model for source data links
4. Create backend/memory/schemas/claim.py with:
- ClaimRecord model for provenance assertions
5. Commit: "Add canonical base types for JSON-LD serialization"
Agent 9: Storage Abstraction Service
Scope: Create storage abstraction layer for Supabase Storage with future S3 migration support
Files:
backend/storage/__init__.py(NEW directory init)backend/storage/interface.py(NEW)backend/storage/supabase.py(NEW)backend/storage/s3.py(NEW - stub for future)
Deliverables:
# interface.py
from abc import ABC, abstractmethod
from typing import List, AsyncGenerator, Optional, TypeVar, Type
from backend.memory.schemas.base import CanonicalRecord
T = TypeVar('T', bound=CanonicalRecord)
class StorageService(ABC):
"""Abstract interface for canonical storage. Swappable between Supabase and S3."""
@abstractmethod
async def write_memory(self, record: CanonicalRecord) -> str:
"""Write a memory record to storage. Returns storage path."""
pass
@abstractmethod
async def read_memory(
self,
user_id: str,
memory_type: str,
memory_id: str,
schema_class: Type[T]
) -> Optional[T]:
"""Read a memory record from storage."""
pass
@abstractmethod
async def list_memories(
self,
user_id: str,
memory_type: str
) -> List[str]:
"""List all memory IDs of a type for a user."""
pass
@abstractmethod
async def delete_memory(
self,
user_id: str,
memory_type: str,
memory_id: str
) -> bool:
"""Delete a memory record."""
pass
@abstractmethod
async def write_event_log(
self,
user_id: str,
event: dict
) -> str:
"""Write to the immutable event log."""
pass
@abstractmethod
async def export_user_data(
self,
user_id: str
) -> AsyncGenerator[dict, None]:
"""Export all user data as JSON objects."""
pass
def get_storage_service() -> StorageService:
"""Factory to get configured storage service."""
pass
# supabase.py
from supabase import create_client, Client
from backend.storage.interface import StorageService
class SupabaseStorageService(StorageService):
"""Supabase Storage implementation."""
def __init__(self, supabase_url: str, supabase_key: str, bucket: str = "canonical"):
self.client: Client = create_client(supabase_url, supabase_key)
self.bucket = bucket
async def write_memory(self, record: CanonicalRecord) -> str:
path = record.to_storage_path()
data = record.model_dump_json(by_alias=True, indent=2)
self.client.storage.from_(self.bucket).upload(
path,
data.encode(),
{"content-type": "application/json", "upsert": "true"}
)
# Write event log entry
await self._write_event(record.user_id, "MemoryCreated", record)
return path
# ... implement all abstract methods
# s3.py (stub for future migration)
class S3StorageService(StorageService):
"""AWS S3 implementation - for future migration."""
def __init__(self, bucket: str, region: str = "us-east-1"):
# To be implemented when migrating to S3
raise NotImplementedError("S3 storage not yet implemented")
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the storage abstraction layer for canonical JSON files.
1. Create directory structure:
- backend/storage/__init__.py
2. Create backend/storage/interface.py with:
- StorageService abstract base class
- write_memory() - write record to storage
- read_memory() - read record from storage
- list_memories() - list all records of a type
- delete_memory() - delete a record
- write_event_log() - append to event log
- export_user_data() - export all user data
- get_storage_service() factory function
3. Create backend/storage/supabase.py with:
- SupabaseStorageService implementation
- Uses Supabase Storage client
- Writes JSON with proper content-type
- Implements event logging
- Handles paths: users/{user_id}/memories/{type}/{id}.json
4. Create backend/storage/s3.py with:
- S3StorageService stub (raises NotImplementedError)
- Document the migration path to AWS S3
5. Update backend/config.py to add:
- storage_provider: str (supabase, s3)
- supabase_url: str
- supabase_key: str
- storage_bucket: str
6. Commit: "Add storage abstraction layer with Supabase implementation"
Agent 10: Neo4j Graph Service
Scope: Create Neo4j service for graph indexing and Cypher queries
Files:
backend/graph/__init__.py(NEW directory init)backend/graph/service.py(NEW)backend/graph/queries.py(NEW)
Deliverables:
# service.py
from neo4j import AsyncGraphDatabase, AsyncDriver
from typing import List, Dict, Any, Optional
from backend.memory.schemas.base import CanonicalRecord
class Neo4jService:
"""Neo4j graph database service for indexing and queries."""
def __init__(self, uri: str, username: str, password: str):
self.driver: AsyncDriver = AsyncGraphDatabase.driver(
uri, auth=(username, password)
)
async def close(self):
await self.driver.close()
async def setup_indexes(self):
"""Create indexes and constraints."""
async with self.driver.session() as session:
# Node indexes
await session.run("CREATE INDEX user_id IF NOT EXISTS FOR (u:User) ON (u.id)")
await session.run("CREATE INDEX person_user IF NOT EXISTS FOR (p:Person) ON (p.user_id)")
await session.run("CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.canonical_name)")
# Vector indexes (Neo4j 5.11+)
await session.run("""
CREATE VECTOR INDEX person_embeddings IF NOT EXISTS
FOR (p:Person) ON p.embedding
OPTIONS {indexConfig: {
`vector.dimensions`: 1536,
`vector.similarity_function`: 'cosine'
}}
""")
async def index_record(self, record: CanonicalRecord) -> None:
"""Index a canonical record as a graph node."""
record_type = record.type_
properties = record.model_dump(by_alias=True, exclude={'@context'})
async with self.driver.session() as session:
await session.run(
f"""
MERGE (n:{record_type} {{id: $id}})
SET n += $properties
WITH n
MATCH (u:User {{id: $user_id}})
MERGE (u)-[:OWNS]->(n)
""",
id=record.id_,
user_id=record.user_id,
properties=properties
)
async def create_relationship(
self,
from_id: str,
to_id: str,
relationship_type: str,
properties: Dict[str, Any] = None
) -> None:
"""Create a relationship between two nodes."""
async with self.driver.session() as session:
await session.run(
f"""
MATCH (a {{id: $from_id}})
MATCH (b {{id: $to_id}})
MERGE (a)-[r:{relationship_type}]->(b)
SET r += $properties
""",
from_id=from_id,
to_id=to_id,
properties=properties or {}
)
async def query(self, cypher: str, params: Dict[str, Any] = None) -> List[Dict]:
"""Execute a Cypher query and return results."""
async with self.driver.session() as session:
result = await session.run(cypher, params or {})
return [record.data() async for record in result]
async def semantic_search(
self,
user_id: str,
embedding: List[float],
node_type: str,
limit: int = 10
) -> List[Dict]:
"""Search for similar nodes by embedding."""
async with self.driver.session() as session:
result = await session.run(
f"""
MATCH (u:User {{id: $user_id}})-[:OWNS]->(n:{node_type})
WHERE n.embedding IS NOT NULL
WITH n, vector.similarity.cosine(n.embedding, $embedding) AS score
WHERE score > 0.7
RETURN n, score
ORDER BY score DESC
LIMIT $limit
""",
user_id=user_id,
embedding=embedding,
limit=limit
)
return [record.data() async for record in result]
async def rebuild_from_storage(
self,
user_id: str,
records: List[CanonicalRecord]
) -> Dict[str, int]:
"""Rebuild graph index from canonical storage records."""
node_count = 0
rel_count = 0
# Clear existing user data
async with self.driver.session() as session:
await session.run(
"MATCH (u:User {id: $user_id})-[:OWNS]->(n) DETACH DELETE n",
user_id=user_id
)
# Reindex all records
for record in records:
await self.index_record(record)
node_count += 1
return {"nodes": node_count, "relationships": rel_count}
# queries.py - Common query templates
WHO_IS_QUERY = """
MATCH (u:User {id: $user_id})-[:OWNS]->(p:Person)
WHERE toLower(p.canonical_name) CONTAINS toLower($name)
OPTIONAL MATCH (u)-[:OWNS]->(e:Episode)<-[:PARTICIPATED_IN]-(p)
WITH p, e
ORDER BY e.occurred_at DESC
RETURN p, collect(e)[0..5] as recent_episodes
LIMIT 1
"""
HOW_DO_I_KNOW_QUERY = """
MATCH (u:User {id: $user_id})-[:OWNS]->(p:Person {id: $person_id})
MATCH (u)-[r:KNOWS]->(p)
OPTIONAL MATCH (u)-[:OWNS]->(first:Episode)<-[:HOW_MET]-(r)
OPTIONAL MATCH (u)-[:OWNS]->(e:Episode)<-[:PARTICIPATED_IN]-(p)
RETURN p, r, first, collect(e) as all_episodes
"""
Task Prompt:
Working in worktree: ../epic-cognitive-memory/
Branch: epic/cognitive-memory
Create the Neo4j graph service for indexing and Cypher queries.
1. Create directory structure:
- backend/graph/__init__.py
2. Create backend/graph/service.py with:
- Neo4jService class
- Async Neo4j driver connection
- setup_indexes() - create node and vector indexes
- index_record() - index a canonical record as node
- create_relationship() - create edges between nodes
- query() - execute arbitrary Cypher
- semantic_search() - vector similarity search
- rebuild_from_storage() - rebuild index from canonical files
3. Create backend/graph/queries.py with:
- WHO_IS_QUERY - find person by name with episodes
- HOW_DO_I_KNOW_QUERY - relationship with history
- Common query templates as constants
4. Update backend/config.py to add:
- neo4j_uri: str
- neo4j_username: str
- neo4j_password: str
5. Commit: "Add Neo4j graph service with indexing and queries"
Execution Status Template
Create .claude/epics/cognitive-memory/execution-status.md:
---
started: {datetime}
worktree: ../epic-cognitive-memory
branch: epic/cognitive-memory
---
# Execution Status: Cognitive Memory Schema
## Active Agents
| Agent | Task | Files | Status |
|-------|------|-------|--------|
| Agent-1 | Preference Schemas | memory/schemas/preference.py | 🔄 In Progress |
| Agent-2 | Decision Schemas | memory/schemas/decision.py | 🔄 In Progress |
| Agent-3 | Semantic Schemas | memory/schemas/semantic.py | 🔄 In Progress |
| Agent-4 | Narrative Schemas | memory/schemas/narrative.py | 🔄 In Progress |
| Agent-5 | Reasoning Schemas | memory/schemas/reasoning.py | 🔄 In Progress |
| Agent-6 | LLM Abstraction | llm/*.py | 🔄 In Progress |
| Agent-7 | Frontend Shell | frontend/src/pages/*.tsx | 🔄 In Progress |
| Agent-8 | Base Types | memory/schemas/base.py | 🔄 In Progress |
| Agent-9 | Storage Service | storage/*.py | 🔄 In Progress |
| Agent-10 | Neo4j Service | graph/*.py | 🔄 In Progress |
## Completed
- None yet
## Dependencies
- Agent 8 (Base Types) should complete before Agents 1-5
- Agent 9 (Storage) and Agent 10 (Neo4j) are independent
## Next Phase (After Schema)
- Core memory schemas (Person, Episode, Relationship)
- Memory Coordinator service
- Google sync services
Launch Commands
# Launch all 10 agents in parallel using Task tool
# Each agent works on independent files in the same worktree
Single message with 10 Task tool calls:
# Agent 8 first (base dependency)
Task:
description: "Canonical base types"
subagent_type: "general-purpose"
prompt: |
{Agent 8 prompt from above}
# Agents 1-5 in parallel (depend on Agent 8)
# Agents 6-7 in parallel (no dependencies)
# Agents 9-10 in parallel (no dependencies)
Coordination Rules
- File Isolation: Each agent only modifies their assigned files
- Dependency: Agent 8 (Base Types) should complete before Agents 1-5 start
- Shared File:
backend/memory/schemas/__init__.py- coordinate via sequential commits - Commit Often: Each agent commits after completing their work
- Push Regularly: Push to remote for visibility
- No Conflicts: File-level isolation means no merge conflicts
Post-Completion
After all 10 agents complete:
-
Verify all schemas created:
cd ../epic-cognitive-memory ls backend/memory/schemas/ ls backend/storage/ ls backend/graph/ -
Run schema validation:
cd ../epic-cognitive-memory uv run python -c "from backend.memory.schemas import *; print('All schemas imported successfully')" -
Test storage service:
cd ../epic-cognitive-memory uv run python -c "from backend.storage import get_storage_service; print('Storage service ready')" -
Test Neo4j connection:
cd ../epic-cognitive-memory uv run python -c "from backend.graph import Neo4jService; print('Neo4j service ready')" -
No database migration needed - canonical storage is file-based
-
Merge to main or continue to Phase 2