Source code for spark_bestfit.truncated

"""Truncated distribution wrapper for scipy frozen distributions.

This module provides analytical truncated moments for common distributions
(norm, expon, uniform) and falls back to Monte Carlo for others.
"""

import numpy as np


[docs] class TruncatedFrozenDist: """Wrapper for frozen scipy distributions with truncation bounds. Implements truncation for arbitrary scipy.stats frozen distributions using CDF inversion for sampling and proper normalization for PDF/CDF. This is needed because scipy.stats.truncate() only works with the new distribution infrastructure (scipy 1.14+), not with traditional rv_frozen objects. Args: frozen_dist: Frozen scipy.stats distribution lb: Lower bound (-np.inf for no lower bound) ub: Upper bound (np.inf for no upper bound) raise_on_empty: If True, raise ValueError when truncation has no probability mass. If False, methods return zeros/empty results silently. Default True. Example: >>> from scipy import stats >>> from spark_bestfit import TruncatedFrozenDist >>> # Create a normal distribution truncated to [0, inf) >>> frozen = stats.norm(loc=0, scale=1) >>> truncated = TruncatedFrozenDist(frozen, lb=0, ub=np.inf) >>> truncated.pdf(0.5) # Evaluate PDF at x=0.5 """ def __init__(self, frozen_dist, lb: float, ub: float, *, raise_on_empty: bool = True): """Initialize truncated distribution.""" self._dist = frozen_dist self._lb = lb self._ub = ub self._raise_on_empty = raise_on_empty # Pre-compute normalization constant self._cdf_lb = frozen_dist.cdf(lb) if np.isfinite(lb) else 0.0 self._cdf_ub = frozen_dist.cdf(ub) if np.isfinite(ub) else 1.0 self._norm = self._cdf_ub - self._cdf_lb if self._norm <= 0 and raise_on_empty: raise ValueError(f"Invalid truncation: no probability mass in [{lb}, {ub}]") @property def bounds(self) -> tuple: """Return (lower_bound, upper_bound) tuple.""" return (self._lb, self._ub)
[docs] def pdf(self, x): """Evaluate probability density function. Returns 0 for values outside the truncation bounds. """ x = np.asarray(x) result = np.zeros_like(x, dtype=float) if self._norm <= 0: return result # Only compute PDF for values within bounds mask = (x >= self._lb) & (x <= self._ub) if np.any(mask): result[mask] = self._dist.pdf(x[mask]) / self._norm return result
[docs] def logpdf(self, x): """Evaluate log probability density function. Returns -inf for values outside the truncation bounds. """ x = np.asarray(x) result = np.full_like(x, -np.inf, dtype=float) if self._norm <= 0: return result # Only compute logPDF for values within bounds mask = (x >= self._lb) & (x <= self._ub) if np.any(mask): result[mask] = self._dist.logpdf(x[mask]) - np.log(self._norm) return result
[docs] def cdf(self, x): """Evaluate cumulative distribution function. Returns 0 for x < lower_bound, 1 for x > upper_bound. """ x = np.asarray(x) result = np.zeros_like(x, dtype=float) # Below lower bound: 0 below = x < self._lb result[below] = 0.0 # Above upper bound: 1 above = x > self._ub result[above] = 1.0 # Within bounds: scaled CDF between = ~below & ~above if np.any(between) and self._norm > 0: result[between] = (self._dist.cdf(x[between]) - self._cdf_lb) / self._norm return result
[docs] def ppf(self, q): """Evaluate percent point function (inverse CDF). Args: q: Quantile(s) in [0, 1] Returns: Value(s) at the given quantile(s) within the truncated distribution """ q = np.asarray(q) # Map quantile to the truncated range q_mapped = self._cdf_lb + q * self._norm return self._dist.ppf(q_mapped)
[docs] def rvs(self, size=1, random_state=None): """Generate random samples using inverse CDF method. Args: size: Number of samples to generate random_state: Random seed or numpy Generator for reproducibility Returns: Array of random samples from the truncated distribution """ rng = np.random.default_rng(random_state) u = rng.uniform(0, 1, size=size) return self.ppf(u)
def _get_dist_name(self) -> str: """Get the distribution name from the frozen distribution.""" return self._dist.dist.name def _get_dist_params(self) -> tuple: """Extract loc, scale, and shape params from frozen distribution.""" args = self._dist.args kwds = self._dist.kwds loc = kwds.get("loc", 0.0) scale = kwds.get("scale", 1.0) return args, loc, scale
[docs] def mean(self): """Compute mean of truncated distribution. Uses analytical formulas for norm, expon, and uniform distributions. Falls back to Monte Carlo for other distributions. """ dist_name = self._get_dist_name() args, loc, scale = self._get_dist_params() lb, ub = self._lb, self._ub if dist_name == "norm": return self._mean_truncated_norm(loc, scale, lb, ub) elif dist_name == "expon": return self._mean_truncated_expon(loc, scale, lb, ub) elif dist_name == "uniform": return self._mean_truncated_uniform(loc, scale, lb, ub) else: # Fall back to Monte Carlo samples = self.rvs(size=10000, random_state=42) return np.mean(samples)
[docs] def std(self): """Compute standard deviation of truncated distribution. Uses analytical formulas for norm, expon, and uniform distributions. Falls back to Monte Carlo for other distributions. """ dist_name = self._get_dist_name() args, loc, scale = self._get_dist_params() lb, ub = self._lb, self._ub if dist_name == "norm": return self._std_truncated_norm(loc, scale, lb, ub) elif dist_name == "expon": return self._std_truncated_expon(loc, scale, lb, ub) elif dist_name == "uniform": return self._std_truncated_uniform(loc, scale, lb, ub) else: # Fall back to Monte Carlo samples = self.rvs(size=10000, random_state=42) return np.std(samples)
def _mean_truncated_norm(self, mu: float, sigma: float, lb: float, ub: float) -> float: """Analytical mean for truncated normal distribution. E[X] = mu + sigma * (phi(alpha) - phi(beta)) / Z where alpha = (lb - mu) / sigma, beta = (ub - mu) / sigma phi = standard normal PDF, Z = Phi(beta) - Phi(alpha) """ alpha = (lb - mu) / sigma if np.isfinite(lb) else -np.inf beta = (ub - mu) / sigma if np.isfinite(ub) else np.inf # Standard normal PDF at alpha and beta phi_alpha = np.exp(-0.5 * alpha**2) / np.sqrt(2 * np.pi) if np.isfinite(alpha) else 0.0 phi_beta = np.exp(-0.5 * beta**2) / np.sqrt(2 * np.pi) if np.isfinite(beta) else 0.0 # Z is already computed as self._norm (CDF(ub) - CDF(lb)) Z = self._norm if Z <= 0: return mu return mu + sigma * (phi_alpha - phi_beta) / Z def _std_truncated_norm(self, mu: float, sigma: float, lb: float, ub: float) -> float: """Analytical standard deviation for truncated normal distribution. Var[X] = sigma^2 * [1 + (alpha*phi(alpha) - beta*phi(beta))/Z - ((phi(alpha) - phi(beta))/Z)^2] """ alpha = (lb - mu) / sigma if np.isfinite(lb) else -np.inf beta = (ub - mu) / sigma if np.isfinite(ub) else np.inf # Standard normal PDF at alpha and beta phi_alpha = np.exp(-0.5 * alpha**2) / np.sqrt(2 * np.pi) if np.isfinite(alpha) else 0.0 phi_beta = np.exp(-0.5 * beta**2) / np.sqrt(2 * np.pi) if np.isfinite(beta) else 0.0 # Handle infinite bounds for alpha*phi(alpha) and beta*phi(beta) alpha_phi_alpha = alpha * phi_alpha if np.isfinite(alpha) else 0.0 beta_phi_beta = beta * phi_beta if np.isfinite(beta) else 0.0 Z = self._norm if Z <= 0: return sigma term1 = (alpha_phi_alpha - beta_phi_beta) / Z term2 = ((phi_alpha - phi_beta) / Z) ** 2 variance = sigma**2 * (1 + term1 - term2) return np.sqrt(max(0, variance)) def _mean_truncated_expon(self, loc: float, scale: float, lb: float, ub: float) -> float: """Analytical mean for truncated exponential distribution. For exponential with rate lambda = 1/scale, truncated to [a, b]: E[X] = loc + scale * [1 - (b-a)/scale * exp(-(b-a)/scale) / (1 - exp(-(b-a)/scale))] when lb >= loc. """ # Effective bounds relative to loc a = max(lb - loc, 0) if np.isfinite(lb) else 0.0 b = ub - loc if np.isfinite(ub) else np.inf if not np.isfinite(b): # One-sided truncation from below # E[X|X > a] = loc + a + scale return loc + a + scale # Two-sided truncation lam = 1.0 / scale exp_term = np.exp(-lam * (b - a)) if exp_term >= 1.0: # Degenerate case return loc + (a + b) / 2 denom = 1 - exp_term if denom <= 0: return loc + (a + b) / 2 # Mean of truncated exponential mean_shifted = (1 / lam) - (b - a) * exp_term / denom return loc + a + mean_shifted def _std_truncated_expon(self, loc: float, scale: float, lb: float, ub: float) -> float: """Analytical standard deviation for truncated exponential distribution.""" # Effective bounds relative to loc a = max(lb - loc, 0) if np.isfinite(lb) else 0.0 b = ub - loc if np.isfinite(ub) else np.inf if not np.isfinite(b): # One-sided truncation: Std = scale (memoryless property) return scale # Two-sided truncation: use variance formula lam = 1.0 / scale exp_term = np.exp(-lam * (b - a)) denom = 1 - exp_term if denom <= 0: return 0.0 # E[X] and E[X^2] for variance calculation delta = b - a mean_rel = (1 / lam) - delta * exp_term / denom # E[X^2] for truncated exponential e_x2 = (2 / lam**2) - (delta**2 + 2 * delta / lam) * exp_term / denom variance = e_x2 - mean_rel**2 return np.sqrt(max(0, variance)) def _mean_truncated_uniform(self, loc: float, scale: float, lb: float, ub: float) -> float: """Analytical mean for truncated uniform distribution. For uniform on [loc, loc+scale], truncated to [lb, ub]: Mean = (effective_lb + effective_ub) / 2 """ # Original support is [loc, loc + scale] effective_lb = max(lb, loc) if np.isfinite(lb) else loc effective_ub = min(ub, loc + scale) if np.isfinite(ub) else loc + scale return (effective_lb + effective_ub) / 2 def _std_truncated_uniform(self, loc: float, scale: float, lb: float, ub: float) -> float: """Analytical standard deviation for truncated uniform distribution. Std = (effective_ub - effective_lb) / sqrt(12) """ # Original support is [loc, loc + scale] effective_lb = max(lb, loc) if np.isfinite(lb) else loc effective_ub = min(ub, loc + scale) if np.isfinite(ub) else loc + scale width = effective_ub - effective_lb return width / np.sqrt(12)
def create_truncated_dist(frozen_dist, lb: float, ub: float) -> TruncatedFrozenDist: """Create a truncated distribution wrapper. Convenience function that creates a TruncatedFrozenDist with raise_on_empty=False so fitting continues silently when truncation bounds contain no probability mass. Args: frozen_dist: Frozen scipy.stats distribution lb: Lower bound ub: Upper bound Returns: Truncated distribution wrapper with pdf, logpdf, cdf methods """ return TruncatedFrozenDist(frozen_dist, lb, ub, raise_on_empty=False) __all__ = ["TruncatedFrozenDist", "create_truncated_dist"]