API Reference

Core

Core distribution fitting engine for Spark - re-exports for backward compatibility.

This module provides backward-compatible imports for: - DistributionFitter (continuous distributions) - DiscreteDistributionFitter (discrete/count data) - TruncatedFrozenDist (truncated distribution wrapper) - Default exclusion constants

The actual implementations are in: - spark_bestfit.continuous_fitter (DistributionFitter) - spark_bestfit.discrete_fitter (DiscreteDistributionFitter) - spark_bestfit.truncated (TruncatedFrozenDist)

class spark_bestfit.core.DiscreteDistributionFitter(spark: SparkSession | None = None, excluded_distributions: Tuple[str, ...] | None = None, random_seed: int = 42, backend: ExecutionBackend | None = None)[source]

Bases: BaseFitter

Spark distribution fitting engine for discrete (count) data.

Efficiently fits scipy.stats discrete distributions to integer data using Spark’s parallel processing capabilities. Uses MLE optimization since scipy discrete distributions don’t have a built-in fit() method.

Metric Selection:

For discrete distributions, AIC is recommended for model selection: - aic: Proper model selection criterion with complexity penalty - bic: Similar to AIC but stronger penalty for complex models - ks_statistic: Valid for ranking, but p-values are not reliable - sse: Simple comparison metric

The K-S test assumes continuous distributions. For discrete data, the K-S statistic can rank fits, but p-values are conservative and should not be used for hypothesis testing.

Example

>>> from pyspark.sql import SparkSession
>>> from spark_bestfit import DiscreteDistributionFitter
>>>
>>> spark = SparkSession.builder.appName("my-app").getOrCreate()
>>> df = spark.createDataFrame([(x,) for x in count_data], ['counts'])
>>>
>>> fitter = DiscreteDistributionFitter(spark)
>>> results = fitter.fit(df, column='counts')
>>>
>>> # Use AIC for model selection (recommended)
>>> best = results.best(n=1, metric='aic')[0]
>>> print(f"Best: {best.distribution} (AIC={best.aic:.2f})")
fit(df: DataFrame, column: str | None = None, columns: List[str] | None = None, config: FitterConfig | None = None, *, max_distributions: int | None = None, enable_sampling: bool = True, sample_fraction: float | None = None, max_sample_size: int = 1000000, sample_threshold: int = 10000000, num_partitions: int | None = None, progress_callback: Callable[[int, int, float], None] | None = None, bounded: bool = False, lower_bound: float | Dict[str, float] | None = None, upper_bound: float | Dict[str, float] | None = None, lazy_metrics: bool = False, prefilter: bool | str = False) EagerFitResults | LazyFitResults[source]

Fit discrete distributions to integer data column(s).

Parameters:
  • df – Spark DataFrame containing integer count data

  • column – Name of single column to fit distributions to

  • columns – List of column names for multi-column fitting

  • config – FitterConfig object (v2.2.0). Provides a cleaner way to configure fitting with many parameters. If provided, individual parameters below are ignored (except progress_callback which can override the config’s callback). Note: bins, use_rice_rule, support_at_zero, and prefilter in config are ignored for discrete fitting.

  • max_distributions – Limit number of distributions (for testing)

  • enable_sampling – Enable sampling for large datasets

  • sample_fraction – Fraction to sample (None = auto-determine)

  • max_sample_size – Maximum rows to sample when auto-determining

  • sample_threshold – Row count above which sampling is applied

  • num_partitions – Spark partitions (None = auto-determine)

  • progress_callback – Optional callback for progress updates. Called with (completed_tasks, total_tasks, percent_complete). Callback is invoked from background thread - ensure thread-safety.

  • bounded – Enable bounded distribution fitting. When True, bounds are auto-detected from data or use explicit lower_bound/upper_bound.

  • lower_bound – Lower bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s minimum.

  • upper_bound – Upper bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s maximum.

  • lazy_metrics – If True, defer computation of expensive KS metrics until accessed (v1.5.0). Improves fitting performance when only using AIC/BIC/SSE for model selection. Default False for backward compatibility.

  • prefilter – Pre-filter distributions (v1.6.0). Currently only supported for continuous distributions. For discrete, this parameter is accepted but ignored (logs a warning if enabled).

Returns:

FitResults object with fitted distributions

Raises:
  • ValueError – If column not found, DataFrame empty, or invalid params

  • TypeError – If column is not numeric

Example

>>> # Using FitterConfig (v2.2.0)
>>> from spark_bestfit import FitterConfigBuilder
>>> config = (FitterConfigBuilder()
...     .with_bounds(lower=0, upper=100)
...     .with_sampling(fraction=0.1)
...     .build())
>>> results = fitter.fit(df, column='counts', config=config)
>>>
>>> # Single column (backward compatible)
>>> results = fitter.fit(df, column='counts')
>>> best = results.best(n=1, metric='aic')
>>>
>>> # Multi-column
>>> results = fitter.fit(df, columns=['counts1', 'counts2'])
>>> best_per_col = results.best_per_column(n=1, metric='aic')
>>>
>>> # Bounded fitting
>>> results = fitter.fit(df, column='counts', bounded=True, lower_bound=0, upper_bound=100)
>>>
>>> # Lazy metrics for faster fitting when only using AIC/BIC (v1.5.0)
>>> results = fitter.fit(df, 'counts', lazy_metrics=True)
>>> best_aic = results.best(n=1, metric='aic')[0]  # Fast, no KS computed
plot(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, title: str = '', xlabel: str = 'Value', ylabel: str = 'Probability', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.7, pmf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]

Plot fitted discrete distribution against data histogram.

Parameters:
  • result – DistributionFitResult to plot

  • df – DataFrame with data. If None, uses cached sample from result (v2.10.0). When a cached sample exists and force_recompute is False, the cached sample is used and df is ignored (a warning is emitted).

  • column – Column name. If None, uses column_name from result.

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • show_histogram – Show data histogram

  • histogram_alpha – Histogram transparency (0-1)

  • pmf_linewidth – Line width for PMF curve

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Path to save figure (optional)

  • save_format – Save format (png, pdf, svg)

  • force_recompute – If True, ignore cached sample and recompute from df. Default False (v3.0.2).

Returns:

Tuple of (figure, axis) from matplotlib

Example

>>> best = results.best(n=1)[0]
>>> # v3.0.2: instant plotting using cached sample (default)
>>> fitter.plot(best, title='Instant Plot')
>>> # Force recompute from DataFrame
>>> fitter.plot(best, df, 'value', title='Recomputed', force_recompute=True)
class spark_bestfit.core.DistributionFitter(spark: SparkSession | None = None, excluded_distributions: Tuple[str, ...] | None = None, random_seed: int = 42, backend: ExecutionBackend | None = None)[source]

Bases: BaseFitter

Modern Spark distribution fitting engine.

Efficiently fits ~90 scipy.stats distributions to data using Spark’s parallel processing capabilities. Uses broadcast variables and Pandas UDFs to avoid data collection and minimize serialization overhead.

Example

>>> from pyspark.sql import SparkSession
>>> from spark_bestfit import DistributionFitter
>>>
>>> # Create your own SparkSession
>>> spark = SparkSession.builder.appName("my-app").getOrCreate()
>>> df = spark.createDataFrame([(float(x),) for x in data], ['value'])
>>>
>>> # Simple usage
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, column='value')
>>> best = results.best(n=1)[0]
>>> print(f"Best: {best.distribution} with SSE={best.sse}")
>>>
>>> # With custom parameters
>>> fitter = DistributionFitter(spark, random_seed=123)
>>> results = fitter.fit(df, 'value', bins=100, support_at_zero=True)
>>>
>>> # Plot the best fit
>>> fitter.plot(best, df, 'value', title='Best Fit')
fit(df: DataFrame, column: str | None = None, columns: List[str] | None = None, config: FitterConfig | None = None, *, bins: int | Tuple[float, ...] = 50, use_rice_rule: bool = True, support_at_zero: bool = False, max_distributions: int | None = None, enable_sampling: bool = True, sample_fraction: float | None = None, max_sample_size: int = 1000000, sample_threshold: int = 10000000, num_partitions: int | None = None, progress_callback: Callable[[int, int, float], None] | None = None, bounded: bool = False, lower_bound: float | Dict[str, float] | None = None, upper_bound: float | Dict[str, float] | None = None, lazy_metrics: bool = False, prefilter: bool | str = False, estimation_method: str = 'mle') EagerFitResults | LazyFitResults[source]

Fit distributions to data column(s).

