API Reference¶
Core¶
Core distribution fitting engine for Spark - re-exports for backward compatibility.
This module provides backward-compatible imports for: - DistributionFitter (continuous distributions) - DiscreteDistributionFitter (discrete/count data) - TruncatedFrozenDist (truncated distribution wrapper) - Default exclusion constants
The actual implementations are in: - spark_bestfit.continuous_fitter (DistributionFitter) - spark_bestfit.discrete_fitter (DiscreteDistributionFitter) - spark_bestfit.truncated (TruncatedFrozenDist)
- class spark_bestfit.core.DiscreteDistributionFitter(spark: SparkSession | None = None, excluded_distributions: Tuple[str, ...] | None = None, random_seed: int = 42, backend: ExecutionBackend | None = None)[source]¶
Bases:
BaseFitterSpark distribution fitting engine for discrete (count) data.
Efficiently fits scipy.stats discrete distributions to integer data using Spark’s parallel processing capabilities. Uses MLE optimization since scipy discrete distributions don’t have a built-in fit() method.
- Metric Selection:
For discrete distributions, AIC is recommended for model selection: -
aic: Proper model selection criterion with complexity penalty -bic: Similar to AIC but stronger penalty for complex models -ks_statistic: Valid for ranking, but p-values are not reliable -sse: Simple comparison metricThe K-S test assumes continuous distributions. For discrete data, the K-S statistic can rank fits, but p-values are conservative and should not be used for hypothesis testing.
Example
>>> from pyspark.sql import SparkSession >>> from spark_bestfit import DiscreteDistributionFitter >>> >>> spark = SparkSession.builder.appName("my-app").getOrCreate() >>> df = spark.createDataFrame([(x,) for x in count_data], ['counts']) >>> >>> fitter = DiscreteDistributionFitter(spark) >>> results = fitter.fit(df, column='counts') >>> >>> # Use AIC for model selection (recommended) >>> best = results.best(n=1, metric='aic')[0] >>> print(f"Best: {best.distribution} (AIC={best.aic:.2f})")
- fit(df: DataFrame, column: str | None = None, columns: List[str] | None = None, config: FitterConfig | None = None, *, max_distributions: int | None = None, enable_sampling: bool = True, sample_fraction: float | None = None, max_sample_size: int = 1000000, sample_threshold: int = 10000000, num_partitions: int | None = None, progress_callback: Callable[[int, int, float], None] | None = None, bounded: bool = False, lower_bound: float | Dict[str, float] | None = None, upper_bound: float | Dict[str, float] | None = None, lazy_metrics: bool = False, prefilter: bool | str = False) EagerFitResults | LazyFitResults[source]¶
Fit discrete distributions to integer data column(s).
- Parameters:
df – Spark DataFrame containing integer count data
column – Name of single column to fit distributions to
columns – List of column names for multi-column fitting
config – FitterConfig object (v2.2.0). Provides a cleaner way to configure fitting with many parameters. If provided, individual parameters below are ignored (except progress_callback which can override the config’s callback). Note: bins, use_rice_rule, support_at_zero, and prefilter in config are ignored for discrete fitting.
max_distributions – Limit number of distributions (for testing)
enable_sampling – Enable sampling for large datasets
sample_fraction – Fraction to sample (None = auto-determine)
max_sample_size – Maximum rows to sample when auto-determining
sample_threshold – Row count above which sampling is applied
num_partitions – Spark partitions (None = auto-determine)
progress_callback – Optional callback for progress updates. Called with (completed_tasks, total_tasks, percent_complete). Callback is invoked from background thread - ensure thread-safety.
bounded – Enable bounded distribution fitting. When True, bounds are auto-detected from data or use explicit lower_bound/upper_bound.
lower_bound – Lower bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s minimum.
upper_bound – Upper bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s maximum.
lazy_metrics – If True, defer computation of expensive KS metrics until accessed (v1.5.0). Improves fitting performance when only using AIC/BIC/SSE for model selection. Default False for backward compatibility.
prefilter – Pre-filter distributions (v1.6.0). Currently only supported for continuous distributions. For discrete, this parameter is accepted but ignored (logs a warning if enabled).
- Returns:
FitResults object with fitted distributions
- Raises:
ValueError – If column not found, DataFrame empty, or invalid params
TypeError – If column is not numeric
Example
>>> # Using FitterConfig (v2.2.0) >>> from spark_bestfit import FitterConfigBuilder >>> config = (FitterConfigBuilder() ... .with_bounds(lower=0, upper=100) ... .with_sampling(fraction=0.1) ... .build()) >>> results = fitter.fit(df, column='counts', config=config) >>> >>> # Single column (backward compatible) >>> results = fitter.fit(df, column='counts') >>> best = results.best(n=1, metric='aic') >>> >>> # Multi-column >>> results = fitter.fit(df, columns=['counts1', 'counts2']) >>> best_per_col = results.best_per_column(n=1, metric='aic') >>> >>> # Bounded fitting >>> results = fitter.fit(df, column='counts', bounded=True, lower_bound=0, upper_bound=100) >>> >>> # Lazy metrics for faster fitting when only using AIC/BIC (v1.5.0) >>> results = fitter.fit(df, 'counts', lazy_metrics=True) >>> best_aic = results.best(n=1, metric='aic')[0] # Fast, no KS computed
- plot(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, title: str = '', xlabel: str = 'Value', ylabel: str = 'Probability', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.7, pmf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]¶
Plot fitted discrete distribution against data histogram.
- Parameters:
result – DistributionFitResult to plot
df – DataFrame with data. If None, uses cached sample from result (v2.10.0). When a cached sample exists and
force_recomputeis False, the cached sample is used and df is ignored (a warning is emitted).column – Column name. If None, uses column_name from result.
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
show_histogram – Show data histogram
histogram_alpha – Histogram transparency (0-1)
pmf_linewidth – Line width for PMF curve
title_fontsize – Title font size
label_fontsize – Axis label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency (0-1)
save_path – Path to save figure (optional)
save_format – Save format (png, pdf, svg)
force_recompute – If True, ignore cached sample and recompute from df. Default False (v3.0.2).
- Returns:
Tuple of (figure, axis) from matplotlib
Example
>>> best = results.best(n=1)[0] >>> # v3.0.2: instant plotting using cached sample (default) >>> fitter.plot(best, title='Instant Plot') >>> # Force recompute from DataFrame >>> fitter.plot(best, df, 'value', title='Recomputed', force_recompute=True)
- class spark_bestfit.core.DistributionFitter(spark: SparkSession | None = None, excluded_distributions: Tuple[str, ...] | None = None, random_seed: int = 42, backend: ExecutionBackend | None = None)[source]¶
Bases:
BaseFitterModern Spark distribution fitting engine.
Efficiently fits ~90 scipy.stats distributions to data using Spark’s parallel processing capabilities. Uses broadcast variables and Pandas UDFs to avoid data collection and minimize serialization overhead.
Example
>>> from pyspark.sql import SparkSession >>> from spark_bestfit import DistributionFitter >>> >>> # Create your own SparkSession >>> spark = SparkSession.builder.appName("my-app").getOrCreate() >>> df = spark.createDataFrame([(float(x),) for x in data], ['value']) >>> >>> # Simple usage >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, column='value') >>> best = results.best(n=1)[0] >>> print(f"Best: {best.distribution} with SSE={best.sse}") >>> >>> # With custom parameters >>> fitter = DistributionFitter(spark, random_seed=123) >>> results = fitter.fit(df, 'value', bins=100, support_at_zero=True) >>> >>> # Plot the best fit >>> fitter.plot(best, df, 'value', title='Best Fit')
- fit(df: DataFrame, column: str | None = None, columns: List[str] | None = None, config: FitterConfig | None = None, *, bins: int | Tuple[float, ...] = 50, use_rice_rule: bool = True, support_at_zero: bool = False, max_distributions: int | None = None, enable_sampling: bool = True, sample_fraction: float | None = None, max_sample_size: int = 1000000, sample_threshold: int = 10000000, num_partitions: int | None = None, progress_callback: Callable[[int, int, float], None] | None = None, bounded: bool = False, lower_bound: float | Dict[str, float] | None = None, upper_bound: float | Dict[str, float] | None = None, lazy_metrics: bool = False, prefilter: bool | str = False, estimation_method: str = 'mle') EagerFitResults | LazyFitResults[source]¶
Fit distributions to data column(s).
- Parameters:
df – Spark DataFrame containing data
column – Name of single column to fit distributions to
columns – List of column names for multi-column fitting
config – FitterConfig object (v2.2.0). Provides a cleaner way to configure fitting with many parameters. If provided, individual parameters below are ignored (except progress_callback which can override the config’s callback). Use FitterConfigBuilder for fluent configuration.
bins – Number of histogram bins or tuple of bin edges
use_rice_rule – Use Rice rule to auto-determine bin count
support_at_zero – Only fit non-negative distributions
max_distributions – Limit number of distributions (for testing)
enable_sampling – Enable sampling for large datasets
sample_fraction – Fraction to sample (None = auto-determine)
max_sample_size – Maximum rows to sample when auto-determining
sample_threshold – Row count above which sampling is applied
num_partitions – Spark partitions (None = auto-determine)
progress_callback – Optional callback for progress updates. Called with (completed_tasks, total_tasks, percent_complete). Callback is invoked from background thread - ensure thread-safety.
bounded – If True, fit truncated distributions (v1.4.0). When enabled, distributions are truncated to [lower_bound, upper_bound] using scipy.stats.truncate(). Requires scipy >= 1.14.0.
lower_bound – Lower bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s minimum.
upper_bound – Upper bound for truncated distribution fitting. Can be a float (applied to all columns) or a dict mapping column names to bounds (v1.5.0). If None and bounded=True, auto-detects from each column’s maximum.
lazy_metrics – If True, defer computation of expensive KS/AD metrics until accessed (v1.5.0). Improves fitting performance when only using AIC/BIC/SSE for model selection. Default False for backward compatibility.
prefilter – Pre-filter distributions based on data characteristics (v1.6.0). Skips distributions that are mathematically incompatible with the data, reducing fitting time by 30-70% for non-normal data. - False (default): No pre-filtering, fit all distributions - True: Safe mode - filters by support bounds and skewness sign - ‘aggressive’: Also filters by kurtosis (may skip valid distributions) Pre-filtering uses scipy’s distribution support bounds (dist.a, dist.b) and sample moments. Filtered distributions are logged for transparency.
estimation_method –
Parameter estimation method (v2.5.0):
”mle”: Maximum Likelihood Estimation (default). Fast and accurate for most distributions. Uses scipy.stats.fit().
”mse”: Maximum Spacing Estimation. More robust for heavy-tailed distributions (Pareto, Cauchy, etc.) where MLE may fail.
”auto”: Automatically select MSE for heavy-tailed data based on kurtosis and extreme value analysis, MLE otherwise.
- Returns:
FitResults object with fitted distributions
- Raises:
ValueError – If column not found, DataFrame empty, or invalid params
TypeError – If column is not numeric
Example
>>> # Using FitterConfig (recommended for complex configs, v2.2.0) >>> from spark_bestfit import FitterConfigBuilder >>> config = (FitterConfigBuilder() ... .with_bins(100) ... .with_bounds(lower=0, upper=100) ... .with_sampling(fraction=0.1) ... .build()) >>> results = fitter.fit(df, column='value', config=config) >>> >>> # Single column (backward compatible) >>> results = fitter.fit(df, column='value') >>> results = fitter.fit(df, 'value', bins=100, support_at_zero=True) >>> >>> # Multi-column >>> results = fitter.fit(df, columns=['col1', 'col2', 'col3']) >>> best_col1 = results.for_column('col1').best(n=1)[0] >>> best_per_col = results.best_per_column(n=1) >>> >>> # Bounded fitting (v1.4.0) >>> results = fitter.fit(df, 'value', bounded=True) # Auto-detect bounds >>> results = fitter.fit(df, 'value', bounded=True, lower_bound=0, upper_bound=100) >>> >>> # Lazy metrics for faster fitting when only using AIC/BIC (v1.5.0) >>> results = fitter.fit(df, 'value', lazy_metrics=True) >>> best_aic = results.best(n=1, metric='aic')[0] # Fast, no KS/AD computed
- get_custom_distributions() dict[source]¶
Get all registered custom distributions.
- Returns:
Dict mapping distribution names to rv_continuous objects
- plot(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, bins: int | Tuple[float, ...] = 50, use_rice_rule: bool = True, title: str = '', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]¶
Plot fitted distribution against data histogram.
- Parameters:
result – DistributionFitResult to plot
df – DataFrame with data. Optional when result contains a cached sample (the default after fitting). When both a cached sample and df are provided, the cached sample is used unless
force_recompute=True.column – Column name. If None, uses column_name from result.
bins – Number of histogram bins
use_rice_rule – Use Rice rule for bins
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
show_histogram – Show data histogram
histogram_alpha – Histogram transparency (0-1)
pdf_linewidth – Line width for PDF curve
title_fontsize – Title font size
label_fontsize – Axis label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency (0-1)
save_path – Path to save figure (optional)
save_format – Save format (png, pdf, svg)
force_recompute – If True, ignore cached sample and recompute histogram from
df(requiresdfto be provided). Default False.
- Returns:
Tuple of (figure, axis) from matplotlib
Example
>>> # Instant plot from cached sample (recommended) >>> fitter.plot(best, title='Instant Plot') >>> # Explicit DataFrame (recomputes histogram via Spark) >>> fitter.plot(best, df, 'value', title='Best Fit', force_recompute=True)
- plot_comparison(results: List[DistributionFitResult], df: DataFrame | None = None, column: str | None = None, bins: int | Tuple[float, ...] = 50, use_rice_rule: bool = True, title: str = 'Distribution Comparison', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]¶
Plot multiple distributions for comparison.
- Parameters:
results – List of DistributionFitResult objects
df – DataFrame with data. Optional when results contain a cached sample. When both a cached sample and df are provided, the cached sample is used unless
force_recompute=True.column – Column name. If None, uses column_name from the first result.
bins – Number of histogram bins
use_rice_rule – Use Rice rule for bins
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch
show_histogram – Show data histogram
histogram_alpha – Histogram transparency
pdf_linewidth – PDF line width
title_fontsize – Title font size
label_fontsize – Label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency
save_path – Path to save figure
save_format – Save format
force_recompute – If True, ignore cached samples and recompute histogram from
df(requiresdfto be provided). Default False.
- Returns:
Tuple of (figure, axis)
Example
>>> top_3 = results.best(n=3) >>> # Instant comparison from cached sample (recommended) >>> fitter.plot_comparison(top_3) >>> # Explicit DataFrame (recomputes histogram via Spark) >>> fitter.plot_comparison(top_3, df, 'value', force_recompute=True)
- plot_pp(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, max_points: int = 1000, title: str = '', xlabel: str = 'Theoretical Probabilities', ylabel: str = 'Sample Probabilities', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]¶
Create a P-P plot to assess goodness-of-fit.
A P-P (probability-probability) plot compares the empirical CDF of the sample data against the theoretical CDF of the fitted distribution. Points falling close to the reference line indicate a good fit, particularly in the center of the distribution.
- Parameters:
result – DistributionFitResult to plot
df – DataFrame with data. Optional when result contains a cached sample (the default after fitting). When both a cached sample and df are provided, the cached sample is used unless
force_recompute=True.column – Column name. If None, uses column_name from result.
max_points – Maximum data points to sample for plotting
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
marker – Marker style for data points
marker_size – Size of markers
marker_alpha – Marker transparency (0-1)
marker_color – Color of markers
line_color – Color of reference line
line_style – Style of reference line
line_width – Width of reference line
title_fontsize – Title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Path to save figure (optional)
save_format – Save format (png, pdf, svg)
force_recompute – If True, ignore cached sample and resample from
df(requiresdfto be provided). Default False.
- Returns:
Tuple of (figure, axis) from matplotlib
Example
>>> best = results.best(n=1)[0] >>> # Instant P-P plot from cached sample (recommended) >>> fitter.plot_pp(best, title='Instant P-P Plot') >>> # Explicit DataFrame (resamples via Spark) >>> fitter.plot_pp(best, df, 'value', title='P-P Plot', force_recompute=True)
- plot_qq(result: DistributionFitResult, df: DataFrame | None = None, column: str | None = None, max_points: int = 1000, title: str = '', xlabel: str = 'Theoretical Quantiles', ylabel: str = 'Sample Quantiles', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png', force_recompute: bool = False)[source]¶
Create a Q-Q plot to assess goodness-of-fit.
A Q-Q (quantile-quantile) plot compares sample quantiles against theoretical quantiles from the fitted distribution. Points falling close to the reference line indicate a good fit.
- Parameters:
result – DistributionFitResult to plot
df – DataFrame with data. Optional when result contains a cached sample (the default after fitting). When both a cached sample and df are provided, the cached sample is used unless
force_recompute=True.column – Column name. If None, uses column_name from result.
max_points – Maximum data points to sample for plotting
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
marker – Marker style for data points
marker_size – Size of markers
marker_alpha – Marker transparency (0-1)
marker_color – Color of markers
line_color – Color of reference line
line_style – Style of reference line
line_width – Width of reference line
title_fontsize – Title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Path to save figure (optional)
save_format – Save format (png, pdf, svg)
force_recompute – If True, ignore cached sample and resample from
df(requiresdfto be provided). Default False.
- Returns:
Tuple of (figure, axis) from matplotlib
Example
>>> best = results.best(n=1)[0] >>> # Instant Q-Q plot from cached sample (recommended) >>> fitter.plot_qq(best, title='Instant Q-Q Plot') >>> # Explicit DataFrame (resamples via Spark) >>> fitter.plot_qq(best, df, 'value', title='Q-Q Plot', force_recompute=True)
- register_distribution(name: str, distribution: rv_continuous, overwrite: bool = False) DistributionFitter[source]¶
Register a custom distribution for fitting.
Custom distributions must implement the scipy rv_continuous interface, specifically the fit(), pdf(), and cdf() methods. The distribution will be included in fitting alongside scipy.stats distributions.
- Parameters:
name – Unique name for the distribution (used in results)
distribution – scipy rv_continuous instance or subclass. Must implement fit(), pdf(), cdf() methods.
overwrite – If True, overwrite existing distribution with same name. Default False raises ValueError if name exists.
- Returns:
Self (for method chaining)
- Raises:
ValueError – If name already exists (and overwrite=False) or conflicts with a scipy.stats distribution name
TypeError – If distribution doesn’t implement required interface
Example
>>> from scipy.stats import rv_continuous >>> >>> class PowerDistribution(rv_continuous): ... def _pdf(self, x, alpha): ... return alpha * x ** (alpha - 1) ... def _cdf(self, x, alpha): ... return x ** alpha >>> >>> fitter = DistributionFitter(spark) >>> fitter.register_distribution("power", PowerDistribution(a=0, b=1)) >>> results = fitter.fit(df, "column") >>> # Results will include "power" if it fits well
- unregister_distribution(name: str) DistributionFitter[source]¶
Remove a custom distribution from the registry.
- Parameters:
name – Name of the custom distribution to remove
- Returns:
Self (for method chaining)
- Raises:
KeyError – If distribution not found in registry
- class spark_bestfit.core.TruncatedFrozenDist(frozen_dist, lb: float, ub: float, *, raise_on_empty: bool = True)[source]¶
Bases:
objectWrapper for frozen scipy distributions with truncation bounds.
Implements truncation for arbitrary scipy.stats frozen distributions using CDF inversion for sampling and proper normalization for PDF/CDF.
This is needed because scipy.stats.truncate() only works with the new distribution infrastructure (scipy 1.14+), not with traditional rv_frozen objects.
- Parameters:
frozen_dist – Frozen scipy.stats distribution
lb – Lower bound (-np.inf for no lower bound)
ub – Upper bound (np.inf for no upper bound)
raise_on_empty – If True, raise ValueError when truncation has no probability mass. If False, methods return zeros/empty results silently. Default True.
Example
>>> from scipy import stats >>> from spark_bestfit import TruncatedFrozenDist >>> # Create a normal distribution truncated to [0, inf) >>> frozen = stats.norm(loc=0, scale=1) >>> truncated = TruncatedFrozenDist(frozen, lb=0, ub=np.inf) >>> truncated.pdf(0.5) # Evaluate PDF at x=0.5
- cdf(x)[source]¶
Evaluate cumulative distribution function.
Returns 0 for x < lower_bound, 1 for x > upper_bound.
- logpdf(x)[source]¶
Evaluate log probability density function.
Returns -inf for values outside the truncation bounds.
- mean()[source]¶
Compute mean of truncated distribution.
Uses analytical formulas for norm, expon, and uniform distributions. Falls back to Monte Carlo for other distributions.
- pdf(x)[source]¶
Evaluate probability density function.
Returns 0 for values outside the truncation bounds.
- ppf(q)[source]¶
Evaluate percent point function (inverse CDF).
- Parameters:
q – Quantile(s) in [0, 1]
- Returns:
Value(s) at the given quantile(s) within the truncated distribution
- spark_bestfit.core.DEFAULT_EXCLUDED_DISTRIBUTIONS = ('kstwobign', 'vonmises', 'dpareto_lognorm', 'mielke', 'exponpow', 'geninvgauss', 'ncf', 'studentized_range', 'ksone', 'gausshyper', 'ncx2', 'kstwo', 'vonmises_line', 'genhyperbolic', 'kappa4', 'nct', 'recipinvgauss', 'levy_stable', 'wald', 'tukeylambda')¶
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.
If the argument is a tuple, the return value is the same object.
- spark_bestfit.core.DEFAULT_EXCLUDED_DISCRETE_DISTRIBUTIONS = ('nchypergeom_fisher', 'randint', 'poisson_binom', 'bernoulli', 'nchypergeom_wallenius')¶
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.
If the argument is a tuple, the return value is the same object.
Discrete Fitting¶
Discrete distribution fitting using MLE optimization and Pandas UDFs.
- spark_bestfit.discrete_fitting.bootstrap_discrete_confidence_intervals(dist_name: str, data: ndarray, alpha: float = 0.05, n_bootstrap: int = 1000, random_seed: int | None = None) Dict[str, Tuple[float, float]][source]¶
Compute bootstrap confidence intervals for discrete distribution parameters.
Uses the percentile bootstrap method: resample data with replacement, refit the distribution using MLE, and compute confidence intervals from the empirical distribution of fitted parameters.
- Parameters:
dist_name – Name of scipy.stats discrete distribution
data – Integer data array used for fitting
alpha – Significance level (default 0.05 for 95% CI)
n_bootstrap – Number of bootstrap samples (default 1000)
random_seed – Random seed for reproducibility
- Returns:
Dictionary mapping parameter names to (lower, upper) bounds
Example
>>> data = np.random.poisson(lam=7, size=1000) >>> ci = bootstrap_discrete_confidence_intervals("poisson", data, alpha=0.05) >>> print(ci) {'mu': (6.75, 7.25)}
Note
Bootstrap fitting may fail for some resamples. Failed fits are skipped.
- spark_bestfit.discrete_fitting.compute_discrete_histogram(data: ndarray) Tuple[ndarray, ndarray][source]¶
Compute histogram for discrete (integer) data.
Unlike continuous histograms, discrete histograms use integer-aligned bins and compute empirical probability mass function (PMF).
- Parameters:
data – Integer data array
- Returns:
values: unique integer values in data
pmf: empirical probability mass at each value
- Return type:
Tuple of (values, pmf) where
- spark_bestfit.discrete_fitting.compute_discrete_information_criteria(dist: Any, params: Tuple[float, ...], data: ndarray, dist_name: str) Tuple[float, float][source]¶
Compute AIC and BIC for discrete distribution.
- Parameters:
dist – scipy.stats discrete distribution object
params – Fitted distribution parameters
data – Original integer data
dist_name – Name of distribution
- Returns:
Tuple of (aic, bic)
- spark_bestfit.discrete_fitting.compute_discrete_ks_statistic(dist: Any, params: Tuple[float, ...], data: ndarray, dist_name: str) Tuple[float, float][source]¶
Compute Kolmogorov-Smirnov statistic for discrete distribution.
Computes the two-sided KS statistic D_n = max(D+, D-) which measures the maximum distance between empirical and theoretical CDFs.
Note
The standard KS test assumes continuous distributions. For discrete distributions, the KS statistic is valid for comparing fits, but p-values are conservative and should not be used for formal hypothesis testing. Use AIC/BIC for model selection instead.
- Parameters:
dist – scipy.stats discrete distribution object
params – Fitted distribution parameters
data – Original integer data
dist_name – Name of distribution
- Returns:
Tuple of (ks_statistic, pvalue) where pvalue is approximate only
- spark_bestfit.discrete_fitting.compute_discrete_sse(dist: Any, params: Tuple[float, ...], x_values: ndarray, empirical_pmf: ndarray, dist_name: str) float[source]¶
Compute sum of squared errors between empirical and fitted PMF.
- Parameters:
dist – scipy.stats discrete distribution object
params – Fitted distribution parameters
x_values – Integer values where PMF is evaluated
empirical_pmf – Empirical probability mass at each x value
dist_name – Name of distribution
- Returns:
Sum of squared errors
- spark_bestfit.discrete_fitting.compute_ks_ad_metrics_discrete(dist_name: str, params: List[float], data_sample: ndarray, lower_bound: float | None = None, upper_bound: float | None = None) Tuple[float | None, float | None, float | None, float | None][source]¶
Compute KS metrics for a fitted discrete distribution.
This is the core computation function used for lazy metric evaluation with discrete distributions.
Note: Anderson-Darling is not computed for discrete distributions (AD test is for continuous distributions only).
- Parameters:
dist_name – Name of scipy.stats discrete distribution
params – Fitted distribution parameters
data_sample – Integer data sample for metric computation
lower_bound – Optional lower bound (unused for discrete, for API compatibility)
upper_bound – Optional upper bound (unused for discrete, for API compatibility)
- Returns:
Tuple of (ks_statistic, pvalue, ad_statistic, ad_pvalue) ad_statistic and ad_pvalue are always None for discrete distributions. Returns (None, None, None, None) if computation fails.
- spark_bestfit.discrete_fitting.create_discrete_fitting_udf(histogram_broadcast: Broadcast[Tuple[ndarray, ndarray]], data_sample_broadcast: Broadcast[ndarray], column_name: str | None = None, data_stats: Dict[str, float] | None = None, lower_bound: float | None = None, upper_bound: float | None = None, lazy_metrics: bool = False) Callable[[Series], DataFrame][source]¶
Factory function to create Pandas UDF for discrete distribution fitting.
- Parameters:
histogram_broadcast – Broadcast variable containing (x_values, empirical_pmf)
data_sample_broadcast – Broadcast variable containing integer data sample
column_name – Name of the column being fitted (for result tracking)
data_stats – Pre-computed summary statistics (data_min, data_max, etc.)
lower_bound – Optional lower bound for truncated distribution
upper_bound – Optional upper bound for truncated distribution
lazy_metrics – If True, skip expensive KS computation during fitting. These metrics will be computed on-demand when accessed via FitResults.best() or DistributionFitResult properties. (v1.5.0)
- Returns:
Pandas UDF function for fitting discrete distributions
- spark_bestfit.discrete_fitting.create_discrete_sample_data(data_full: ndarray, sample_size: int = 10000, random_seed: int = 42) ndarray[source]¶
Create a sample of discrete data for distribution fitting.
- Parameters:
data_full – Full integer dataset
sample_size – Target sample size
random_seed – Random seed for reproducibility
- Returns:
Sampled integer data
- spark_bestfit.discrete_fitting.evaluate_pmf(dist: Any, params: Tuple[float, ...], x: ndarray, dist_name: str) ndarray[source]¶
Evaluate probability mass function at given integer points.
- Parameters:
dist – scipy.stats discrete distribution object
params – Distribution parameters
x – Integer points at which to evaluate PMF
dist_name – Name of distribution (for special handling)
- Returns:
PMF values at x
- spark_bestfit.discrete_fitting.fit_discrete_mle(dist_name: str, data: ndarray, initial_params: List[float], bounds: List[Tuple[float, float]]) Tuple[ndarray, float][source]¶
Fit a discrete distribution using maximum likelihood estimation.
Since scipy discrete distributions don’t have a fit() method, we use scipy.optimize.minimize to find parameters that maximize the likelihood.
- Parameters:
dist_name – Name of the scipy.stats discrete distribution
data – Integer data to fit
initial_params – Initial parameter guesses
bounds – Parameter bounds as list of (min, max) tuples
- Returns:
Tuple of (fitted_params, negative_log_likelihood)
- Raises:
ValueError – If optimization fails to converge
- spark_bestfit.discrete_fitting.fit_single_discrete_distribution(dist_name: str, data_sample: ndarray, x_values: ndarray, empirical_pmf: ndarray, registry: DiscreteDistributionRegistry, column_name: str | None = None, data_stats: Dict[str, float] | None = None, lower_bound: float | None = None, upper_bound: float | None = None, lazy_metrics: bool = False) Dict[str, Any][source]¶
Fit a single discrete distribution and compute goodness-of-fit metrics.
- Parameters:
dist_name – Name of scipy.stats discrete distribution
data_sample – Sample of integer data for parameter fitting
x_values – Unique integer values in data
empirical_pmf – Empirical PMF at each x value
registry – DiscreteDistributionRegistry for parameter configs
column_name – Name of the column being fitted (for multi-column support)
data_stats – Pre-computed summary statistics (data_min, data_max, etc.)
lower_bound – Optional lower bound for truncated distribution
upper_bound – Optional upper bound for truncated distribution
lazy_metrics – If True, skip expensive KS computation. These metrics will be None in the result and computed on-demand later. (v1.5.0)
- Returns:
Dictionary with fit result fields including data_min, data_max, etc.
- spark_bestfit.discrete_fitting.get_discrete_param_names(dist_name: str) List[str][source]¶
Get parameter names for a discrete scipy distribution.
- Parameters:
dist_name – Name of scipy.stats discrete distribution
- Returns:
List of parameter names
Example
>>> get_discrete_param_names("poisson") ['mu'] >>> get_discrete_param_names("binom") ['n', 'p'] >>> get_discrete_param_names("nbinom") ['n', 'p']
Results¶
- class spark_bestfit.results.DistributionFitResult(distribution: str, parameters: List[float], sse: float, column_name: str | None = None, aic: float | None = None, bic: float | None = None, ks_statistic: float | None = None, pvalue: float | None = None, ad_statistic: float | None = None, ad_pvalue: float | None = None, data_min: float | None = None, data_max: float | None = None, data_mean: float | None = None, data_stddev: float | None = None, data_count: float | None = None, data_kurtosis: float | None = None, data_skewness: float | None = None, cached_sample: ndarray | None = None, lower_bound: float | None = None, upper_bound: float | None = None)[source]
Bases:
objectResult from fitting a single distribution.
- distribution
Name of the scipy.stats distribution
- Type:
- parameters
Fitted parameters (shape params + loc + scale)
- Type:
List[float]
- sse
Sum of Squared Errors
- Type:
- column_name
Name of the column that was fitted (for multi-column support)
- Type:
str | None
- aic
Akaike Information Criterion (lower is better)
- Type:
float | None
- bic
Bayesian Information Criterion (lower is better)
- Type:
float | None
- ks_statistic
Kolmogorov-Smirnov statistic (lower is better)
- Type:
float | None
- pvalue
P-value from KS test (higher indicates better fit)
- Type:
float | None
- ad_statistic
Anderson-Darling statistic (lower is better)
- Type:
float | None
- ad_pvalue
P-value from A-D test (only for norm, expon, logistic, gumbel_r, gumbel_l)
- Type:
float | None
- data_min
Minimum value in the data used for fitting
- Type:
float | None
- data_max
Maximum value in the data used for fitting
- Type:
float | None
- data_mean
Mean of the data used for fitting
- Type:
float | None
- data_stddev
Standard deviation of the data used for fitting
- Type:
float | None
- data_count
Number of samples in the data used for fitting
- Type:
float | None
- data_kurtosis
Excess kurtosis of the data used for fitting (v2.3.0)
- Type:
float | None
- data_skewness
Skewness of the data used for fitting (v2.3.0)
- Type:
float | None
- cached_sample
Cached sample data for instant plotting (v2.10.0)
- Type:
numpy.ndarray | None
- lower_bound
Lower bound for truncated distribution fitting (v1.4.0). When set, the distribution is truncated at this lower limit.
- Type:
float | None
- upper_bound
Upper bound for truncated distribution fitting (v1.4.0). When set, the distribution is truncated at this upper limit.
- Type:
float | None
Note
The p-value from the KS test is approximate when parameters are estimated from the same data being tested. It tends to be conservative (larger than it should be). Use it for rough guidance, not strict hypothesis testing. The ks_statistic is valid for ranking fits.
The ad_pvalue is only available for 5 distributions (norm, expon, logistic, gumbel_r, gumbel_l) where scipy has critical value tables. For other distributions, ad_pvalue will be None but ad_statistic is still valid for ranking fits.
When bounds are set (lower_bound and/or upper_bound), methods like sample(), pdf(), cdf(), and ppf() automatically use scipy.stats.truncate() to return values respecting the bounded domain.
- cdf(x: ndarray) ndarray[source]
Evaluate cumulative distribution function at given points.
- Parameters:
x – Points at which to evaluate CDF
- Returns:
CDF values at x. If bounds are set, the CDF is adjusted for the truncated domain (0 at lower_bound, 1 at upper_bound).
- confidence_intervals(df, column: str, alpha: float = 0.05, n_bootstrap: int = 1000, max_samples: int = 10000, random_seed: int | None = None) Dict[str, Tuple[float, float]][source]
Compute bootstrap confidence intervals for fitted parameters.
Uses the percentile bootstrap method: resample data with replacement, refit the distribution, and compute confidence intervals from the empirical distribution of fitted parameters.
- Parameters:
df – DataFrame containing the data (Spark DataFrame, pandas DataFrame, or Ray Dataset)
column – Column name containing the data
alpha – Significance level (default 0.05 for 95% CI)
n_bootstrap – Number of bootstrap samples (default 1000)
max_samples – Maximum rows to collect from DataFrame (default 10000)
random_seed – Random seed for reproducibility
- Returns:
Dictionary mapping parameter names to (lower, upper) bounds
Example
>>> result = fitter.fit(df, 'value').best(n=1)[0] >>> ci = result.confidence_intervals(df, 'value', alpha=0.05, random_seed=42) >>> print(result.distribution) 'gamma' >>> for param, (lower, upper) in ci.items(): ... print(f" {param}: [{lower:.4f}, {upper:.4f}]") a: [2.35, 2.65] loc: [-0.12, 0.08] scale: [3.05, 3.35]
Note
Bootstrap computation can be slow for large n_bootstrap values. The default 1000 iterations provides reasonable precision.
- diagnostics(data: ndarray, y_hist: ndarray | None = None, x_hist: ndarray | None = None, bins: int = 50, title: str = '', figsize: Tuple[int, int] = (14, 12), dpi: int = 100, title_fontsize: int = 16, subplot_title_fontsize: int = 12, label_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png')[source]
Create a 2x2 diagnostic plot panel for assessing distribution fit quality.
Generates four diagnostic plots: - Q-Q Plot (top-left): Compares sample quantiles vs theoretical quantiles - P-P Plot (top-right): Compares empirical vs theoretical probabilities - Residual Histogram (bottom-left): Distribution of fit residuals - CDF Comparison (bottom-right): Empirical vs theoretical CDF overlay
- Parameters:
data – Sample data array (1D numpy array)
y_hist – Optional pre-computed histogram density values. If None, computed from data using specified bins.
x_hist – Optional pre-computed histogram bin centers. If None, computed from data using specified bins.
bins – Number of histogram bins (used if y_hist/x_hist not provided)
title – Overall figure title
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
title_fontsize – Main title font size
subplot_title_fontsize – Subplot title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, array of axes)
Example
>>> result = fitter.fit(df, 'value').best(n=1)[0] >>> fig, axes = result.diagnostics(data, title='Fit Diagnostics') >>> plt.show()
- distribution: str
- get_param_names() List[str][source]
Get parameter names for this distribution.
- Returns:
List of parameter names in order matching self.parameters
Example
>>> result = fitter.fit(df, 'value').best(n=1)[0] >>> print(result.distribution) 'gamma' >>> print(result.get_param_names()) ['a', 'loc', 'scale'] >>> print(dict(zip(result.get_param_names(), result.parameters))) {'a': 2.5, 'loc': 0.0, 'scale': 3.2}
- get_scipy_dist(frozen: bool = True)[source]
Get scipy distribution object.
- Parameters:
frozen – If True (default), return a frozen distribution with parameters applied. If False, return the unfrozen distribution class.
- Returns:
scipy.stats distribution object. If bounds are set and frozen=True, returns a TruncatedFrozenDist wrapper that handles truncation.
Note
When bounds are set (lower_bound and/or upper_bound), the returned distribution is truncated. This ensures that sampling and PDF/CDF evaluation respect the bounds.
- classmethod load(path: str | Path) DistributionFitResult[source]
Load fitted distribution from file.
Reconstructs a DistributionFitResult from a previously saved file. The loaded result can be used for sampling, PDF/CDF evaluation, etc.
- Parameters:
path – File path. Format is detected from extension (.json, .pkl, .pickle).
- Returns:
Reconstructed DistributionFitResult
- Raises:
SerializationError – If file format is invalid or distribution is unknown.
FileNotFoundError – If file does not exist.
Example
>>> loaded = DistributionFitResult.load("model.json") >>> samples = loaded.sample(n=1000) >>> pdf_values = loaded.pdf(np.linspace(0, 100, 100))
Warning
Only load pickle files from trusted sources.
- pdf(x: ndarray) ndarray[source]
Evaluate probability density function at given points.
- Parameters:
x – Points at which to evaluate PDF
- Returns:
PDF values at x. If bounds are set, the PDF is normalized to integrate to 1 over the bounded domain.
Example
>>> result = fitter.fit(df, 'value').best(n=1)[0] >>> x = np.linspace(0, 10, 100) >>> y = result.pdf(x)
- ppf(q: ndarray) ndarray[source]
Evaluate percent point function (inverse CDF) at given quantiles.
- Parameters:
q – Quantiles at which to evaluate PPF (0 to 1)
- Returns:
PPF values at q. If bounds are set, values are guaranteed to be within [lower_bound, upper_bound].
- sample(size: int = 1000, random_state: int | None = None) ndarray[source]
Generate random samples from the fitted distribution.
- Parameters:
size – Number of samples to generate
random_state – Random seed for reproducibility
- Returns:
Array of random samples. If bounds are set, samples are guaranteed to be within [lower_bound, upper_bound].
Example
>>> result = fitter.fit(df, 'value').best(n=1)[0] >>> samples = result.sample(size=10000, random_state=42)
- save(path: str | Path, format: Literal['json', 'pickle'] | None = None, indent: int | None = 2) None[source]
Save fitted distribution to file.
Serializes the distribution parameters and metrics to JSON or pickle format. JSON is recommended for human-readable, version-safe output. Pickle is available for faster serialization when human-readability is not required.
- Parameters:
path – File path. Format is detected from extension if not specified.
format – Output format - ‘json’ (human-readable) or ‘pickle’. If None, detected from file extension (.json, .pkl, .pickle).
indent – JSON indentation level (default 2). Use None for compact output. Ignored for pickle format.
- Raises:
SerializationError – If format cannot be determined or write fails.
Example
>>> best = results.best(n=1)[0] >>> best.save("model.json") >>> best.save("model.pkl", format="pickle") >>> best.save("compact.json", indent=None)
- sse: float
- class spark_bestfit.results.BaseFitResults(results_df: DataFrame | DataFrame, samples: Dict[str, ndarray] | None = None)[source]¶
Bases:
ABCAbstract base class for distribution fit results.
Provides convenient methods for accessing, filtering, and analyzing fitted distributions. Wraps a Spark DataFrame but provides pandas-like interface for common operations.
- Subclasses:
EagerFitResults: All metrics pre-computed during fitting
LazyFitResults: KS/AD metrics computed on-demand
Example
>>> results = fitter.fit(df, 'value') >>> # Get the best distribution >>> best = results.best(n=1)[0] >>> # Get top 5 by AIC >>> top_aic = results.best(n=5, metric='aic') >>> # Convert to pandas for analysis >>> df_pandas = results.df.toPandas() >>> # Filter by SSE threshold >>> good_fits = results.filter(sse_threshold=0.01)
- abstractmethod best(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic', warn_if_poor: bool = False, pvalue_threshold: float = 0.05) List[DistributionFitResult][source]¶
Get top n distributions by specified metric.
- Parameters:
n – Number of results to return
metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’). Defaults to ‘ks_statistic’ (Kolmogorov-Smirnov statistic).
warn_if_poor – If True, emit a warning when the best fit has a p-value below pvalue_threshold, indicating a potentially poor fit.
pvalue_threshold – P-value threshold for poor fit warning (default 0.05). Only used when warn_if_poor=True.
- Returns:
List of DistributionFitResult objects
Example
>>> best = results.best(n=1)[0] >>> top_5 = results.best(n=5, metric='aic')
- best_per_column(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic') Dict[str, List[DistributionFitResult]][source]¶
Get top n distributions for each column.
- Parameters:
n – Number of results per column
metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’)
- Returns:
Dict mapping column_name -> List[DistributionFitResult]
Example
>>> results = fitter.fit(df, columns=["col1", "col2", "col3"]) >>> best_per_col = results.best_per_column(n=1) >>> for col, fits in best_per_col.items(): ... print(f"{col}: {fits[0].distribution}")
- property column_names: List[str]¶
Get list of unique column names in results.
- Returns:
List of column names that have fit results
Example
>>> results = fitter.fit(df, columns=["col1", "col2"]) >>> print(results.column_names) ['col1', 'col2']
- property df: DataFrame¶
Get underlying Spark DataFrame.
- Returns:
Spark DataFrame with results
- abstractmethod filter(sse_threshold: float | None = None, aic_threshold: float | None = None, bic_threshold: float | None = None, ks_threshold: float | None = None, pvalue_threshold: float | None = None, ad_threshold: float | None = None) BaseFitResults[source]¶
Filter results by metric thresholds.
- Parameters:
sse_threshold – Maximum SSE to include
aic_threshold – Maximum AIC to include
bic_threshold – Maximum BIC to include
ks_threshold – Maximum K-S statistic to include
pvalue_threshold – Minimum p-value to include (higher = better fit)
ad_threshold – Maximum A-D statistic to include
- Returns:
New FitResults with filtered data (same type as self)
Example
>>> good_fits = results.filter(sse_threshold=0.01)
- abstractmethod for_column(column_name: str) BaseFitResults[source]¶
Filter results to a single column.
- Parameters:
column_name – Column to filter for
- Returns:
New FitResults containing only results for the specified column (same type as self).
Example
>>> col1_results = results.for_column("col1")
- abstract property is_lazy: bool¶
Check if lazy metrics are available for on-demand computation.
- Returns:
True if this is a LazyFitResults with lazy contexts, False if this is an EagerFitResults with all metrics computed.
- property is_spark_df: bool¶
Check if the underlying DataFrame is a Spark DataFrame.
- Returns:
True if Spark DataFrame, False if pandas DataFrame.
- abstractmethod materialize() EagerFitResults[source]¶
Force computation of all lazy metrics.
When lazy_metrics=True was used during fitting, this method computes KS and AD statistics for all distributions. Call this before unpersisting the source DataFrame if you need the metrics later.
- Returns:
EagerFitResults with all metrics computed.
- Raises:
RuntimeError – If the source DataFrame is no longer available (LazyFitResults only).
Example
>>> results = fitter.fit(df, 'value', lazy_metrics=True) >>> # Fast: only AIC/BIC/SSE computed >>> best_aic = results.best(n=1, metric='aic')[0] >>> >>> # Before unpersisting, materialize all metrics >>> materialized = results.materialize() >>> df.unpersist() # Safe now >>> >>> # Access KS on materialized results >>> best_ks = materialized.best(n=1, metric='ks_statistic')[0]
- quality_report(n: int = 5, pvalue_threshold: float = 0.05, ks_threshold: float = 0.1, ad_threshold: float = 2.0) Dict[str, List[DistributionFitResult] | Dict[str, float] | List[str]][source]¶
Generate a quality assessment report for the fitting results.
Provides a comprehensive view of fit quality including the top fits, summary statistics, and any quality concerns.
- Parameters:
n – Number of top distributions to include (default 5)
pvalue_threshold – Minimum p-value for acceptable fit (default 0.05)
ks_threshold – Maximum K-S statistic for acceptable fit (default 0.10)
ad_threshold – Maximum A-D statistic for acceptable fit (default 2.0)
- Returns:
‘top_fits’: List of top n DistributionFitResult objects
’summary’: Dict with summary statistics (min/max/mean for key metrics)
’warnings’: List of warning messages about fit quality
’n_acceptable’: Number of distributions meeting all thresholds
- Return type:
Dictionary with
Example
>>> report = results.quality_report() >>> print(f"Top fit: {report['top_fits'][0].distribution}") >>> print(f"Warnings: {report['warnings']}") >>> if report['warnings']: ... print("Consider reviewing fit quality")
- summary() DataFrame[source]¶
Get summary statistics of fit quality.
- Returns:
DataFrame with min, mean, max for each metric
Example
>>> results.summary() min_sse mean_sse max_sse min_ks mean_ks max_ks min_ad mean_ad max_ad count 0 0.001 0.15 2.34 0.02 0.08 0.25 0.10 0.50 2.0 95
- unpersist(blocking: bool = False) BaseFitResults[source]¶
Release the cached DataFrame from memory.
Call this method when you no longer need the FitResults to free executor memory. This is especially useful in notebook sessions where multiple fits accumulate cached DataFrames.
Note
If lazy_metrics=True was used during fitting and you haven’t called materialize(), you should do so before unpersisting if you need KS/AD metrics later. After unpersisting, methods like best(), filter(), etc. may trigger recomputation from source.
- Parameters:
blocking – If True, block until unpersist completes. Default False.
- Returns:
Self for method chaining.
Example
>>> results = fitter.fit(df, 'value') >>> best = results.best(n=3) # Get what you need >>> results.unpersist() # Release memory >>> >>> # With lazy metrics, materialize first >>> lazy_results = fitter.fit(df, 'value', lazy_metrics=True) >>> materialized = lazy_results.materialize() >>> lazy_results.unpersist() # Release lazy version
- class spark_bestfit.results.EagerFitResults(results_df: DataFrame | DataFrame, samples: Dict[str, ndarray] | None = None)[source]¶
Bases:
BaseFitResultsFit results with all metrics pre-computed.
This class represents distribution fit results where all metrics (SSE, AIC, BIC, KS, AD) have been computed during fitting.
Example
>>> results = fitter.fit(df, 'value') # Default: eager evaluation >>> best = results.best(n=1)[0] >>> print(f"KS: {best.ks_statistic:.4f}")
- best(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic', warn_if_poor: bool = False, pvalue_threshold: float = 0.05) List[DistributionFitResult][source]¶
Get top n distributions by specified metric.
- Parameters:
n – Number of results to return
metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’)
warn_if_poor – If True, warn when best fit has poor p-value
pvalue_threshold – P-value threshold for poor fit warning
- Returns:
List of DistributionFitResult objects
- filter(sse_threshold: float | None = None, aic_threshold: float | None = None, bic_threshold: float | None = None, ks_threshold: float | None = None, pvalue_threshold: float | None = None, ad_threshold: float | None = None) EagerFitResults[source]¶
Filter results by metric thresholds.
- Parameters:
sse_threshold – Maximum SSE to include
aic_threshold – Maximum AIC to include
bic_threshold – Maximum BIC to include
ks_threshold – Maximum K-S statistic to include
pvalue_threshold – Minimum p-value to include
ad_threshold – Maximum A-D statistic to include
- Returns:
New EagerFitResults with filtered data
- for_column(column_name: str) EagerFitResults[source]¶
Filter results to a single column.
- Parameters:
column_name – Column to filter for
- Returns:
New EagerFitResults for the specified column
- materialize() EagerFitResults[source]¶
Return self - already materialized.
For eager results, this is a no-op since all metrics are already computed.
- Returns:
Self (no copy needed).
- class spark_bestfit.results.LazyFitResults(results_df: DataFrame | DataFrame, lazy_contexts: Dict[str, LazyMetricsContext], samples: Dict[str, ndarray] | None = None)[source]¶
Bases:
BaseFitResultsFit results with lazy KS/AD metric computation.
This class represents distribution fit results where only fast metrics (SSE, AIC, BIC) are pre-computed. KS and AD statistics are computed on-demand when first accessed via best() with those metrics.
Important
The source DataFrame must remain valid (not unpersisted) for lazy metric computation to work. Call materialize() before unpersisting the source DataFrame if you need the metrics later.
Example
>>> results = fitter.fit(df, 'value', lazy_metrics=True) >>> best_aic = results.best(n=1, metric='aic')[0] # Fast >>> best_ks = results.best(n=1, metric='ks_statistic')[0] # Computes on-demand >>> >>> # Before unpersisting source, materialize all metrics >>> materialized = results.materialize() >>> df.unpersist() # Safe now
- best(n: int = 1, metric: Literal['sse', 'aic', 'bic', 'ks_statistic', 'ad_statistic'] = 'ks_statistic', warn_if_poor: bool = False, pvalue_threshold: float = 0.05) List[DistributionFitResult][source]¶
Get top n distributions by specified metric.
For KS and AD metrics, computation happens on-demand using the stored lazy context.
- Parameters:
n – Number of results to return
metric – Metric to sort by (‘ks_statistic’, ‘sse’, ‘aic’, ‘bic’, or ‘ad_statistic’)
warn_if_poor – If True, warn when best fit has poor p-value
pvalue_threshold – P-value threshold for poor fit warning
- Returns:
List of DistributionFitResult objects
- filter(sse_threshold: float | None = None, aic_threshold: float | None = None, bic_threshold: float | None = None, ks_threshold: float | None = None, pvalue_threshold: float | None = None, ad_threshold: float | None = None) LazyFitResults[source]¶
Filter results by metric thresholds.
Note
Filtering by KS/AD thresholds with lazy metrics will exclude all results since those metrics are None. Use AIC/BIC/SSE thresholds or call materialize() first.
- Returns:
New LazyFitResults with filtered data (preserves lazy contexts)
- for_column(column_name: str) LazyFitResults[source]¶
Filter results to a single column.
- Parameters:
column_name – Column to filter for
- Returns:
New LazyFitResults for the specified column (preserves lazy context)
- is_source_available() bool[source]¶
Check if source DataFrames are still accessible.
Use this to verify that lazy metric computation can still succeed.
- Returns:
True if all source DataFrames can be accessed, False otherwise.
- materialize() EagerFitResults[source]¶
Force computation of all lazy metrics.
Computes KS and AD statistics for all distributions, returning an EagerFitResults that no longer depends on the source DataFrame.
- Returns:
EagerFitResults with all metrics computed.
- Raises:
RuntimeError – If the source DataFrame is no longer available.
- spark_bestfit.results.create_fit_results(results_df: DataFrame | DataFrame, lazy_contexts: Dict[str, LazyMetricsContext] | None = None, samples: Dict[str, ndarray] | None = None) EagerFitResults | LazyFitResults[source]¶
Factory function for creating FitResults.
Creates the appropriate FitResults variant based on whether lazy contexts are provided.
- Parameters:
results_df – Spark DataFrame or pandas DataFrame with fit results
lazy_contexts – Optional dict mapping column names to LazyMetricsContext for on-demand KS/AD computation
samples – Optional dict mapping column names to data samples
- Returns:
LazyFitResults if lazy_contexts provided, EagerFitResults otherwise
Example
>>> # From fitter (automatic) >>> results = fitter.fit(df, 'value') # Returns EagerFitResults >>> lazy = fitter.fit(df, 'value', lazy_metrics=True) # Returns LazyFitResults >>> >>> # Direct construction (rare) >>> eager = create_fit_results(df) # EagerFitResults >>> lazy = create_fit_results(df, lazy_contexts={...}) # LazyFitResults
Sampling¶
Distributed sampling for fitted distributions.
This module provides functions for generating samples from fitted distributions using the backend abstraction for distributed or local execution.
- spark_bestfit.sampling.sample_distributed(distribution: str, parameters: List[float], n: int, backend: ExecutionBackend, num_partitions: int | None = None, random_seed: int | None = None, column_name: str = 'sample') Any[source]¶
Generate samples from a fitted distribution using backend abstraction.
Uses the backend’s parallelism to generate samples, enabling generation of millions of samples efficiently with SparkBackend or local execution with LocalBackend.
- Parameters:
distribution – scipy.stats distribution name (e.g., “norm”, “expon”)
parameters – Distribution parameters (shape, loc, scale)
n – Total number of samples to generate
backend – Execution backend (SparkBackend, LocalBackend, etc.)
num_partitions – Number of partitions to use. Defaults to backend parallelism.
random_seed – Random seed for reproducibility. Each partition uses seed + partition_id.
column_name – Name for the output column (default: “sample”)
- Returns:
Backend-specific DataFrame with single column containing samples (Spark DataFrame for SparkBackend, pandas DataFrame for LocalBackend)
Example
>>> from spark_bestfit.backends.spark import SparkBackend >>> backend = SparkBackend(spark) >>> df = sample_distributed("norm", [0.0, 1.0], n=1_000_000, backend=backend) >>> df.show(5) +-------------------+ | sample| +-------------------+ | 0.4691122931291924| |-0.2828633018445851| | 1.0093545783546243| +-------------------+
Distributions¶
Distribution registry and management for scipy.stats distributions.
- class spark_bestfit.distributions.DiscreteDistributionRegistry(custom_exclusions: Set[str] | None = None)[source]¶
Bases:
objectRegistry for managing scipy.stats discrete distributions.
Unlike continuous distributions, discrete distributions in scipy do not have a built-in fit() method. This registry provides parameter configuration (initial values, bounds, estimation functions) needed for MLE fitting via optimization.
Example
>>> registry = DiscreteDistributionRegistry() >>> distributions = registry.get_distributions() >>> len(distributions) ~15
>>> # Get parameter config for fitting >>> config = registry.get_param_config("poisson") >>> initial = config["initial"](data) >>> bounds = config["bounds"](data)
- ALL_DISTRIBUTIONS = ['bernoulli', 'betabinom', 'betanbinom', 'binom', 'boltzmann', 'dlaplace', 'geom', 'hypergeom', 'logser', 'nbinom', 'nchypergeom_fisher', 'nchypergeom_wallenius', 'nhypergeom', 'planck', 'poisson', 'poisson_binom', 'randint', 'skellam', 'yulesimon', 'zipf', 'zipfian']¶
- DEFAULT_EXCLUSIONS = {'bernoulli', 'nchypergeom_fisher', 'nchypergeom_wallenius', 'poisson_binom', 'randint'}¶
- get_distributions(additional_exclusions: List[str] | None = None) List[str][source]¶
Get filtered list of discrete distributions.
Only returns distributions that have parameter configurations defined.
- Parameters:
additional_exclusions – Additional distribution names to exclude
- Returns:
List of distribution names that can be fitted
- class spark_bestfit.distributions.DistributionRegistry(custom_exclusions: Set[str] | None = None)[source]¶
Bases:
objectRegistry for managing scipy.stats continuous distributions.
Handles filtering of distributions based on exclusions and support constraints. All scipy.stats continuous distributions are available by default, with sensible exclusions for slow-computing distributions.
Example
>>> registry = DistributionRegistry() >>> distributions = registry.get_distributions() >>> len(distributions) ~100
>>> # Only non-negative distributions >>> pos_distributions = registry.get_distributions(support_at_zero=True)
>>> # Add custom exclusions >>> distributions = registry.get_distributions( ... additional_exclusions=["ncf", "ncx2"] ... )
- ALL_DISTRIBUTIONS = ['alpha', 'anglit', 'arcsine', 'argus', 'beta', 'betaprime', 'bradford', 'burr', 'burr12', 'cauchy', 'chi', 'chi2', 'cosine', 'crystalball', 'dgamma', 'dpareto_lognorm', 'dweibull', 'erlang', 'expon', 'exponnorm', 'exponpow', 'exponweib', 'f', 'fatiguelife', 'fisk', 'foldcauchy', 'foldnorm', 'gamma', 'gausshyper', 'genexpon', 'genextreme', 'gengamma', 'genhalflogistic', 'genhyperbolic', 'geninvgauss', 'genlogistic', 'gennorm', 'genpareto', 'gibrat', 'gompertz', 'gumbel_l', 'gumbel_r', 'halfcauchy', 'halfgennorm', 'halflogistic', 'halfnorm', 'hypsecant', 'invgamma', 'invgauss', 'invweibull', 'irwinhall', 'jf_skew_t', 'johnsonsb', 'johnsonsu', 'kappa3', 'kappa4', 'ksone', 'kstwo', 'kstwobign', 'landau', 'laplace', 'laplace_asymmetric', 'levy', 'levy_l', 'levy_stable', 'loggamma', 'logistic', 'loglaplace', 'lognorm', 'loguniform', 'lomax', 'maxwell', 'mielke', 'moyal', 'nakagami', 'ncf', 'nct', 'ncx2', 'norm', 'norminvgauss', 'pareto', 'pearson3', 'powerlaw', 'powerlognorm', 'powernorm', 'rayleigh', 'rdist', 'recipinvgauss', 'reciprocal', 'rel_breitwigner', 'rice', 'semicircular', 'skewcauchy', 'skewnorm', 'studentized_range', 't', 'trapezoid', 'triang', 'truncexpon', 'truncnorm', 'truncpareto', 'truncweibull_min', 'tukeylambda', 'uniform', 'vonmises', 'vonmises_line', 'wald', 'weibull_max', 'weibull_min', 'wrapcauchy']¶
- DEFAULT_EXCLUSIONS = {'dpareto_lognorm', 'exponpow', 'gausshyper', 'genhyperbolic', 'geninvgauss', 'kappa4', 'ksone', 'kstwo', 'kstwobign', 'levy_stable', 'mielke', 'ncf', 'nct', 'ncx2', 'recipinvgauss', 'studentized_range', 'tukeylambda', 'vonmises', 'vonmises_line', 'wald'}¶
- SLOW_DISTRIBUTIONS: Set[str] = {'burr', 'burr12', 'exponweib', 'fisk', 'genexpon', 'gengamma', 'invweibull', 'jf_skew_t', 'johnsonsb', 'johnsonsu', 'norminvgauss', 'pearson3', 'powerlognorm', 'rice', 't', 'truncweibull_min'}¶
- add_exclusion(dist_name: str) None[source]¶
Add a distribution to the exclusion list.
- Parameters:
dist_name – Name of the distribution to exclude
- get_custom_distributions() Dict[str, rv_continuous][source]¶
Get a copy of all registered custom distributions.
- Returns:
Dict mapping distribution names to rv_continuous objects
Note
Returns a shallow copy - modifying the dict won’t affect the registry, but modifying distribution objects will.
- get_distribution_object(name: str) rv_continuous[source]¶
Get a distribution object by name.
Looks up both scipy.stats built-in distributions and registered custom distributions.
- Parameters:
name – Distribution name (scipy.stats name or custom registered name)
- Returns:
scipy rv_continuous distribution object
- Raises:
ValueError – If distribution not found
Example
>>> registry = DistributionRegistry() >>> norm_dist = registry.get_distribution_object("norm") >>> # Also works for custom distributions >>> registry.register_distribution("custom", MyDist()) >>> my_dist = registry.get_distribution_object("custom")
- get_distributions(support_at_zero: bool = False, additional_exclusions: List[str] | None = None, include_custom: bool = True) List[str][source]¶
Get filtered list of distributions based on criteria.
- Parameters:
support_at_zero – If True, only include distributions with support at zero (non-negative distributions)
additional_exclusions – Additional distribution names to exclude
include_custom – If True, include registered custom distributions (default True)
- Returns:
List of distribution names meeting the criteria
Example
>>> registry = DistributionRegistry() >>> # Get all non-excluded distributions >>> dists = registry.get_distributions()
>>> # Get only non-negative distributions >>> pos_dists = registry.get_distributions(support_at_zero=True)
>>> # Exclude more distributions >>> filtered = registry.get_distributions( ... additional_exclusions=["norm", "expon"] ... )
>>> # Register and include custom distributions >>> registry.register_distribution("my_custom", MyCustomDistribution()) >>> dists = registry.get_distributions() # Includes "my_custom"
- get_exclusions() Set[str][source]¶
Get current set of excluded distributions.
- Returns:
Set of excluded distribution names
- has_custom_distributions() bool[source]¶
Check if any custom distributions are registered.
- Returns:
True if at least one custom distribution is registered
- register_distribution(name: str, distribution: rv_continuous, overwrite: bool = False) None[source]¶
Register a custom distribution for fitting.
Custom distributions must implement the scipy rv_continuous interface, specifically the fit(), pdf(), and cdf() methods. The distribution will be included in fitting alongside scipy.stats distributions.
- Parameters:
name – Unique name for the distribution (used in results)
distribution – scipy rv_continuous instance or subclass. Must implement fit(), pdf(), cdf() methods.
overwrite – If True, overwrite existing distribution with same name. Default False raises ValueError if name exists.
- Raises:
ValueError – If name already exists (and overwrite=False) or conflicts with a scipy.stats distribution name
TypeError – If distribution doesn’t implement required interface
Example
>>> from scipy.stats import rv_continuous >>> >>> class PowerDistribution(rv_continuous): ... def _pdf(self, x, alpha): ... return alpha * x ** (alpha - 1) ... def _cdf(self, x, alpha): ... return x ** alpha >>> >>> registry = DistributionRegistry() >>> registry.register_distribution("power", PowerDistribution(a=0, b=1)) >>> distributions = registry.get_distributions() >>> "power" in distributions True
Histogram¶
Distributed histogram computation without collecting raw data.
This module provides the HistogramComputer class that uses the backend abstraction for distributed histogram computation.
- class spark_bestfit.histogram.HistogramComputer(backend: ExecutionBackend | None = None)[source]¶
Bases:
objectComputes histograms efficiently using distributed aggregations.
This implementation avoids collecting raw data to the driver by using the backend’s distributed aggregation capabilities. Only the final histogram (typically ~100 values) is collected, not the raw dataset.
Supports multiple backends: - SparkBackend: Uses Spark ML Bucketizer + groupBy (scales to billions of rows) - LocalBackend: Uses numpy histogram (for testing and small datasets)
Example
>>> from spark_bestfit.backends.spark import SparkBackend >>> backend = SparkBackend(spark) >>> computer = HistogramComputer(backend) >>> y_hist, bin_edges = computer.compute_histogram( ... df, column='value', bins=50 ... ) >>> # y_hist has 50 values, bin_edges has 51 values >>> x_centers = (bin_edges[:-1] + bin_edges[1:]) / 2.0 # Compute centers if needed
- Auto-detection example (detects backend from DataFrame type):
>>> computer = HistogramComputer() >>> y_hist, bin_edges = computer.compute_histogram(pandas_df, column='value') # Uses LocalBackend
- compute_histogram(df: Any, column: str, bins: int | ndarray = 50, use_rice_rule: bool = False, approx_count: int | None = None) Tuple[ndarray, ndarray][source]¶
Compute histogram using distributed aggregations.
This method computes the histogram WITHOUT collecting the raw data. It uses the backend’s distributed aggregation to compute bin counts, then collects only the aggregated histogram.
- Parameters:
df – DataFrame containing data (Spark DataFrame or pandas DataFrame)
column – Column name to compute histogram for
bins – Number of bins (int) or array of bin edges
use_rice_rule – Use Rice rule to automatically determine bin count
approx_count – Approximate row count (avoids full count if provided)
- Returns:
y_hist: Normalized frequency density for each bin
bin_edges: Array of bin edge values (len = n_bins + 1)
- Return type:
Tuple of (y_hist, bin_edges) where
Example
>>> computer = HistogramComputer(backend) >>> y, x = computer.compute_histogram(df, 'value', bins=100) >>> # y and x are small numpy arrays (~100 elements)
Plotting¶
Visualization utilities for fitted distributions.
- spark_bestfit.plotting.plot_cdf_comparison(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Value', ylabel: str = 'Cumulative Probability', figsize: Tuple[int, int] = (10, 8), dpi: int = 100, empirical_color: str = 'steelblue', empirical_linewidth: float = 2.0, empirical_alpha: float = 0.8, theoretical_color: str = 'red', theoretical_linewidth: float = 2.0, theoretical_linestyle: str = '--', title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Plot empirical CDF overlaid with theoretical CDF from the fitted distribution.
The empirical CDF is computed from the sample data using the step function. The theoretical CDF is computed from the fitted distribution. A good fit shows close alignment between the two CDFs.
- Parameters:
result – Fitted distribution result
data – Sample data array (1D numpy array)
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
empirical_color – Color of empirical CDF line
empirical_linewidth – Line width for empirical CDF
empirical_alpha – Transparency of empirical CDF line
theoretical_color – Color of theoretical CDF line
theoretical_linewidth – Line width for theoretical CDF
theoretical_linestyle – Line style for theoretical CDF
title_fontsize – Title font size
label_fontsize – Axis label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, axis)
Example
>>> from spark_bestfit import DistributionFitter >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, 'value') >>> best = results.best(n=1)[0] >>> plot_cdf_comparison(best, data, title='CDF Comparison')
- spark_bestfit.plotting.plot_comparison(results: List[DistributionFitResult], y_hist: ndarray, x_hist: ndarray, title: str = 'Distribution Comparison', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Plot multiple fitted distributions for comparison.
- Parameters:
results – List of DistributionFitResult objects
y_hist – Histogram density values
x_hist – Histogram bin centers
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch
show_histogram – Show data histogram
histogram_alpha – Histogram transparency
pdf_linewidth – PDF line width
title_fontsize – Title font size
label_fontsize – Label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency
save_path – Optional path to save figure
save_format – Save format
- Returns:
Tuple of (figure, axis)
Example
>>> top_3 = results.best(n=3) >>> fitter.plot_comparison(top_3, df, 'value')
- spark_bestfit.plotting.plot_diagnostics(result: DistributionFitResult, data: ndarray, y_hist: ndarray | None = None, x_hist: ndarray | None = None, bins: int = 50, title: str = '', figsize: Tuple[int, int] = (14, 12), dpi: int = 100, title_fontsize: int = 16, subplot_title_fontsize: int = 12, label_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, ndarray][source]¶
Create a 2x2 diagnostic plot panel for assessing distribution fit quality.
Generates four diagnostic plots: - Q-Q Plot (top-left): Compares sample quantiles vs theoretical quantiles - P-P Plot (top-right): Compares empirical vs theoretical probabilities - Residual Histogram (bottom-left): Distribution of fit residuals - CDF Comparison (bottom-right): Empirical vs theoretical CDF overlay
- Parameters:
result – Fitted distribution result
data – Sample data array (1D numpy array)
y_hist – Optional pre-computed histogram density values. If None, computed from data using specified bins.
x_hist – Optional pre-computed histogram bin centers. If None, computed from data using specified bins.
bins – Number of histogram bins (used if y_hist/x_hist not provided)
title – Overall figure title
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
title_fontsize – Main title font size
subplot_title_fontsize – Subplot title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, array of axes)
Example
>>> from spark_bestfit import DistributionFitter >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, 'value') >>> best = results.best(n=1)[0] >>> fig, axes = plot_diagnostics(best, data, title='Fit Diagnostics')
- spark_bestfit.plotting.plot_discrete_distribution(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Value', ylabel: str = 'Probability', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.7, pmf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Plot fitted discrete distribution against data histogram.
- Parameters:
result – Fitted discrete distribution result
data – Integer data array
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
show_histogram – Show data histogram
histogram_alpha – Histogram transparency (0-1)
pmf_linewidth – Line width for PMF markers
title_fontsize – Title font size
label_fontsize – Axis label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, axis)
- spark_bestfit.plotting.plot_distribution(result: DistributionFitResult, y_hist: ndarray, x_hist: ndarray, title: str = '', xlabel: str = 'Value', ylabel: str = 'Density', figsize: Tuple[int, int] = (12, 8), dpi: int = 100, show_histogram: bool = True, histogram_alpha: float = 0.5, pdf_linewidth: int = 2, title_fontsize: int = 14, label_fontsize: int = 12, legend_fontsize: int = 10, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Plot fitted distribution against data histogram.
- Parameters:
result – Fitted distribution result
y_hist – Histogram density values
x_hist – Histogram bin centers
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
show_histogram – Show data histogram
histogram_alpha – Histogram transparency (0-1)
pdf_linewidth – Line width for PDF curve
title_fontsize – Title font size
label_fontsize – Axis label font size
legend_fontsize – Legend font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, axis)
Example
>>> from spark_bestfit import DistributionFitter >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, 'value') >>> best = results.best(n=1)[0] >>> fitter.plot(best, df, 'value', title='Best Fit')
- spark_bestfit.plotting.plot_pp(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Theoretical Probabilities', ylabel: str = 'Sample Probabilities', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Create a P-P (probability-probability) plot for goodness-of-fit assessment.
A P-P plot compares the empirical cumulative distribution function (CDF) of the sample data against the theoretical CDF of the fitted distribution. It is particularly useful for assessing fit in the center of the distribution.
- Parameters:
result – Fitted distribution result
data – Sample data array (1D numpy array)
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
marker – Marker style for data points
marker_size – Size of markers
marker_alpha – Marker transparency (0-1)
marker_color – Color of markers
line_color – Color of reference line
line_style – Style of reference line
line_width – Width of reference line
title_fontsize – Title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, axis)
Example
>>> from spark_bestfit import DistributionFitter >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, 'value') >>> best = results.best(n=1)[0] >>> fitter.plot_pp(best, df, 'value', title='P-P Plot')
- spark_bestfit.plotting.plot_qq(result: DistributionFitResult, data: ndarray, title: str = '', xlabel: str = 'Theoretical Quantiles', ylabel: str = 'Sample Quantiles', figsize: Tuple[int, int] = (10, 10), dpi: int = 100, marker: str = 'o', marker_size: int = 30, marker_alpha: float = 0.6, marker_color: str = 'steelblue', line_color: str = 'red', line_style: str = '--', line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Create a Q-Q (quantile-quantile) plot for goodness-of-fit assessment.
A Q-Q plot compares the quantiles of the sample data against the theoretical quantiles of the fitted distribution. If the data follows the fitted distribution well, the points will fall approximately along the reference line.
- Parameters:
result – Fitted distribution result
data – Sample data array (1D numpy array)
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
marker – Marker style for data points
marker_size – Size of markers
marker_alpha – Marker transparency (0-1)
marker_color – Color of markers
line_color – Color of reference line
line_style – Style of reference line
line_width – Width of reference line
title_fontsize – Title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, axis)
Example
>>> from spark_bestfit import DistributionFitter >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, 'value') >>> best = results.best(n=1)[0] >>> fitter.plot_qq(best, df, 'value', title='Q-Q Plot')
- spark_bestfit.plotting.plot_residual_histogram(result: DistributionFitResult, y_hist: ndarray, x_hist: ndarray, title: str = '', xlabel: str = 'Residual (Observed - Expected)', ylabel: str = 'Frequency', figsize: Tuple[int, int] = (10, 8), dpi: int = 100, bins: int = 30, histogram_alpha: float = 0.7, histogram_color: str = 'steelblue', show_zero_line: bool = True, zero_line_color: str = 'red', zero_line_style: str = '--', zero_line_width: float = 1.5, title_fontsize: int = 14, label_fontsize: int = 12, grid_alpha: float = 0.3, save_path: str | None = None, save_format: str = 'png') Tuple[None, None][source]¶
Plot a histogram of residuals (observed - expected density).
Residuals are computed as the difference between the empirical density (from histogram) and the theoretical density (from fitted distribution). A good fit should show residuals centered near zero.
- Parameters:
result – Fitted distribution result
y_hist – Histogram density values (empirical density)
x_hist – Histogram bin centers
title – Plot title
xlabel – X-axis label
ylabel – Y-axis label
figsize – Figure size (width, height)
dpi – Dots per inch for saved figures
bins – Number of bins for the residual histogram
histogram_alpha – Histogram transparency (0-1)
histogram_color – Color of histogram bars
show_zero_line – Whether to show a vertical line at zero
zero_line_color – Color of the zero reference line
zero_line_style – Style of the zero reference line
zero_line_width – Width of the zero reference line
title_fontsize – Title font size
label_fontsize – Axis label font size
grid_alpha – Grid transparency (0-1)
save_path – Optional path to save figure
save_format – Save format (png, pdf, svg)
- Returns:
Tuple of (figure, axis)
Example
>>> from spark_bestfit import DistributionFitter >>> fitter = DistributionFitter(spark) >>> results = fitter.fit(df, 'value') >>> best = results.best(n=1)[0] >>> y_hist, x_edges = np.histogram(data, bins=50, density=True) >>> x_hist = (x_edges[:-1] + x_edges[1:]) / 2 >>> plot_residual_histogram(best, y_hist, x_hist)
Utilities¶
Utility functions for spark-bestfit.
- spark_bestfit.utils.get_spark_session(spark: SparkSession | None = None) SparkSession[source]¶
Get or create a SparkSession.
If a SparkSession is provided, it is returned as-is. If None is provided, attempts to get the active SparkSession.
- Parameters:
spark – Optional SparkSession. If None, gets the active session.
- Returns:
SparkSession instance
- Raises:
RuntimeError – If no SparkSession is provided and no active session exists
Example
>>> # Use existing session >>> spark = SparkSession.builder.appName("my-app").getOrCreate() >>> session = get_spark_session(spark) >>> >>> # Use active session >>> session = get_spark_session() # Gets active session