Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python SDK Reference

Complete reference for the Horizon Epoch Python SDK.

Installation

The Python SDK requires building from source (see Installation Guide).

# From project root
cd python && uv sync
uv run maturin develop --manifest-path ../crates/horizon-epoch-py/Cargo.toml

Quick Start

from horizon_epoch import Client, ClientConfig, Author

# Create client configuration
config = ClientConfig(
    metadata_url="postgresql://localhost/horizon_epoch"
)

# Use async context manager for connection management
async with Client.connect("postgresql://localhost/horizon_epoch") as client:
    # Initialize a repository
    await client.init("my-data-repo")

    # Create a branch
    await client.branch("feature-x")

    # Make changes and commit
    await client.commit("Initial commit")

Classes

ClientConfig

Configuration for the Horizon Epoch client.

from horizon_epoch import ClientConfig, Author

config = ClientConfig(
    metadata_url="postgresql://user:pass@localhost/horizon_epoch",
    default_branch="main",
    author=Author(name="Dev", email="dev@example.com"),
    timeout_seconds=30.0,
    retry_count=3,
    log_level="INFO",
)
ParameterTypeDefaultDescription
metadata_urlstrrequiredMetadata database URL
default_branchstr"main"Default branch name
authorAuthorNoneDefault commit author
timeout_secondsfloat30.0Operation timeout
retry_countint3Retry count for transient failures
log_levelstr"INFO"Logging level

Client

Main client for interacting with Horizon Epoch.

Connection Management

from horizon_epoch import Client

# Option 1: Async context manager (recommended)
async with Client.connect("postgresql://...") as client:
    await client.init("my-repo")

# Option 2: Manual connection management
client = Client.from_url("postgresql://...")
await client.open()
try:
    await client.init("my-repo")
finally:
    await client.close()

Repository Operations

# Initialize a new repository
config = await client.init(
    name="my-repo",
    storage_configs={"primary": {"backend": "postgresql", "url": "..."}},
    description="My data repository"
)

# Open an existing repository
config = await client.open_repository("my-repo")

# Get repository status
status = await client.status(branch="main")

Branch Operations

# Create a branch
branch = await client.branch(
    name="feature/new-schema",
    start_point="main",  # Optional, defaults to default branch
    checkout=True        # Optional, defaults to True
)

# List all branches
branches = await client.branches()

# Get a specific branch
branch = await client.get_branch("main")

# Checkout a branch
status = await client.checkout("feature/new-schema")

# Delete a branch
await client.delete_branch("old-feature", force=False)

# Compare branches
comparison = await client.compare_branches("feature-x", "main")
print(f"Ahead: {comparison.ahead_count}, Behind: {comparison.behind_count}")

Commit Operations

from horizon_epoch import Author

# Create a commit
commit = await client.commit(
    message="Add user preferences",
    author=Author(name="Dev", email="dev@example.com"),
    branch="feature-x",      # Optional
    tables=["users"]         # Optional, defaults to all staged
)

# View commit history
log = await client.log(ref="main", limit=10, skip=0)
for entry in log.entries:
    print(f"{entry.commit.short_id}: {entry.commit.message}")

# Get commit details
commit = await client.show("abc123")

Diff Operations

# Diff between branches
diff = await client.diff(
    base="main",
    target="feature-x",
    tables=["users"],           # Optional
    include_record_diff=True    # Optional
)

for table_diff in diff.table_diffs:
    print(f"{table_diff.table_name}: {table_diff.total_changes} changes")

Merge Operations

from horizon_epoch import MergeStrategy

# Merge a branch
result = await client.merge(
    source="feature-x",
    target="main",                      # Optional, defaults to default branch
    strategy=MergeStrategy.THREE_WAY,   # Optional
    message="Merge feature-x",          # Optional
    dry_run=False                       # Optional
)

if result.has_conflicts:
    print(f"Conflicts: {result.conflict_count}")
else:
    print(f"Merged: {result.result_commit_id}")