Parameters:
  • df – Spark DataFrame containing data

  • column – Name of single column to fit distributions to

  • columns – List of column names for multi-column fitting

  • config – FitterConfig object (v2.2.0). Provides a cleaner way to configure fitting with many parameters. If provided, individual parameters below are ignored (except progress_callback which can override the config’s callback). Use FitterConfigBuilder for fluent configuration.

  • bins – Number of histogram bins or tuple of bin edges

  • use_rice_rule – Use Rice rule to auto-determine bin count

  • support_at_zero – Only fit non-negative distributions

  • max_distributions – Limit number of distributions (for testing)

  • enable_sampling – Enable sampling for large datasets

  • sample_fraction – Fraction to sample (None = auto-determine)

  • max_sample_size – Maximum rows to sample when auto-determining

  • sample_threshold – Row count above which sampling is applied

  • num_partitions – Spark partitions (None = auto-determine)

  • progress_callback – Optional callback for progress updates. Called with (completed_tasks, total_tasks, percent_complete). Callback is invoked from background thread - ensure thread-safety.

  • bounded – If True, fit truncated distributions (v1.4.0). When enabled, distributions are truncated to [lower_bound, upper_bound] using scipy.stats.truncate(). Requires scipy >= 1.14.0.

  • lower_bound – Lower bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s minimum.

  • upper_bound – Upper bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s maximum.

  • lazy_metrics – If True, defer computation of expensive KS/AD metrics until accessed (v1.5.0). Improves fitting performance when only using AIC/BIC/SSE for model selection. Default False for backward compatibility.

  • prefilter – Pre-filter distributions based on data characteristics (v1.6.0). Skips distributions that are mathematically incompatible with the data, reducing fitting time by 30-70% for non-normal data. - False (default): No pre-filtering, fit all distributions - True: Safe mode - filters by support bounds and skewness sign - ‘aggressive’: Also filters by kurtosis (may skip valid distributions) Pre-filtering uses scipy’s distribution support bounds (dist.a, dist.b) and sample moments. Filtered distributions are logged for transparency.

  • estimation_method

    Parameter estimation method (v2.5.0):

    • ”mle”: Maximum Likelihood Estimation (default). Fast and accurate for most distributions. Uses scipy.stats.fit().

    • ”mse”: Maximum Spacing Estimation. More robust for heavy-tailed distributions (Pareto, Cauchy, etc.) where MLE may fail.

    • ”auto”: Automatically select MSE for heavy-tailed data based on kurtosis and extreme value analysis, MLE otherwise.

Returns:

FitResults object with fitted distributions

Raises:
  • ValueError – If column not found, DataFrame empty, or invalid params

  • TypeError – If column is not numeric

Example

>>> # Using FitterConfig (recommended for complex configs, v2.2.0)
>>> from spark_bestfit import FitterConfigBuilder
>>> config = (FitterConfigBuilder()
...     .with_bins(100)
...     .with_bounds(lower=0, upper=100)
...     .with_sampling(fraction=0.1)
...     .build())
>>> results = fitter.fit(df, column='value', config=config)
>>>
>>> # Single column (backward compatible)
>>> results = fitter.fit(df, column='value')
>>> results = fitter.fit(df, 'value', bins=100, support_at_zero=True)
>>>
>>> # Multi-column
>>> results = fitter.fit(df, columns=['col1', 'col2', 'col3'])
>>> best_col1 = results.for_column('col1').best(n=1)[0]
>>> best_per_col = results.best_per_column(n=1)
>>>
>>> # Bounded fitting (v1.4.0)
>>> results = fitter.fit(df, 'value', bounded=True)  # Auto-detect bounds
>>> results = fitter.fit(df, 'value', bounded=True, lower_bound=0, upper_bound=100)
>>>
>>> # Lazy metrics for faster fitting when only using AIC/BIC (v1.5.0)
>>> results = fitter.fit(df, 'value', lazy_metrics=True)
>>> best_aic = results.best(n=1, metric='aic')[0]  # Fast, no KS/AD computed
get_custom_distributions() dict[source]

Get all registered custom distributions.

Returns:

Dict mapping distribution names to rv_continuous objects

plot(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, bins: int | Tuple[float, ...] = 50, use_rice_rule: bool = True, title: str = '', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]

Plot fitted distribution against data histogram.

Parameters:
  • result – DistributionFitResult to plot

  • df – DataFrame with data. Optional when result contains a cached sample (the default after fitting). When both a cached sample and df are provided, the cached sample is used unless force_recompute=True.

  • column – Column name. If None, uses column_name from result.

  • bins – Number of histogram bins

  • use_rice_rule – Use Rice rule for bins

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • show_histogram – Show data histogram

  • histogram_alpha – Histogram transparency (0-1)

  • pdf_linewidth – Line width for PDF curve

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Path to save figure (optional)

  • save_format – Save format (png, pdf, svg)

  • force_recompute – If True, ignore cached sample and recompute histogram from df (requires df to be provided). Default False.

Returns:

Tuple of (figure, axis) from matplotlib

Example

>>> # Instant plot from cached sample (recommended)
>>> fitter.plot(best, title='Instant Plot')
>>> # Explicit DataFrame (recomputes histogram via Spark)
>>> fitter.plot(best, df, 'value', title='Best Fit', force_recompute=True)
plot_comparison(results: List[DistributionFitResult], df: DataFrame | None = None, column: str | None = None, bins: int | Tuple[float, ...] = 50, use_rice_rule: bool = True, title: str = 'Distribution Comparison', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]

Plot multiple distributions for comparison.

Parameters:
  • results – List of DistributionFitResult objects

  • df – DataFrame with data. Optional when results contain a cached sample. When both a cached sample and df are provided, the cached sample is used unless force_recompute=True.

  • column – Column name. If None, uses column_name from the first result.

  • bins – Number of histogram bins

  • use_rice_rule – Use Rice rule for bins

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch

  • show_histogram – Show data histogram

  • histogram_alpha – Histogram transparency

  • pdf_linewidth – PDF line width

  • title_fontsize – Title font size

  • label_fontsize – Label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency

  • save_path – Path to save figure

  • save_format – Save format

  • force_recompute – If True, ignore cached samples and recompute histogram from df (requires df to be provided). Default False.

Returns:

Tuple of (figure, axis)

Example

>>> top_3 = results.best(n=3)
>>> # Instant comparison from cached sample (recommended)
>>> fitter.plot_comparison(top_3)
>>> # Explicit DataFrame (recomputes histogram via Spark)
>>> fitter.plot_comparison(top_3, df, 'value', force_recompute=True)
plot_pp(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, max_points: int = 1000, title: str = '', xlabel: str = 'Theoretical Probabilities', ylabel: str = 'Sample Probabilities', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]

Create a P-P plot to assess goodness-of-fit.

A P-P (probability-probability) plot compares the empirical CDF of the sample data against the theoretical CDF of the fitted distribution. Points falling close to the reference line indicate a good fit, particularly in the center of the distribution.

Parameters:
  • result – DistributionFitResult to plot

  • df – DataFrame with data. Optional when result contains a cached sample (the default after fitting). When both a cached sample and df are provided, the cached sample is used unless force_recompute=True.

  • column – Column name. If None, uses column_name from result.

  • max_points – Maximum data points to sample for plotting

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • marker – Marker style for data points

  • marker_size – Size of markers

  • marker_alpha – Marker transparency (0-1)

  • marker_color – Color of markers

  • line_color – Color of reference line

  • line_style – Style of reference line

  • line_width – Width of reference line

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Path to save figure (optional)

  • save_format – Save format (png, pdf, svg)

  • force_recompute – If True, ignore cached sample and resample from df (requires df to be provided). Default False.

Returns:

Tuple of (figure, axis) from matplotlib

Example

>>> best = results.best(n=1)[0]
>>> # Instant P-P plot from cached sample (recommended)
>>> fitter.plot_pp(best, title='Instant P-P Plot')
>>> # Explicit DataFrame (resamples via Spark)
>>> fitter.plot_pp(best, df, 'value', title='P-P Plot', force_recompute=True)
plot_qq(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, max_points: int = 1000, title: str = '', xlabel: str = 'Theoretical Quantiles', ylabel: str = 'Sample Quantiles', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]

Create a Q-Q plot to assess goodness-of-fit.

A Q-Q (quantile-quantile) plot compares sample quantiles against theoretical quantiles from the fitted distribution. Points falling close to the reference line indicate a good fit.

Parameters:
  • result – DistributionFitResult to plot

  • df – DataFrame with data. Optional when result contains a cached sample (the default after fitting). When both a cached sample and df are provided, the cached sample is used unless force_recompute=True.

  • column – Column name. If None, uses column_name from result.

  • max_points – Maximum data points to sample for plotting

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • marker – Marker style for data points

  • marker_size – Size of markers

  • marker_alpha – Marker transparency (0-1)

  • marker_color – Color of markers

  • line_color – Color of reference line

  • line_style – Style of reference line

  • line_width – Width of reference line

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Path to save figure (optional)

  • save_format – Save format (png, pdf, svg)

  • force_recompute – If True, ignore cached sample and resample from df (requires df to be provided). Default False.

Returns:

Tuple of (figure, axis) from matplotlib

Example

>>> best = results.best(n=1)[0]
>>> # Instant Q-Q plot from cached sample (recommended)
>>> fitter.plot_qq(best, title='Instant Q-Q Plot')
>>> # Explicit DataFrame (resamples via Spark)
>>> fitter.plot_qq(best, df, 'value', title='Q-Q Plot', force_recompute=True)
register_distribution(name: str, distribution: rv_continuous, overwrite: bool = False) DistributionFitter[source]

Register a custom distribution for fitting.

Custom distributions must implement the scipy rv_continuous interface, specifically the fit(), pdf(), and cdf() methods. The distribution will be included in fitting alongside scipy.stats distributions.

Parameters:
  • name – Unique name for the distribution (used in results)

  • distribution – scipy rv_continuous instance or subclass. Must implement fit(), pdf(), cdf() methods.

  • overwrite – If True, overwrite existing distribution with same name. Default False raises ValueError if name exists.

Returns:

Self (for method chaining)

Raises:
  • ValueError – If name already exists (and overwrite=False) or conflicts with a scipy.stats distribution name

  • TypeError – If distribution doesn’t implement required interface

Example

>>> from scipy.stats import rv_continuous
>>>
>>> class PowerDistribution(rv_continuous):
...     def _pdf(self, x, alpha):
...         return alpha * x ** (alpha - 1)
...     def _cdf(self, x, alpha):
...         return x ** alpha
>>>
>>> fitter = DistributionFitter(spark)
>>> fitter.register_distribution("power", PowerDistribution(a=0, b=1))
>>> results = fitter.fit(df, "column")
>>> # Results will include "power" if it fits well
unregister_distribution(name: str) DistributionFitter[source]

