Source code for spark_bestfit.utils
"""Utility functions for spark-bestfit."""
from typing import Optional
# PySpark is optional - only import if available
try:
from pyspark.sql import SparkSession
_PYSPARK_AVAILABLE = True
except ImportError:
SparkSession = None # type: ignore[assignment,misc]
_PYSPARK_AVAILABLE = False
[docs]
def get_spark_session(spark: Optional["SparkSession"] = None) -> "SparkSession":
"""Get or create a SparkSession.
If a SparkSession is provided, it is returned as-is.
If None is provided, attempts to get the active SparkSession.
Args:
spark: Optional SparkSession. If None, gets the active session.
Returns:
SparkSession instance
Raises:
RuntimeError: If no SparkSession is provided and no active session exists
Example:
>>> # Use existing session
>>> spark = SparkSession.builder.appName("my-app").getOrCreate()
>>> session = get_spark_session(spark)
>>>
>>> # Use active session
>>> session = get_spark_session() # Gets active session
"""
if not _PYSPARK_AVAILABLE:
raise ImportError(
"PySpark is required but not installed. "
"Install with: pip install spark-bestfit[spark]\n"
"Or use a non-Spark backend: LocalBackend() or RayBackend()"
)
if spark is not None:
return spark
active_session = SparkSession.getActiveSession()
if active_session is not None:
return active_session
raise RuntimeError(
"No SparkSession provided and no active session found. "
"Please create a SparkSession first:\n"
" spark = SparkSession.builder.appName('my-app').getOrCreate()"
)