# Abort a merge
await client.abort_merge(conflict_id="...")

Storage Operations

Horizon Epoch supports 8 storage backend types across databases, object storage, and filesystems.

from horizon_epoch import StorageBackend

# =============================================================================
# DATABASE BACKENDS
# =============================================================================

# PostgreSQL
backend_id = await client.add_storage(
    name="postgres-warehouse",
    backend=StorageBackend.POSTGRESQL,
    config={"url": "postgresql://user:pass@localhost:5432/mydb"},
    description="PostgreSQL data warehouse"
)

# MySQL
backend_id = await client.add_storage(
    name="mysql-analytics",
    backend=StorageBackend.MYSQL,
    config={"url": "mysql://user:pass@localhost:3306/analytics"},
    description="MySQL analytics database"
)

# Microsoft SQL Server
backend_id = await client.add_storage(
    name="mssql-legacy",
    backend=StorageBackend.MSSQL,
    config={"url": "mssql://user:pass@localhost:1433/legacy_db"},
    description="SQL Server legacy system"
)

# SQLite
backend_id = await client.add_storage(
    name="sqlite-local",
    backend=StorageBackend.SQLITE,
    config={"path": "/data/local.db"},
    description="Local SQLite database"
)

# =============================================================================
# OBJECT STORAGE BACKENDS
# =============================================================================

# AWS S3 / S3-compatible storage
backend_id = await client.add_storage(
    name="s3-datalake",
    backend=StorageBackend.S3,
    config={
        "bucket": "my-datalake",
        "region": "us-west-2",
        "endpoint": None,  # Use AWS default, or specify for MinIO/LocalStack
    },
    description="S3 data lake"
)

# Azure Blob Storage
backend_id = await client.add_storage(
    name="azure-warehouse",
    backend=StorageBackend.AZURE,
    config={
        "account": "mystorageaccount",
        "container": "data-warehouse",
    },
    description="Azure Blob data warehouse"
)

# Google Cloud Storage
backend_id = await client.add_storage(
    name="gcs-analytics",
    backend=StorageBackend.GCS,
    config={
        "bucket": "my-analytics-bucket",
        "project": "my-gcp-project",
    },
    description="GCS analytics storage"
)

# =============================================================================
# FILESYSTEM BACKEND
# =============================================================================

# Local filesystem
backend_id = await client.add_storage(
    name="local-dev",
    backend=StorageBackend.LOCAL,
    config={"path": "/data/epoch-storage"},
    description="Local development storage"
)

# =============================================================================
# STORAGE MANAGEMENT
# =============================================================================

# List all storage backends
backends = await client.list_storage()
for name, backend_type in backends.items():
    print(f"{name}: {backend_type.value}")

# Get storage details
config = await client.get_storage("s3-datalake")
print(f"Backend: {config.name}, Type: {config.backend_type}")

# Get detailed storage info
backends_info = await client.list_storage_info()
for backend in backends_info:
    status = "default" if backend.is_default else "enabled"
    print(f"{backend.name} ({backend.backend_type}): {status}")

# Remove storage backend
await client.remove_storage("old-storage")

Table Operations

Tables are tracked using StorageLocation objects that specify where the data lives. The native module provides factory methods for all 8 storage backends.

# Access native module for StorageLocation
from horizon_epoch.client import _native

# =============================================================================
# DATABASE STORAGE LOCATIONS
# =============================================================================

# PostgreSQL: connection_name, schema, table
loc = _native.StorageLocation.postgresql("main", "public", "users")
reg = await client.track_table("users", loc)

# MySQL: connection_name, database, table
loc = _native.StorageLocation.mysql("analytics", "sales_db", "orders")
reg = await client.track_table("orders", loc)

# Microsoft SQL Server: connection_name, database, schema, table
loc = _native.StorageLocation.mssql("legacy", "warehouse", "dbo", "customers")
reg = await client.track_table("customers", loc)