Remove a custom distribution from the registry.

Parameters:

name – Name of the custom distribution to remove

Returns:

Self (for method chaining)

Raises:

KeyError – If distribution not found in registry

class spark_bestfit.core.TruncatedFrozenDist(frozen_dist, lb: float, ub: float, *, raise_on_empty: bool = True)[source]

Bases: object

Wrapper for frozen scipy distributions with truncation bounds.

Implements truncation for arbitrary scipy.stats frozen distributions using CDF inversion for sampling and proper normalization for PDF/CDF.

This is needed because scipy.stats.truncate() only works with the new distribution infrastructure (scipy 1.14+), not with traditional rv_frozen objects.

Parameters:
  • frozen_dist – Frozen scipy.stats distribution

  • lb – Lower bound (-np.inf for no lower bound)

  • ub – Upper bound (np.inf for no upper bound)

  • raise_on_empty – If True, raise ValueError when truncation has no probability mass. If False, methods return zeros/empty results silently. Default True.

Example

>>> from scipy import stats
>>> from spark_bestfit import TruncatedFrozenDist
>>> # Create a normal distribution truncated to [0, inf)
>>> frozen = stats.norm(loc=0, scale=1)
>>> truncated = TruncatedFrozenDist(frozen, lb=0, ub=np.inf)
>>> truncated.pdf(0.5)  # Evaluate PDF at x=0.5
property bounds: tuple

Return (lower_bound, upper_bound) tuple.

cdf(x)[source]

Evaluate cumulative distribution function.

Returns 0 for x < lower_bound, 1 for x > upper_bound.

logpdf(x)[source]

Evaluate log probability density function.

Returns -inf for values outside the truncation bounds.

mean()[source]

Compute mean of truncated distribution.

Uses analytical formulas for norm, expon, and uniform distributions. Falls back to Monte Carlo for other distributions.

pdf(x)[source]

Evaluate probability density function.

Returns 0 for values outside the truncation bounds.

ppf(q)[source]

Evaluate percent point function (inverse CDF).

Parameters:

q – Quantile(s) in [0, 1]

Returns:

Value(s) at the given quantile(s) within the truncated distribution

rvs(size=1, random_state=None)[source]

Generate random samples using inverse CDF method.

Parameters:
  • size – Number of samples to generate

  • random_state – Random seed or numpy Generator for reproducibility

Returns:

Array of random samples from the truncated distribution

std()[source]

Compute standard deviation of truncated distribution.

Uses analytical formulas for norm, expon, and uniform distributions. Falls back to Monte Carlo for other distributions.

spark_bestfit.core.DEFAULT_EXCLUDED_DISTRIBUTIONS = ('kstwobign', 'vonmises', 'dpareto_lognorm', 'mielke', 'exponpow', 'geninvgauss', 'ncf', 'studentized_range', 'ksone', 'gausshyper', 'ncx2', 'kstwo', 'vonmises_line', 'genhyperbolic', 'kappa4', 'nct', 'recipinvgauss', 'levy_stable', 'wald', 'tukeylambda')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.

If the argument is a tuple, the return value is the same object.

spark_bestfit.core.DEFAULT_EXCLUDED_DISCRETE_DISTRIBUTIONS = ('nchypergeom_fisher', 'randint', 'poisson_binom', 'bernoulli', 'nchypergeom_wallenius')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.

If the argument is a tuple, the return value is the same object.

Discrete Fitting

Discrete distribution fitting using MLE optimization and Pandas UDFs.

spark_bestfit.discrete_fitting.bootstrap_discrete_confidence_intervals(dist_name: str, data: ndarray, alpha: float = 0.05, n_bootstrap: int = 1000, random_seed: int | None = None) Dict[str, Tuple[float, float]][source]

Compute bootstrap confidence intervals for discrete distribution parameters.

Uses the percentile bootstrap method: resample data with replacement, refit the distribution using MLE, and compute confidence intervals from the empirical distribution of fitted parameters.

Parameters:
  • dist_name – Name of scipy.stats discrete distribution

  • data – Integer data array used for fitting

  • alpha – Significance level (default 0.05 for 95% CI)

  • n_bootstrap – Number of bootstrap samples (default 1000)

  • random_seed – Random seed for reproducibility

Returns:

Dictionary mapping parameter names to (lower, upper) bounds

Example

>>> data = np.random.poisson(lam=7, size=1000)
>>> ci = bootstrap_discrete_confidence_intervals("poisson", data, alpha=0.05)
>>> print(ci)
{'mu': (6.75, 7.25)}

Note

Bootstrap fitting may fail for some resamples. Failed fits are skipped.

spark_bestfit.discrete_fitting.compute_discrete_histogram(data: ndarray) Tuple[ndarray, ndarray][source]

Compute histogram for discrete (integer) data.

Unlike continuous histograms, discrete histograms use integer-aligned bins and compute empirical probability mass function (PMF).

Parameters:

data – Integer data array

Returns:

  • values: unique integer values in data

  • pmf: empirical probability mass at each value

Return type:

Tuple of (values, pmf) where

spark_bestfit.discrete_fitting.compute_discrete_information_criteria(dist: Any, params: Tuple[float, ...], data: ndarray, dist_name: str) Tuple[float, float][source]

Compute AIC and BIC for discrete distribution.

Parameters:
  • dist – scipy.stats discrete distribution object

  • params – Fitted distribution parameters

  • data – Original integer data

  • dist_name – Name of distribution

Returns:

Tuple of (aic, bic)

spark_bestfit.discrete_fitting.compute_discrete_ks_statistic(dist: Any, params: Tuple[float, ...], data: ndarray, dist_name: str) Tuple[float, float][source]

Compute Kolmogorov-Smirnov statistic for discrete distribution.

Computes the two-sided KS statistic D_n = max(D+, D-) which measures the maximum distance between empirical and theoretical CDFs.

Note

The standard KS test assumes continuous distributions. For discrete distributions, the KS statistic is valid for comparing fits, but p-values are conservative and should not be used for formal hypothesis testing. Use AIC/BIC for model selection instead.

Parameters:
  • dist – scipy.stats discrete distribution object

  • params – Fitted distribution parameters

  • data – Original integer data

  • dist_name – Name of distribution

Returns:

Tuple of (ks_statistic, pvalue) where pvalue is approximate only

spark_bestfit.discrete_fitting.compute_discrete_sse(dist: Any, params: Tuple[float, ...], x_values: ndarray, empirical_pmf: ndarray, dist_name: str) float[source]

Compute sum of squared errors between empirical and fitted PMF.

Parameters:
  • dist – scipy.stats discrete distribution object

  • params – Fitted distribution parameters

  • x_values – Integer values where PMF is evaluated

  • empirical_pmf – Empirical probability mass at each x value

  • dist_name – Name of distribution

Returns:

Sum of squared errors

spark_bestfit.discrete_fitting.compute_ks_ad_metrics_discrete(dist_name: str, params: List[float], data_sample: ndarray, lower_bound: float | None = None, upper_bound: float | None = None) Tuple[float | None, float | None, float | None, float | None][source]

Compute KS metrics for a fitted discrete distribution.

This is the core computation function used for lazy metric evaluation with discrete distributions.

Note: Anderson-Darling is not computed for discrete distributions (AD test is for continuous distributions only).

Parameters:
  • dist_name – Name of scipy.stats discrete distribution

  • params – Fitted distribution parameters

  • data_sample – Integer data sample for metric computation

  • lower_bound – Optional lower bound (unused for discrete, for API compatibility)

  • upper_bound – Optional upper bound (unused for discrete, for API compatibility)

Returns:

Tuple of (ks_statistic, pvalue, ad_statistic, ad_pvalue) ad_statistic and ad_pvalue are always None for discrete distributions. Returns (None, None, None, None) if computation fails.

spark_bestfit.discrete_fitting.create_discrete_fitting_udf(histogram_broadcast: Broadcast[Tuple[ndarray, ndarray]], data_sample_broadcast: Broadcast[ndarray], column_name: str | None = None, data_stats: Dict[str, float] | None = None, lower_bound: float | None = None, upper_bound: float | None = None, lazy_metrics: bool = False) Callable[[Series], DataFrame][source]

Factory function to create Pandas UDF for discrete distribution fitting.

Parameters:
  • histogram_broadcast – Broadcast variable containing (x_values, empirical_pmf)

  • data_sample_broadcast – Broadcast variable containing integer data sample

  • column_name – Name of the column being fitted (for result tracking)

  • data_stats – Pre-computed summary statistics (data_min, data_max, etc.)

  • lower_bound – Optional lower bound for truncated distribution

  • upper_bound – Optional upper bound for truncated distribution

  • lazy_metrics – If True, skip expensive KS computation during fitting. These metrics will be computed on-demand when accessed via FitResults.best() or DistributionFitResult properties. (v1.5.0)

Returns:

Pandas UDF function for fitting discrete distributions

spark_bestfit.discrete_fitting.create_discrete_sample_data(data_full: ndarray, sample_size: int = 10000, random_seed: int = 42) ndarray[source]

Create a sample of discrete data for distribution fitting.

Parameters:
  • data_full – Full integer dataset

  • sample_size – Target sample size

  • random_seed – Random seed for reproducibility

Returns:

Sampled integer data

spark_bestfit.discrete_fitting.evaluate_pmf(dist: Any, params: Tuple[float, ...], x: ndarray, dist_name: str) ndarray[source]

Evaluate probability mass function at given integer points.

