Python SDK Reference
Complete reference for the Horizon Epoch Python SDK.
Installation
The Python SDK requires building from source (see Installation Guide).
# From project root
cd python && uv sync
uv run maturin develop --manifest-path ../crates/horizon-epoch-py/Cargo.toml
Quick Start
from horizon_epoch import Client, ClientConfig, Author
# Create client configuration
config = ClientConfig(
metadata_url="postgresql://localhost/horizon_epoch"
)
# Use async context manager for connection management
async with Client.connect("postgresql://localhost/horizon_epoch") as client:
# Initialize a repository
await client.init("my-data-repo")
# Create a branch
await client.branch("feature-x")
# Make changes and commit
await client.commit("Initial commit")
Classes
ClientConfig
Configuration for the Horizon Epoch client.
from horizon_epoch import ClientConfig, Author
config = ClientConfig(
metadata_url="postgresql://user:pass@localhost/horizon_epoch",
default_branch="main",
author=Author(name="Dev", email="dev@example.com"),
timeout_seconds=30.0,
retry_count=3,
log_level="INFO",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
metadata_url | str | required | Metadata database URL |
default_branch | str | "main" | Default branch name |
author | Author | None | Default commit author |
timeout_seconds | float | 30.0 | Operation timeout |
retry_count | int | 3 | Retry count for transient failures |
log_level | str | "INFO" | Logging level |
Client
Main client for interacting with Horizon Epoch.
Connection Management
from horizon_epoch import Client
# Option 1: Async context manager (recommended)
async with Client.connect("postgresql://...") as client:
await client.init("my-repo")
# Option 2: Manual connection management
client = Client.from_url("postgresql://...")
await client.open()
try:
await client.init("my-repo")
finally:
await client.close()
Repository Operations
# Initialize a new repository
config = await client.init(
name="my-repo",
storage_configs={"primary": {"backend": "postgresql", "url": "..."}},
description="My data repository"
)
# Open an existing repository
config = await client.open_repository("my-repo")
# Get repository status
status = await client.status(branch="main")
Branch Operations
# Create a branch
branch = await client.branch(
name="feature/new-schema",
start_point="main", # Optional, defaults to default branch
checkout=True # Optional, defaults to True
)
# List all branches
branches = await client.branches()
# Get a specific branch
branch = await client.get_branch("main")
# Checkout a branch
status = await client.checkout("feature/new-schema")
# Delete a branch
await client.delete_branch("old-feature", force=False)
# Compare branches
comparison = await client.compare_branches("feature-x", "main")
print(f"Ahead: {comparison.ahead_count}, Behind: {comparison.behind_count}")
Commit Operations
from horizon_epoch import Author
# Create a commit
commit = await client.commit(
message="Add user preferences",
author=Author(name="Dev", email="dev@example.com"),
branch="feature-x", # Optional
tables=["users"] # Optional, defaults to all staged
)
# View commit history
log = await client.log(ref="main", limit=10, skip=0)
for entry in log.entries:
print(f"{entry.commit.short_id}: {entry.commit.message}")
# Get commit details
commit = await client.show("abc123")
Diff Operations
# Diff between branches
diff = await client.diff(
base="main",
target="feature-x",
tables=["users"], # Optional
include_record_diff=True # Optional
)
for table_diff in diff.table_diffs:
print(f"{table_diff.table_name}: {table_diff.total_changes} changes")
Merge Operations
from horizon_epoch import MergeStrategy
# Merge a branch
result = await client.merge(
source="feature-x",
target="main", # Optional, defaults to default branch
strategy=MergeStrategy.THREE_WAY, # Optional
message="Merge feature-x", # Optional
dry_run=False # Optional
)
if result.has_conflicts:
print(f"Conflicts: {result.conflict_count}")
else:
print(f"Merged: {result.result_commit_id}")
# Abort a merge
await client.abort_merge(conflict_id="...")
Storage Operations
Horizon Epoch supports 8 storage backend types across databases, object storage, and filesystems.
from horizon_epoch import StorageBackend
# =============================================================================
# DATABASE BACKENDS
# =============================================================================
# PostgreSQL
backend_id = await client.add_storage(
name="postgres-warehouse",
backend=StorageBackend.POSTGRESQL,
config={"url": "postgresql://user:pass@localhost:5432/mydb"},
description="PostgreSQL data warehouse"
)
# MySQL
backend_id = await client.add_storage(
name="mysql-analytics",
backend=StorageBackend.MYSQL,
config={"url": "mysql://user:pass@localhost:3306/analytics"},
description="MySQL analytics database"
)
# Microsoft SQL Server
backend_id = await client.add_storage(
name="mssql-legacy",
backend=StorageBackend.MSSQL,
config={"url": "mssql://user:pass@localhost:1433/legacy_db"},
description="SQL Server legacy system"
)
# SQLite
backend_id = await client.add_storage(
name="sqlite-local",
backend=StorageBackend.SQLITE,
config={"path": "/data/local.db"},
description="Local SQLite database"
)
# =============================================================================
# OBJECT STORAGE BACKENDS
# =============================================================================
# AWS S3 / S3-compatible storage
backend_id = await client.add_storage(
name="s3-datalake",
backend=StorageBackend.S3,
config={
"bucket": "my-datalake",
"region": "us-west-2",
"endpoint": None, # Use AWS default, or specify for MinIO/LocalStack
},
description="S3 data lake"
)
# Azure Blob Storage
backend_id = await client.add_storage(
name="azure-warehouse",
backend=StorageBackend.AZURE,
config={
"account": "mystorageaccount",
"container": "data-warehouse",
},
description="Azure Blob data warehouse"
)
# Google Cloud Storage
backend_id = await client.add_storage(
name="gcs-analytics",
backend=StorageBackend.GCS,
config={
"bucket": "my-analytics-bucket",
"project": "my-gcp-project",
},
description="GCS analytics storage"
)
# =============================================================================
# FILESYSTEM BACKEND
# =============================================================================
# Local filesystem
backend_id = await client.add_storage(
name="local-dev",
backend=StorageBackend.LOCAL,
config={"path": "/data/epoch-storage"},
description="Local development storage"
)
# =============================================================================
# STORAGE MANAGEMENT
# =============================================================================
# List all storage backends
backends = await client.list_storage()
for name, backend_type in backends.items():
print(f"{name}: {backend_type.value}")
# Get storage details
config = await client.get_storage("s3-datalake")
print(f"Backend: {config.name}, Type: {config.backend_type}")
# Get detailed storage info
backends_info = await client.list_storage_info()
for backend in backends_info:
status = "default" if backend.is_default else "enabled"
print(f"{backend.name} ({backend.backend_type}): {status}")
# Remove storage backend
await client.remove_storage("old-storage")
Table Operations
Tables are tracked using StorageLocation objects that specify where the data lives. The native module provides factory methods for all 8 storage backends.
# Access native module for StorageLocation
from horizon_epoch.client import _native
# =============================================================================
# DATABASE STORAGE LOCATIONS
# =============================================================================
# PostgreSQL: connection_name, schema, table
loc = _native.StorageLocation.postgresql("main", "public", "users")
reg = await client.track_table("users", loc)
# MySQL: connection_name, database, table
loc = _native.StorageLocation.mysql("analytics", "sales_db", "orders")
reg = await client.track_table("orders", loc)
# Microsoft SQL Server: connection_name, database, schema, table
loc = _native.StorageLocation.mssql("legacy", "warehouse", "dbo", "customers")
reg = await client.track_table("customers", loc)
# SQLite: connection_name, table
loc = _native.StorageLocation.sqlite("local-db", "products")
reg = await client.track_table("products", loc)
# =============================================================================
# OBJECT STORAGE LOCATIONS
# =============================================================================
# S3: bucket, prefix, format (parquet, delta, csv)
loc = _native.StorageLocation.s3("my-datalake", "tables/events/", format="parquet")
reg = await client.track_table("events", loc)
# S3 with Delta Lake format
loc = _native.StorageLocation.s3("my-datalake", "delta/transactions/", format="delta")
reg = await client.track_table("transactions", loc)
# Azure Blob Storage: account, container, prefix, format
loc = _native.StorageLocation.azure_blob(
"mystorageaccount", "warehouse", "data/metrics/", format="parquet"
)
reg = await client.track_table("metrics", loc)
# Google Cloud Storage: bucket, prefix, format
loc = _native.StorageLocation.gcs("my-analytics", "tables/sessions/", format="parquet")
reg = await client.track_table("sessions", loc)
# =============================================================================
# FILESYSTEM STORAGE LOCATIONS
# =============================================================================
# Local filesystem: path, format
loc = _native.StorageLocation.local_filesystem("/data/exports/reports/", format="parquet")
reg = await client.track_table("reports", loc)
# =============================================================================
# TABLE MANAGEMENT
# =============================================================================
# View registration result
print(f"Tracked: {reg.table_name}")
print(f"Content hash: {reg.content_hash}")
print(f"Record count: {reg.record_count}")
print(f"Location: {reg.storage_location_uri}")
# List all tracked tables
tables = await client.list_tables()
for table in tables:
print(f" - {table}")
# Untrack a table (data is preserved, only tracking is removed)
await client.untrack_table("old-table")
Staging Operations
# Stage all changes
result = await client.stage_all(branch="main")
print(f"Staged {result.tables_affected} tables")
# Stage specific table
await client.stage("users", branch="main")
# Unstage all
result = await client.unstage_all(branch="main")
# Unstage specific table
await client.unstage("users", branch="main")
Constraint Operations
# Get constraints for a table
constraints = await client.get_constraints("users", branch="main")
print(f"Foreign keys: {len(constraints.foreign_keys)}")
print(f"Unique: {len(constraints.unique_constraints)}")
# Add a constraint (using native types)
from horizon_epoch.client import _native
constraint = _native.UniqueConstraint("uq_email", ["email"])
await client.add_constraint("users", constraint)
# Remove a constraint
await client.remove_constraint("users", "uq_email")
# Diff constraints between branches
diff = await client.diff_constraints("main", "feature-x", table_name="users")
# Add an index
col = _native.IndexColumn("email")
idx = _native.IndexDefinition("idx_email", [col])
await client.add_index("users", idx)
# Remove an index
await client.remove_index("users", "idx_email")
Models
Author
from horizon_epoch import Author
author = Author(name="Jane Developer", email="jane@example.com")
Branch
from horizon_epoch import Branch
# Returned from client.branch(), client.branches(), client.get_branch()
branch.name # str
branch.head_commit_id # str
branch.created_at # datetime
branch.is_default # bool
Commit
from horizon_epoch import Commit
# Returned from client.commit(), client.show()
commit.id # str (full commit ID)
commit.short_id # str (abbreviated)
commit.message # str
commit.author # Author
commit.timestamp # datetime
commit.parent_ids # List[str]
MergeResult
from horizon_epoch import MergeResult
result.source_branch # str
result.target_branch # str
result.is_fast_forward # bool
result.has_conflicts # bool
result.conflict_count # int
result.result_commit_id # Optional[str]
result.conflicts # List[RecordConflict]
Status
from horizon_epoch import Status
status.branch # Optional[str]
status.commit_id # Optional[str]
status.is_detached # bool
status.staged_tables # List[str]
status.unstaged_tables # List[str]
Enums
from horizon_epoch import MergeStrategy, StorageBackend
# Merge strategies
MergeStrategy.THREE_WAY # Standard three-way merge
MergeStrategy.OURS # Keep target branch version
MergeStrategy.THEIRS # Keep source branch version
MergeStrategy.MANUAL # Fail on conflicts
# Storage backends - Database
StorageBackend.POSTGRESQL # PostgreSQL database
StorageBackend.MYSQL # MySQL database
StorageBackend.MSSQL # Microsoft SQL Server
StorageBackend.SQLITE # SQLite database
# Storage backends - Object Storage
StorageBackend.S3 # AWS S3 / S3-compatible storage
StorageBackend.AZURE # Azure Blob Storage
StorageBackend.GCS # Google Cloud Storage
# Storage backends - Filesystem
StorageBackend.LOCAL # Local filesystem
Exceptions
from horizon_epoch import (
HorizonEpochError, # Base exception
RepositoryNotInitializedError,
BranchNotFoundError,
BranchAlreadyExistsError,
CommitNotFoundError,
MergeConflictError,
ConnectionFailedError,
StorageError,
ValidationError,
)
Async Support
All client methods are async and should be awaited:
import asyncio
from horizon_epoch import Client
async def main():
async with Client.connect("postgresql://...") as client:
await client.init("my-repo")
await client.branch("feature")
await client.commit("Initial commit")
asyncio.run(main())
Native Module Access
For advanced operations, access the native Rust module directly:
from horizon_epoch.client import (
_native, # Native module (if available)
is_native_available, # Check if native bindings loaded
get_native_version, # Get native module version
get_native_info, # Get diagnostic info
require_native, # Get native module or raise ImportError
)
if is_native_available():
print(f"Native version: {get_native_version()}")
# StorageLocation factory methods for all 8 backends
# Database backends
pg_loc = _native.StorageLocation.postgresql("conn", "schema", "table")
mysql_loc = _native.StorageLocation.mysql("conn", "database", "table")
mssql_loc = _native.StorageLocation.mssql("conn", "database", "schema", "table")
sqlite_loc = _native.StorageLocation.sqlite("conn", "table")
# Object storage backends
s3_loc = _native.StorageLocation.s3("bucket", "prefix/", format="parquet")
azure_loc = _native.StorageLocation.azure_blob("account", "container", "prefix/", format="parquet")
gcs_loc = _native.StorageLocation.gcs("bucket", "prefix/", format="parquet")
# Filesystem backend
local_loc = _native.StorageLocation.local_filesystem("/path/to/data/", format="parquet")
# Constraint types
unique = _native.UniqueConstraint("uq_email", ["email"])
check = _native.CheckConstraint("ck_age", "age >= 0")
fk = _native.ForeignKeyConstraint("fk_user", ["user_id"], "users", ["id"])
# Index types
col = _native.IndexColumn("email")
idx = _native.IndexDefinition("idx_email", [col])
Example Usage
import asyncio
from horizon_epoch import Client, Author, MergeStrategy
async def main():
async with Client.connect("postgresql://localhost/horizon_epoch") as client:
# Initialize repository
await client.init("my-data-repo")
# Create and checkout feature branch
await client.branch("feature/new-schema")
# Make changes (via your data pipeline)
# ...
# Stage and commit
await client.stage_all()
await client.commit(
message="Add user preferences",
author=Author(name="Dev", email="dev@example.com")
)
# Merge back to main
result = await client.merge(
source="feature/new-schema",
strategy=MergeStrategy.THREE_WAY
)
if result.has_conflicts:
print(f"Resolve {result.conflict_count} conflicts")
else:
print(f"Merged successfully: {result.result_commit_id}")
asyncio.run(main())