# SQLite: connection_name, table
loc = _native.StorageLocation.sqlite("local-db", "products")
reg = await client.track_table("products", loc)

# =============================================================================
# OBJECT STORAGE LOCATIONS
# =============================================================================

# S3: bucket, prefix, format (parquet, delta, csv)
loc = _native.StorageLocation.s3("my-datalake", "tables/events/", format="parquet")
reg = await client.track_table("events", loc)

# S3 with Delta Lake format
loc = _native.StorageLocation.s3("my-datalake", "delta/transactions/", format="delta")
reg = await client.track_table("transactions", loc)

# Azure Blob Storage: account, container, prefix, format
loc = _native.StorageLocation.azure_blob(
    "mystorageaccount", "warehouse", "data/metrics/", format="parquet"
)
reg = await client.track_table("metrics", loc)

# Google Cloud Storage: bucket, prefix, format
loc = _native.StorageLocation.gcs("my-analytics", "tables/sessions/", format="parquet")
reg = await client.track_table("sessions", loc)

# =============================================================================
# FILESYSTEM STORAGE LOCATIONS
# =============================================================================

# Local filesystem: path, format
loc = _native.StorageLocation.local_filesystem("/data/exports/reports/", format="parquet")
reg = await client.track_table("reports", loc)

# =============================================================================
# TABLE MANAGEMENT
# =============================================================================

# View registration result
print(f"Tracked: {reg.table_name}")
print(f"Content hash: {reg.content_hash}")
print(f"Record count: {reg.record_count}")
print(f"Location: {reg.storage_location_uri}")

# List all tracked tables
tables = await client.list_tables()
for table in tables:
    print(f"  - {table}")

# Untrack a table (data is preserved, only tracking is removed)
await client.untrack_table("old-table")

Staging Operations

# Stage all changes
result = await client.stage_all(branch="main")
print(f"Staged {result.tables_affected} tables")

# Stage specific table
await client.stage("users", branch="main")

# Unstage all
result = await client.unstage_all(branch="main")

# Unstage specific table
await client.unstage("users", branch="main")

Constraint Operations

# Get constraints for a table
constraints = await client.get_constraints("users", branch="main")
print(f"Foreign keys: {len(constraints.foreign_keys)}")
print(f"Unique: {len(constraints.unique_constraints)}")

# Add a constraint (using native types)
from horizon_epoch.client import _native
constraint = _native.UniqueConstraint("uq_email", ["email"])
await client.add_constraint("users", constraint)

# Remove a constraint
await client.remove_constraint("users", "uq_email")

# Diff constraints between branches
diff = await client.diff_constraints("main", "feature-x", table_name="users")

# Add an index
col = _native.IndexColumn("email")
idx = _native.IndexDefinition("idx_email", [col])
await client.add_index("users", idx)

# Remove an index
await client.remove_index("users", "idx_email")

Models

Author

from horizon_epoch import Author

author = Author(name="Jane Developer", email="jane@example.com")

Branch

from horizon_epoch import Branch

# Returned from client.branch(), client.branches(), client.get_branch()
branch.name           # str
branch.head_commit_id # str
branch.created_at     # datetime
branch.is_default     # bool

Commit

from horizon_epoch import Commit

# Returned from client.commit(), client.show()
commit.id             # str (full commit ID)
commit.short_id       # str (abbreviated)
commit.message        # str
commit.author         # Author
commit.timestamp      # datetime
commit.parent_ids     # List[str]

MergeResult

from horizon_epoch import MergeResult

result.source_branch     # str
result.target_branch     # str
result.is_fast_forward   # bool
result.has_conflicts     # bool
result.conflict_count    # int
result.result_commit_id  # Optional[str]
result.conflicts         # List[RecordConflict]

Status

from horizon_epoch import Status

status.branch            # Optional[str]
status.commit_id         # Optional[str]
status.is_detached       # bool
status.staged_tables     # List[str]
status.unstaged_tables   # List[str]

