Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Tutorial: ETL Branch Testing

Learn how to safely test ETL (Extract, Transform, Load) changes using Horizon Epoch branches before deploying to production.

Prerequisites

  • Completed Environment Promotion tutorial
  • Understanding of ETL workflows
  • Python environment with Horizon Epoch SDK

The Problem

ETL changes are risky:

  • Transformations might produce incorrect results
  • Schema changes might break downstream systems
  • Volume changes might cause performance issues

Traditional approaches:

  • Test on copied data (expensive, often stale)
  • Test in staging (still risky, shared environment)
  • Hope for the best (not recommended)

The Solution: Branch-Based ETL Testing

With Horizon Epoch, you can:

  1. Create a branch from production data
  2. Run your ETL changes against the branch
  3. Validate the results
  4. Merge only when confident

Step-by-Step Guide

1. Set Up Your Repository

import asyncio
from horizon_epoch import Client, StorageBackend

async def setup():
    async with Client.connect("postgresql://localhost/horizon_epoch") as client:
        await client.init("my-repo")

        # Add warehouse storage
        await client.add_storage(
            name="warehouse",
            backend=StorageBackend.POSTGRESQL,
            config={"url": "postgresql://localhost/data_warehouse"}
        )
        return client

# Note: In practice, you'd keep the client connection open throughout

2. Create a Test Branch

# Create branch from production
test_branch = await client.branch(
    name="etl-test/sales-aggregation-v2",
    start_point="main"  # production
)
print(f"Created test branch: {test_branch.name}")

3. Run Your ETL Transformation

import pandas as pd
from horizon_epoch import Author

# Checkout the test branch
await client.checkout("etl-test/sales-aggregation-v2")

# Your ETL logic here
df = pd.read_sql("""
    SELECT
        date_trunc('month', sale_date) as month,
        product_category,
        SUM(amount) as total_sales
    FROM sales
    GROUP BY 1, 2
""", connection)

# Write transformed data
df.to_sql(
    "monthly_sales_summary",
    connection,
    if_exists="replace",
    index=False
)

# Stage and commit the transformation
await client.stage_all()
await client.commit(
    message="Apply new sales aggregation logic v2",
    author=Author(name="ETL Pipeline", email="etl@example.com")
)

4. Validate the Results

# Compare with expected results
async def validate_transformation():
    # Already on the test branch from checkout
    # Check row counts
    result = connection.execute(
        "SELECT COUNT(*) FROM monthly_sales_summary"
    ).scalar()
    assert result > 0, "No rows in summary table"

    # Check for nulls in required fields
    nulls = connection.execute("""
        SELECT COUNT(*) FROM monthly_sales_summary
        WHERE month IS NULL OR total_sales IS NULL
    """).scalar()
    assert nulls == 0, f"Found {nulls} rows with null values"

    # Verify totals match
        branch_total = connection.execute(
            "SELECT SUM(total_sales) FROM monthly_sales_summary"
        ).scalar()

    # Compare with production
    prod_total = connection.execute(
        "SELECT SUM(amount) FROM sales"
    ).scalar()

    assert abs(branch_total - prod_total) < 0.01, \
        f"Total mismatch: {branch_total} vs {prod_total}"

    print("All validations passed!")

validate_transformation()

5. Compare with Production

# Get detailed diff
diff = client.diff(
    source="main",
    target="etl-test/sales-aggregation-v2"
)

for table_diff in diff.table_diffs:
    print(f"\nTable: {table_diff.table_name}")
    print(f"  Added: {table_diff.added_count}")
    print(f"  Modified: {table_diff.modified_count}")
    print(f"  Deleted: {table_diff.deleted_count}")

6. Review and Merge

If validations pass:

# Merge to production
result = client.merge(
    source="etl-test/sales-aggregation-v2",
    target="main"
)

if result.status == "success":
    print("ETL changes merged to production!")
    # Clean up test branch
    client.delete_branch("etl-test/sales-aggregation-v2")
else:
    print(f"Merge failed: {result.conflicts}")

Complete Example Script

#!/usr/bin/env python3
"""
ETL Branch Testing Example

Usage:
    python etl_branch_test.py
"""

import asyncio
from horizon_epoch import Client, Author, StorageBackend
from datetime import datetime

async def main():
    async with Client.connect("postgresql://localhost/horizon_epoch") as client:
        # Add warehouse storage if needed
        await client.add_storage(
            name="warehouse",
            backend=StorageBackend.POSTGRESQL,
            config={"url": "postgresql://localhost/data_warehouse"}
        )

        branch_name = f"etl-test/transform-{datetime.now():%Y%m%d-%H%M%S}"

        try:
            # 1. Create test branch
            print(f"Creating branch: {branch_name}")
            await client.branch(branch_name, start_point="main")

            # 2. Run transformation
            print("Running ETL transformation...")
            await client.checkout(branch_name)
            await run_transformation()
            await client.stage_all()
            await client.commit(
                message="Apply transformation",
                author=Author(name="ETL", email="etl@example.com")
            )

            # 3. Validate
            print("Validating results...")
            await validate_results()

            # 4. Review diff
            print("\nChanges summary:")
            diff = await client.diff(base="main", target=branch_name)
            for td in diff.table_diffs:
                print(f"  {td.table_name}: {td.total_changes} changes")

            # 5. Prompt for merge
            if input("\nMerge to production? (y/n): ").lower() == 'y':
                result = await client.merge(source=branch_name)
                if result.has_conflicts:
                    print(f"Conflicts: {result.conflict_count}")
                else:
                    print(f"Merged: {result.result_commit_id}")

        finally:
            # Clean up
            branches = await client.branches()
            if any(b.name == branch_name for b in branches):
                await client.delete_branch(branch_name)
                print(f"Cleaned up branch: {branch_name}")

def run_transformation():
    # Your ETL logic here
    pass

def validate_results():
    # Your validation logic here
    pass

if __name__ == "__main__":
    main()

Best Practices

  1. Name branches descriptively - Include ETL name and timestamp
  2. Automate validation - Write comprehensive checks
  3. Keep transformations atomic - One logical change per branch
  4. Document expected changes - Record what the ETL should do
  5. Time-box testing - Don’t let test branches linger

Next Steps