Source code for spark_bestfit.backends.factory
"""Backend factory for automatic and explicit backend selection.
This module provides a centralized factory for creating execution backends,
eliminating duplicate auto-detection logic across the codebase.
Example:
>>> from spark_bestfit.backends.factory import BackendFactory
>>>
>>> # Auto-detect from DataFrame type
>>> backend = BackendFactory.for_dataframe(df)
>>>
>>> # Explicit creation with options
>>> backend = BackendFactory.create("local", max_workers=4)
>>> backend = BackendFactory.create("spark", spark_session=spark)
"""
from typing import Any, Literal
from spark_bestfit.protocols import ExecutionBackend
BackendType = Literal["spark", "local", "ray"]
[docs]
class BackendFactory:
"""Factory for creating execution backends.
Provides centralized backend creation with:
- Auto-detection from DataFrame type
- Explicit string-based selection
- Optional dependency handling
Example:
>>> # Auto-detect from DataFrame
>>> backend = BackendFactory.for_dataframe(df)
>>> # Explicit creation
>>> backend = BackendFactory.create("local", max_workers=4)
>>> # Check availability
>>> if BackendFactory.is_available("spark"):
... backend = BackendFactory.create("spark")
"""
[docs]
@classmethod
def for_dataframe(cls, df: Any) -> ExecutionBackend:
"""Auto-detect and create backend based on DataFrame type.
Detection order:
1. Ray Dataset (duck typing: has select_columns and to_pandas)
2. pandas DataFrame (isinstance check)
3. Spark DataFrame (default fallback)
Args:
df: Input DataFrame (Spark, pandas, or Ray Dataset)
Returns:
Appropriate backend instance
Raises:
ImportError: If detected backend's dependencies not installed
"""
import pandas as pd
# Ray Dataset (duck typing - Ray Dataset has no common base class)
if hasattr(df, "select_columns") and hasattr(df, "to_pandas"):
return cls.create("ray")
# pandas DataFrame
elif isinstance(df, pd.DataFrame):
return cls.create("local")
# Spark DataFrame (default)
else:
return cls.create("spark")
[docs]
@classmethod
def create(
cls,
backend_type: BackendType,
**kwargs: Any,
) -> ExecutionBackend:
"""Create a specific backend by name.
Args:
backend_type: One of "spark", "local", "ray"
**kwargs: Backend-specific arguments:
- spark: spark_session (optional SparkSession)
- local: max_workers (optional int)
- ray: (no options currently)
Returns:
Backend instance
Raises:
ValueError: If backend_type is unknown
ImportError: If required dependencies not installed
"""
if backend_type == "spark":
from spark_bestfit.backends.spark import SparkBackend
return SparkBackend(kwargs.get("spark_session"))
elif backend_type == "local":
from spark_bestfit.backends.local import LocalBackend
return LocalBackend(kwargs.get("max_workers"))
elif backend_type == "ray":
from spark_bestfit.backends.ray import RayBackend
return RayBackend()
else:
raise ValueError(f"Unknown backend type: {backend_type}. " f"Valid options: 'spark', 'local', 'ray'")
[docs]
@classmethod
def is_available(cls, backend_type: BackendType) -> bool:
"""Check if a backend's dependencies are installed.
Args:
backend_type: Backend to check
Returns:
True if dependencies are available
"""
if backend_type == "local":
return True # No external dependencies
elif backend_type == "spark":
try:
import pyspark # noqa: F401
return True
except ImportError:
return False
elif backend_type == "ray":
try:
import ray # noqa: F401
return True
except ImportError:
return False
return False
[docs]
@classmethod
def get_available(cls) -> list[BackendType]:
"""Get list of available backends.
Returns:
List of backend types with installed dependencies.
Always includes "local" as it has no external deps.
"""
available: list[BackendType] = ["local"] # Always available
if cls.is_available("spark"):
available.append("spark")
if cls.is_available("ray"):
available.append("ray")
return available