Parameters:
  • dist – scipy.stats discrete distribution object

  • params – Distribution parameters

  • x – Integer points at which to evaluate PMF

  • dist_name – Name of distribution (for special handling)

Returns:

PMF values at x

spark_bestfit.discrete_fitting.fit_discrete_mle(dist_name: str, data: ndarray, initial_params: List[float], bounds: List[Tuple[float, float]]) Tuple[ndarray, float][source]

Fit a discrete distribution using maximum likelihood estimation.

Since scipy discrete distributions don’t have a fit() method, we use scipy.optimize.minimize to find parameters that maximize the likelihood.

Parameters:
  • dist_name – Name of the scipy.stats discrete distribution

  • data – Integer data to fit

  • initial_params – Initial parameter guesses

  • bounds – Parameter bounds as list of (min, max) tuples

Returns:

Tuple of (fitted_params, negative_log_likelihood)

Raises:

ValueError – If optimization fails to converge

spark_bestfit.discrete_fitting.fit_single_discrete_distribution(dist_name: str, data_sample: ndarray, x_values: ndarray, empirical_pmf: ndarray, registry: DiscreteDistributionRegistry, column_name: str | None = None, data_stats: Dict[str, float] | None = None, lower_bound: float | None = None, upper_bound: float | None = None, lazy_metrics: bool = False) Dict[str, Any][source]

Fit a single discrete distribution and compute goodness-of-fit metrics.

Parameters:
  • dist_name – Name of scipy.stats discrete distribution

  • data_sample – Sample of integer data for parameter fitting

  • x_values – Unique integer values in data

  • empirical_pmf – Empirical PMF at each x value

  • registry – DiscreteDistributionRegistry for parameter configs

  • column_name – Name of the column being fitted (for multi-column support)

  • data_stats – Pre-computed summary statistics (data_min, data_max, etc.)

  • lower_bound – Optional lower bound for truncated distribution

  • upper_bound – Optional upper bound for truncated distribution

  • lazy_metrics – If True, skip expensive KS computation. These metrics will be None in the result and computed on-demand later. (v1.5.0)

Returns:

Dictionary with fit result fields including data_min, data_max, etc.

spark_bestfit.discrete_fitting.get_discrete_param_names(dist_name: str) List[str][source]

Get parameter names for a discrete scipy distribution.

Parameters:

dist_name – Name of scipy.stats discrete distribution

Returns:

List of parameter names

Example

>>> get_discrete_param_names("poisson")
['mu']
>>> get_discrete_param_names("binom")
['n', 'p']
>>> get_discrete_param_names("nbinom")
['n', 'p']

Results

class spark_bestfit.results.DistributionFitResult(distribution: str, parameters: List[float], sse: float, column_name: str | None = None, aic: float | None = None, bic: float | None = None, ks_statistic: float | None = None, pvalue: float | None = None, ad_statistic: float | None = None, ad_pvalue: float | None = None, data_min: float | None = None, data_max: float | None = None, data_mean: float | None = None, data_stddev: float | None = None, data_count: float | None = None, data_kurtosis: float | None = None, data_skewness: float | None = None, cached_sample: ndarray | None = None, lower_bound: float | None = None, upper_bound: float | None = None)[source]

Bases: object

Result from fitting a single distribution.

distribution

Name of the scipy.stats distribution

Type:

str

parameters

Fitted parameters (shape params + loc + scale)

Type:

List[float]

sse

Sum of Squared Errors

Type:

float

column_name

Name of the column that was fitted (for multi-column support)

Type:

str | None

aic

Akaike Information Criterion (lower is better)

Type:

float | None

bic

Bayesian Information Criterion (lower is better)

Type:

float | None

ks_statistic

Kolmogorov-Smirnov statistic (lower is better)

Type:

float | None

pvalue

P-value from KS test (higher indicates better fit)

Type:

float | None

ad_statistic

Anderson-Darling statistic (lower is better)

Type:

float | None

ad_pvalue

P-value from A-D test (only for norm, expon, logistic, gumbel_r, gumbel_l)

Type:

float | None

data_min

Minimum value in the data used for fitting

Type:

float | None

data_max

Maximum value in the data used for fitting

Type:

float | None

data_mean

Mean of the data used for fitting

Type:

float | None

data_stddev

Standard deviation of the data used for fitting

Type:

float | None

data_count

Number of samples in the data used for fitting

Type:

float | None

data_kurtosis

Excess kurtosis of the data used for fitting (v2.3.0)

Type:

float | None

data_skewness

Skewness of the data used for fitting (v2.3.0)

Type:

float | None

cached_sample

Cached sample data for instant plotting (v2.10.0)

Type:

numpy.ndarray | None

lower_bound

Lower bound for truncated distribution fitting (v1.4.0). When set, the distribution is truncated at this lower limit.

Type:

float | None

upper_bound

Upper bound for truncated distribution fitting (v1.4.0). When set, the distribution is truncated at this upper limit.

Type:

float | None

Note

The p-value from the KS test is approximate when parameters are estimated from the same data being tested. It tends to be conservative (larger than it should be). Use it for rough guidance, not strict hypothesis testing. The ks_statistic is valid for ranking fits.

The ad_pvalue is only available for 5 distributions (norm, expon, logistic, gumbel_r, gumbel_l) where scipy has critical value tables. For other distributions, ad_pvalue will be None but ad_statistic is still valid for ranking fits.

When bounds are set (lower_bound and/or upper_bound), methods like sample(), pdf(), cdf(), and ppf() automatically use scipy.stats.truncate() to return values respecting the bounded domain.

ad_pvalue: float | None
ad_statistic: float | None
aic: float | None
bic: float | None
cached_sample: ndarray | None
cdf(x: ndarray) ndarray[source]

Evaluate cumulative distribution function at given points.

Parameters:

x – Points at which to evaluate CDF

Returns:

CDF values at x. If bounds are set, the CDF is adjusted for the truncated domain (0 at lower_bound, 1 at upper_bound).

column_name: str | None
confidence_intervals(df, column: str, alpha: float = 0.05, n_bootstrap: int = 1000, max_samples: int = 10000, random_seed: int | None = None) Dict[str, Tuple[float, float]][source]

Compute bootstrap confidence intervals for fitted parameters.

Uses the percentile bootstrap method: resample data with replacement, refit the distribution, and compute confidence intervals from the empirical distribution of fitted parameters.

Parameters:
  • df – DataFrame containing the data (Spark DataFrame, pandas DataFrame, or Ray Dataset)

  • column – Column name containing the data

  • alpha – Significance level (default 0.05 for 95% CI)

  • n_bootstrap – Number of bootstrap samples (default 1000)

  • max_samples – Maximum rows to collect from DataFrame (default 10000)

  • random_seed – Random seed for reproducibility

Returns:

Dictionary mapping parameter names to (lower, upper) bounds

Example

>>> result = fitter.fit(df, 'value').best(n=1)[0]
>>> ci = result.confidence_intervals(df, 'value', alpha=0.05, random_seed=42)
>>> print(result.distribution)
'gamma'
>>> for param, (lower, upper) in ci.items():
...     print(f"  {param}: [{lower:.4f}, {upper:.4f}]")
a: [2.35, 2.65]
loc: [-0.12, 0.08]
scale: [3.05, 3.35]

Note

Bootstrap computation can be slow for large n_bootstrap values. The default 1000 iterations provides reasonable precision.

data_count: float | None
data_kurtosis: float | None
data_max: float | None
data_mean: float | None
data_min: float | None
data_skewness: float | None
data_stddev: float | None
diagnostics(data: ndarray, y_hist: ndarray | None = None, x_hist: ndarray | None = None, bins: int = 50, title: str = '', figsize: Tuple[int, int] = (14, 12), dpi: int = 100, title_fontsize: int = 16, subplot_title_fontsize: int = 12, label_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png')[source]

Create a 2x2 diagnostic plot panel for assessing distribution fit quality.

Generates four diagnostic plots: - Q-Q Plot (top-left): Compares sample quantiles vs theoretical quantiles - P-P Plot (top-right): Compares empirical vs theoretical probabilities - Residual Histogram (bottom-left): Distribution of fit residuals - CDF Comparison (bottom-right): Empirical vs theoretical CDF overlay

Parameters:
  • data – Sample data array (1D numpy array)

  • y_hist – Optional pre-computed histogram density values. If None, computed from data using specified bins.

  • x_hist – Optional pre-computed histogram bin centers. If None, computed from data using specified bins.

  • bins – Number of histogram bins (used if y_hist/x_hist not provided)

  • title – Overall figure title

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • title_fontsize – Main title font size

  • subplot_title_fontsize – Subplot title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, array of axes)

Example

>>> result = fitter.fit(df, 'value').best(n=1)[0]
>>> fig, axes = result.diagnostics(data, title='Fit Diagnostics')
>>> plt.show()
distribution: str
get_param_names() List[str][source]

Get parameter names for this distribution.

Returns:

List of parameter names in order matching self.parameters

Example

>>> result = fitter.fit(df, 'value').best(n=1)[0]
>>> print(result.distribution)
'gamma'
>>> print(result.get_param_names())
['a', 'loc', 'scale']
>>> print(dict(zip(result.get_param_names(), result.parameters)))
{'a': 2.5, 'loc': 0.0, 'scale': 3.2}
get_scipy_dist(frozen: bool = True)[source]

Get scipy distribution object.

Parameters:

frozen – If True (default), return a frozen distribution with parameters applied. If False, return the unfrozen distribution class.

Returns:

scipy.stats distribution object. If bounds are set and frozen=True, returns a TruncatedFrozenDist wrapper that handles truncation.

Note

When bounds are set (lower_bound and/or upper_bound), the returned distribution is truncated. This ensures that sampling and PDF/CDF evaluation respect the bounds.

