Source code for qp.parameterizations.quant.dual_spline_average

from __future__ import annotations

from typing import List

import numpy as np
from scipy.interpolate import interp1d

from .abstract_pdf_constructor import AbstractQuantilePdfConstructor


[docs] class DualSplineAverage(AbstractQuantilePdfConstructor): """Implementation of the "area-under-the-curve" using the average of the bounding splines fit to the CDF derivative. By using the difference between quantiles to solve for the area under the PDF, we can create an approximation of the original PDF. However, because we use a piecewise linear approximation for the continuous PDF, our approximated p(z) values will always be different that the original distribution. In practice they typically oscillate above and below the original curve as each calculation attempts to correct for over or undershooting of the prior calculation. If we fit two splines, one to the odd and one to the even approximated points, then take the average, the resulting average of those splines tend to fit the original distribution well. This constructor implements that algorithmic approach. """ def __init__(self, quantiles: List[float], locations: List[List[float]]) -> None: """Constructor to instantiate this class. Parameters ---------- quantiles : List[float] List of n quantile values in the range (0,1). locations : List[List[float]] List of m Lists, each containing n values corresponding to the y-value of the PPF function at the same quantile index. """ self._quantiles = quantiles self._locations = np.atleast_2d(locations) self._p_of_zs = None self.y1 = None self.y2 = None
[docs] def prepare_constructor(self) -> None: """This method solves for the area under the PDF via a stepwise algorithm. Given that the difference between any two quantile values is equal to the area under the PDF between the corresponding pair of locations, _and_ given that we know the p(z) value at 1 of those locations, we can solve for the unknown p(z) value at the other location. We approximate the area under the curve as a trapezoid with the following area: (q_i+1 - q_i) = (loc_i+1 - loc_i) * p(z_i) + (1/2) * (loc_i+1 - loc_i) * p(z_i+1 = p(z_i)) Solving for p(z_i+1), we have: p(z_i+1) = [2 * (q_i+1 - q_i) / (loc_i+1 - loc_i)] - p(z_i) The first term in this equation is calculated as `first_term`. After that we step along all distributions simultaneously for each location, using the previous p(z) value to calculate the next. """ # Prepare an empty container for the output self._p_of_zs = np.zeros(self._locations.shape) # Prepare an all-zero list, used in the step-wise calculation to prevent negative values zeros_for_comparison = np.zeros(self._locations.shape[0]) # Calculate the first term first_term = 2 * np.diff(self._quantiles) / np.diff(self._locations) # Perform the step-wise calculation for all distributions simultaneously for i in range(1, np.shape(self._p_of_zs)[-1]): self._p_of_zs[:, i] = np.maximum( zeros_for_comparison, first_term[:, i - 1] - self._p_of_zs[:, i - 1] ) # Set any negative values to 0. self._p_of_zs = np.maximum(np.zeros(self._locations.shape), self._p_of_zs)
[docs] def construct_pdf( self, grid: List[float], row: List[int] = None ) -> List[List[float]]: """This method utilizes intermediate calculations from `prepare_constructor` along with the provided grid (i.e. x) values to return corresponding y values to construct the PDF approximation. Parameters ---------- grid : List[float] x values used to calculate corresponding y values row : List[int], optional A list of indexes of the original distribution to return, used as a filter. By default None will do no filtering. Returns ------- List[List[float]] The lists of y values returned from self._interpolation_functions """ if self._p_of_zs is None: self.prepare_constructor() # Support the use of `row` as a filter. If row is None, do nothing, # otherwise, return a subset of the rows. # Using `map` alone will return an iterator that will be completely consumed after the first # list comprehension. Thus we convert the map to a list so that it can be used multiple times. filtered_p_of_zs = self._p_of_zs filtered_locations = self._locations if row is not None: filtered_p_of_zs = list(map(self._p_of_zs.__getitem__, np.unique(row))) filtered_locations = list(map(self._locations.__getitem__, np.unique(row))) # Create a list of interpolated splines for the even and odd pairs of # (specific_locations, specific_p_of_zs) f1 = np.asarray( [ interp1d( np.squeeze(specific_locations[0::2]), np.squeeze(specific_p_of_zs[0::2]), bounds_error=False, fill_value=0.0, kind="cubic", ) for specific_p_of_zs, specific_locations in zip( filtered_p_of_zs, filtered_locations ) ] ) f2 = np.asarray( [ interp1d( np.squeeze(specific_locations[1::2]), np.squeeze(specific_p_of_zs[1::2]), bounds_error=False, fill_value=0.0, kind="cubic", ) for specific_p_of_zs, specific_locations in zip( filtered_p_of_zs, filtered_locations ) ] ) # Evaluate all the splines at the input grid values self.y1 = np.asarray([func_1(grid) for func_1 in f1]) self.y2 = np.asarray([func_2(grid) for func_2 in f2]) # Return the average of the spline values at each of the evaluated points. return (self.y1 + self.y2) / 2
[docs] def debug(self): """This is a debugging utility that is meant to return intermediate calculation values to make it easier to visualize and debug the reconstruction algorithm. Returns ------- _quantiles : Input during constructor instantiation _locations : Input during constructor instantiation _p_of_zs : Resulting p(z) values found after calculating the area of trapezoids based on the difference between adjacent quantile values y1 : One of two splines fit to alternating pairs of (_locations, _p_of_zs) y2 : One of two splines fit to alternating pairs of (_locations, _p_of_zs) """ return self._quantiles, self._locations, self._p_of_zs, self.y1, self.y2