EOM — Agents & Services
Every logical service in EOM is described here: its responsibility boundary, its inputs/outputs, its allowed dependencies, and its error contract. Claude Code agents must stay within their service boundary. Cross-service calls always go through the defined interface — never import internals.
Service Map
┌──────────────────────────────────────────────────────────────────┐
│ EOM Platform │
│ │
│ Studio (React) ──► API Gateway (FastAPI) │
│ CLI (Click) ──► │ │
│ ▼ │
│ ┌──────────────┬──────────────┬──────────────┐ │
│ │ │ │ │ │
│ SchemaService QueryService ActionService GitService │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ FalkorDB FalkorDB OPA → FalkorDB pygit2 │
│ (meta) (data+ES) (data write) (worktrees) │
│ │ │
│ Elasticsearch │
└──────────────────────────────────────────────────────────────────┘
Agent 1 — Schema Service
File: apps/api/services/schema_service.py
Responsibility
Owns all schema compilation and publication operations. Takes YAML manifests from a Git worktree and produces:
- FalkorDB Cypher DDL (node labels, indices, UDF registrations)
- Elasticsearch index mappings (JSON)
- Compiled Cypher transaction templates for Action Types
- Meta-graph update operations (Cypher to update
eom_meta)
Inputs
| Input | Type | Source |
|---|---|---|
| Worktree path | str | Git Service |
| Space ID | UUID | API request |
| Branch name | str | API request |
Outputs
| Output | Type | Destination |
|---|---|---|
| FalkorDB DDL statements | list[str] | FalkorDB client |
| ES index mapping | dict | Elasticsearch client |
| Compiled Cypher templates | dict[str, str] | Written back to worktree + stored in DB |
| Meta-graph operations | list[str] | FalkorDB eom_meta graph |
| SDK generation trigger | Event | SDK Generator |
Rules
- MUST NOT write directly to the production FalkorDB data graph — only DDL and meta-graph
- MUST NOT call OPA — schema compilation is pre-policy
- MUST validate all YAML against Pydantic models before compiling
- MUST produce idempotent operations (re-running compile produces identical output)
- MUST store compiled Cypher templates in the worktree alongside the YAML
Key Methods
class SchemaService:
async def compile_branch(self, space_id: UUID, branch: str) -> CompileResult
async def validate_yaml(self, worktree_path: str) -> ValidationResult
async def apply_to_production(self, space_id: UUID) -> PublishResult
async def estimate_index_size(self, space_id: UUID, branch: str) -> SizeEstimate
async def generate_diff(self, space_id: UUID, base: str, head: str) -> SchemaDiff
Agent 2 — Query Service
File: apps/api/services/query_service.py
Responsibility
All read operations on ontology instances. Routes queries to FalkorDB (graph traversal, multi-hop relationships) or Elasticsearch (full-text, vector, aggregations) based on query type. Never performs writes.
Inputs
| Input | Type |
|---|---|
| Object type API name | str |
| Filter criteria | FilterExpr |
| Graph traversal spec | TraversalSpec | None |
| Search query (NL or structured) | SearchQuery | None |
| Caller identity (for sensitivity filtering) | CallerContext |
Outputs
| Output | Type |
|---|---|
| Object instances | list[ObjectInstance] |
| Ranked search results | list[SearchResult] |
| Graph traversal paths | list[TraversalResult] |
| Aggregation results | AggregationResult |
Rules
- MUST NOT write to FalkorDB or Elasticsearch
- MUST apply sensitivity filters on every query (strip CONFIDENTIAL/RESTRICTED
fields for callers without
ELEVATED_CLEARANCErole) - MUST route purely structural queries (follow link, count linked, path) to FalkorDB
- MUST route text/vector search, aggregations to Elasticsearch
- MUST apply graph-topology boost after RRF fusion for semantic search
Key Methods
class QueryService:
async def get_objects(
self, space_id: UUID, object_type: str,
filters: FilterExpr, caller: CallerContext,
page: int, page_size: int
) -> Page[ObjectInstance]
async def get_object(
self, space_id: UUID, object_type: str,
object_id: str, caller: CallerContext
) -> ObjectInstance
async def semantic_search(
self, space_id: UUID, object_type: str,
query: SemanticSearchQuery, caller: CallerContext
) -> list[SearchResult]
async def traverse_links(
self, space_id: UUID, from_object_id: str,
link_type: str, depth: int, caller: CallerContext
) -> list[TraversalResult]
async def meta_graph_query(self, cypher: str) -> list[dict]
Agent 3 — Action Service
File: apps/api/services/action_service.py
Responsibility
All write operations on ontology instances. Executes Action Types as atomic FalkorDB Cypher write transactions, gated by OPA.
Execution Pipeline (strict order — never skip a step)
1. Validate parameters (Pydantic)
2. Load action type definition from meta-graph
3. Call OPA → data.eom.action.invoke
→ DENY: raise HTTPException(403, deny_reason)
4. Evaluate submission_criteria (read-only Cypher)
→ FAIL: raise HTTPException(422, criteria_failure)
5. Run validation_rules (read-only Cypher checks)
→ FAIL: raise HTTPException(422, validation_errors)
6. Execute compiled Cypher template (atomic GRAPH.QUERY write)
→ ERROR: raise HTTPException(500, graph_error)
7. Write audit log entry to Elasticsearch (fire-and-forget async)
8. Update Elasticsearch object document (async, best-effort)
9. Emit side-effects (webhook, notification) — async, non-blocking
10. Return ActionResult
Rules
- MUST call OPA before ANY graph write — no exceptions
- MUST execute the Cypher template atomically (single GRAPH.QUERY call)
- MUST write audit log regardless of side-effect outcome
- MUST NOT partially apply edits — if Cypher fails, the whole action fails
- MUST NOT use FalkorDB UDFs for write operations
- Side-effects MUST be fire-and-forget — their failure must not fail the action
Key Methods
class ActionService:
async def invoke(
self, space_id: UUID, action_type: str,
object_id: str, params: dict,
caller: CallerContext
) -> ActionResult
async def get_action_log(
self, space_id: UUID, action_type: str,
page: int, page_size: int
) -> Page[ActionLogEntry]
async def revert(
self, space_id: UUID, action_log_id: str,
caller: CallerContext
) -> ActionResult
Agent 4 — Git Service
File: apps/api/services/git_service.py
Responsibility
All Git and worktree operations. Manages the lifecycle of bare repos,
worktrees, branches, PRs (as metadata), and merge operations. Uses
pygit2 for all Git operations.
Worktree State Machine
┌─────────┐
│ PENDING │ (creation requested)
└────┬────┘
│ worktree add + sandbox created
▼
┌─────────┐
│ ACTIVE │ (author can edit YAML)
└────┬────┘
│ eom pr open
▼
┌──────────┐
│ IN_REVIEW│ (CI running, approvals collecting)
└────┬─────┘
│ all gates passed
▼
┌──────────┐
│ APPROVED │
└────┬─────┘
│ eom merge
▼
┌──────────┐
│ MERGED │ (worktree cleaned up)
└──────────┘
OR
┌──────────┐
│ REJECTED │ (PR closed without merge)
└──────────┘
Rules
- MUST use pygit2 for all Git operations — never shell out to
git - MUST create a sandbox FalkorDB graph on branch creation (GRAPH.COPY)
- MUST create sandbox ES indices with branch-specific aliases on branch creation
- MUST clean up sandbox resources on branch delete/merge
- MUST store branch and PR metadata in the EOM PostgreSQL database
- MUST run the semantic merge tool on YAML conflicts (never expose raw diff3 to users)
- Direct commits to
mainMUST be blocked
Key Methods
class GitService:
async def create_branch(
self, space_id: UUID, branch_name: str, creator: CallerContext
) -> BranchInfo
async def delete_branch(self, space_id: UUID, branch_name: str) -> None
async def list_branches(self, space_id: UUID) -> list[BranchInfo]
async def open_pr(
self, space_id: UUID, branch_name: str, title: str,
description: str, caller: CallerContext
) -> ProposalInfo
async def merge_pr(
self, space_id: UUID, pr_id: UUID, caller: CallerContext
) -> MergeResult
async def rebase_branch(
self, space_id: UUID, branch_name: str, onto: str
) -> RebaseResult
async def get_diff(
self, space_id: UUID, base: str, head: str
) -> SchemaDiff
Agent 5 — OPA Service
File: apps/api/services/opa_service.py
Responsibility
Thin client that calls the OPA sidecar REST API (http://opa:8181/v1/data/...).
Provides typed Python wrappers for all policy decision points.
Rules
- MUST call OPA synchronously (await the decision before proceeding)
- MUST pass structured JSON input that exactly matches what Rego expects
- MUST surface
deny_reasonfrom OPA output in HTTP error details - MUST NOT cache OPA decisions (policies can change on branch merge)
- Circuit breaker: if OPA is unreachable, DENY all writes (fail-safe)
Key Methods
class OPAService:
async def check_schema_change(
self, diff: SchemaDiff, approvals: ApprovalState
) -> SchemaChangeDecision # {allow: bool, change_class: str, violations: list}
async def check_action_invoke(
self, action_type: str, caller: CallerContext,
object_id: str, object_sensitivity: str,
submission_criteria_passed: bool
) -> ActionInvokeDecision # {allow: bool, deny_reason: str | None}
async def check_access(
self, space_id: UUID, resource: str,
operation: str, caller: CallerContext
) -> AccessDecision # {allow: bool, masked_fields: list[str]}
async def check_naming(
self, api_name: str, name_type: str
) -> NamingDecision # {allow: bool, violation: str | None}
Agent 6 — Embedding Service
File: apps/api/services/embedding_service.py
Responsibility
Generates vector embeddings for Object Type instances and ontology schema
nodes. Writes embeddings to Elasticsearch _embedding field. Runs as
a background task — never blocks request path.
Rules
- MUST be entirely async and background-only (FastAPI BackgroundTasks or Celery)
- MUST handle missing
EMBEDDING_MODEL_ENDPOINTgracefully (skip, log warning) - MUST batch embedding requests (max 100 texts per API call)
- MUST update only the
_embeddingfield in ES (partial update, not full reindex)
Key Methods
class EmbeddingService:
async def embed_object_type_schema(self, space_id: UUID, object_type: str) -> None
async def embed_instances_batch(
self, space_id: UUID, object_type: str, object_ids: list[str]
) -> None
async def run_duplication_radar(self, space_id: UUID) -> list[DuplicatePair]
Agent 7 — SDK Generator
File: packages/eom-sdk-generator/
Responsibility
Auto-generates typed Python and TypeScript clients from the published
ontology schema on every merge to main. Publishes to the configured
package registries.
Rules
- MUST generate from the compiled ontology in the meta-graph (not from YAML)
- MUST pin the generated SDK to the exact schema version (e.g.
2.4.1) - MUST generate type stubs and docstrings from Object Type
descriptionfields - MUST publish only after a successful production publish (never on branch merge)
Key Methods
class SDKGenerator:
async def generate_python(self, space_id: UUID, version: str, output_dir: str) -> None
async def generate_typescript(self, space_id: UUID, version: str, output_dir: str) -> None
async def publish_python(self, package_dir: str, registry_url: str) -> None
async def publish_typescript(self, package_dir: str, registry_url: str) -> None
Agent 8 — Health Agent (Background)
File: apps/api/services/health_agent.py
Responsibility
Runs on a nightly schedule. Computes Ontology Health Score (OHS) for every
Space and writes results to Elasticsearch (eom_metrics index). Emits
alerts for stewards.
OHS Dimensions (see DATA_MODELS.md for full scoring logic)
- Completeness (25%)
- DRY Conformance (20%)
- Change Governance (20%)
- Freshness (20%)
- Utilisation (15%)
Rules
- MUST run as an APScheduler cron job at 02:00 UTC daily
- MUST NOT block the API — runs entirely in a background thread pool
- MUST write OHS metrics to ES regardless of alert outcome
- MUST send steward alerts via the configured notification channel (webhook/email)
Inter-Service Dependency Matrix
| Service | May call | Must NOT call |
|---|---|---|
| SchemaService | FalkorDB (meta write), OPA (naming check) | ActionService, QueryService |
| QueryService | FalkorDB (read), Elasticsearch (read) | ActionService, SchemaService |
| ActionService | OPA, FalkorDB (write), Elasticsearch (async write) | SchemaService, GitService |
| GitService | SchemaService (compile), OPA (naming) | ActionService, QueryService |
| OPAService | OPA REST API | Any other service |
| EmbeddingService | Elasticsearch (write) | FalkorDB, OPA |
| SDKGenerator | meta-graph via QueryService | Any data write |
| HealthAgent | QueryService, Elasticsearch (write) | ActionService |
Shared Infrastructure Clients
All services share singleton clients defined in apps/api/dependencies.py:
from falkordb import FalkorDB
from elasticsearch import AsyncElasticsearch
import httpx
# Injected via FastAPI Depends()
async def get_falkordb() -> FalkorDB: ...
async def get_elasticsearch() -> AsyncElasticsearch: ...
async def get_opa_client() -> httpx.AsyncClient: ...
Important: FalkorDB is not async-native. Use asyncio.to_thread() to
wrap FalkorDB calls so they don't block the event loop:
import asyncio
result = await asyncio.to_thread(graph.query, cypher, params)
Agent 9 — Space Service
File: apps/api/services/space_service.py
Responsibility: CRUD for Spaces and Folders, permission inheritance bootstrapping.
Input / Output Contract
# Inputs
create_space(owner_id: str, name: str, visibility: SpaceVisibility, description: str | None) -> Space
get_space(space_id: str) -> Space
list_spaces(owner_id: str, limit: int, offset: int) -> list[Space]
update_space(space_id: str, data: SpaceUpdate) -> Space
delete_space(space_id: str) -> None
create_folder(space_id: str, name: str, parent_folder_id: str | None) -> Folder
list_folders(space_id: str) -> list[Folder]
delete_folder(folder_id: str) -> None
Execution Pipeline
- OPA check —
check_access(user_id, action, "SPACE"|"FOLDER", resource_id) - FalkorDB write —
CREATE (:OntMeta_Space {...})or(:OntMeta_Folder {...}) - Relationship —
(Folder)-[:BELONGS_TO]->(Space)or(Folder)-[:BELONGS_TO]->(ParentFolder) - Bootstrap permissions — owner gets READ/WRITE/EDIT/DELETE via
PermissionService - Return Pydantic model
FalkorDB Nodes
-- Space
CREATE (:OntMeta_Space {
id: $id, name: $name, description: $description,
visibility: $visibility, owner_id: $owner_id,
created_at: $ts, updated_at: $ts
})
-- Folder
CREATE (:OntMeta_Folder {
id: $id, space_id: $space_id,
parent_folder_id: $parent_folder_id,
name: $name, created_at: $ts
})
MATCH (f:OntMeta_Folder {id: $id}), (s:OntMeta_Space {id: $space_id})
CREATE (f)-[:BELONGS_TO]->(s)
Allowed Dependencies
| May call | Must not call |
|---|---|
| OPAService | ActionService |
FalkorDB (via execute_cypher) | GitService |
| PermissionService (bootstrap) | EmbeddingService |
Agent 10 — Ontology Orchestration Service
File: apps/api/services/ontology_service.py
Responsibility: Orchestrate ObjectTypes, Properties, and Relationships within an Ontology. Manages the compile → FalkorDB meta-graph publish pipeline.
Input / Output Contract
create_ontology(folder_id: str, name: str, description: str | None) -> Ontology
publish_ontology(ontology_id: str) -> Ontology
get_ontology(ontology_id: str) -> Ontology
list_ontologies(folder_id: str) -> list[Ontology]
create_object_type(ontology_id: str, data: ObjectTypeCreate) -> ObjectType
create_property(object_type_id: str, data: PropertyCreate) -> Property
create_relationship(ontology_id: str, data: RelationshipCreate) -> Relationship
Execution Pipeline (publish)
- OPA check —
check_schema_change(change_type="PUBLISH", ontology_id=...) - Load ontology — fetch all ObjectTypes + Properties + Relationships from meta-graph
- SchemaService.compile() — validate naming, generate FalkorDB DDL + ES mappings
- FalkorDB DDL — create label indexes on data graph
eom_{spaceId}_data - Elasticsearch — create versioned index
eom_{spaceId}_{apiName}_v{N}with alias - Update status —
SET o.status = "PUBLISHED", o.version = $version - Return updated Ontology
FalkorDB Meta-Graph Nodes
-- ObjectType
CREATE (:OntMeta_ObjectType {
id: $id, ontology_id: $ontology_id,
api_name: $api_name, display_name: $display_name,
primary_key: $primary_key,
is_skos_concept: $is_skos_concept,
is_skos_concept_scheme: $is_skos_concept_scheme,
created_at: $ts
})
MATCH (ot:OntMeta_ObjectType {id: $id}), (o:OntMeta_Ontology {id: $ontology_id})
CREATE (ot)-[:BELONGS_TO]->(o)
-- Property
CREATE (:OntMeta_Property {
id: $id, object_type_id: $object_type_id,
api_name: $api_name, data_type: $data_type,
required: $required, skos_mapping: $skos_mapping,
created_at: $ts
})
MATCH (p:OntMeta_Property {id: $id}), (ot:OntMeta_ObjectType {id: $object_type_id})
CREATE (p)-[:BELONGS_TO]->(ot)
-- Relationship
CREATE (:OntMeta_Relationship {
id: $id, ontology_id: $ontology_id,
api_name: $api_name, cardinality: $cardinality,
created_at: $ts
})
MATCH (r:OntMeta_Relationship {id: $id}),
(src:OntMeta_ObjectType {id: $source_object_type_id}),
(tgt:OntMeta_ObjectType {id: $target_object_type_id})
CREATE (r)-[:SOURCE]->(src), (r)-[:TARGET]->(tgt)
Allowed Dependencies
| May call | Must not call |
|---|---|
| OPAService | ActionService (direct write) |
| SchemaService | GitService |
FalkorDB (via execute_cypher) | EmbeddingService (synchronously) |
Elasticsearch (via es_client) | HealthAgent |