Source code for qp.interp_pdf

"""This module implements a PDT distribution sub-class using interpolated grids
"""

import numpy as np
from scipy.stats import rv_continuous

from qp.conversion_funcs import extract_vals_at_x, extract_xy_sparse, extract_xy_vals
from qp.factory import add_class
from qp.pdf_gen import Pdf_rows_gen
from qp.plotting import get_axes_and_xlims, plot_pdf_on_axes
from qp.test_data import TEST_XVALS, XARRAY, XBINS, YARRAY
from qp.utils import (
    interpolate_multi_x_multi_y,
    interpolate_multi_x_y,
    interpolate_x_multi_y,
    normalize_interp1d,
    reshape_to_pdf_size,
)


[docs]class interp_gen(Pdf_rows_gen): """Interpolator based distribution Notes ----- This implements a PDF using a set of interpolated values. This version use the same xvals for all the the PDFs, which allows for much faster evaluation, and reduces the memory usage by a factor of 2. The relevant data members are: xvals: (n) x values yvals: (npdf, n) y values Inside the range xvals[0], xvals[-1] tt simply takes a set of x and y values and uses `scipy.interpolate.interp1d` to build the PDF. Outside the range xvals[0], xvals[-1] the pdf() will return 0. The cdf() is constructed by integrating analytically computing the cumulative sum at the xvals grid points and interpolating between them. This will give a slight discrepency with the true integral of the pdf(), bit is much, much faster to evaluate. Outside the range xvals[0], xvals[-1] the cdf() will return (0 or 1), respectively The ppf() is computed by inverting the cdf(). ppf(0) will return xvals[0] ppf(1) will return xvals[-1] """ # pylint: disable=protected-access name = "interp" version = 0 _support_mask = rv_continuous._support_mask def __init__(self, xvals, yvals, *args, **kwargs): """ Create a new distribution by interpolating the given values Parameters ---------- xvals : array_like The x-values used to do the interpolation yvals : array_like The y-values used to do the interpolation """ if np.size(xvals) != np.shape(yvals)[-1]: # pragma: no cover raise ValueError( "Shape of xbins in xvals (%s) != shape of xbins in yvals (%s)" % (np.size(xvals), np.shape(yvals)[-1]) ) self._xvals = np.asarray(xvals) # Set support self._xmin = self._xvals[0] self._xmax = self._xvals[-1] kwargs["shape"] = np.shape(yvals)[:-1] self._yvals = reshape_to_pdf_size(yvals, -1) check_input = kwargs.pop("check_input", True) if check_input: self._compute_ycumul() self._yvals = (self._yvals.T / self._ycumul[:, -1]).T self._ycumul = (self._ycumul.T / self._ycumul[:, -1]).T else: # pragma: no cover self._ycumul = None super().__init__(*args, **kwargs) self._addmetadata("xvals", self._xvals) self._addobjdata("yvals", self._yvals) def _compute_ycumul(self): copy_shape = np.array(self._yvals.shape) self._ycumul = np.ndarray(copy_shape) self._ycumul[:, 0] = 0.5 * self._yvals[:, 0] * (self._xvals[1] - self._xvals[0]) self._ycumul[:, 1:] = np.cumsum( (self._xvals[1:] - self._xvals[:-1]) * 0.5 * np.add(self._yvals[:, 1:], self._yvals[:, :-1]), axis=1, ) @property def xvals(self): """Return the x-values used to do the interpolation""" return self._xvals @property def yvals(self): """Return the y-valus used to do the interpolation""" return self._yvals def _pdf(self, x, row): # pylint: disable=arguments-differ return interpolate_x_multi_y( x, row, self._xvals, self._yvals, bounds_error=False, fill_value=0.0 ).ravel() def _cdf(self, x, row): # pylint: disable=arguments-differ if self._ycumul is None: # pragma: no cover self._compute_ycumul() return interpolate_x_multi_y( x, row, self._xvals, self._ycumul, bounds_error=False, fill_value=(0.0, 1.0) ).ravel() def _ppf(self, x, row): # pylint: disable=arguments-differ if self._ycumul is None: # pragma: no cover self._compute_ycumul() return interpolate_multi_x_y( x, row, self._ycumul, self._xvals, bounds_error=False, fill_value=(self._xmin, self._xmax), ).ravel() def _munp(self, m, *args): """compute moments""" # pylint: disable=arguments-differ # Silence floating point warnings from integration. with np.errstate(all="ignore"): vals = self.custom_generic_moment(m) return vals
[docs] def custom_generic_moment(self, m): """Compute the mth moment""" m = np.asarray(m) dx = self._xvals[1] - self._xvals[0] return np.sum(self._xvals**m * self._yvals, axis=1) * dx
def _updated_ctor_param(self): """ Set the bins as additional constructor argument """ dct = super()._updated_ctor_param() dct["xvals"] = self._xvals dct["yvals"] = self._yvals return dct
[docs] @classmethod def get_allocation_kwds(cls, npdf, **kwargs): """Return the keywords necessary to create an 'empty' hdf5 file with npdf entries for iterative file writeout. We only need to allocate the objdata columns, as the metadata can be written when we finalize the file. Parameters ---------- npdf: int number of *total* PDFs that will be written out kwargs: dict dictionary of kwargs needed to create the ensemble """ if "xvals" not in kwargs: # pragma: no cover raise ValueError("required argument xvals not included in kwargs") ngrid = np.shape(kwargs["xvals"])[-1] return dict(yvals=((npdf, ngrid), "f4"))
[docs] @classmethod def plot_native(cls, pdf, **kwargs): """Plot the PDF in a way that is particular to this type of distibution For a interpolated PDF this uses the interpolation points """ axes, _, kw = get_axes_and_xlims(**kwargs) return plot_pdf_on_axes(axes, pdf, pdf.dist.xvals, **kw)
[docs] @classmethod def add_mappings(cls): """ Add this classes mappings to the conversion dictionary """ cls._add_creation_method(cls.create, None) cls._add_extraction_method(extract_vals_at_x, None)
[docs] @classmethod def make_test_data(cls): """Make data for unit tests""" cls.test_data = dict( interp=dict( gen_func=interp, ctor_data=dict(xvals=XBINS, yvals=YARRAY), convert_data=dict(xvals=XBINS), test_xvals=TEST_XVALS, ) )
interp = interp_gen.create
[docs]class interp_irregular_gen(Pdf_rows_gen): """Interpolator based distribution Notes ----- This implements a PDF using a set of interpolated values. This version use the different xvals for each the the PDFs, which allows for more precision. The relevant data members are: xvals: (npdf, n) x values yvals: (npdf, n) y values Inside the range xvals[:,0], xvals[:,-1] tt simply takes a set of x and y values and uses `scipy.interpolate.interp1d` to build the PDF. Outside the range xvals[:,0], xvals[:,-1] the pdf() will return 0. The cdf() is constructed by integrating analytically computing the cumulative sum at the xvals grid points and interpolating between them. This will give a slight discrepency with the true integral of the pdf(), bit is much, much faster to evaluate. Outside the range xvals[:,0], xvals[:,-1] the cdf() will return (0 or 1), respectively The ppf() is computed by inverting the cdf(). ppf(0) will return min(xvals) ppf(1) will return max(xvals) """ # pylint: disable=protected-access name = "interp_irregular" version = 0 _support_mask = rv_continuous._support_mask def __init__(self, xvals, yvals, *args, **kwargs): """ Create a new distribution by interpolating the given values Parameters ---------- xvals : array_like The x-values used to do the interpolation yvals : array_like The y-values used to do the interpolation """ if np.shape(xvals) != np.shape(yvals): # pragma: no cover raise ValueError( "Shape of xvals (%s) != shape of yvals (%s)" % (np.shape(xvals), np.shape(yvals)) ) self._xvals = reshape_to_pdf_size(xvals, -1) self._xmin = np.min(self._xvals) self._xmax = np.max(self._xvals) kwargs["shape"] = np.shape(xvals)[:-1] check_input = kwargs.pop("check_input", True) self._yvals = reshape_to_pdf_size(yvals, -1) if check_input: self._yvals = normalize_interp1d(self._xvals, self._yvals) self._ycumul = None super().__init__(*args, **kwargs) self._addobjdata("xvals", self._xvals) self._addobjdata("yvals", self._yvals) def _compute_ycumul(self): copy_shape = np.array(self._yvals.shape) self._ycumul = np.ndarray(copy_shape) self._ycumul[:, 0] = 0.0 self._ycumul[:, 1:] = np.cumsum( self._xvals[:, 1:] * self._yvals[:, 1:] - self._xvals[:, :-1] * self._yvals[:, 1:], axis=1, ) @property def xvals(self): """Return the x-values used to do the interpolation""" return self._xvals @property def yvals(self): """Return the y-valus used to do the interpolation""" return self._yvals def _pdf(self, x, row): # pylint: disable=arguments-differ return interpolate_multi_x_multi_y( x, row, self._xvals, self._yvals, bounds_error=False, fill_value=0.0 ).ravel() def _cdf(self, x, row): # pylint: disable=arguments-differ if self._ycumul is None: # pragma: no cover self._compute_ycumul() return interpolate_multi_x_multi_y( x, row, self._xvals, self._ycumul, bounds_error=False, fill_value=(0.0, 1.0) ).ravel() def _ppf(self, x, row): # pylint: disable=arguments-differ if self._ycumul is None: # pragma: no cover self._compute_ycumul() return interpolate_multi_x_multi_y( x, row, self._ycumul, self._xvals, bounds_error=False, fill_value=(self._xmin, self._xmax), ).ravel() def _updated_ctor_param(self): """ Set the bins as additional constructor argument """ dct = super()._updated_ctor_param() dct["xvals"] = self._xvals dct["yvals"] = self._yvals return dct
[docs] @classmethod def get_allocation_kwds(cls, npdf, **kwargs): """Return the keywords necessary to create an 'empty' hdf5 file with npdf entries for iterative file writeout. We only need to allocate the objdata columns, as the metadata can be written when we finalize the file. Parameters ---------- npdf: int number of *total* PDFs that will be written out kwargs: dict dictionary of kwargs needed to create the ensemble """ if "xvals" not in kwargs: # pragma: no cover raise ValueError("required argument xvals not included in kwargs") ngrid = np.shape(kwargs["xvals"])[-1] return dict(xvals=((npdf, ngrid), "f4"), yvals=((npdf, ngrid), "f4"))
[docs] @classmethod def plot_native(cls, pdf, **kwargs): """Plot the PDF in a way that is particular to this type of distibution For a interpolated PDF this uses the interpolation points """ axes, _, kw = get_axes_and_xlims(**kwargs) xvals_row = pdf.dist.xvals return plot_pdf_on_axes(axes, pdf, xvals_row, **kw)
[docs] @classmethod def add_mappings(cls): """ Add this classes mappings to the conversion dictionary """ cls._add_creation_method(cls.create, None) cls._add_extraction_method(extract_xy_vals, None) cls._add_extraction_method(extract_xy_sparse, None)
[docs] @classmethod def make_test_data(cls): """Make data for unit tests""" cls.test_data = dict( interp_irregular=dict( gen_func=interp_irregular, ctor_data=dict(xvals=XARRAY, yvals=YARRAY), convert_data=dict(xvals=XBINS), test_xvals=TEST_XVALS, ) )
interp_irregular = interp_irregular_gen.create add_class(interp_gen) add_class(interp_irregular_gen)