ks_statistic: float | None
classmethod load(path: str | Path) DistributionFitResult[source]

Load fitted distribution from file.

Reconstructs a DistributionFitResult from a previously saved file. The loaded result can be used for sampling, PDF/CDF evaluation, etc.

Parameters:

path – File path. Format is detected from extension (.json, .pkl, .pickle).

Returns:

Reconstructed DistributionFitResult

Raises:
  • SerializationError – If file format is invalid or distribution is unknown.

  • FileNotFoundError – If file does not exist.

Example

>>> loaded = DistributionFitResult.load("model.json")
>>> samples = loaded.sample(n=1000)
>>> pdf_values = loaded.pdf(np.linspace(0, 100, 100))

Warning

Only load pickle files from trusted sources.

lower_bound: float | None
parameters: List[float]
pdf(x: ndarray) ndarray[source]

Evaluate probability density function at given points.

Parameters:

x – Points at which to evaluate PDF

Returns:

PDF values at x. If bounds are set, the PDF is normalized to integrate to 1 over the bounded domain.

Example

>>> result = fitter.fit(df, 'value').best(n=1)[0]
>>> x = np.linspace(0, 10, 100)
>>> y = result.pdf(x)
ppf(q: ndarray) ndarray[source]

Evaluate percent point function (inverse CDF) at given quantiles.

Parameters:

q – Quantiles at which to evaluate PPF (0 to 1)

Returns:

PPF values at q. If bounds are set, values are guaranteed to be within [lower_bound, upper_bound].

pvalue: float | None
sample(size: int = 1000, random_state: int | None = None) ndarray[source]

Generate random samples from the fitted distribution.

Parameters:
  • size – Number of samples to generate

  • random_state – Random seed for reproducibility

Returns:

Array of random samples. If bounds are set, samples are guaranteed to be within [lower_bound, upper_bound].

Example

>>> result = fitter.fit(df, 'value').best(n=1)[0]
>>> samples = result.sample(size=10000, random_state=42)
save(path: str | Path, format: Literal['json', 'pickle'] | None = None, indent: int | None = 2) None[source]

Save fitted distribution to file.

Serializes the distribution parameters and metrics to JSON or pickle format. JSON is recommended for human-readable, version-safe output. Pickle is available for faster serialization when human-readability is not required.

Parameters:
  • path – File path. Format is detected from extension if not specified.

  • format – Output format - ‘json’ (human-readable) or ‘pickle’. If None, detected from file extension (.json, .pkl, .pickle).

  • indent – JSON indentation level (default 2). Use None for compact output. Ignored for pickle format.

Raises:

SerializationError – If format cannot be determined or write fails.

Example

>>> best = results.best(n=1)[0]
>>> best.save("model.json")
>>> best.save("model.pkl", format="pickle")
>>> best.save("compact.json", indent=None)
sse: float
to_dict() dict[source]

Convert result to dictionary.

Returns:

Dictionary representation of the result

upper_bound: float | None
class spark_bestfit.results.BaseFitResults(results_df: DataFrame | DataFrame, samples: Dict[str, ndarray] | None = None)[source]

Bases: ABC

Abstract base class for distribution fit results.

Provides convenient methods for accessing, filtering, and analyzing fitted distributions. Wraps a Spark DataFrame but provides pandas-like interface for common operations.

Subclasses:
  • EagerFitResults: All metrics pre-computed during fitting

  • LazyFitResults: KS/AD metrics computed on-demand

Example

>>> results = fitter.fit(df, 'value')
>>> # Get the best distribution
>>> best = results.best(n=1)[0]
>>> # Get top 5 by AIC
>>> top_aic = results.best(n=5, metric='aic')
>>> # Convert to pandas for analysis
>>> df_pandas = results.df.toPandas()
>>> # Filter by SSE threshold
>>> good_fits = results.filter(sse_threshold=0.01)
abstractmethod best(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic', warn_if_poor: bool = False, pvalue_threshold: float = 0.05) List[DistributionFitResult][source]

Get top n distributions by specified metric.

Parameters:
  • n – Number of results to return

  • metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’). Defaults to ‘ks_statistic’ (Kolmogorov-Smirnov statistic).

  • warn_if_poor – If True, emit a warning when the best fit has a p-value below pvalue_threshold, indicating a potentially poor fit.

  • pvalue_threshold – P-value threshold for poor fit warning (default 0.05). Only used when warn_if_poor=True.

Returns:

List of DistributionFitResult objects

Example

>>> best = results.best(n=1)[0]
>>> top_5 = results.best(n=5, metric='aic')
best_per_column(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic') Dict[str, List[DistributionFitResult]][source]

Get top n distributions for each column.

Parameters:
  • n – Number of results per column

  • metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’)

Returns:

Dict mapping column_name -> List[DistributionFitResult]

Example

>>> results = fitter.fit(df, columns=["col1", "col2", "col3"])
>>> best_per_col = results.best_per_column(n=1)
>>> for col, fits in best_per_col.items():
...     print(f"{col}: {fits[0].distribution}")
property column_names: List[str]

Get list of unique column names in results.

Returns:

List of column names that have fit results

Example

>>> results = fitter.fit(df, columns=["col1", "col2"])
>>> print(results.column_names)
['col1', 'col2']
count() int[source]

Get number of fitted distributions.

Returns:

Count of distributions

property df: DataFrame

Get underlying Spark DataFrame.

Returns:

Spark DataFrame with results

abstractmethod filter(sse_threshold: float | None = None, aic_threshold: float | None = None, bic_threshold: float | None = None, ks_threshold: float | None = None, pvalue_threshold: float | None = None, ad_threshold: float | None = None) BaseFitResults[source]

Filter results by metric thresholds.

Parameters:
  • sse_threshold – Maximum SSE to include

  • aic_threshold – Maximum AIC to include

  • bic_threshold – Maximum BIC to include

  • ks_threshold – Maximum K-S statistic to include

  • pvalue_threshold – Minimum p-value to include (higher = better fit)

  • ad_threshold – Maximum A-D statistic to include

Returns:

New FitResults with filtered data (same type as self)

Example

>>> good_fits = results.filter(sse_threshold=0.01)
abstractmethod for_column(column_name: str) BaseFitResults[source]

Filter results to a single column.

Parameters:

column_name – Column to filter for

Returns:

New FitResults containing only results for the specified column (same type as self).

Example

>>> col1_results = results.for_column("col1")
abstract property is_lazy: bool

Check if lazy metrics are available for on-demand computation.

Returns:

True if this is a LazyFitResults with lazy contexts, False if this is an EagerFitResults with all metrics computed.

property is_spark_df: bool

Check if the underlying DataFrame is a Spark DataFrame.

Returns:

True if Spark DataFrame, False if pandas DataFrame.

abstractmethod materialize() EagerFitResults[source]

Force computation of all lazy metrics.

When lazy_metrics=True was used during fitting, this method computes KS and AD statistics for all distributions. Call this before unpersisting the source DataFrame if you need the metrics later.

Returns:

EagerFitResults with all metrics computed.

Raises:

RuntimeError – If the source DataFrame is no longer available (LazyFitResults only).

Example

>>> results = fitter.fit(df, 'value', lazy_metrics=True)
>>> # Fast: only AIC/BIC/SSE computed
>>> best_aic = results.best(n=1, metric='aic')[0]
>>>
>>> # Before unpersisting, materialize all metrics
>>> materialized = results.materialize()
>>> df.unpersist()  # Safe now
>>>
>>> # Access KS on materialized results
>>> best_ks = materialized.best(n=1, metric='ks_statistic')[0]
quality_report(n: int = 5, pvalue_threshold: float = 0.05, ks_threshold: float = 0.1, ad_threshold: float = 2.0) Dict[str, List[DistributionFitResult] | Dict[str, float] | List[str]][source]

Generate a quality assessment report for the fitting results.

Provides a comprehensive view of fit quality including the top fits, summary statistics, and any quality concerns.

Parameters:
  • n – Number of top distributions to include (default 5)

  • pvalue_threshold – Minimum p-value for acceptable fit (default 0.05)

  • ks_threshold – Maximum K-S statistic for acceptable fit (default 0.10)

  • ad_threshold – Maximum A-D statistic for acceptable fit (default 2.0)

Returns:

  • ‘top_fits’: List of top n DistributionFitResult objects

  • ’summary’: Dict with summary statistics (min/max/mean for key metrics)

  • ’warnings’: List of warning messages about fit quality

  • ’n_acceptable’: Number of distributions meeting all thresholds

Return type:

Dictionary with

Example

>>> report = results.quality_report()
>>> print(f"Top fit: {report['top_fits'][0].distribution}")
>>> print(f"Warnings: {report['warnings']}")
>>> if report['warnings']:
...     print("Consider reviewing fit quality")
summary() DataFrame[source]

Get summary statistics of fit quality.

Returns:

DataFrame with min, mean, max for each metric

Example

>>> results.summary()
       min_sse  mean_sse  max_sse  min_ks  mean_ks  max_ks  min_ad  mean_ad  max_ad  count
0      0.001     0.15      2.34    0.02    0.08     0.25    0.10    0.50     2.0      95
unpersist(blocking: bool = False) BaseFitResults[source]

Release the cached DataFrame from memory.

Call this method when you no longer need the FitResults to free executor memory. This is especially useful in notebook sessions where multiple fits accumulate cached DataFrames.

Note

If lazy_metrics=True was used during fitting and you haven’t called materialize(), you should do so before unpersisting if you need KS/AD metrics later. After unpersisting, methods like best(), filter(), etc. may trigger recomputation from source.

