summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/statistics.py1513
-rw-r--r--Lib/test/test_statistics.py5
2 files changed, 783 insertions, 735 deletions
diff --git a/Lib/statistics.py b/Lib/statistics.py
index c36145f..c64c6fa 100644
--- a/Lib/statistics.py
+++ b/Lib/statistics.py
@@ -147,327 +147,13 @@ from collections import Counter, namedtuple, defaultdict
_SQRT2 = sqrt(2.0)
_random = random
-# === Exceptions ===
+## Exceptions ##############################################################
class StatisticsError(ValueError):
pass
-# === Private utilities ===
-
-def _sum(data):
- """_sum(data) -> (type, sum, count)
-
- Return a high-precision sum of the given numeric data as a fraction,
- together with the type to be converted to and the count of items.
-
- Examples
- --------
-
- >>> _sum([3, 2.25, 4.5, -0.5, 0.25])
- (<class 'float'>, Fraction(19, 2), 5)
-
- Some sources of round-off error will be avoided:
-
- # Built-in sum returns zero.
- >>> _sum([1e50, 1, -1e50] * 1000)
- (<class 'float'>, Fraction(1000, 1), 3000)
-
- Fractions and Decimals are also supported:
-
- >>> from fractions import Fraction as F
- >>> _sum([F(2, 3), F(7, 5), F(1, 4), F(5, 6)])
- (<class 'fractions.Fraction'>, Fraction(63, 20), 4)
-
- >>> from decimal import Decimal as D
- >>> data = [D("0.1375"), D("0.2108"), D("0.3061"), D("0.0419")]
- >>> _sum(data)
- (<class 'decimal.Decimal'>, Fraction(6963, 10000), 4)
-
- Mixed types are currently treated as an error, except that int is
- allowed.
- """
- count = 0
- types = set()
- types_add = types.add
- partials = {}
- partials_get = partials.get
- for typ, values in groupby(data, type):
- types_add(typ)
- for n, d in map(_exact_ratio, values):
- count += 1
- partials[d] = partials_get(d, 0) + n
- if None in partials:
- # The sum will be a NAN or INF. We can ignore all the finite
- # partials, and just look at this special one.
- total = partials[None]
- assert not _isfinite(total)
- else:
- # Sum all the partial sums using builtin sum.
- total = sum(Fraction(n, d) for d, n in partials.items())
- T = reduce(_coerce, types, int) # or raise TypeError
- return (T, total, count)
-
-
-def _ss(data, c=None):
- """Return the exact mean and sum of square deviations of sequence data.
-
- Calculations are done in a single pass, allowing the input to be an iterator.
-
- If given *c* is used the mean; otherwise, it is calculated from the data.
- Use the *c* argument with care, as it can lead to garbage results.
-
- """
- if c is not None:
- T, ssd, count = _sum((d := x - c) * d for x in data)
- return (T, ssd, c, count)
- count = 0
- types = set()
- types_add = types.add
- sx_partials = defaultdict(int)
- sxx_partials = defaultdict(int)
- for typ, values in groupby(data, type):
- types_add(typ)
- for n, d in map(_exact_ratio, values):
- count += 1
- sx_partials[d] += n
- sxx_partials[d] += n * n
- if not count:
- ssd = c = Fraction(0)
- elif None in sx_partials:
- # The sum will be a NAN or INF. We can ignore all the finite
- # partials, and just look at this special one.
- ssd = c = sx_partials[None]
- assert not _isfinite(ssd)
- else:
- sx = sum(Fraction(n, d) for d, n in sx_partials.items())
- sxx = sum(Fraction(n, d*d) for d, n in sxx_partials.items())
- # This formula has poor numeric properties for floats,
- # but with fractions it is exact.
- ssd = (count * sxx - sx * sx) / count
- c = sx / count
- T = reduce(_coerce, types, int) # or raise TypeError
- return (T, ssd, c, count)
-
-
-def _isfinite(x):
- try:
- return x.is_finite() # Likely a Decimal.
- except AttributeError:
- return math.isfinite(x) # Coerces to float first.
-
-
-def _coerce(T, S):
- """Coerce types T and S to a common type, or raise TypeError.
-
- Coercion rules are currently an implementation detail. See the CoerceTest
- test class in test_statistics for details.
- """
- # See http://bugs.python.org/issue24068.
- assert T is not bool, "initial type T is bool"
- # If the types are the same, no need to coerce anything. Put this
- # first, so that the usual case (no coercion needed) happens as soon
- # as possible.
- if T is S: return T
- # Mixed int & other coerce to the other type.
- if S is int or S is bool: return T
- if T is int: return S
- # If one is a (strict) subclass of the other, coerce to the subclass.
- if issubclass(S, T): return S
- if issubclass(T, S): return T
- # Ints coerce to the other type.
- if issubclass(T, int): return S
- if issubclass(S, int): return T
- # Mixed fraction & float coerces to float (or float subclass).
- if issubclass(T, Fraction) and issubclass(S, float):
- return S
- if issubclass(T, float) and issubclass(S, Fraction):
- return T
- # Any other combination is disallowed.
- msg = "don't know how to coerce %s and %s"
- raise TypeError(msg % (T.__name__, S.__name__))
-
-
-def _exact_ratio(x):
- """Return Real number x to exact (numerator, denominator) pair.
-
- >>> _exact_ratio(0.25)
- (1, 4)
-
- x is expected to be an int, Fraction, Decimal or float.
- """
-
- # XXX We should revisit whether using fractions to accumulate exact
- # ratios is the right way to go.
-
- # The integer ratios for binary floats can have numerators or
- # denominators with over 300 decimal digits. The problem is more
- # acute with decimal floats where the default decimal context
- # supports a huge range of exponents from Emin=-999999 to
- # Emax=999999. When expanded with as_integer_ratio(), numbers like
- # Decimal('3.14E+5000') and Decimal('3.14E-5000') have large
- # numerators or denominators that will slow computation.
-
- # When the integer ratios are accumulated as fractions, the size
- # grows to cover the full range from the smallest magnitude to the
- # largest. For example, Fraction(3.14E+300) + Fraction(3.14E-300),
- # has a 616 digit numerator. Likewise,
- # Fraction(Decimal('3.14E+5000')) + Fraction(Decimal('3.14E-5000'))
- # has 10,003 digit numerator.
-
- # This doesn't seem to have been problem in practice, but it is a
- # potential pitfall.
-
- try:
- return x.as_integer_ratio()
- except AttributeError:
- pass
- except (OverflowError, ValueError):
- # float NAN or INF.
- assert not _isfinite(x)
- return (x, None)
- try:
- # x may be an Integral ABC.
- return (x.numerator, x.denominator)
- except AttributeError:
- msg = f"can't convert type '{type(x).__name__}' to numerator/denominator"
- raise TypeError(msg)
-
-
-def _convert(value, T):
- """Convert value to given numeric type T."""
- if type(value) is T:
- # This covers the cases where T is Fraction, or where value is
- # a NAN or INF (Decimal or float).
- return value
- if issubclass(T, int) and value.denominator != 1:
- T = float
- try:
- # FIXME: what do we do if this overflows?
- return T(value)
- except TypeError:
- if issubclass(T, Decimal):
- return T(value.numerator) / T(value.denominator)
- else:
- raise
-
-
-def _fail_neg(values, errmsg='negative value'):
- """Iterate over values, failing if any are less than zero."""
- for x in values:
- if x < 0:
- raise StatisticsError(errmsg)
- yield x
-
-
-def _rank(data, /, *, key=None, reverse=False, ties='average', start=1) -> list[float]:
- """Rank order a dataset. The lowest value has rank 1.
-
- Ties are averaged so that equal values receive the same rank:
-
- >>> data = [31, 56, 31, 25, 75, 18]
- >>> _rank(data)
- [3.5, 5.0, 3.5, 2.0, 6.0, 1.0]
-
- The operation is idempotent:
-
- >>> _rank([3.5, 5.0, 3.5, 2.0, 6.0, 1.0])
- [3.5, 5.0, 3.5, 2.0, 6.0, 1.0]
-
- It is possible to rank the data in reverse order so that the
- highest value has rank 1. Also, a key-function can extract
- the field to be ranked:
-
- >>> goals = [('eagles', 45), ('bears', 48), ('lions', 44)]
- >>> _rank(goals, key=itemgetter(1), reverse=True)
- [2.0, 1.0, 3.0]
-
- Ranks are conventionally numbered starting from one; however,
- setting *start* to zero allows the ranks to be used as array indices:
-
- >>> prize = ['Gold', 'Silver', 'Bronze', 'Certificate']
- >>> scores = [8.1, 7.3, 9.4, 8.3]
- >>> [prize[int(i)] for i in _rank(scores, start=0, reverse=True)]
- ['Bronze', 'Certificate', 'Gold', 'Silver']
-
- """
- # If this function becomes public at some point, more thought
- # needs to be given to the signature. A list of ints is
- # plausible when ties is "min" or "max". When ties is "average",
- # either list[float] or list[Fraction] is plausible.
-
- # Default handling of ties matches scipy.stats.mstats.spearmanr.
- if ties != 'average':
- raise ValueError(f'Unknown tie resolution method: {ties!r}')
- if key is not None:
- data = map(key, data)
- val_pos = sorted(zip(data, count()), reverse=reverse)
- i = start - 1
- result = [0] * len(val_pos)
- for _, g in groupby(val_pos, key=itemgetter(0)):
- group = list(g)
- size = len(group)
- rank = i + (size + 1) / 2
- for value, orig_pos in group:
- result[orig_pos] = rank
- i += size
- return result
-
-
-def _integer_sqrt_of_frac_rto(n: int, m: int) -> int:
- """Square root of n/m, rounded to the nearest integer using round-to-odd."""
- # Reference: https://www.lri.fr/~melquion/doc/05-imacs17_1-expose.pdf
- a = math.isqrt(n // m)
- return a | (a*a*m != n)
-
-
-# For 53 bit precision floats, the bit width used in
-# _float_sqrt_of_frac() is 109.
-_sqrt_bit_width: int = 2 * sys.float_info.mant_dig + 3
-
-
-def _float_sqrt_of_frac(n: int, m: int) -> float:
- """Square root of n/m as a float, correctly rounded."""
- # See principle and proof sketch at: https://bugs.python.org/msg407078
- q = (n.bit_length() - m.bit_length() - _sqrt_bit_width) // 2
- if q >= 0:
- numerator = _integer_sqrt_of_frac_rto(n, m << 2 * q) << q
- denominator = 1
- else:
- numerator = _integer_sqrt_of_frac_rto(n << -2 * q, m)
- denominator = 1 << -q
- return numerator / denominator # Convert to float
-
-
-def _decimal_sqrt_of_frac(n: int, m: int) -> Decimal:
- """Square root of n/m as a Decimal, correctly rounded."""
- # Premise: For decimal, computing (n/m).sqrt() can be off
- # by 1 ulp from the correctly rounded result.
- # Method: Check the result, moving up or down a step if needed.
- if n <= 0:
- if not n:
- return Decimal('0.0')
- n, m = -n, -m
-
- root = (Decimal(n) / Decimal(m)).sqrt()
- nr, dr = root.as_integer_ratio()
-
- plus = root.next_plus()
- np, dp = plus.as_integer_ratio()
- # test: n / m > ((root + plus) / 2) ** 2
- if 4 * n * (dr*dp)**2 > m * (dr*np + dp*nr)**2:
- return plus
-
- minus = root.next_minus()
- nm, dm = minus.as_integer_ratio()
- # test: n / m < ((root + minus) / 2) ** 2
- if 4 * n * (dr*dm)**2 < m * (dr*nm + dm*nr)**2:
- return minus
-
- return root
-
-
-# === Measures of central tendency (averages) ===
+## Measures of central tendency (averages) #################################
def mean(data):
"""Return the sample arithmetic mean of data.
@@ -484,6 +170,7 @@ def mean(data):
Decimal('0.5625')
If ``data`` is empty, StatisticsError will be raised.
+
"""
T, total, n = _sum(data)
if n < 1:
@@ -499,8 +186,10 @@ def fmean(data, weights=None):
>>> fmean([3.5, 4.0, 5.25])
4.25
+
"""
if weights is None:
+
try:
n = len(data)
except TypeError:
@@ -510,18 +199,25 @@ def fmean(data, weights=None):
n = next(counter)
else:
total = fsum(data)
+
if not n:
raise StatisticsError('fmean requires at least one data point')
+
return total / n
+
if not isinstance(weights, (list, tuple)):
weights = list(weights)
+
try:
num = sumprod(data, weights)
except ValueError:
raise StatisticsError('data and weights must be the same length')
+
den = fsum(weights)
+
if not den:
raise StatisticsError('sum of weights must be non-zero')
+
return num / den
@@ -538,9 +234,11 @@ def geometric_mean(data):
>>> round(geometric_mean([54, 24, 36]), 9)
36.0
+
"""
n = 0
found_zero = False
+
def count_positive(iterable):
nonlocal n, found_zero
for n, x in enumerate(iterable, start=1):
@@ -551,12 +249,14 @@ def geometric_mean(data):
else:
raise StatisticsError('No negative inputs allowed', x)
total = fsum(map(log, count_positive(data)))
+
if not n:
raise StatisticsError('Must have a non-empty dataset')
if math.isnan(total):
return math.nan
if found_zero:
return math.nan if total == math.inf else 0.0
+
return exp(total / n)
@@ -582,10 +282,13 @@ def harmonic_mean(data, weights=None):
If ``data`` is empty, or any element is less than zero,
``harmonic_mean`` will raise ``StatisticsError``.
+
"""
if iter(data) is data:
data = list(data)
+
errmsg = 'harmonic mean does not support negative values'
+
n = len(data)
if n < 1:
raise StatisticsError('harmonic_mean requires at least one data point')
@@ -597,6 +300,7 @@ def harmonic_mean(data, weights=None):
return x
else:
raise TypeError('unsupported type')
+
if weights is None:
weights = repeat(1, n)
sum_weights = n
@@ -606,16 +310,19 @@ def harmonic_mean(data, weights=None):
if len(weights) != n:
raise StatisticsError('Number of weights does not match data size')
_, sum_weights, _ = _sum(w for w in _fail_neg(weights, errmsg))
+
try:
data = _fail_neg(data, errmsg)
T, total, count = _sum(w / x if w else 0 for w, x in zip(weights, data))
except ZeroDivisionError:
return 0
+
if total <= 0:
raise StatisticsError('Weighted sum must be positive')
+
return _convert(sum_weights / total, T)
-# FIXME: investigate ways to calculate medians without sorting? Quickselect?
+
def median(data):
"""Return the median (middle value) of numeric data.
@@ -652,6 +359,9 @@ def median_low(data):
3
"""
+ # Potentially the sorting step could be replaced with a quickselect.
+ # However, it would require an excellent implementation to beat our
+ # highly optimized builtin sort.
data = sorted(data)
n = len(data)
if n == 0:
@@ -795,6 +505,7 @@ def multimode(data):
['b', 'd', 'f']
>>> multimode('')
[]
+
"""
counts = Counter(iter(data))
if not counts:
@@ -803,308 +514,7 @@ def multimode(data):
return [value for value, count in counts.items() if count == maxcount]
-def kde(data, h, kernel='normal', *, cumulative=False):
- """Kernel Density Estimation: Create a continuous probability density
- function or cumulative distribution function from discrete samples.
-
- The basic idea is to smooth the data using a kernel function
- to help draw inferences about a population from a sample.
-
- The degree of smoothing is controlled by the scaling parameter h
- which is called the bandwidth. Smaller values emphasize local
- features while larger values give smoother results.
-
- The kernel determines the relative weights of the sample data
- points. Generally, the choice of kernel shape does not matter
- as much as the more influential bandwidth smoothing parameter.
-
- Kernels that give some weight to every sample point:
-
- normal (gauss)
- logistic
- sigmoid
-
- Kernels that only give weight to sample points within
- the bandwidth:
-
- rectangular (uniform)
- triangular
- parabolic (epanechnikov)
- quartic (biweight)
- triweight
- cosine
-
- If *cumulative* is true, will return a cumulative distribution function.
-
- A StatisticsError will be raised if the data sequence is empty.
-
- Example
- -------
-
- Given a sample of six data points, construct a continuous
- function that estimates the underlying probability density:
-
- >>> sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
- >>> f_hat = kde(sample, h=1.5)
-
- Compute the area under the curve:
-
- >>> area = sum(f_hat(x) for x in range(-20, 20))
- >>> round(area, 4)
- 1.0
-
- Plot the estimated probability density function at
- evenly spaced points from -6 to 10:
-
- >>> for x in range(-6, 11):
- ... density = f_hat(x)
- ... plot = ' ' * int(density * 400) + 'x'
- ... print(f'{x:2}: {density:.3f} {plot}')
- ...
- -6: 0.002 x
- -5: 0.009 x
- -4: 0.031 x
- -3: 0.070 x
- -2: 0.111 x
- -1: 0.125 x
- 0: 0.110 x
- 1: 0.086 x
- 2: 0.068 x
- 3: 0.059 x
- 4: 0.066 x
- 5: 0.082 x
- 6: 0.082 x
- 7: 0.058 x
- 8: 0.028 x
- 9: 0.009 x
- 10: 0.002 x
-
- Estimate P(4.5 < X <= 7.5), the probability that a new sample value
- will be between 4.5 and 7.5:
-
- >>> cdf = kde(sample, h=1.5, cumulative=True)
- >>> round(cdf(7.5) - cdf(4.5), 2)
- 0.22
-
- References
- ----------
-
- Kernel density estimation and its application:
- https://www.itm-conferences.org/articles/itmconf/pdf/2018/08/itmconf_sam2018_00037.pdf
-
- Kernel functions in common use:
- https://en.wikipedia.org/wiki/Kernel_(statistics)#kernel_functions_in_common_use
-
- Interactive graphical demonstration and exploration:
- https://demonstrations.wolfram.com/KernelDensityEstimation/
-
- Kernel estimation of cumulative distribution function of a random variable with bounded support
- https://www.econstor.eu/bitstream/10419/207829/1/10.21307_stattrans-2016-037.pdf
-
- """
-
- n = len(data)
- if not n:
- raise StatisticsError('Empty data sequence')
-
- if not isinstance(data[0], (int, float)):
- raise TypeError('Data sequence must contain ints or floats')
-
- if h <= 0.0:
- raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
-
- match kernel:
-
- case 'normal' | 'gauss':
- sqrt2pi = sqrt(2 * pi)
- sqrt2 = sqrt(2)
- K = lambda t: exp(-1/2 * t * t) / sqrt2pi
- W = lambda t: 1/2 * (1.0 + erf(t / sqrt2))
- support = None
-
- case 'logistic':
- # 1.0 / (exp(t) + 2.0 + exp(-t))
- K = lambda t: 1/2 / (1.0 + cosh(t))
- W = lambda t: 1.0 - 1.0 / (exp(t) + 1.0)
- support = None
-
- case 'sigmoid':
- # (2/pi) / (exp(t) + exp(-t))
- c1 = 1 / pi
- c2 = 2 / pi
- K = lambda t: c1 / cosh(t)
- W = lambda t: c2 * atan(exp(t))
- support = None
-
- case 'rectangular' | 'uniform':
- K = lambda t: 1/2
- W = lambda t: 1/2 * t + 1/2
- support = 1.0
-
- case 'triangular':
- K = lambda t: 1.0 - abs(t)
- W = lambda t: t*t * (1/2 if t < 0.0 else -1/2) + t + 1/2
- support = 1.0
-
- case 'parabolic' | 'epanechnikov':
- K = lambda t: 3/4 * (1.0 - t * t)
- W = lambda t: -1/4 * t**3 + 3/4 * t + 1/2
- support = 1.0
-
- case 'quartic' | 'biweight':
- K = lambda t: 15/16 * (1.0 - t * t) ** 2
- W = lambda t: sumprod((3/16, -5/8, 15/16, 1/2),
- (t**5, t**3, t, 1.0))
- support = 1.0
-
- case 'triweight':
- K = lambda t: 35/32 * (1.0 - t * t) ** 3
- W = lambda t: sumprod((-5/32, 21/32, -35/32, 35/32, 1/2),
- (t**7, t**5, t**3, t, 1.0))
- support = 1.0
-
- case 'cosine':
- c1 = pi / 4
- c2 = pi / 2
- K = lambda t: c1 * cos(c2 * t)
- W = lambda t: 1/2 * sin(c2 * t) + 1/2
- support = 1.0
-
- case _:
- raise StatisticsError(f'Unknown kernel name: {kernel!r}')
-
- if support is None:
-
- def pdf(x):
- return sum(K((x - x_i) / h) for x_i in data) / (len(data) * h)
-
- def cdf(x):
- return sum(W((x - x_i) / h) for x_i in data) / len(data)
-
- else:
-
- sample = sorted(data)
- bandwidth = h * support
-
- def pdf(x):
- nonlocal n, sample
- if len(data) != n:
- sample = sorted(data)
- n = len(data)
- i = bisect_left(sample, x - bandwidth)
- j = bisect_right(sample, x + bandwidth)
- supported = sample[i : j]
- return sum(K((x - x_i) / h) for x_i in supported) / (n * h)
-
- def cdf(x):
- nonlocal n, sample
- if len(data) != n:
- sample = sorted(data)
- n = len(data)
- i = bisect_left(sample, x - bandwidth)
- j = bisect_right(sample, x + bandwidth)
- supported = sample[i : j]
- return sum((W((x - x_i) / h) for x_i in supported), i) / n
-
- if cumulative:
- cdf.__doc__ = f'CDF estimate with {h=!r} and {kernel=!r}'
- return cdf
-
- else:
- pdf.__doc__ = f'PDF estimate with {h=!r} and {kernel=!r}'
- return pdf
-
-
-# Notes on methods for computing quantiles
-# ----------------------------------------
-#
-# There is no one perfect way to compute quantiles. Here we offer
-# two methods that serve common needs. Most other packages
-# surveyed offered at least one or both of these two, making them
-# "standard" in the sense of "widely-adopted and reproducible".
-# They are also easy to explain, easy to compute manually, and have
-# straight-forward interpretations that aren't surprising.
-
-# The default method is known as "R6", "PERCENTILE.EXC", or "expected
-# value of rank order statistics". The alternative method is known as
-# "R7", "PERCENTILE.INC", or "mode of rank order statistics".
-
-# For sample data where there is a positive probability for values
-# beyond the range of the data, the R6 exclusive method is a
-# reasonable choice. Consider a random sample of nine values from a
-# population with a uniform distribution from 0.0 to 1.0. The
-# distribution of the third ranked sample point is described by
-# betavariate(alpha=3, beta=7) which has mode=0.250, median=0.286, and
-# mean=0.300. Only the latter (which corresponds with R6) gives the
-# desired cut point with 30% of the population falling below that
-# value, making it comparable to a result from an inv_cdf() function.
-# The R6 exclusive method is also idempotent.
-
-# For describing population data where the end points are known to
-# be included in the data, the R7 inclusive method is a reasonable
-# choice. Instead of the mean, it uses the mode of the beta
-# distribution for the interior points. Per Hyndman & Fan, "One nice
-# property is that the vertices of Q7(p) divide the range into n - 1
-# intervals, and exactly 100p% of the intervals lie to the left of
-# Q7(p) and 100(1 - p)% of the intervals lie to the right of Q7(p)."
-
-# If needed, other methods could be added. However, for now, the
-# position is that fewer options make for easier choices and that
-# external packages can be used for anything more advanced.
-
-def quantiles(data, *, n=4, method='exclusive'):
- """Divide *data* into *n* continuous intervals with equal probability.
-
- Returns a list of (n - 1) cut points separating the intervals.
-
- Set *n* to 4 for quartiles (the default). Set *n* to 10 for deciles.
- Set *n* to 100 for percentiles which gives the 99 cuts points that
- separate *data* in to 100 equal sized groups.
-
- The *data* can be any iterable containing sample.
- The cut points are linearly interpolated between data points.
-
- If *method* is set to *inclusive*, *data* is treated as population
- data. The minimum value is treated as the 0th percentile and the
- maximum value is treated as the 100th percentile.
- """
- if n < 1:
- raise StatisticsError('n must be at least 1')
- data = sorted(data)
- ld = len(data)
- if ld < 2:
- if ld == 1:
- return data * (n - 1)
- raise StatisticsError('must have at least one data point')
-
- if method == 'inclusive':
- m = ld - 1
- result = []
- for i in range(1, n):
- j, delta = divmod(i * m, n)
- interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
- result.append(interpolated)
- return result
-
- if method == 'exclusive':
- m = ld + 1
- result = []
- for i in range(1, n):
- j = i * m // n # rescale i to m/n
- j = 1 if j < 1 else ld-1 if j > ld-1 else j # clamp to 1 .. ld-1
- delta = i*m - j*n # exact integer math
- interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
- result.append(interpolated)
- return result
-
- raise ValueError(f'Unknown method: {method!r}')
-
-
-# === Measures of spread ===
-
-# See http://mathworld.wolfram.com/Variance.html
-# http://mathworld.wolfram.com/SampleVariance.html
-
+## Measures of spread ######################################################
def variance(data, xbar=None):
"""Return the sample variance of data.
@@ -1144,6 +554,8 @@ def variance(data, xbar=None):
Fraction(67, 108)
"""
+ # http://mathworld.wolfram.com/SampleVariance.html
+
T, ss, c, n = _ss(data, xbar)
if n < 2:
raise StatisticsError('variance requires at least two data points')
@@ -1185,6 +597,8 @@ def pvariance(data, mu=None):
Fraction(13, 72)
"""
+ # http://mathworld.wolfram.com/Variance.html
+
T, ss, c, n = _ss(data, mu)
if n < 1:
raise StatisticsError('pvariance requires at least one data point')
@@ -1227,46 +641,7 @@ def pstdev(data, mu=None):
return _float_sqrt_of_frac(mss.numerator, mss.denominator)
-def _mean_stdev(data):
- """In one pass, compute the mean and sample standard deviation as floats."""
- T, ss, xbar, n = _ss(data)
- if n < 2:
- raise StatisticsError('stdev requires at least two data points')
- mss = ss / (n - 1)
- try:
- return float(xbar), _float_sqrt_of_frac(mss.numerator, mss.denominator)
- except AttributeError:
- # Handle Nans and Infs gracefully
- return float(xbar), float(xbar) / float(ss)
-
-def _sqrtprod(x: float, y: float) -> float:
- "Return sqrt(x * y) computed with improved accuracy and without overflow/underflow."
- h = sqrt(x * y)
- if not isfinite(h):
- if isinf(h) and not isinf(x) and not isinf(y):
- # Finite inputs overflowed, so scale down, and recompute.
- scale = 2.0 ** -512 # sqrt(1 / sys.float_info.max)
- return _sqrtprod(scale * x, scale * y) / scale
- return h
- if not h:
- if x and y:
- # Non-zero inputs underflowed, so scale up, and recompute.
- # Scale: 1 / sqrt(sys.float_info.min * sys.float_info.epsilon)
- scale = 2.0 ** 537
- return _sqrtprod(scale * x, scale * y) / scale
- return h
- # Improve accuracy with a differential correction.
- # https://www.wolframalpha.com/input/?i=Maclaurin+series+sqrt%28h**2+%2B+x%29+at+x%3D0
- d = sumprod((x, h), (y, -h))
- return h + d / (2.0 * h)
-
-
-# === Statistics for relations between two inputs ===
-
-# See https://en.wikipedia.org/wiki/Covariance
-# https://en.wikipedia.org/wiki/Pearson_correlation_coefficient
-# https://en.wikipedia.org/wiki/Simple_linear_regression
-
+## Statistics for relations between two inputs #############################
def covariance(x, y, /):
"""Covariance
@@ -1285,6 +660,7 @@ def covariance(x, y, /):
-7.5
"""
+ # https://en.wikipedia.org/wiki/Covariance
n = len(x)
if len(y) != n:
raise StatisticsError('covariance requires that both inputs have same number of data points')
@@ -1318,7 +694,10 @@ def correlation(x, y, /, *, method='linear'):
Spearman's rank correlation coefficient is appropriate for ordinal
data or for continuous data that doesn't meet the linear proportion
requirement for Pearson's correlation coefficient.
+
"""
+ # https://en.wikipedia.org/wiki/Pearson_correlation_coefficient
+ # https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient
n = len(x)
if len(y) != n:
raise StatisticsError('correlation requires that both inputs have same number of data points')
@@ -1326,6 +705,7 @@ def correlation(x, y, /, *, method='linear'):
raise StatisticsError('correlation requires at least two data points')
if method not in {'linear', 'ranked'}:
raise ValueError(f'Unknown method: {method!r}')
+
if method == 'ranked':
start = (n - 1) / -2 # Center rankings around zero
x = _rank(x, start=start)
@@ -1335,9 +715,11 @@ def correlation(x, y, /, *, method='linear'):
ybar = fsum(y) / n
x = [xi - xbar for xi in x]
y = [yi - ybar for yi in y]
+
sxy = sumprod(x, y)
sxx = sumprod(x, x)
syy = sumprod(y, y)
+
try:
return sxy / _sqrtprod(sxx, syy)
except ZeroDivisionError:
@@ -1385,28 +767,445 @@ def linear_regression(x, y, /, *, proportional=False):
LinearRegression(slope=2.90475..., intercept=0.0)
"""
+ # https://en.wikipedia.org/wiki/Simple_linear_regression
n = len(x)
if len(y) != n:
raise StatisticsError('linear regression requires that both inputs have same number of data points')
if n < 2:
raise StatisticsError('linear regression requires at least two data points')
+
if not proportional:
xbar = fsum(x) / n
ybar = fsum(y) / n
x = [xi - xbar for xi in x] # List because used three times below
y = (yi - ybar for yi in y) # Generator because only used once below
+
sxy = sumprod(x, y) + 0.0 # Add zero to coerce result to a float
sxx = sumprod(x, x)
+
try:
slope = sxy / sxx # equivalent to: covariance(x, y) / variance(x)
except ZeroDivisionError:
raise StatisticsError('x is constant')
+
intercept = 0.0 if proportional else ybar - slope * xbar
return LinearRegression(slope=slope, intercept=intercept)
-## Normal Distribution #####################################################
+## Kernel Density Estimation ###############################################
+
+_kernel_specs = {}
+
+def register(*kernels):
+ "Load the kernel's pdf, cdf, invcdf, and support into _kernel_specs."
+ def deco(builder):
+ spec = dict(zip(('pdf', 'cdf', 'invcdf', 'support'), builder()))
+ for kernel in kernels:
+ _kernel_specs[kernel] = spec
+ return builder
+ return deco
+
+@register('normal', 'gauss')
+def normal_kernel():
+ sqrt2pi = sqrt(2 * pi)
+ sqrt2 = sqrt(2)
+ pdf = lambda t: exp(-1/2 * t * t) / sqrt2pi
+ cdf = lambda t: 1/2 * (1.0 + erf(t / sqrt2))
+ invcdf = lambda t: _normal_dist_inv_cdf(t, 0.0, 1.0)
+ support = None
+ return pdf, cdf, invcdf, support
+
+@register('logistic')
+def logistic_kernel():
+ # 1.0 / (exp(t) + 2.0 + exp(-t))
+ pdf = lambda t: 1/2 / (1.0 + cosh(t))
+ cdf = lambda t: 1.0 - 1.0 / (exp(t) + 1.0)
+ invcdf = lambda p: log(p / (1.0 - p))
+ support = None
+ return pdf, cdf, invcdf, support
+
+@register('sigmoid')
+def sigmoid_kernel():
+ # (2/pi) / (exp(t) + exp(-t))
+ c1 = 1 / pi
+ c2 = 2 / pi
+ c3 = pi / 2
+ pdf = lambda t: c1 / cosh(t)
+ cdf = lambda t: c2 * atan(exp(t))
+ invcdf = lambda p: log(tan(p * c3))
+ support = None
+ return pdf, cdf, invcdf, support
+
+@register('rectangular', 'uniform')
+def rectangular_kernel():
+ pdf = lambda t: 1/2
+ cdf = lambda t: 1/2 * t + 1/2
+ invcdf = lambda p: 2.0 * p - 1.0
+ support = 1.0
+ return pdf, cdf, invcdf, support
+
+@register('triangular')
+def triangular_kernel():
+ pdf = lambda t: 1.0 - abs(t)
+ cdf = lambda t: t*t * (1/2 if t < 0.0 else -1/2) + t + 1/2
+ invcdf = lambda p: sqrt(2.0*p) - 1.0 if p < 1/2 else 1.0 - sqrt(2.0 - 2.0*p)
+ support = 1.0
+ return pdf, cdf, invcdf, support
+
+@register('parabolic', 'epanechnikov')
+def parabolic_kernel():
+ pdf = lambda t: 3/4 * (1.0 - t * t)
+ cdf = lambda t: sumprod((-1/4, 3/4, 1/2), (t**3, t, 1.0))
+ invcdf = lambda p: 2.0 * cos((acos(2.0*p - 1.0) + pi) / 3.0)
+ support = 1.0
+ return pdf, cdf, invcdf, support
+
+def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12):
+ def f_inv(y):
+ "Return x such that f(x) ≈ y within the specified tolerance."
+ x = f_inv_estimate(y)
+ while abs(diff := f(x) - y) > tolerance:
+ x -= diff / f_prime(x)
+ return x
+ return f_inv
+def _quartic_invcdf_estimate(p):
+ sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
+ x = (2.0 * p) ** 0.4258865685331 - 1.0
+ if p >= 0.004 < 0.499:
+ x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953)
+ return x * sign
+
+@register('quartic', 'biweight')
+def quartic_kernel():
+ pdf = lambda t: 15/16 * (1.0 - t * t) ** 2
+ cdf = lambda t: sumprod((3/16, -5/8, 15/16, 1/2),
+ (t**5, t**3, t, 1.0))
+ invcdf = _newton_raphson(_quartic_invcdf_estimate, f=cdf, f_prime=pdf)
+ support = 1.0
+ return pdf, cdf, invcdf, support
+
+def _triweight_invcdf_estimate(p):
+ sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
+ x = (2.0 * p) ** 0.3400218741872791 - 1.0
+ return x * sign
+
+@register('triweight')
+def triweight_kernel():
+ pdf = lambda t: 35/32 * (1.0 - t * t) ** 3
+ cdf = lambda t: sumprod((-5/32, 21/32, -35/32, 35/32, 1/2),
+ (t**7, t**5, t**3, t, 1.0))
+ invcdf = _newton_raphson(_triweight_invcdf_estimate, f=cdf, f_prime=pdf)
+ support = 1.0
+ return pdf, cdf, invcdf, support
+
+@register('cosine')
+def cosine_kernel():
+ c1 = pi / 4
+ c2 = pi / 2
+ pdf = lambda t: c1 * cos(c2 * t)
+ cdf = lambda t: 1/2 * sin(c2 * t) + 1/2
+ invcdf = lambda p: 2.0 * asin(2.0 * p - 1.0) / pi
+ support = 1.0
+ return pdf, cdf, invcdf, support
+
+del register, normal_kernel, logistic_kernel, sigmoid_kernel
+del rectangular_kernel, triangular_kernel, parabolic_kernel
+del quartic_kernel, triweight_kernel, cosine_kernel
+
+
+def kde(data, h, kernel='normal', *, cumulative=False):
+ """Kernel Density Estimation: Create a continuous probability density
+ function or cumulative distribution function from discrete samples.
+
+ The basic idea is to smooth the data using a kernel function
+ to help draw inferences about a population from a sample.
+
+ The degree of smoothing is controlled by the scaling parameter h
+ which is called the bandwidth. Smaller values emphasize local
+ features while larger values give smoother results.
+
+ The kernel determines the relative weights of the sample data
+ points. Generally, the choice of kernel shape does not matter
+ as much as the more influential bandwidth smoothing parameter.
+
+ Kernels that give some weight to every sample point:
+
+ normal (gauss)
+ logistic
+ sigmoid
+
+ Kernels that only give weight to sample points within
+ the bandwidth:
+
+ rectangular (uniform)
+ triangular
+ parabolic (epanechnikov)
+ quartic (biweight)
+ triweight
+ cosine
+
+ If *cumulative* is true, will return a cumulative distribution function.
+
+ A StatisticsError will be raised if the data sequence is empty.
+
+ Example
+ -------
+
+ Given a sample of six data points, construct a continuous
+ function that estimates the underlying probability density:
+
+ >>> sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+ >>> f_hat = kde(sample, h=1.5)
+
+ Compute the area under the curve:
+
+ >>> area = sum(f_hat(x) for x in range(-20, 20))
+ >>> round(area, 4)
+ 1.0
+
+ Plot the estimated probability density function at
+ evenly spaced points from -6 to 10:
+
+ >>> for x in range(-6, 11):
+ ... density = f_hat(x)
+ ... plot = ' ' * int(density * 400) + 'x'
+ ... print(f'{x:2}: {density:.3f} {plot}')
+ ...
+ -6: 0.002 x
+ -5: 0.009 x
+ -4: 0.031 x
+ -3: 0.070 x
+ -2: 0.111 x
+ -1: 0.125 x
+ 0: 0.110 x
+ 1: 0.086 x
+ 2: 0.068 x
+ 3: 0.059 x
+ 4: 0.066 x
+ 5: 0.082 x
+ 6: 0.082 x
+ 7: 0.058 x
+ 8: 0.028 x
+ 9: 0.009 x
+ 10: 0.002 x
+
+ Estimate P(4.5 < X <= 7.5), the probability that a new sample value
+ will be between 4.5 and 7.5:
+
+ >>> cdf = kde(sample, h=1.5, cumulative=True)
+ >>> round(cdf(7.5) - cdf(4.5), 2)
+ 0.22
+
+ References
+ ----------
+
+ Kernel density estimation and its application:
+ https://www.itm-conferences.org/articles/itmconf/pdf/2018/08/itmconf_sam2018_00037.pdf
+
+ Kernel functions in common use:
+ https://en.wikipedia.org/wiki/Kernel_(statistics)#kernel_functions_in_common_use
+
+ Interactive graphical demonstration and exploration:
+ https://demonstrations.wolfram.com/KernelDensityEstimation/
+
+ Kernel estimation of cumulative distribution function of a random variable with bounded support
+ https://www.econstor.eu/bitstream/10419/207829/1/10.21307_stattrans-2016-037.pdf
+
+ """
+
+ n = len(data)
+ if not n:
+ raise StatisticsError('Empty data sequence')
+
+ if not isinstance(data[0], (int, float)):
+ raise TypeError('Data sequence must contain ints or floats')
+
+ if h <= 0.0:
+ raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
+
+ kernel_spec = _kernel_specs.get(kernel)
+ if kernel_spec is None:
+ raise StatisticsError(f'Unknown kernel name: {kernel!r}')
+ K = kernel_spec['pdf']
+ W = kernel_spec['cdf']
+ support = kernel_spec['support']
+
+ if support is None:
+
+ def pdf(x):
+ return sum(K((x - x_i) / h) for x_i in data) / (len(data) * h)
+
+ def cdf(x):
+ return sum(W((x - x_i) / h) for x_i in data) / len(data)
+
+ else:
+
+ sample = sorted(data)
+ bandwidth = h * support
+
+ def pdf(x):
+ nonlocal n, sample
+ if len(data) != n:
+ sample = sorted(data)
+ n = len(data)
+ i = bisect_left(sample, x - bandwidth)
+ j = bisect_right(sample, x + bandwidth)
+ supported = sample[i : j]
+ return sum(K((x - x_i) / h) for x_i in supported) / (n * h)
+
+ def cdf(x):
+ nonlocal n, sample
+ if len(data) != n:
+ sample = sorted(data)
+ n = len(data)
+ i = bisect_left(sample, x - bandwidth)
+ j = bisect_right(sample, x + bandwidth)
+ supported = sample[i : j]
+ return sum((W((x - x_i) / h) for x_i in supported), i) / n
+
+ if cumulative:
+ cdf.__doc__ = f'CDF estimate with {h=!r} and {kernel=!r}'
+ return cdf
+
+ else:
+ pdf.__doc__ = f'PDF estimate with {h=!r} and {kernel=!r}'
+ return pdf
+
+
+def kde_random(data, h, kernel='normal', *, seed=None):
+ """Return a function that makes a random selection from the estimated
+ probability density function created by kde(data, h, kernel).
+
+ Providing a *seed* allows reproducible selections within a single
+ thread. The seed may be an integer, float, str, or bytes.
+
+ A StatisticsError will be raised if the *data* sequence is empty.
+
+ Example:
+
+ >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+ >>> rand = kde_random(data, h=1.5, seed=8675309)
+ >>> new_selections = [rand() for i in range(10)]
+ >>> [round(x, 1) for x in new_selections]
+ [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
+
+ """
+ n = len(data)
+ if not n:
+ raise StatisticsError('Empty data sequence')
+
+ if not isinstance(data[0], (int, float)):
+ raise TypeError('Data sequence must contain ints or floats')
+
+ if h <= 0.0:
+ raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
+
+ kernel_spec = _kernel_specs.get(kernel)
+ if kernel_spec is None:
+ raise StatisticsError(f'Unknown kernel name: {kernel!r}')
+ invcdf = kernel_spec['invcdf']
+
+ prng = _random.Random(seed)
+ random = prng.random
+ choice = prng.choice
+
+ def rand():
+ return choice(data) + h * invcdf(random())
+
+ rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}'
+
+ return rand
+
+
+## Quantiles ###############################################################
+
+# There is no one perfect way to compute quantiles. Here we offer
+# two methods that serve common needs. Most other packages
+# surveyed offered at least one or both of these two, making them
+# "standard" in the sense of "widely-adopted and reproducible".
+# They are also easy to explain, easy to compute manually, and have
+# straight-forward interpretations that aren't surprising.
+
+# The default method is known as "R6", "PERCENTILE.EXC", or "expected
+# value of rank order statistics". The alternative method is known as
+# "R7", "PERCENTILE.INC", or "mode of rank order statistics".
+
+# For sample data where there is a positive probability for values
+# beyond the range of the data, the R6 exclusive method is a
+# reasonable choice. Consider a random sample of nine values from a
+# population with a uniform distribution from 0.0 to 1.0. The
+# distribution of the third ranked sample point is described by
+# betavariate(alpha=3, beta=7) which has mode=0.250, median=0.286, and
+# mean=0.300. Only the latter (which corresponds with R6) gives the
+# desired cut point with 30% of the population falling below that
+# value, making it comparable to a result from an inv_cdf() function.
+# The R6 exclusive method is also idempotent.
+
+# For describing population data where the end points are known to
+# be included in the data, the R7 inclusive method is a reasonable
+# choice. Instead of the mean, it uses the mode of the beta
+# distribution for the interior points. Per Hyndman & Fan, "One nice
+# property is that the vertices of Q7(p) divide the range into n - 1
+# intervals, and exactly 100p% of the intervals lie to the left of
+# Q7(p) and 100(1 - p)% of the intervals lie to the right of Q7(p)."
+
+# If needed, other methods could be added. However, for now, the
+# position is that fewer options make for easier choices and that
+# external packages can be used for anything more advanced.
+
+def quantiles(data, *, n=4, method='exclusive'):
+ """Divide *data* into *n* continuous intervals with equal probability.
+
+ Returns a list of (n - 1) cut points separating the intervals.
+
+ Set *n* to 4 for quartiles (the default). Set *n* to 10 for deciles.
+ Set *n* to 100 for percentiles which gives the 99 cuts points that
+ separate *data* in to 100 equal sized groups.
+
+ The *data* can be any iterable containing sample.
+ The cut points are linearly interpolated between data points.
+
+ If *method* is set to *inclusive*, *data* is treated as population
+ data. The minimum value is treated as the 0th percentile and the
+ maximum value is treated as the 100th percentile.
+
+ """
+ if n < 1:
+ raise StatisticsError('n must be at least 1')
+
+ data = sorted(data)
+
+ ld = len(data)
+ if ld < 2:
+ if ld == 1:
+ return data * (n - 1)
+ raise StatisticsError('must have at least one data point')
+
+ if method == 'inclusive':
+ m = ld - 1
+ result = []
+ for i in range(1, n):
+ j, delta = divmod(i * m, n)
+ interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
+ result.append(interpolated)
+ return result
+
+ if method == 'exclusive':
+ m = ld + 1
+ result = []
+ for i in range(1, n):
+ j = i * m // n # rescale i to m/n
+ j = 1 if j < 1 else ld-1 if j > ld-1 else j # clamp to 1 .. ld-1
+ delta = i*m - j*n # exact integer math
+ interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
+ result.append(interpolated)
+ return result
+
+ raise ValueError(f'Unknown method: {method!r}')
+
+
+## Normal Distribution #####################################################
def _normal_dist_inv_cdf(p, mu, sigma):
# There is no closed-form solution to the inverse CDF for the normal
@@ -1415,6 +1214,7 @@ def _normal_dist_inv_cdf(p, mu, sigma):
# Normal Distribution". Applied Statistics. Blackwell Publishing. 37
# (3): 477–484. doi:10.2307/2347330. JSTOR 2347330.
q = p - 0.5
+
if fabs(q) <= 0.425:
r = 0.180625 - q * q
# Hash sum: 55.88319_28806_14901_4439
@@ -1436,6 +1236,7 @@ def _normal_dist_inv_cdf(p, mu, sigma):
1.0)
x = num / den
return mu + (x * sigma)
+
r = p if q <= 0.0 else 1.0 - p
r = sqrt(-log(r))
if r <= 5.0:
@@ -1476,9 +1277,11 @@ def _normal_dist_inv_cdf(p, mu, sigma):
1.36929_88092_27358_05310e-1) * r +
5.99832_20655_58879_37690e-1) * r +
1.0)
+
x = num / den
if q < 0.0:
x = -x
+
return mu + (x * sigma)
@@ -1712,95 +1515,339 @@ class NormalDist:
self._mu, self._sigma = state
-## kde_random() ##############################################################
+## Private utilities #######################################################
-def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12):
- def f_inv(y):
- "Return x such that f(x) ≈ y within the specified tolerance."
- x = f_inv_estimate(y)
- while abs(diff := f(x) - y) > tolerance:
- x -= diff / f_prime(x)
- return x
- return f_inv
+def _sum(data):
+ """_sum(data) -> (type, sum, count)
-def _quartic_invcdf_estimate(p):
- sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
- x = (2.0 * p) ** 0.4258865685331 - 1.0
- if p >= 0.004 < 0.499:
- x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953)
- return x * sign
+ Return a high-precision sum of the given numeric data as a fraction,
+ together with the type to be converted to and the count of items.
-_quartic_invcdf = _newton_raphson(
- f_inv_estimate = _quartic_invcdf_estimate,
- f = lambda t: sumprod((3/16, -5/8, 15/16, 1/2), (t**5, t**3, t, 1.0)),
- f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2)
+ Examples
+ --------
-def _triweight_invcdf_estimate(p):
- sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
- x = (2.0 * p) ** 0.3400218741872791 - 1.0
- return x * sign
+ >>> _sum([3, 2.25, 4.5, -0.5, 0.25])
+ (<class 'float'>, Fraction(19, 2), 5)
-_triweight_invcdf = _newton_raphson(
- f_inv_estimate = _triweight_invcdf_estimate,
- f = lambda t: sumprod((-5/32, 21/32, -35/32, 35/32, 1/2),
- (t**7, t**5, t**3, t, 1.0)),
- f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3)
-
-_kernel_invcdfs = {
- 'normal': NormalDist().inv_cdf,
- 'logistic': lambda p: log(p / (1 - p)),
- 'sigmoid': lambda p: log(tan(p * pi/2)),
- 'rectangular': lambda p: 2*p - 1,
- 'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
- 'quartic': _quartic_invcdf,
- 'triweight': _triweight_invcdf,
- 'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p),
- 'cosine': lambda p: 2 * asin(2*p - 1) / pi,
-}
-_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal']
-_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular']
-_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic']
-_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic']
+ Some sources of round-off error will be avoided:
-def kde_random(data, h, kernel='normal', *, seed=None):
- """Return a function that makes a random selection from the estimated
- probability density function created by kde(data, h, kernel).
+ # Built-in sum returns zero.
+ >>> _sum([1e50, 1, -1e50] * 1000)
+ (<class 'float'>, Fraction(1000, 1), 3000)
- Providing a *seed* allows reproducible selections within a single
- thread. The seed may be an integer, float, str, or bytes.
+ Fractions and Decimals are also supported:
- A StatisticsError will be raised if the *data* sequence is empty.
+ >>> from fractions import Fraction as F
+ >>> _sum([F(2, 3), F(7, 5), F(1, 4), F(5, 6)])
+ (<class 'fractions.Fraction'>, Fraction(63, 20), 4)
- Example:
+ >>> from decimal import Decimal as D
+ >>> data = [D("0.1375"), D("0.2108"), D("0.3061"), D("0.0419")]
+ >>> _sum(data)
+ (<class 'decimal.Decimal'>, Fraction(6963, 10000), 4)
- >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
- >>> rand = kde_random(data, h=1.5, seed=8675309)
- >>> new_selections = [rand() for i in range(10)]
- >>> [round(x, 1) for x in new_selections]
- [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
+ Mixed types are currently treated as an error, except that int is
+ allowed.
"""
- n = len(data)
- if not n:
- raise StatisticsError('Empty data sequence')
+ count = 0
+ types = set()
+ types_add = types.add
+ partials = {}
+ partials_get = partials.get
+ for typ, values in groupby(data, type):
+ types_add(typ)
+ for n, d in map(_exact_ratio, values):
+ count += 1
+ partials[d] = partials_get(d, 0) + n
+ if None in partials:
+ # The sum will be a NAN or INF. We can ignore all the finite
+ # partials, and just look at this special one.
+ total = partials[None]
+ assert not _isfinite(total)
+ else:
+ # Sum all the partial sums using builtin sum.
+ total = sum(Fraction(n, d) for d, n in partials.items())
+ T = reduce(_coerce, types, int) # or raise TypeError
+ return (T, total, count)
- if not isinstance(data[0], (int, float)):
- raise TypeError('Data sequence must contain ints or floats')
- if h <= 0.0:
- raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
+def _ss(data, c=None):
+ """Return the exact mean and sum of square deviations of sequence data.
- kernel_invcdf = _kernel_invcdfs.get(kernel)
- if kernel_invcdf is None:
- raise StatisticsError(f'Unknown kernel name: {kernel!r}')
+ Calculations are done in a single pass, allowing the input to be an iterator.
- prng = _random.Random(seed)
- random = prng.random
- choice = prng.choice
+ If given *c* is used the mean; otherwise, it is calculated from the data.
+ Use the *c* argument with care, as it can lead to garbage results.
- def rand():
- return choice(data) + h * kernel_invcdf(random())
+ """
+ if c is not None:
+ T, ssd, count = _sum((d := x - c) * d for x in data)
+ return (T, ssd, c, count)
- rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}'
+ count = 0
+ types = set()
+ types_add = types.add
+ sx_partials = defaultdict(int)
+ sxx_partials = defaultdict(int)
+ for typ, values in groupby(data, type):
+ types_add(typ)
+ for n, d in map(_exact_ratio, values):
+ count += 1
+ sx_partials[d] += n
+ sxx_partials[d] += n * n
- return rand
+ if not count:
+ ssd = c = Fraction(0)
+ elif None in sx_partials:
+ # The sum will be a NAN or INF. We can ignore all the finite
+ # partials, and just look at this special one.
+ ssd = c = sx_partials[None]
+ assert not _isfinite(ssd)
+ else:
+ sx = sum(Fraction(n, d) for d, n in sx_partials.items())
+ sxx = sum(Fraction(n, d*d) for d, n in sxx_partials.items())
+ # This formula has poor numeric properties for floats,
+ # but with fractions it is exact.
+ ssd = (count * sxx - sx * sx) / count
+ c = sx / count
+
+ T = reduce(_coerce, types, int) # or raise TypeError
+ return (T, ssd, c, count)
+
+
+def _isfinite(x):
+ try:
+ return x.is_finite() # Likely a Decimal.
+ except AttributeError:
+ return math.isfinite(x) # Coerces to float first.
+
+
+def _coerce(T, S):
+ """Coerce types T and S to a common type, or raise TypeError.
+
+ Coercion rules are currently an implementation detail. See the CoerceTest
+ test class in test_statistics for details.
+
+ """
+ # See http://bugs.python.org/issue24068.
+ assert T is not bool, "initial type T is bool"
+ # If the types are the same, no need to coerce anything. Put this
+ # first, so that the usual case (no coercion needed) happens as soon
+ # as possible.
+ if T is S: return T
+ # Mixed int & other coerce to the other type.
+ if S is int or S is bool: return T
+ if T is int: return S
+ # If one is a (strict) subclass of the other, coerce to the subclass.
+ if issubclass(S, T): return S
+ if issubclass(T, S): return T
+ # Ints coerce to the other type.
+ if issubclass(T, int): return S
+ if issubclass(S, int): return T
+ # Mixed fraction & float coerces to float (or float subclass).
+ if issubclass(T, Fraction) and issubclass(S, float):
+ return S
+ if issubclass(T, float) and issubclass(S, Fraction):
+ return T
+ # Any other combination is disallowed.
+ msg = "don't know how to coerce %s and %s"
+ raise TypeError(msg % (T.__name__, S.__name__))
+
+
+def _exact_ratio(x):
+ """Return Real number x to exact (numerator, denominator) pair.
+
+ >>> _exact_ratio(0.25)
+ (1, 4)
+
+ x is expected to be an int, Fraction, Decimal or float.
+
+ """
+ try:
+ return x.as_integer_ratio()
+ except AttributeError:
+ pass
+ except (OverflowError, ValueError):
+ # float NAN or INF.
+ assert not _isfinite(x)
+ return (x, None)
+
+ try:
+ # x may be an Integral ABC.
+ return (x.numerator, x.denominator)
+ except AttributeError:
+ msg = f"can't convert type '{type(x).__name__}' to numerator/denominator"
+ raise TypeError(msg)
+
+
+def _convert(value, T):
+ """Convert value to given numeric type T."""
+ if type(value) is T:
+ # This covers the cases where T is Fraction, or where value is
+ # a NAN or INF (Decimal or float).
+ return value
+ if issubclass(T, int) and value.denominator != 1:
+ T = float
+ try:
+ # FIXME: what do we do if this overflows?
+ return T(value)
+ except TypeError:
+ if issubclass(T, Decimal):
+ return T(value.numerator) / T(value.denominator)
+ else:
+ raise
+
+
+def _fail_neg(values, errmsg='negative value'):
+ """Iterate over values, failing if any are less than zero."""
+ for x in values:
+ if x < 0:
+ raise StatisticsError(errmsg)
+ yield x
+
+
+def _rank(data, /, *, key=None, reverse=False, ties='average', start=1) -> list[float]:
+ """Rank order a dataset. The lowest value has rank 1.
+
+ Ties are averaged so that equal values receive the same rank:
+
+ >>> data = [31, 56, 31, 25, 75, 18]
+ >>> _rank(data)
+ [3.5, 5.0, 3.5, 2.0, 6.0, 1.0]
+
+ The operation is idempotent:
+
+ >>> _rank([3.5, 5.0, 3.5, 2.0, 6.0, 1.0])
+ [3.5, 5.0, 3.5, 2.0, 6.0, 1.0]
+
+ It is possible to rank the data in reverse order so that the
+ highest value has rank 1. Also, a key-function can extract
+ the field to be ranked:
+
+ >>> goals = [('eagles', 45), ('bears', 48), ('lions', 44)]
+ >>> _rank(goals, key=itemgetter(1), reverse=True)
+ [2.0, 1.0, 3.0]
+
+ Ranks are conventionally numbered starting from one; however,
+ setting *start* to zero allows the ranks to be used as array indices:
+
+ >>> prize = ['Gold', 'Silver', 'Bronze', 'Certificate']
+ >>> scores = [8.1, 7.3, 9.4, 8.3]
+ >>> [prize[int(i)] for i in _rank(scores, start=0, reverse=True)]
+ ['Bronze', 'Certificate', 'Gold', 'Silver']
+
+ """
+ # If this function becomes public at some point, more thought
+ # needs to be given to the signature. A list of ints is
+ # plausible when ties is "min" or "max". When ties is "average",
+ # either list[float] or list[Fraction] is plausible.
+
+ # Default handling of ties matches scipy.stats.mstats.spearmanr.
+ if ties != 'average':
+ raise ValueError(f'Unknown tie resolution method: {ties!r}')
+ if key is not None:
+ data = map(key, data)
+ val_pos = sorted(zip(data, count()), reverse=reverse)
+ i = start - 1
+ result = [0] * len(val_pos)
+ for _, g in groupby(val_pos, key=itemgetter(0)):
+ group = list(g)
+ size = len(group)
+ rank = i + (size + 1) / 2
+ for value, orig_pos in group:
+ result[orig_pos] = rank
+ i += size
+ return result
+
+
+def _integer_sqrt_of_frac_rto(n: int, m: int) -> int:
+ """Square root of n/m, rounded to the nearest integer using round-to-odd."""
+ # Reference: https://www.lri.fr/~melquion/doc/05-imacs17_1-expose.pdf
+ a = math.isqrt(n // m)
+ return a | (a*a*m != n)
+
+
+# For 53 bit precision floats, the bit width used in
+# _float_sqrt_of_frac() is 109.
+_sqrt_bit_width: int = 2 * sys.float_info.mant_dig + 3
+
+
+def _float_sqrt_of_frac(n: int, m: int) -> float:
+ """Square root of n/m as a float, correctly rounded."""
+ # See principle and proof sketch at: https://bugs.python.org/msg407078
+ q = (n.bit_length() - m.bit_length() - _sqrt_bit_width) // 2
+ if q >= 0:
+ numerator = _integer_sqrt_of_frac_rto(n, m << 2 * q) << q
+ denominator = 1
+ else:
+ numerator = _integer_sqrt_of_frac_rto(n << -2 * q, m)
+ denominator = 1 << -q
+ return numerator / denominator # Convert to float
+
+
+def _decimal_sqrt_of_frac(n: int, m: int) -> Decimal:
+ """Square root of n/m as a Decimal, correctly rounded."""
+ # Premise: For decimal, computing (n/m).sqrt() can be off
+ # by 1 ulp from the correctly rounded result.
+ # Method: Check the result, moving up or down a step if needed.
+ if n <= 0:
+ if not n:
+ return Decimal('0.0')
+ n, m = -n, -m
+
+ root = (Decimal(n) / Decimal(m)).sqrt()
+ nr, dr = root.as_integer_ratio()
+
+ plus = root.next_plus()
+ np, dp = plus.as_integer_ratio()
+ # test: n / m > ((root + plus) / 2) ** 2
+ if 4 * n * (dr*dp)**2 > m * (dr*np + dp*nr)**2:
+ return plus
+
+ minus = root.next_minus()
+ nm, dm = minus.as_integer_ratio()
+ # test: n / m < ((root + minus) / 2) ** 2
+ if 4 * n * (dr*dm)**2 < m * (dr*nm + dm*nr)**2:
+ return minus
+
+ return root
+
+
+def _mean_stdev(data):
+ """In one pass, compute the mean and sample standard deviation as floats."""
+ T, ss, xbar, n = _ss(data)
+ if n < 2:
+ raise StatisticsError('stdev requires at least two data points')
+ mss = ss / (n - 1)
+ try:
+ return float(xbar), _float_sqrt_of_frac(mss.numerator, mss.denominator)
+ except AttributeError:
+ # Handle Nans and Infs gracefully
+ return float(xbar), float(xbar) / float(ss)
+
+
+def _sqrtprod(x: float, y: float) -> float:
+ "Return sqrt(x * y) computed with improved accuracy and without overflow/underflow."
+
+ h = sqrt(x * y)
+
+ if not isfinite(h):
+ if isinf(h) and not isinf(x) and not isinf(y):
+ # Finite inputs overflowed, so scale down, and recompute.
+ scale = 2.0 ** -512 # sqrt(1 / sys.float_info.max)
+ return _sqrtprod(scale * x, scale * y) / scale
+ return h
+
+ if not h:
+ if x and y:
+ # Non-zero inputs underflowed, so scale up, and recompute.
+ # Scale: 1 / sqrt(sys.float_info.min * sys.float_info.epsilon)
+ scale = 2.0 ** 537
+ return _sqrtprod(scale * x, scale * y) / scale
+ return h
+
+ # Improve accuracy with a differential correction.
+ # https://www.wolframalpha.com/input/?i=Maclaurin+series+sqrt%28h**2+%2B+x%29+at+x%3D0
+ d = sumprod((x, h), (y, -h))
+ return h + d / (2.0 * h)
diff --git a/Lib/test/test_statistics.py b/Lib/test/test_statistics.py
index cded8ab..0b28459 100644
--- a/Lib/test/test_statistics.py
+++ b/Lib/test/test_statistics.py
@@ -2435,12 +2435,13 @@ class TestKDE(unittest.TestCase):
self.assertGreater(f_hat(100), 0.0)
def test_kde_kernel_invcdfs(self):
- kernel_invcdfs = statistics._kernel_invcdfs
+ kernel_specs = statistics._kernel_specs
kde = statistics.kde
# Verify that cdf / invcdf will round trip
xarr = [i/100 for i in range(-100, 101)]
- for kernel, invcdf in kernel_invcdfs.items():
+ for kernel, spec in kernel_specs.items():
+ invcdf = spec['invcdf']
with self.subTest(kernel=kernel):
cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
for x in xarr: