AGENTS.md — 健康画像 ETL 项目 AI 协作指南
项目概述
本项目是一个基于 Python + Neo4j 的医疗健康画像数据 ETL 工程。从 SQL Server 的 ai_patients 表获取患者 ID,调用大数据平台 API 拉取患者健康画像数据,经图谱建模后写入 Neo4j,最终通过 Flask REST API 对外提供查询服务。
代码库结构速查
etl_neo4j/
├── run_api.py # ★ API 服务独立启动入口
├── main.py # ★ ETL 主入口(增量执行、重试)
├── core/ # 全局通用组件
│ ├── config.py # 全局配置(数据库、API、调度参数)
│ ├── logger.py # 日志处理
│ └── db.py # Neo4j 单例连接(线程安全)
├── api/ # REST API 包模块
│ ├── __init__.py # Flask App 注册
│ └── routes.py # API 路由节点实现
├── etl/ # 数据提取与转换逻辑
│ ├── extractors/ # 从外部抓取数据
│ │ ├── health_portrait_api.py # 大数据平台 HTTP 客户端
│ │ └── sqlserver.py # SQL Server 客户端
│ ├── core/ # ★ 图谱核心构建模块
│ │ └── etl_patient.py # 图谱节点/关系的 Cypher 写入逻辑
│ └── processors/
│ └── health_portrait.py # 协调 API 调用与事务提交
├── scheduler/ # 批处理调度
│ ├── job_manager.py # 批量调度 + 错误队列
│ └── scheduler.py # 定时调度器
├── data/ # 动态产生与运行时数据
│ ├── state/
│ │ └── etl_state.json # 增量记录(上次成功时间戳)
│ └── samples/ # 测试用 JSON 样例
├── tests/ # 单元与集成测试脚本集
├── scripts/ # 部署/环境验证运维工具
└── docs/ # 文档说明
数据流与核心调用链
main.py
└─ SQLServerConnection.load_patient_ids(last_update_time) # 增量拉取患者 ID
└─ JobManager.process_batch(empi_list)
└─ ThreadPoolExecutor → _process_single(empi)
├─ HealthPortraitAPI.get_health_portrait(empi) # 调用大数据平台
└─ HealthPortraitProcessor.process(patient_data)
└─ session.execute_write(import_patient_data_from_json, data)
└─ etl_patient.py 中各 import_* 函数(Cypher MERGE 写入)
写入完成后,etl_state.json 记录本次运行时间戳,供下次增量查询使用。
图谱数据模型(节点与关系)
主要节点标签
| 标签 | 说明 | 关键属性 |
|---|---|---|
Patient | 患者 | patientId, name, birthDate, gender, idType, idValue |
Encounter | 就诊记录 | encounterId, encounterType, typeName, visitStartTime |
Hospital | 医院 | hospitalId, name |
Department | 科室 | departmentId, name |
Provider | 医生 | providerId, name |
Condition | 疾病/诊断 | code, name |
Allergen | 过敏原 | name, allergenType |
LabTestReport | 检验报告 | reportId |
LabTestItem | 检验项目 | name, code |
Examination | 检查记录 | reportId, timestamp |
PastMedicalEvent | 既往史事件(多标签) | Surgery / Trauma / BloodTransfusion / Vaccination |
LifestyleFact | 生活方式事实 | type, value |
核心关系类型
| 关系 | 说明 |
|---|---|
HAD_ENCOUNTER | 患者 → 就诊 |
AT_HOSPITAL | 就诊 → 医院 |
IN_DEPARTMENT | 就诊 → 科室 |
TREATED_BY | 就诊 → 医生 |
RECORDED_DIAGNOSIS | 就诊 → 疾病 |
HAD_LAB_TEST | 就诊 → 检验报告 |
HAS_ITEM | 检验报告 → 检验项目(关系上存结果值) |
HAD_EXAMINATION | 就诊 → 检查记录 |
HAS_FINDING | 检查记录 → 疾病(发现) |
HAS_ALLERGY_TO | 患者 → 过敏原 |
HAS_FAMILY_HISTORY | 患者 → 疾病(家族史) |
HAS_LIFESTYLE_FACT | 患者 → 生活方式事实 |
PARENT_OF | 患者 → 患者(亲子关系) |
SPOUSE_OF | 患者 ↔ 患者(配偶关系) |
API 端点速查
所有端点均以 /api/patients/<patient_id>/ 为前缀:
| 路径 | 方法 | 说明 |
|---|---|---|
dashboard | GET | 患者仪表盘概览(基本信息 + 诊断 + 异常检验) |
encounters | GET | 就诊记录列表(支持分页 ?page=1&limit=10) |
history/medical | GET | 既往医疗史(手术/外伤/输血/疫苗) |
history/personal | GET | 生活方式/个人史 |
history/family | GET | 家族史 |
allergies | GET | 过敏史 |
marital_info | GET | 婚育史 |
family-graph | GET | 家族关系图谱(支持 ?depth=2) |
/api/docs | GET | 自动生成的接口清单页面 |
开发规范与约定
数据写入规范
- 全部使用
MERGE而非CREATE,以保证幂等性。Encounter、Patient、Condition等核心节点均设有唯一性约束。 - 批量写入优化:
import_diagnoses_from_encounter采用两阶段写入——先批量UNWIND创建节点,再批量创建关系,避免高并发下的锁竞争。 - 空值过滤:各
import_*函数内部均对关键字段进行空值检查,缺失时debug级别记录日志并跳过,不抛出异常。 - 家族成员处理:通过
idType + idValue作为家族成员的唯一标识,支持"认领"机制——若 Neo4j 中已有通过家族关系预建的节点,ETL 会将patientId赋给它,实现无缝合并。
日志规范
- 使用
setup_logger(name)获取 logger,禁止直接logging.getLogger()。 - 每个模块使用独立的 logger 名称(
etl_patient_core、health_portrait、job_manager等)。 - 正常业务跳过用
logger.debug(),异常用logger.error(),关键里程碑用logger.info()。
配置规范
- 所有配置均在
config/settings.py的Config类中集中管理。 - 不得在业务代码中硬编码 URI、密码、表名等敏感信息。
- 修改配置后,先运行
python check_config.py验证有效性。
错误处理规范
HealthPortraitProcessor.process()捕获异常后必须重新raise,以便JobManager将其计入错误队列并触发重试。JobManager的错误重试上限由Config.RETRY_TIMES控制,重试间隔由Config.RETRY_DELAY控制。- 若所有重试均失败,
etl_state.json的时间戳不会更新,下次运行会重新处理这些患者。
本地开发快速上手
# 1. 安装依赖
pip install -r requirements.txt
# 2. 检查环境配置
python check_config.py
# 3. 单次运行 ETL(使用本地 JSON 测试数据)
# 在 job_manager.py 中取消注释本地文件读取代码,注释 API 调用
python main.py
# 4. 启动 API 服务
python app.py
# 访问 http://localhost:5000/api/docs
# 5. 启动定时调度
python -c "from scheduler.scheduler import ETLScheduler; ETLScheduler().start(24)"
AI Agent 任务指引
新增实体/关系类型
- 在
etl/core/etl_patient.py中新增import_xxx(tx, patient_id, data_list)函数。 - 在
import_patient_data_from_json的末尾调用新函数。 - 在
etl/core/__init__.py的__all__列表中导出。 - 如需 API 查询,在
app.py中新增对应端点,参照已有端点的@neo4j_session装饰器模式。
修改 Cypher 查询
- 所有 Cypher 均写在各
import_*函数的局部变量query中,直接修改字符串即可。 - 修改后用
files/test_patient.json作为输入运行单次测试验证。
新增 API 端点
@app.route('/api/patients/<string:patient_id>/new_endpoint', methods=['GET'])
@neo4j_session
def get_new_endpoint(session, patient_id):
"""端点的中文描述(会显示在 /api/docs 页面)"""
query = "MATCH (p:Patient {patientId: $patientId}) ..."
results = session.execute_read(lambda tx: list(tx.run(query, patientId=patient_id)))
return jsonify([serialize_record(r) for r in results])
调试单个患者
# 临时修改 job_manager.py 的 _process_single 方法,
# 用本地 JSON 文件替代 API 调用:
with open("files/test_patient.json", 'r', encoding='utf-8') as f:
patient_data = json.load(f)
patient_data = patient_data.get('data', patient_data) # 兼容两种格式
已知限制与注意事项
MAX_WORKERS当前建议设为1(串行),多线程并发写入 Neo4j 在当前图模型下可能引发死锁(MERGE竞争)。archive/目录中的文件为历史版本,请勿在新功能中引用。app.py中已注释的部分端点(findings、bodypart等)依赖图模型中尚未实现的节点类型,暂不可用。etl_state.json存储的时间戳格式为2026-01-29 16:38:58 (Beijing),解析时需注意去除后缀。