Parameters:

blocking – If True, block until unpersist completes. Default False.

Returns:

Self for method chaining.

Example

>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=3)  # Get what you need
>>> results.unpersist()  # Release memory
>>>
>>> # With lazy metrics, materialize first
>>> lazy_results = fitter.fit(df, 'value', lazy_metrics=True)
>>> materialized = lazy_results.materialize()
>>> lazy_results.unpersist()  # Release lazy version
class spark_bestfit.results.EagerFitResults(results_df: DataFrame | DataFrame, samples: Dict[str, ndarray] | None = None)[source]

Bases: BaseFitResults

Fit results with all metrics pre-computed.

This class represents distribution fit results where all metrics (SSE, AIC, BIC, KS, AD) have been computed during fitting.

Example

>>> results = fitter.fit(df, 'value')  # Default: eager evaluation
>>> best = results.best(n=1)[0]
>>> print(f"KS: {best.ks_statistic:.4f}")
best(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic', warn_if_poor: bool = False, pvalue_threshold: float = 0.05) List[DistributionFitResult][source]

Get top n distributions by specified metric.

Parameters:
  • n – Number of results to return

  • metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’)

  • warn_if_poor – If True, warn when best fit has poor p-value

  • pvalue_threshold – P-value threshold for poor fit warning

Returns:

List of DistributionFitResult objects

filter(sse_threshold: float | None = None, aic_threshold: float | None = None, bic_threshold: float | None = None, ks_threshold: float | None = None, pvalue_threshold: float | None = None, ad_threshold: float | None = None) EagerFitResults[source]

Filter results by metric thresholds.

Parameters:
  • sse_threshold – Maximum SSE to include

  • aic_threshold – Maximum AIC to include

  • bic_threshold – Maximum BIC to include

  • ks_threshold – Maximum K-S statistic to include

  • pvalue_threshold – Minimum p-value to include

  • ad_threshold – Maximum A-D statistic to include

Returns:

New EagerFitResults with filtered data

for_column(column_name: str) EagerFitResults[source]

Filter results to a single column.

Parameters:

column_name – Column to filter for

Returns:

New EagerFitResults for the specified column

property is_lazy: Literal[False]

Return False - eager results have all metrics computed.

materialize() EagerFitResults[source]

Return self - already materialized.

For eager results, this is a no-op since all metrics are already computed.

Returns:

Self (no copy needed).

class spark_bestfit.results.LazyFitResults(results_df: DataFrame | DataFrame, lazy_contexts: Dict[str, LazyMetricsContext], samples: Dict[str, ndarray] | None = None)[source]

Bases: BaseFitResults

Fit results with lazy KS/AD metric computation.

This class represents distribution fit results where only fast metrics (SSE, AIC, BIC) are pre-computed. KS and AD statistics are computed on-demand when first accessed via best() with those metrics.

Important

The source DataFrame must remain valid (not unpersisted) for lazy metric computation to work. Call materialize() before unpersisting the source DataFrame if you need the metrics later.

Example

>>> results = fitter.fit(df, 'value', lazy_metrics=True)
>>> best_aic = results.best(n=1, metric='aic')[0]  # Fast
>>> best_ks = results.best(n=1, metric='ks_statistic')[0]  # Computes on-demand
>>>
>>> # Before unpersisting source, materialize all metrics
>>> materialized = results.materialize()
>>> df.unpersist()  # Safe now
best(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic', warn_if_poor: bool = False, pvalue_threshold: float = 0.05) List[DistributionFitResult][source]

Get top n distributions by specified metric.

For KS and AD metrics, computation happens on-demand using the stored lazy context.

Parameters:
  • n – Number of results to return

  • metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’)

  • warn_if_poor – If True, warn when best fit has poor p-value

  • pvalue_threshold – P-value threshold for poor fit warning

Returns:

List of DistributionFitResult objects

filter(sse_threshold: float | None = None, aic_threshold: float | None = None, bic_threshold: float | None = None, ks_threshold: float | None = None, pvalue_threshold: float | None = None, ad_threshold: float | None = None) LazyFitResults[source]

Filter results by metric thresholds.

Note

Filtering by KS/AD thresholds with lazy metrics will exclude all results since those metrics are None. Use AIC/BIC/SSE thresholds or call materialize() first.

Returns:

New LazyFitResults with filtered data (preserves lazy contexts)

for_column(column_name: str) LazyFitResults[source]

Filter results to a single column.

Parameters:

column_name – Column to filter for

Returns:

New LazyFitResults for the specified column (preserves lazy context)

property is_lazy: Literal[True]

Return True - lazy results have deferred metric computation.

is_source_available() bool[source]

Check if source DataFrames are still accessible.

Use this to verify that lazy metric computation can still succeed.

Returns:

True if all source DataFrames can be accessed, False otherwise.

materialize() EagerFitResults[source]

Force computation of all lazy metrics.

Computes KS and AD statistics for all distributions, returning an EagerFitResults that no longer depends on the source DataFrame.

Returns:

EagerFitResults with all metrics computed.

Raises:

RuntimeError – If the source DataFrame is no longer available.

property source_dataframes: Dict[str, DataFrame]

Get source DataFrames for lifecycle visibility.

Use this to understand what DataFrames the lazy computation depends on.

Returns:

Dict mapping column names to their source DataFrames.

spark_bestfit.results.create_fit_results(results_df: DataFrame | DataFrame, lazy_contexts: Dict[str, LazyMetricsContext] | None = None, samples: Dict[str, ndarray] | None = None) EagerFitResults | LazyFitResults[source]

Factory function for creating FitResults.

Creates the appropriate FitResults variant based on whether lazy contexts are provided.

Parameters:
  • results_df – Spark DataFrame or pandas DataFrame with fit results

  • lazy_contexts – Optional dict mapping column names to LazyMetricsContext for on-demand KS/AD computation

  • samples – Optional dict mapping column names to data samples

Returns:

LazyFitResults if lazy_contexts provided, EagerFitResults otherwise

Example

>>> # From fitter (automatic)
>>> results = fitter.fit(df, 'value')  # Returns EagerFitResults
>>> lazy = fitter.fit(df, 'value', lazy_metrics=True)  # Returns LazyFitResults
>>>
>>> # Direct construction (rare)
>>> eager = create_fit_results(df)  # EagerFitResults
>>> lazy = create_fit_results(df, lazy_contexts={...})  # LazyFitResults

Sampling

Distributed sampling for fitted distributions.

This module provides functions for generating samples from fitted distributions using the backend abstraction for distributed or local execution.

spark_bestfit.sampling.sample_distributed(distribution: str, parameters: List[float], n: int, backend: ExecutionBackend, num_partitions: int | None = None, random_seed: int | None = None, column_name: str = 'sample') Any[source]

Generate samples from a fitted distribution using backend abstraction.

Uses the backend’s parallelism to generate samples, enabling generation of millions of samples efficiently with SparkBackend or local execution with LocalBackend.

Parameters:
  • distribution – scipy.stats distribution name (e.g., “norm”, “expon”)

  • parameters – Distribution parameters (shape, loc, scale)

  • n – Total number of samples to generate

  • backend – Execution backend (SparkBackend, LocalBackend, etc.)

  • num_partitions – Number of partitions to use. Defaults to backend parallelism.

  • random_seed – Random seed for reproducibility. Each partition uses seed + partition_id.

  • column_name – Name for the output column (default: “sample”)

Returns:

Backend-specific DataFrame with single column containing samples (Spark DataFrame for SparkBackend, pandas DataFrame for LocalBackend)

Example

>>> from spark_bestfit.backends.spark import SparkBackend
>>> backend = SparkBackend(spark)
>>> df = sample_distributed("norm", [0.0, 1.0], n=1_000_000, backend=backend)
>>> df.show(5)
+-------------------+
|             sample|
+-------------------+
| 0.4691122931291924|
|-0.2828633018445851|
| 1.0093545783546243|
+-------------------+

Distributions

Distribution registry and management for scipy.stats distributions.

class spark_bestfit.distributions.DiscreteDistributionRegistry(custom_exclusions: Set[str] | None = None)[source]

Bases: object

Registry for managing scipy.stats discrete distributions.

Unlike continuous distributions, discrete distributions in scipy do not have a built-in fit() method. This registry provides parameter configuration (initial values, bounds, estimation functions) needed for MLE fitting via optimization.

Example

>>> registry = DiscreteDistributionRegistry()
>>> distributions = registry.get_distributions()
>>> len(distributions)
~15
>>> # Get parameter config for fitting
>>> config = registry.get_param_config("poisson")
>>> initial = config["initial"](data)
>>> bounds = config["bounds"](data)
ALL_DISTRIBUTIONS = ['bernoulli', 'betabinom', 'betanbinom', 'binom', 'boltzmann', 'dlaplace', 'geom', 'hypergeom', 'logser', 'nbinom', 'nchypergeom_fisher', 'nchypergeom_wallenius', 'nhypergeom', 'planck', 'poisson', 'poisson_binom', 'randint', 'skellam', 'yulesimon', 'zipf', 'zipfian']
DEFAULT_EXCLUSIONS = {'bernoulli', 'nchypergeom_fisher', 'nchypergeom_wallenius', 'poisson_binom', 'randint'}
add_exclusion(dist_name: str) None[source]

Add a distribution to the exclusion list.

get_distributions(additional_exclusions: List[str] | None = None) List[str][source]

Get filtered list of discrete distributions.

Only returns distributions that have parameter configurations defined.

Parameters:

additional_exclusions – Additional distribution names to exclude

Returns:

List of distribution names that can be fitted

get_exclusions() Set[str][source]

