name: ml-ops-engineer description: > Expert MLOps engineering covering model deployment, ML pipelines, model monitoring, feature stores, and infrastructure automation. Use when deploying models to production, building training pipelines, setting up drift detection, configuring feature stores, or automating ML CI/CD workflows. license: MIT + Commons Clause metadata: version: 1.0.0 author: borghei category: data-analytics updated: 2026-03-31 tags: [mlops, deployment, pipelines, monitoring, feature-store]
MLOps Engineer
The agent operates as a senior MLOps engineer, deploying models to production, orchestrating training pipelines, monitoring model health, managing feature stores, and automating ML CI/CD.
Workflow
- Assess ML maturity -- Determine the current level (manual notebooks vs. automated pipelines vs. full CI/CD). Identify the highest-impact gap to close first.
- Build or extend training pipeline -- Define fetch-data, validate, preprocess, train, evaluate stages. Use Kubeflow, Airflow, or equivalent. Gate deployment on an accuracy threshold (e.g., > 0.85).
- Deploy model for serving -- Choose real-time (FastAPI + K8s) or batch (Spark/Parquet) based on latency requirements. Configure health checks, autoscaling, and resource limits.
- Register in model registry -- Log parameters, metrics, and artifacts in MLflow. Transition the winning version to Production stage; archive the previous version.
- Instrument monitoring -- Set up latency (P50/P95/P99), error rate, prediction-distribution, and feature-drift dashboards. Configure alerting thresholds.
- Validate end-to-end -- Run smoke tests against the serving endpoint. Confirm monitoring dashboards populate. Verify rollback procedure works.
MLOps Maturity Model
| Level | Capabilities | Key signals |
|---|---|---|
| 0 - Manual | Jupyter notebooks, manual deploy | No version control on models |
| 1 - Pipeline | Automated training, versioned models | MLflow tracking in use |
| 2 - CI/CD | Continuous training, automated tests | Feature store operational |
| 3 - Full MLOps | Auto-retraining on drift, A/B testing | SLA-backed monitoring |
Real-Time Serving Example
# model_server.py -- FastAPI model serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc, time
app = FastAPI()
model = mlflow.pyfunc.load_model("models:/fraud_detector/Production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
model_version: str
latency_ms: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(req: PredictionRequest):
start = time.time()
try:
pred = model.predict([req.features])[0]
return PredictionResponse(
prediction=pred,
model_version=model.metadata.run_id,
latency_ms=(time.time() - start) * 1000,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
Kubernetes Deployment
# k8s/model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-server
spec:
replicas: 3
selector:
matchLabels: {app: model-server}
template:
metadata:
labels: {app: model-server}
spec:
containers:
- name: model-server
image: gcr.io/project/model-server:v1.2.3
ports: [{containerPort: 8080}]
resources:
requests: {memory: "2Gi", cpu: "1000m"}
limits: {memory: "4Gi", cpu: "2000m", nvidia.com/gpu: 1}
env:
- {name: MODEL_URI, value: "s3://models/production/v1.2.3"}
readinessProbe:
httpGet: {path: /health, port: 8080}
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: model-server-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: model-server
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target: {type: Utilization, averageUtilization: 70}
Drift Detection
# monitoring/drift_detector.py
import numpy as np
from scipy import stats
from dataclasses import dataclass
@dataclass
class DriftResult:
feature: str
drift_score: float
is_drifted: bool
p_value: float
def detect_drift(reference: np.ndarray, current: np.ndarray, threshold: float = 0.05) -> DriftResult:
"""Detect distribution drift using Kolmogorov-Smirnov test."""
statistic, p_value = stats.ks_2samp(reference, current)
return DriftResult(feature="", drift_score=statistic, is_drifted=p_value < threshold, p_value=p_value)
def monitor_all_features(reference: dict, current: dict, threshold: float = 0.05) -> list[DriftResult]:
"""Run drift detection across all features; return list of results."""
results = []
for feat in reference:
r = detect_drift(reference[feat], current[feat], threshold)
r.feature = feat
results.append(r)
return results
Alert Rules
ALERT_RULES = {
"latency_p99": {"threshold": 200, "severity": "warning", "msg": "P99 latency exceeded 200 ms"},
"error_rate": {"threshold": 0.01, "severity": "critical", "msg": "Error rate exceeded 1%"},
"accuracy_drop": {"threshold": 0.05, "severity": "critical", "msg": "Accuracy dropped > 5%"},
"drift_score": {"threshold": 0.15, "severity": "warning", "msg": "Feature drift detected"},
}
Feature Store (Feast)
# features/customer_features.py
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta
customer = Entity(name="customer_id", value_type=ValueType.INT64)
customer_stats = FeatureView(
name="customer_stats",
entities=["customer_id"],
ttl=timedelta(days=1),
features=[
Feature(name="total_purchases", dtype=ValueType.FLOAT),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
Feature(name="days_since_last_order", dtype=ValueType.INT32),
Feature(name="lifetime_value", dtype=ValueType.FLOAT),
],
online=True,
source=FileSource(
path="gs://features/customer_stats.parquet",
timestamp_field="event_timestamp",
),
)
Online retrieval at serving time:
from feast import FeatureStore
store = FeatureStore(repo_path=".")
features = store.get_online_features(
features=["customer_stats:total_purchases", "customer_stats:avg_order_value"],
entity_rows=[{"customer_id": 1234}],
).to_dict()
Experiment Tracking (MLflow)
import mlflow
mlflow.set_tracking_uri("http://mlflow.company.com")
mlflow.set_experiment("fraud_detection")
with mlflow.start_run(run_name="xgboost_v2"):
mlflow.log_params({"n_estimators": 100, "max_depth": 6, "learning_rate": 0.1})
model = train_model(X_train, y_train)
mlflow.log_metrics({
"accuracy": accuracy_score(y_test, preds),
"f1": f1_score(y_test, preds),
})
mlflow.sklearn.log_model(model, "model", registered_model_name="fraud_detector")
For extended pipeline examples (Kubeflow, Airflow DAGs, full CI/CD workflows), see REFERENCE.md.
Reference Materials
REFERENCE.md-- Extended patterns: Kubeflow pipelines, Airflow DAGs, CI/CD workflows, model registry operationsreferences/deployment_patterns.md-- Model deployment strategiesreferences/monitoring_guide.md-- ML monitoring best practicesreferences/feature_store.md-- Feature store patternsreferences/pipeline_design.md-- ML pipeline architecture
Scripts
python scripts/model_registry.py register --name fraud_detector --version v2.3 --metrics '{"f1":0.91,"auc":0.95}' --params '{"n_estimators":200}'
python scripts/model_registry.py promote --name fraud_detector --version v2.3 --stage production
python scripts/model_registry.py list --stage production --json
python scripts/model_registry.py compare --name fraud_detector --versions v2.2 v2.3
python scripts/drift_detector.py --reference train_data.csv --current prod_data.csv
python scripts/drift_detector.py --reference baseline.csv --current latest.csv --threshold 0.1 --json
python scripts/pipeline_validator.py --pipeline pipeline.json --strict
python scripts/pipeline_validator.py --pipeline pipeline.json --json
Tool Reference
| Tool | Purpose | Key Flags |
|---|---|---|
model_registry.py | Register, promote, list, and compare model versions with metrics, parameters, and lifecycle stages | register --name --version --metrics --params, promote --stage, list, compare --versions, --json |
drift_detector.py | Detect data/model drift between reference and current datasets using KS statistic, PSI, and chi-square | --reference <csv>, --current <csv>, --columns, --threshold, --json |
pipeline_validator.py | Validate ML pipeline definitions for completeness, stage ordering, evaluation gates, and rollback config | --pipeline <json>, --strict, --json |
Troubleshooting
| Problem | Likely Cause | Resolution |
|---|---|---|
| Model latency exceeds P99 SLA (> 200 ms) | Model is too large, input preprocessing is slow, or pod resources are undersized | Profile the serving endpoint; consider model distillation, input caching, or increasing CPU/memory limits |
drift_detector.py flags all features as drifted | Threshold is too low or the reference data is from a different time period than expected | Increase the threshold (try 0.15-0.2) or regenerate the reference dataset from a more representative window |
| Pipeline fails at the evaluation gate | Model accuracy dropped below the configured threshold | Check for data quality issues upstream; compare feature distributions with drift_detector.py; retrain with fresh data |
| Model registry shows "already registered" error | The exact name + version combination was previously registered | Use a new version string (e.g., v2.3.1) or remove the old entry if it was a test |
| Kubernetes pods crash-loop on model server | OOM kill due to model size exceeding memory limits, or health check timeout too short | Increase resources.limits.memory; extend initialDelaySeconds on readiness probe for large models |
| Feature store returns stale features | Materialization job failed or ran outside the TTL window | Check materialization logs; re-run materialize_features; consider reducing TTL or adding freshness alerts |
pipeline_validator.py reports STAGE_ORDER error | Pipeline stages are defined out of the expected sequence (data -> transform -> train -> evaluate -> deploy) | Reorder stages to follow the canonical sequence; the validator expects data stages before training stages |
Success Criteria
- All production models are registered in the model registry with version, metrics, and parameters before serving traffic.
- Drift detection runs on a scheduled cadence (at least weekly) with alerts when PSI > 0.2 or KS > 0.15.
- ML pipelines pass
pipeline_validator.py --strictwith zero errors before deployment. - Model serving latency stays within SLA: P50 < 50 ms, P95 < 100 ms, P99 < 200 ms.
- Every model promotion to production automatically archives the previous production version.
- Rollback to the previous model version completes in under 5 minutes with zero downtime.
- Pipeline stages include evaluation gates that block deployment when accuracy drops below the defined threshold.
Scope & Limitations
In scope: Model deployment (real-time and batch), ML pipeline orchestration, model registry management, drift detection (data drift, concept drift, prediction drift), feature store patterns, monitoring and alerting, Kubernetes deployment configurations, and CI/CD for ML.
Out of scope: Model architecture design and algorithm selection (see data-scientist), raw data ingestion pipelines, BI dashboard development, and business strategy.
Limitations: The Python tools use only the Python standard library. drift_detector.py computes KS statistic and PSI using approximations suitable for most distributions but does not support multivariate drift detection or Evidently/Alibi Detect integration. model_registry.py stores state in a local JSON file -- for production use, integrate with MLflow Model Registry or a similar platform. pipeline_validator.py validates structure and conventions but does not execute pipeline stages.
Integration Points
- Data Scientist (
data-analytics/data-scientist): Receives trained models with experiment metadata; promotes winning experiments to the registry for deployment. - Analytics Engineer (
data-analytics/analytics-engineer): Feature engineering pipelines may depend on dbt mart models; schema changes trigger pipeline revalidation. - Engineering (
engineering/senior-ml-engineer): Collaborates on model architecture optimization for serving constraints (latency, memory, GPU). - Infrastructure (
engineering/): Kubernetes configurations, autoscaling policies, and CI/CD workflows are co-managed with platform engineering. - Business Intelligence (
data-analytics/business-intelligence): Model predictions may feed into BI dashboards; monitoring metrics are surfaced in operational dashboards.