Tutorial: ETL Branch Testing
Learn how to safely test ETL (Extract, Transform, Load) changes using Horizon Epoch branches before deploying to production.
Prerequisites
- Completed Environment Promotion tutorial
- Understanding of ETL workflows
- Python environment with Horizon Epoch SDK
The Problem
ETL changes are risky:
- Transformations might produce incorrect results
- Schema changes might break downstream systems
- Volume changes might cause performance issues
Traditional approaches:
- Test on copied data (expensive, often stale)
- Test in staging (still risky, shared environment)
- Hope for the best (not recommended)
The Solution: Branch-Based ETL Testing
With Horizon Epoch, you can:
- Create a branch from production data
- Run your ETL changes against the branch
- Validate the results
- Merge only when confident
Step-by-Step Guide
1. Set Up Your Repository
import asyncio
from horizon_epoch import Client, StorageBackend
async def setup():
async with Client.connect("postgresql://localhost/horizon_epoch") as client:
await client.init("my-repo")
# Add warehouse storage
await client.add_storage(
name="warehouse",
backend=StorageBackend.POSTGRESQL,
config={"url": "postgresql://localhost/data_warehouse"}
)
return client
# Note: In practice, you'd keep the client connection open throughout
2. Create a Test Branch
# Create branch from production
test_branch = await client.branch(
name="etl-test/sales-aggregation-v2",
start_point="main" # production
)
print(f"Created test branch: {test_branch.name}")
3. Run Your ETL Transformation
import pandas as pd
from horizon_epoch import Author
# Checkout the test branch
await client.checkout("etl-test/sales-aggregation-v2")
# Your ETL logic here
df = pd.read_sql("""
SELECT
date_trunc('month', sale_date) as month,
product_category,
SUM(amount) as total_sales
FROM sales
GROUP BY 1, 2
""", connection)
# Write transformed data
df.to_sql(
"monthly_sales_summary",
connection,
if_exists="replace",
index=False
)
# Stage and commit the transformation
await client.stage_all()
await client.commit(
message="Apply new sales aggregation logic v2",
author=Author(name="ETL Pipeline", email="etl@example.com")
)
4. Validate the Results
# Compare with expected results
async def validate_transformation():
# Already on the test branch from checkout
# Check row counts
result = connection.execute(
"SELECT COUNT(*) FROM monthly_sales_summary"
).scalar()
assert result > 0, "No rows in summary table"
# Check for nulls in required fields
nulls = connection.execute("""
SELECT COUNT(*) FROM monthly_sales_summary
WHERE month IS NULL OR total_sales IS NULL
""").scalar()
assert nulls == 0, f"Found {nulls} rows with null values"
# Verify totals match
branch_total = connection.execute(
"SELECT SUM(total_sales) FROM monthly_sales_summary"
).scalar()
# Compare with production
prod_total = connection.execute(
"SELECT SUM(amount) FROM sales"
).scalar()
assert abs(branch_total - prod_total) < 0.01, \
f"Total mismatch: {branch_total} vs {prod_total}"
print("All validations passed!")
validate_transformation()
5. Compare with Production
# Get detailed diff
diff = client.diff(
source="main",
target="etl-test/sales-aggregation-v2"
)
for table_diff in diff.table_diffs:
print(f"\nTable: {table_diff.table_name}")
print(f" Added: {table_diff.added_count}")
print(f" Modified: {table_diff.modified_count}")
print(f" Deleted: {table_diff.deleted_count}")
6. Review and Merge
If validations pass:
# Merge to production
result = client.merge(
source="etl-test/sales-aggregation-v2",
target="main"
)
if result.status == "success":
print("ETL changes merged to production!")
# Clean up test branch
client.delete_branch("etl-test/sales-aggregation-v2")
else:
print(f"Merge failed: {result.conflicts}")
Complete Example Script
#!/usr/bin/env python3
"""
ETL Branch Testing Example
Usage:
python etl_branch_test.py
"""
import asyncio
from horizon_epoch import Client, Author, StorageBackend
from datetime import datetime
async def main():
async with Client.connect("postgresql://localhost/horizon_epoch") as client:
# Add warehouse storage if needed
await client.add_storage(
name="warehouse",
backend=StorageBackend.POSTGRESQL,
config={"url": "postgresql://localhost/data_warehouse"}
)
branch_name = f"etl-test/transform-{datetime.now():%Y%m%d-%H%M%S}"
try:
# 1. Create test branch
print(f"Creating branch: {branch_name}")
await client.branch(branch_name, start_point="main")
# 2. Run transformation
print("Running ETL transformation...")
await client.checkout(branch_name)
await run_transformation()
await client.stage_all()
await client.commit(
message="Apply transformation",
author=Author(name="ETL", email="etl@example.com")
)
# 3. Validate
print("Validating results...")
await validate_results()
# 4. Review diff
print("\nChanges summary:")
diff = await client.diff(base="main", target=branch_name)
for td in diff.table_diffs:
print(f" {td.table_name}: {td.total_changes} changes")
# 5. Prompt for merge
if input("\nMerge to production? (y/n): ").lower() == 'y':
result = await client.merge(source=branch_name)
if result.has_conflicts:
print(f"Conflicts: {result.conflict_count}")
else:
print(f"Merged: {result.result_commit_id}")
finally:
# Clean up
branches = await client.branches()
if any(b.name == branch_name for b in branches):
await client.delete_branch(branch_name)
print(f"Cleaned up branch: {branch_name}")
def run_transformation():
# Your ETL logic here
pass
def validate_results():
# Your validation logic here
pass
if __name__ == "__main__":
main()
Best Practices
- Name branches descriptively - Include ETL name and timestamp
- Automate validation - Write comprehensive checks
- Keep transformations atomic - One logical change per branch
- Document expected changes - Record what the ETL should do
- Time-box testing - Don’t let test branches linger
Next Steps
- Branch-Aware Queries - Query across branches
- Multi-Backend Setup - ETL across storage types