Get current set of excluded distributions.

get_param_config(dist_name: str) Dict[str, Any][source]

Get parameter configuration for a distribution.

Parameters:

dist_name – Name of the scipy discrete distribution

Returns:

Dictionary with ‘param_names’, ‘initial’, and ‘bounds’ keys

Raises:

ValueError – If distribution is not supported

remove_exclusion(dist_name: str) None[source]

Remove a distribution from the exclusion list.

reset_exclusions() None[source]

Reset exclusions to default set.

class spark_bestfit.distributions.DistributionRegistry(custom_exclusions: Set[str] | None = None)[source]

Bases: object

Registry for managing scipy.stats continuous distributions.

Handles filtering of distributions based on exclusions and support constraints. All scipy.stats continuous distributions are available by default, with sensible exclusions for slow-computing distributions.

Example

>>> registry = DistributionRegistry()
>>> distributions = registry.get_distributions()
>>> len(distributions)
~100
>>> # Only non-negative distributions
>>> pos_distributions = registry.get_distributions(support_at_zero=True)
>>> # Add custom exclusions
>>> distributions = registry.get_distributions(
...     additional_exclusions=["ncf", "ncx2"]
... )
ALL_DISTRIBUTIONS = ['alpha', 'anglit', 'arcsine', 'argus', 'beta', 'betaprime', 'bradford', 'burr', 'burr12', 'cauchy', 'chi', 'chi2', 'cosine', 'crystalball', 'dgamma', 'dpareto_lognorm', 'dweibull', 'erlang', 'expon', 'exponnorm', 'exponpow', 'exponweib', 'f', 'fatiguelife', 'fisk', 'foldcauchy', 'foldnorm', 'gamma', 'gausshyper', 'genexpon', 'genextreme', 'gengamma', 'genhalflogistic', 'genhyperbolic', 'geninvgauss', 'genlogistic', 'gennorm', 'genpareto', 'gibrat', 'gompertz', 'gumbel_l', 'gumbel_r', 'halfcauchy', 'halfgennorm', 'halflogistic', 'halfnorm', 'hypsecant', 'invgamma', 'invgauss', 'invweibull', 'irwinhall', 'jf_skew_t', 'johnsonsb', 'johnsonsu', 'kappa3', 'kappa4', 'ksone', 'kstwo', 'kstwobign', 'landau', 'laplace', 'laplace_asymmetric', 'levy', 'levy_l', 'levy_stable', 'loggamma', 'logistic', 'loglaplace', 'lognorm', 'loguniform', 'lomax', 'maxwell', 'mielke', 'moyal', 'nakagami', 'ncf', 'nct', 'ncx2', 'norm', 'norminvgauss', 'pareto', 'pearson3', 'powerlaw', 'powerlognorm', 'powernorm', 'rayleigh', 'rdist', 'recipinvgauss', 'reciprocal', 'rel_breitwigner', 'rice', 'semicircular', 'skewcauchy', 'skewnorm', 'studentized_range', 't', 'trapezoid', 'triang', 'truncexpon', 'truncnorm', 'truncpareto', 'truncweibull_min', 'tukeylambda', 'uniform', 'vonmises', 'vonmises_line', 'wald', 'weibull_max', 'weibull_min', 'wrapcauchy']
DEFAULT_EXCLUSIONS = {'dpareto_lognorm', 'exponpow', 'gausshyper', 'genhyperbolic', 'geninvgauss', 'kappa4', 'ksone', 'kstwo', 'kstwobign', 'levy_stable', 'mielke', 'ncf', 'nct', 'ncx2', 'recipinvgauss', 'studentized_range', 'tukeylambda', 'vonmises', 'vonmises_line', 'wald'}
SLOW_DISTRIBUTIONS: Set[str] = {'burr', 'burr12', 'exponweib', 'fisk', 'genexpon', 'gengamma', 'invweibull', 'jf_skew_t', 'johnsonsb', 'johnsonsu', 'norminvgauss', 'pearson3', 'powerlognorm', 'rice', 't', 'truncweibull_min'}
add_exclusion(dist_name: str) None[source]

Add a distribution to the exclusion list.

Parameters:

dist_name – Name of the distribution to exclude

get_custom_distributions() Dict[str, rv_continuous][source]

Get a copy of all registered custom distributions.

Returns:

Dict mapping distribution names to rv_continuous objects

Note

Returns a shallow copy - modifying the dict won’t affect the registry, but modifying distribution objects will.

get_distribution_object(name: str) rv_continuous[source]

Get a distribution object by name.

Looks up both scipy.stats built-in distributions and registered custom distributions.

Parameters:

name – Distribution name (scipy.stats name or custom registered name)

Returns:

scipy rv_continuous distribution object

Raises:

ValueError – If distribution not found

Example

>>> registry = DistributionRegistry()
>>> norm_dist = registry.get_distribution_object("norm")
>>> # Also works for custom distributions
>>> registry.register_distribution("custom", MyDist())
>>> my_dist = registry.get_distribution_object("custom")
get_distributions(support_at_zero: bool = False, additional_exclusions: List[str] | None = None, include_custom: bool = True) List[str][source]

Get filtered list of distributions based on criteria.

Parameters:
  • support_at_zero – If True, only include distributions with support at zero (non-negative distributions)

  • additional_exclusions – Additional distribution names to exclude

  • include_custom – If True, include registered custom distributions (default True)

Returns:

List of distribution names meeting the criteria

Example

>>> registry = DistributionRegistry()
>>> # Get all non-excluded distributions
>>> dists = registry.get_distributions()
>>> # Get only non-negative distributions
>>> pos_dists = registry.get_distributions(support_at_zero=True)
>>> # Exclude more distributions
>>> filtered = registry.get_distributions(
...     additional_exclusions=["norm", "expon"]
... )
>>> # Register and include custom distributions
>>> registry.register_distribution("my_custom", MyCustomDistribution())
>>> dists = registry.get_distributions()  # Includes "my_custom"
get_exclusions() Set[str][source]

Get current set of excluded distributions.

Returns:

Set of excluded distribution names

has_custom_distributions() bool[source]

Check if any custom distributions are registered.

Returns:

True if at least one custom distribution is registered

register_distribution(name: str, distribution: rv_continuous, overwrite: bool = False) None[source]

Register a custom distribution for fitting.

Custom distributions must implement the scipy rv_continuous interface, specifically the fit(), pdf(), and cdf() methods. The distribution will be included in fitting alongside scipy.stats distributions.

Parameters:
  • name – Unique name for the distribution (used in results)

  • distribution – scipy rv_continuous instance or subclass. Must implement fit(), pdf(), cdf() methods.

  • overwrite – If True, overwrite existing distribution with same name. Default False raises ValueError if name exists.

Raises:
  • ValueError – If name already exists (and overwrite=False) or conflicts with a scipy.stats distribution name

  • TypeError – If distribution doesn’t implement required interface

Example

>>> from scipy.stats import rv_continuous
>>>
>>> class PowerDistribution(rv_continuous):
...     def _pdf(self, x, alpha):
...         return alpha * x ** (alpha - 1)
...     def _cdf(self, x, alpha):
...         return x ** alpha
>>>
>>> registry = DistributionRegistry()
>>> registry.register_distribution("power", PowerDistribution(a=0, b=1))
>>> distributions = registry.get_distributions()
>>> "power" in distributions
True
remove_exclusion(dist_name: str) None[source]

Remove a distribution from the exclusion list.

Parameters:

dist_name – Name of the distribution to include

reset_exclusions() None[source]

Reset exclusions to default set.

unregister_distribution(name: str) None[source]

Remove a custom distribution from the registry.

Parameters:

name – Name of the custom distribution to remove

Raises:

KeyError – If distribution not found in registry

Histogram

Distributed histogram computation without collecting raw data.

This module provides the HistogramComputer class that uses the backend abstraction for distributed histogram computation.

class spark_bestfit.histogram.HistogramComputer(backend: ExecutionBackend | None = None)[source]

Bases: object

Computes histograms efficiently using distributed aggregations.

This implementation avoids collecting raw data to the driver by using the backend’s distributed aggregation capabilities. Only the final histogram (typically ~100 values) is collected, not the raw dataset.

Supports multiple backends: - SparkBackend: Uses Spark ML Bucketizer + groupBy (scales to billions of rows) - LocalBackend: Uses numpy histogram (for testing and small datasets)

Example

>>> from spark_bestfit.backends.spark import SparkBackend
>>> backend = SparkBackend(spark)
>>> computer = HistogramComputer(backend)
>>> y_hist, bin_edges = computer.compute_histogram(
...     df, column='value', bins=50
... )
>>> # y_hist has 50 values, bin_edges has 51 values
>>> x_centers = (bin_edges[:-1] + bin_edges[1:]) / 2.0  # Compute centers if needed
Auto-detection example (detects backend from DataFrame type):
>>> computer = HistogramComputer()
>>> y_hist, bin_edges = computer.compute_histogram(pandas_df, column='value')  # Uses LocalBackend
compute_histogram(df: Any, column: str, bins: int | ndarray = 50, use_rice_rule: bool = False, approx_count: int | None = None) Tuple[ndarray, ndarray][source]

Compute histogram using distributed aggregations.

This method computes the histogram WITHOUT collecting the raw data. It uses the backend’s distributed aggregation to compute bin counts, then collects only the aggregated histogram.

Parameters:
  • df – DataFrame containing data (Spark DataFrame or pandas DataFrame)

  • column – Column name to compute histogram for

  • bins – Number of bins (int) or array of bin edges

  • use_rice_rule – Use Rice rule to automatically determine bin count

  • approx_count – Approximate row count (avoids full count if provided)