Enums

from horizon_epoch import MergeStrategy, StorageBackend

# Merge strategies
MergeStrategy.THREE_WAY   # Standard three-way merge
MergeStrategy.OURS        # Keep target branch version
MergeStrategy.THEIRS      # Keep source branch version
MergeStrategy.MANUAL      # Fail on conflicts

# Storage backends - Database
StorageBackend.POSTGRESQL  # PostgreSQL database
StorageBackend.MYSQL       # MySQL database
StorageBackend.MSSQL       # Microsoft SQL Server
StorageBackend.SQLITE      # SQLite database

# Storage backends - Object Storage
StorageBackend.S3          # AWS S3 / S3-compatible storage
StorageBackend.AZURE       # Azure Blob Storage
StorageBackend.GCS         # Google Cloud Storage

# Storage backends - Filesystem
StorageBackend.LOCAL       # Local filesystem

Exceptions

from horizon_epoch import (
    HorizonEpochError,           # Base exception
    RepositoryNotInitializedError,
    BranchNotFoundError,
    BranchAlreadyExistsError,
    CommitNotFoundError,
    MergeConflictError,
    ConnectionFailedError,
    StorageError,
    ValidationError,
)

Async Support

All client methods are async and should be awaited:

import asyncio
from horizon_epoch import Client

async def main():
    async with Client.connect("postgresql://...") as client:
        await client.init("my-repo")
        await client.branch("feature")
        await client.commit("Initial commit")

asyncio.run(main())

Native Module Access

For advanced operations, access the native Rust module directly:

from horizon_epoch.client import (
    _native,                  # Native module (if available)
    is_native_available,      # Check if native bindings loaded
    get_native_version,       # Get native module version
    get_native_info,          # Get diagnostic info
    require_native,           # Get native module or raise ImportError
)

if is_native_available():
    print(f"Native version: {get_native_version()}")

    # StorageLocation factory methods for all 8 backends
    # Database backends
    pg_loc = _native.StorageLocation.postgresql("conn", "schema", "table")
    mysql_loc = _native.StorageLocation.mysql("conn", "database", "table")
    mssql_loc = _native.StorageLocation.mssql("conn", "database", "schema", "table")
    sqlite_loc = _native.StorageLocation.sqlite("conn", "table")

    # Object storage backends
    s3_loc = _native.StorageLocation.s3("bucket", "prefix/", format="parquet")
    azure_loc = _native.StorageLocation.azure_blob("account", "container", "prefix/", format="parquet")
    gcs_loc = _native.StorageLocation.gcs("bucket", "prefix/", format="parquet")

    # Filesystem backend
    local_loc = _native.StorageLocation.local_filesystem("/path/to/data/", format="parquet")

    # Constraint types
    unique = _native.UniqueConstraint("uq_email", ["email"])
    check = _native.CheckConstraint("ck_age", "age >= 0")
    fk = _native.ForeignKeyConstraint("fk_user", ["user_id"], "users", ["id"])

    # Index types
    col = _native.IndexColumn("email")
    idx = _native.IndexDefinition("idx_email", [col])

Example Usage

import asyncio
from horizon_epoch import Client, Author, MergeStrategy

async def main():
    async with Client.connect("postgresql://localhost/horizon_epoch") as client:
        # Initialize repository
        await client.init("my-data-repo")

        # Create and checkout feature branch
        await client.branch("feature/new-schema")

        # Make changes (via your data pipeline)
        # ...

        # Stage and commit
        await client.stage_all()
        await client.commit(
            message="Add user preferences",
            author=Author(name="Dev", email="dev@example.com")
        )

        # Merge back to main
        result = await client.merge(
            source="feature/new-schema",
            strategy=MergeStrategy.THREE_WAY
        )

        if result.has_conflicts:
            print(f"Resolve {result.conflict_count} conflicts")
        else:
            print(f"Merged successfully: {result.result_commit_id}")

asyncio.run(main())