summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaymond Hettinger <rhettinger@users.noreply.github.com>2024-05-04 04:13:36 (GMT)
committerGitHub <noreply@github.com>2024-05-04 04:13:36 (GMT)
commit42dc5b4ace39a3983cd9853719527f4724693adc (patch)
tree9c19d33694a49367e64864111e57bafa8e86b88b
parent1b7e5e6e60e0d22b2a928cbbb36ebb989183450f (diff)
downloadcpython-42dc5b4ace39a3983cd9853719527f4724693adc.zip
cpython-42dc5b4ace39a3983cd9853719527f4724693adc.tar.gz
cpython-42dc5b4ace39a3983cd9853719527f4724693adc.tar.bz2
gh-115532 Add kde_random() to the statistic module (#118210)
-rw-r--r--Doc/library/statistics.rst84
-rw-r--r--Doc/whatsnew/3.13.rst3
-rw-r--r--Lib/statistics.py103
-rw-r--r--Lib/test/test_statistics.py80
4 files changed, 207 insertions, 63 deletions
diff --git a/Doc/library/statistics.rst b/Doc/library/statistics.rst
index cc72396..d5a316e 100644
--- a/Doc/library/statistics.rst
+++ b/Doc/library/statistics.rst
@@ -77,6 +77,7 @@ or sample.
:func:`geometric_mean` Geometric mean of data.
:func:`harmonic_mean` Harmonic mean of data.
:func:`kde` Estimate the probability density distribution of the data.
+:func:`kde_random` Random sampling from the PDF generated by kde().
:func:`median` Median (middle value) of data.
:func:`median_low` Low median of data.
:func:`median_high` High median of data.
@@ -311,6 +312,30 @@ However, for reading convenience, most of the examples show sorted sequences.
.. versionadded:: 3.13
+.. function:: kde_random(data, h, kernel='normal', *, seed=None)
+
+ Return a function that makes a random selection from the estimated
+ probability density function produced by ``kde(data, h, kernel)``.
+
+ Providing a *seed* allows reproducible selections. In the future, the
+ values may change slightly as more accurate kernel inverse CDF estimates
+ are implemented. The seed may be an integer, float, str, or bytes.
+
+ A :exc:`StatisticsError` will be raised if the *data* sequence is empty.
+
+ Continuing the example for :func:`kde`, we can use
+ :func:`kde_random` to generate new random selections from an
+ estimated probability density function:
+
+ >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+ >>> rand = kde_random(data, h=1.5, seed=8675309)
+ >>> new_selections = [rand() for i in range(10)]
+ >>> [round(x, 1) for x in new_selections]
+ [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
+
+ .. versionadded:: 3.13
+
+
.. function:: median(data)
Return the median (middle value) of numeric data, using the common "mean of
@@ -1148,65 +1173,6 @@ The final prediction goes to the largest posterior. This is known as the
'female'
-Sampling from kernel density estimation
-***************************************
-
-The :func:`kde()` function creates a continuous probability density
-function from discrete samples. Some applications need a way to make
-random selections from that distribution.
-
-The technique is to pick a sample from a bandwidth scaled kernel
-function and recenter the result around a randomly chosen point from
-the input data. This can be done with any kernel that has a known or
-accurately approximated inverse cumulative distribution function.
-
-.. testcode::
-
- from random import choice, random, seed
- from math import sqrt, log, pi, tan, asin, cos, acos
- from statistics import NormalDist
-
- kernel_invcdfs = {
- 'normal': NormalDist().inv_cdf,
- 'logistic': lambda p: log(p / (1 - p)),
- 'sigmoid': lambda p: log(tan(p * pi/2)),
- 'rectangular': lambda p: 2*p - 1,
- 'triangular': lambda p: sqrt(2*p) - 1 if p < 0.5 else 1 - sqrt(2 - 2*p),
- 'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
- 'cosine': lambda p: 2*asin(2*p - 1)/pi,
- }
-
- def kde_random(data, h, kernel='normal'):
- 'Return a function that samples from kde() smoothed data.'
- kernel_invcdf = kernel_invcdfs[kernel]
- def rand():
- return h * kernel_invcdf(random()) + choice(data)
- return rand
-
-For example:
-
-.. doctest::
-
- >>> discrete_samples = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
- >>> rand = kde_random(discrete_samples, h=1.5)
- >>> seed(8675309)
- >>> selections = [rand() for i in range(10)]
- >>> [round(x, 1) for x in selections]
- [4.7, 7.4, 1.2, 7.8, 6.9, -1.3, 5.8, 0.2, -1.4, 5.7]
-
-.. testcode::
- :hide:
-
- from statistics import kde
- from math import isclose
-
- # Verify that cdf / invcdf will round trip
- xarr = [i/100 for i in range(-100, 101)]
- for kernel, invcdf in kernel_invcdfs.items():
- cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
- for x in xarr:
- assert isclose(invcdf(cdf(x)), x, abs_tol=1E-9)
-
..
# This modelines must appear within the last ten lines of the file.
kate: indent-width 3; remove-trailing-space on; replace-tabs on; encoding utf-8;
diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst
index d996cf6..269a7cc 100644
--- a/Doc/whatsnew/3.13.rst
+++ b/Doc/whatsnew/3.13.rst
@@ -745,7 +745,8 @@ statistics
* Add :func:`statistics.kde` for kernel density estimation.
This makes it possible to estimate a continuous probability density function
- from a fixed number of discrete samples.
+ from a fixed number of discrete samples. Also added :func:`statistics.kde_random`
+ for sampling from the estimated probability density function.
(Contributed by Raymond Hettinger in :gh:`115863`.)
.. _whatsnew313-subprocess:
diff --git a/Lib/statistics.py b/Lib/statistics.py
index fc00891..f3ce2d8 100644
--- a/Lib/statistics.py
+++ b/Lib/statistics.py
@@ -113,6 +113,7 @@ __all__ = [
'geometric_mean',
'harmonic_mean',
'kde',
+ 'kde_random',
'linear_regression',
'mean',
'median',
@@ -138,12 +139,13 @@ from decimal import Decimal
from itertools import count, groupby, repeat
from bisect import bisect_left, bisect_right
from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum, sumprod
-from math import isfinite, isinf, pi, cos, sin, cosh, atan
+from math import isfinite, isinf, pi, cos, sin, tan, cosh, asin, atan, acos
from functools import reduce
from operator import itemgetter
from collections import Counter, namedtuple, defaultdict
_SQRT2 = sqrt(2.0)
+_random = random
# === Exceptions ===
@@ -978,11 +980,9 @@ def kde(data, h, kernel='normal', *, cumulative=False):
return sum(K((x - x_i) / h) for x_i in data) / (n * h)
def cdf(x):
-
n = len(data)
return sum(W((x - x_i) / h) for x_i in data) / n
-
else:
sample = sorted(data)
@@ -1078,6 +1078,7 @@ def quantiles(data, *, n=4, method='exclusive'):
if ld == 1:
return data * (n - 1)
raise StatisticsError('must have at least one data point')
+
if method == 'inclusive':
m = ld - 1
result = []
@@ -1086,6 +1087,7 @@ def quantiles(data, *, n=4, method='exclusive'):
interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
result.append(interpolated)
return result
+
if method == 'exclusive':
m = ld + 1
result = []
@@ -1096,6 +1098,7 @@ def quantiles(data, *, n=4, method='exclusive'):
interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
result.append(interpolated)
return result
+
raise ValueError(f'Unknown method: {method!r}')
@@ -1709,3 +1712,97 @@ class NormalDist:
def __setstate__(self, state):
self._mu, self._sigma = state
+
+
+## kde_random() ##############################################################
+
+def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12):
+ def f_inv(y):
+ "Return x such that f(x) ≈ y within the specified tolerance."
+ x = f_inv_estimate(y)
+ while abs(diff := f(x) - y) > tolerance:
+ x -= diff / f_prime(x)
+ return x
+ return f_inv
+
+def _quartic_invcdf_estimate(p):
+ sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
+ x = (2.0 * p) ** 0.4258865685331 - 1.0
+ if p >= 0.004 < 0.499:
+ x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953)
+ return x * sign
+
+_quartic_invcdf = _newton_raphson(
+ f_inv_estimate = _quartic_invcdf_estimate,
+ f = lambda t: 3/16 * t**5 - 5/8 * t**3 + 15/16 * t + 1/2,
+ f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2)
+
+def _triweight_invcdf_estimate(p):
+ sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
+ x = (2.0 * p) ** 0.3400218741872791 - 1.0
+ return x * sign
+
+_triweight_invcdf = _newton_raphson(
+ f_inv_estimate = _triweight_invcdf_estimate,
+ f = lambda t: 35/32 * (-1/7*t**7 + 3/5*t**5 - t**3 + t) + 1/2,
+ f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3)
+
+_kernel_invcdfs = {
+ 'normal': NormalDist().inv_cdf,
+ 'logistic': lambda p: log(p / (1 - p)),
+ 'sigmoid': lambda p: log(tan(p * pi/2)),
+ 'rectangular': lambda p: 2*p - 1,
+ 'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
+ 'quartic': _quartic_invcdf,
+ 'triweight': _triweight_invcdf,
+ 'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p),
+ 'cosine': lambda p: 2 * asin(2*p - 1) / pi,
+}
+_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal']
+_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular']
+_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic']
+_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic']
+
+def kde_random(data, h, kernel='normal', *, seed=None):
+ """Return a function that makes a random selection from the estimated
+ probability density function created by kde(data, h, kernel).
+
+ Providing a *seed* allows reproducible selections within a single
+ thread. The seed may be an integer, float, str, or bytes.
+
+ A StatisticsError will be raised if the *data* sequence is empty.
+
+ Example:
+
+ >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+ >>> rand = kde_random(data, h=1.5, seed=8675309)
+ >>> new_selections = [rand() for i in range(10)]
+ >>> [round(x, 1) for x in new_selections]
+ [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
+
+ """
+ n = len(data)
+ if not n:
+ raise StatisticsError('Empty data sequence')
+
+ if not isinstance(data[0], (int, float)):
+ raise TypeError('Data sequence must contain ints or floats')
+
+ if h <= 0.0:
+ raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
+
+ try:
+ kernel_invcdf = _kernel_invcdfs[kernel]
+ except KeyError:
+ raise StatisticsError(f'Unknown kernel name: {kernel!r}')
+
+ prng = _random.Random(seed)
+ random = prng.random
+ choice = prng.choice
+
+ def rand():
+ return choice(data) + h * kernel_invcdf(random())
+
+ rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}'
+
+ return rand
diff --git a/Lib/test/test_statistics.py b/Lib/test/test_statistics.py
index 204787a..fe6c59c 100644
--- a/Lib/test/test_statistics.py
+++ b/Lib/test/test_statistics.py
@@ -2426,6 +2426,86 @@ class TestKDE(unittest.TestCase):
self.assertEqual(f_hat(-1.0), 1/2)
self.assertEqual(f_hat(1.0), 1/2)
+ def test_kde_kernel_invcdfs(self):
+ kernel_invcdfs = statistics._kernel_invcdfs
+ kde = statistics.kde
+
+ # Verify that cdf / invcdf will round trip
+ xarr = [i/100 for i in range(-100, 101)]
+ for kernel, invcdf in kernel_invcdfs.items():
+ with self.subTest(kernel=kernel):
+ cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
+ for x in xarr:
+ self.assertAlmostEqual(invcdf(cdf(x)), x, places=5)
+
+ def test_kde_random(self):
+ kde_random = statistics.kde_random
+ StatisticsError = statistics.StatisticsError
+ kernels = ['normal', 'gauss', 'logistic', 'sigmoid', 'rectangular',
+ 'uniform', 'triangular', 'parabolic', 'epanechnikov',
+ 'quartic', 'biweight', 'triweight', 'cosine']
+ sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+
+ # Smoke test
+
+ for kernel in kernels:
+ with self.subTest(kernel=kernel):
+ rand = kde_random(sample, h=1.5, kernel=kernel)
+ selections = [rand() for i in range(10)]
+
+ # Check error cases
+
+ with self.assertRaises(StatisticsError):
+ kde_random([], h=1.0) # Empty dataset
+ with self.assertRaises(TypeError):
+ kde_random(['abc', 'def'], 1.5) # Non-numeric data
+ with self.assertRaises(TypeError):
+ kde_random(iter(sample), 1.5) # Data is not a sequence
+ with self.assertRaises(StatisticsError):
+ kde_random(sample, h=0.0) # Zero bandwidth
+ with self.assertRaises(StatisticsError):
+ kde_random(sample, h=0.0) # Negative bandwidth
+ with self.assertRaises(TypeError):
+ kde_random(sample, h='str') # Wrong bandwidth type
+ with self.assertRaises(StatisticsError):
+ kde_random(sample, h=1.0, kernel='bogus') # Invalid kernel
+
+ # Test name and docstring of the generated function
+
+ h = 1.5
+ kernel = 'cosine'
+ prng = kde_random(sample, h, kernel)
+ self.assertEqual(prng.__name__, 'rand')
+ self.assertIn(kernel, prng.__doc__)
+ self.assertIn(repr(h), prng.__doc__)
+
+ # Approximate distribution test: Compare a random sample to the expected distribution
+
+ data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2, 7.8, 14.3, 15.1, 15.3, 15.8, 17.0]
+ n = 1_000_000
+ h = 1.75
+ dx = 0.1
+
+ def p_expected(x):
+ return F_hat(x + dx) - F_hat(x - dx)
+
+ def p_observed(x):
+ # P(x-dx <= X < x+dx) / (2*dx)
+ i = bisect.bisect_left(big_sample, x - dx)
+ j = bisect.bisect_right(big_sample, x + dx)
+ return (j - i) / len(big_sample)
+
+ for kernel in kernels:
+ with self.subTest(kernel=kernel):
+
+ F_hat = statistics.kde(data, h, kernel, cumulative=True)
+ rand = kde_random(data, h, kernel, seed=8675309**2)
+ big_sample = sorted([rand() for i in range(n)])
+
+ for x in range(-40, 190):
+ x /= 10
+ self.assertTrue(math.isclose(p_observed(x), p_expected(x), abs_tol=0.001))
+
class TestQuantiles(unittest.TestCase):