name: data-pipeline-patterns
description: ETL/ELT patterns, batch vs streaming, idempotency, data quality framework, and pipeline orchestration
Data Pipeline Patterns
ETL vs ELT Decision
| Kriter | ETL | ELT |
|---|
| Transform location | Pipeline'da | Data warehouse'da |
| Data volume | Küçük-orta | Büyük |
| Flexibility | Düşük | Yüksek |
| Cost | Compute-heavy | Storage-heavy |
| Use case | Legacy, compliance | Modern analytics |
Batch vs Streaming
| Kriter | Batch | Streaming |
|---|
| Latency | Dakika-saat | Saniye-milisaniye |
| Complexity | Düşük | Yüksek |
| Cost | Düşük | Yüksek |
| Use case | Reporting, ETL | Real-time alerts, dashboards |
| Tool | Airflow, dbt | Kafka Streams, Flink |
Idempotency Patterns
# Pattern 1: Upsert
INSERT INTO target (id, name, updated_at)
VALUES (%(id)s, %(name)s, %(ts)s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
# Pattern 2: Partition overwrite
DELETE FROM target WHERE partition_date = '2026-03-14';
INSERT INTO target SELECT * FROM staging WHERE partition_date = '2026-03-14';
# Pattern 3: Checkpoint
last_checkpoint = get_checkpoint('pipeline_x')
new_data = source.query(f"WHERE updated_at > '{last_checkpoint}'")
process(new_data)
save_checkpoint('pipeline_x', max(new_data.updated_at))
Data Quality Framework
import pandera as pa
schema = pa.DataFrameSchema({
"user_id": pa.Column(int, pa.Check.gt(0), nullable=False),
"email": pa.Column(str, pa.Check.str_matches(r'^.+@.+\..+$')),
"age": pa.Column(int, pa.Check.in_range(0, 150), nullable=True),
"created_at": pa.Column(pa.DateTime, pa.Check.less_than_or_equal_to(pd.Timestamp.now()))
})
validated_df = schema.validate(df) # Fail on invalid data
Quality Dimensions
| Dimension | Kontrol | Tool |
|---|
| Completeness | NULL ratio < threshold | Great Expectations |
| Accuracy | Value range checks | pandera |
| Freshness | Last update < SLA | Airflow sensor |
| Uniqueness | Duplicate check | SQL DISTINCT |
| Consistency | Cross-table referential integrity | dbt test |
Pipeline Orchestration
# Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('daily_etl', schedule='0 6 * * *', catchup=False) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_fn)
transform = PythonOperator(task_id='transform', python_callable=transform_fn)
load = PythonOperator(task_id='load', python_callable=load_fn)
validate = PythonOperator(task_id='validate', python_callable=validate_fn)
extract >> transform >> load >> validate
Checklist
Anti-Patterns
- Pipeline'da hardcoded credentials
- Idempotent olmayan transform
- Data quality check'siz load
- Monolithic pipeline (parçala)
- Silent failure (error swallowing)