Returns:

  • y_hist: Normalized frequency density for each bin

  • bin_edges: Array of bin edge values (len = n_bins + 1)

Return type:

Tuple of (y_hist, bin_edges) where

Example

>>> computer = HistogramComputer(backend)
>>> y, x = computer.compute_histogram(df, 'value', bins=100)
>>> # y and x are small numpy arrays (~100 elements)
compute_statistics(df: Any, column: str) dict[source]

Compute basic statistics for a column (useful for validation).

Parameters:
  • df – DataFrame (Spark DataFrame or pandas DataFrame)

  • column – Column name

Returns:

Dictionary with min, max, count (and optionally mean, stddev)

Plotting

Visualization utilities for fitted distributions.

spark_bestfit.plotting.plot_cdf_comparison(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Value', ylabel: str = 'Cumulative Probability', figsize: Tuple[int, int] = (10, 8), dpi: int = 100, empirical_color: str = 'steelblue', empirical_linewidth: float = 2.0, empirical_alpha: float = 0.8, theoretical_color: str = 'red', theoretical_linewidth: float = 2.0, theoretical_linestyle: str = '--', title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Plot empirical CDF overlaid with theoretical CDF from the fitted distribution.

The empirical CDF is computed from the sample data using the step function. The theoretical CDF is computed from the fitted distribution. A good fit shows close alignment between the two CDFs.

Parameters:
  • result – Fitted distribution result

  • data – Sample data array (1D numpy array)

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • empirical_color – Color of empirical CDF line

  • empirical_linewidth – Line width for empirical CDF

  • empirical_alpha – Transparency of empirical CDF line

  • theoretical_color – Color of theoretical CDF line

  • theoretical_linewidth – Line width for theoretical CDF

  • theoretical_linestyle – Line style for theoretical CDF

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, axis)

Example

>>> from spark_bestfit import DistributionFitter
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=1)[0]
>>> plot_cdf_comparison(best, data, title='CDF Comparison')
spark_bestfit.plotting.plot_comparison(results: List[DistributionFitResult], y_hist: ndarray, x_hist: ndarray, title: str = 'Distribution Comparison', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Plot multiple fitted distributions for comparison.

Parameters:
  • results – List of DistributionFitResult objects

  • y_hist – Histogram density values

  • x_hist – Histogram bin centers

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch

  • show_histogram – Show data histogram

  • histogram_alpha – Histogram transparency

  • pdf_linewidth – PDF line width

  • title_fontsize – Title font size

  • label_fontsize – Label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency

  • save_path – Optional path to save figure

  • save_format – Save format

Returns:

Tuple of (figure, axis)

Example

>>> top_3 = results.best(n=3)
>>> fitter.plot_comparison(top_3, df, 'value')
spark_bestfit.plotting.plot_diagnostics(result: DistributionFitResult, data: ndarray, y_hist: ndarray | None = None, x_hist: ndarray | None = None, bins: int = 50, title: str = '', figsize: Tuple[int, int] = (14, 12), dpi: int = 100, title_fontsize: int = 16, subplot_title_fontsize: int = 12, label_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, ndarray][source]

Create a 2x2 diagnostic plot panel for assessing distribution fit quality.

Generates four diagnostic plots: - Q-Q Plot (top-left): Compares sample quantiles vs theoretical quantiles - P-P Plot (top-right): Compares empirical vs theoretical probabilities - Residual Histogram (bottom-left): Distribution of fit residuals - CDF Comparison (bottom-right): Empirical vs theoretical CDF overlay

Parameters:
  • result – Fitted distribution result

  • data – Sample data array (1D numpy array)

  • y_hist – Optional pre-computed histogram density values. If None, computed from data using specified bins.

  • x_hist – Optional pre-computed histogram bin centers. If None, computed from data using specified bins.

  • bins – Number of histogram bins (used if y_hist/x_hist not provided)

  • title – Overall figure title

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • title_fontsize – Main title font size

  • subplot_title_fontsize – Subplot title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, array of axes)

Example

>>> from spark_bestfit import DistributionFitter
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=1)[0]
>>> fig, axes = plot_diagnostics(best, data, title='Fit Diagnostics')
spark_bestfit.plotting.plot_discrete_distribution(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Value', ylabel: str = 'Probability', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.7, pmf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Plot fitted discrete distribution against data histogram.

Parameters:
  • result – Fitted discrete distribution result

  • data – Integer data array

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • show_histogram – Show data histogram

  • histogram_alpha – Histogram transparency (0-1)

  • pmf_linewidth – Line width for PMF markers

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, axis)

spark_bestfit.plotting.plot_distribution(result: DistributionFitResult, y_hist: ndarray, x_hist: ndarray, title: str = '', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Plot fitted distribution against data histogram.

Parameters:
  • result – Fitted distribution result

  • y_hist – Histogram density values

  • x_hist – Histogram bin centers

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • show_histogram – Show data histogram

  • histogram_alpha – Histogram transparency (0-1)

  • pdf_linewidth – Line width for PDF curve

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • legend_fontsize – Legend font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, axis)

Example

>>> from spark_bestfit import DistributionFitter
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=1)[0]
>>> fitter.plot(best, df, 'value', title='Best Fit')
spark_bestfit.plotting.plot_pp(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Theoretical Probabilities', ylabel: str = 'Sample Probabilities', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Create a P-P (probability-probability) plot for goodness-of-fit assessment.

A P-P plot compares the empirical cumulative distribution function (CDF) of the sample data against the theoretical CDF of the fitted distribution. It is particularly useful for assessing fit in the center of the distribution.

Parameters:
  • result – Fitted distribution result

  • data – Sample data array (1D numpy array)

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • marker – Marker style for data points

  • marker_size – Size of markers

  • marker_alpha – Marker transparency (0-1)

  • marker_color – Color of markers

  • line_color – Color of reference line

  • line_style – Style of reference line

  • line_width – Width of reference line

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, axis)

Example

>>> from spark_bestfit import DistributionFitter
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=1)[0]
>>> fitter.plot_pp(best, df, 'value', title='P-P Plot')
spark_bestfit.plotting.plot_qq(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Theoretical Quantiles', ylabel: str = 'Sample Quantiles', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Create a Q-Q (quantile-quantile) plot for goodness-of-fit assessment.

A Q-Q plot compares the quantiles of the sample data against the theoretical quantiles of the fitted distribution. If the data follows the fitted distribution well, the points will fall approximately along the reference line.

Parameters:
  • result – Fitted distribution result

  • data – Sample data array (1D numpy array)

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • marker – Marker style for data points

  • marker_size – Size of markers

  • marker_alpha – Marker transparency (0-1)

  • marker_color – Color of markers

  • line_color – Color of reference line

  • line_style – Style of reference line

  • line_width – Width of reference line

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, axis)

Example

>>> from spark_bestfit import DistributionFitter
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=1)[0]
>>> fitter.plot_qq(best, df, 'value', title='Q-Q Plot')
spark_bestfit.plotting.plot_residual_histogram(result: DistributionFitResult, y_hist: ndarray, x_hist: ndarray, title: str = '', xlabel: str = 'Residual (Observed - Expected)', ylabel: str = 'Frequency', figsize: Tuple[int, int] = (10, 8), dpi: int = 100, bins: int = 30, histogram_alpha: float = 0.7, histogram_color: str = 'steelblue', show_zero_line: bool = True, zero_line_color: str = 'red', zero_line_style: str = '--', zero_line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]

Plot a histogram of residuals (observed - expected density).

Residuals are computed as the difference between the empirical density (from histogram) and the theoretical density (from fitted distribution). A good fit should show residuals centered near zero.

Parameters:
  • result – Fitted distribution result

  • y_hist – Histogram density values (empirical density)

  • x_hist – Histogram bin centers

  • title – Plot title

  • xlabel – X-axis label

  • ylabel – Y-axis label

  • figsize – Figure size (width, height)

  • dpi – Dots per inch for saved figures

  • bins – Number of bins for the residual histogram

  • histogram_alpha – Histogram transparency (0-1)

  • histogram_color – Color of histogram bars

  • show_zero_line – Whether to show a vertical line at zero

  • zero_line_color – Color of the zero reference line

  • zero_line_style – Style of the zero reference line

  • zero_line_width – Width of the zero reference line

  • title_fontsize – Title font size

  • label_fontsize – Axis label font size

  • grid_alpha – Grid transparency (0-1)

  • save_path – Optional path to save figure

  • save_format – Save format (png, pdf, svg)

Returns:

Tuple of (figure, axis)

Example

>>> from spark_bestfit import DistributionFitter
>>> fitter = DistributionFitter(spark)
>>> results = fitter.fit(df, 'value')
>>> best = results.best(n=1)[0]
>>> y_hist, x_edges = np.histogram(data, bins=50, density=True)
>>> x_hist = (x_edges[:-1] + x_edges[1:]) / 2
>>> plot_residual_histogram(best, y_hist, x_hist)

Utilities

Utility functions for spark-bestfit.

spark_bestfit.utils.get_spark_session(spark: SparkSession | None = None) SparkSession[source]

Get or create a SparkSession.

If a SparkSession is provided, it is returned as-is. If None is provided, attempts to get the active SparkSession.

Parameters:

spark – Optional SparkSession. If None, gets the active session.

Returns:

SparkSession instance

Raises:

RuntimeError – If no SparkSession is provided and no active session exists

Example

>>> # Use existing session
>>> spark = SparkSession.builder.appName("my-app").getOrCreate()
>>> session = get_spark_session(spark)
>>>
>>> # Use active session
>>> session = get_spark_session()  # Gets active session