Progress Tracking¶
spark-bestfit supports progress tracking for distribution fitting operations. This allows you to monitor long-running jobs and provide feedback to users.
Quick Start¶
The easiest way to enable progress tracking is with the built-in console_progress() utility:
from spark_bestfit import DistributionFitter
from spark_bestfit.progress import console_progress
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value", progress_callback=console_progress())
print() # Newline after progress
This displays progress like:
Progress: 45/100 tasks (45.0%)
You can customize the prefix:
results = fitter.fit(df, column="value", progress_callback=console_progress("Fitting distributions"))
# Output: Fitting distributions: 45/100 tasks (45.0%)
Custom Callbacks¶
For full control, pass any function matching the ProgressCallback signature:
from spark_bestfit import DistributionFitter
def on_progress(completed: int, total: int, percent: float) -> None:
print(f"\rFitting: {completed}/{total} ({percent:.1f}%)", end="", flush=True)
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value", progress_callback=on_progress)
print() # Newline after progress
The callback receives three arguments:
completed: Number of tasks completed so fartotal: Total number of tasks in the jobpercent: Percentage complete (0.0 to 100.0)
With tqdm¶
Integration with tqdm for progress bars:
from tqdm import tqdm
from spark_bestfit import DistributionFitter
pbar = None
def tqdm_callback(completed: int, total: int, percent: float) -> None:
global pbar
if pbar is None:
pbar = tqdm(total=total, desc="Fitting distributions")
pbar.n = completed
pbar.refresh()
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value", progress_callback=tqdm_callback)
if pbar:
pbar.close()
Discrete Distribution Fitting¶
Progress tracking works the same way for discrete distributions:
from spark_bestfit import DiscreteDistributionFitter
fitter = DiscreteDistributionFitter(spark)
results = fitter.fit(
df,
column="counts",
progress_callback=on_progress
)
Multi-Column Fitting¶
When fitting multiple columns, progress reflects aggregate completion across all columns:
results = fitter.fit(
df,
columns=["col1", "col2", "col3"],
progress_callback=on_progress,
)
# Progress shows total tasks across all 3 columns
Thread Safety¶
Warning
The callback is invoked from a background thread. Ensure your callback implementation is thread-safe. Avoid modifying shared state without proper synchronization.
For thread-safe progress tracking:
import threading
class ThreadSafeProgress:
def __init__(self):
self.lock = threading.Lock()
self.last_percent = 0.0
def __call__(self, completed: int, total: int, percent: float) -> None:
with self.lock:
if percent - self.last_percent >= 5.0: # Update every 5%
print(f"Progress: {percent:.1f}%")
self.last_percent = percent
progress = ThreadSafeProgress()
results = fitter.fit(df, column="value", progress_callback=progress)
How It Works¶
Progress tracking uses Spark’s built-in StatusTracker API:
When a
progress_callbackis provided, aProgressTrackeris createdThe tracker sets a unique job group on SparkContext
A background thread polls Spark’s StatusTracker every 100ms
When stage/task progress changes, the callback is invoked
After fitting completes, the tracker is automatically stopped
This approach:
Has minimal overhead (~0.1% increase in runtime)
Works on all Spark environments (local, YARN, Kubernetes, Databricks)
Provides partition-level granularity (~16 updates for typical jobs)
Does not require any changes to the Spark job itself
Advanced Usage: ProgressTracker¶
For more control, you can use ProgressTracker directly:
from spark_bestfit import ProgressTracker
def on_progress(completed: int, total: int, percent: float) -> None:
print(f"Progress: {percent:.1f}%")
# Using context manager
with ProgressTracker(spark, on_progress) as tracker:
# Any Spark operations here will be tracked
results = fitter.fit(df, column="value")
# Or manual start/stop
tracker = ProgressTracker(spark, on_progress, poll_interval=0.5)
tracker.start()
try:
results = fitter.fit(df, column="value")
finally:
tracker.stop()
ProgressTracker parameters:
spark: SparkSession instance (or None to use active session)callback: Progress callback functionpoll_interval: Seconds between status checks (default: 0.1)job_group: Custom job group identifier (auto-generated if None)
Performance Notes¶
Progress tracking adds minimal overhead (~0.1% increase in runtime)
The polling interval of 100ms provides a good balance between responsiveness and overhead
No impact on Spark job execution - tracking is purely observational
Works with all Spark cluster managers (standalone, YARN, Kubernetes, Mesos)
Understanding Progress Output¶
Progress values may appear to fluctuate during fitting:
Progress: 34/85 tasks (40.0%)
Progress: 65/156 tasks (41.7%)
Progress: 115/216 tasks (53.2%)
This is expected behavior:
Total increases: Each distribution fit triggers Spark stages. As new stages start, the total task count grows (85 -> 156 -> 216 in the example above).
Percentage can decrease: When a new stage starts, the denominator increases before its tasks complete, temporarily lowering the percentage.
Final may not reach 100%: The job may complete between polling intervals (100ms), so the last captured progress might be less than 100%.
The key observation is that progress generally trends upward, and the job completes successfully. For long-running fits (many distributions or large datasets), you will see many incremental updates as stages complete.