Source code for qp.metrics.metrics

"""This module implements some performance metrics for distribution parameterization"""
import logging
from collections import namedtuple
from functools import partial

import numpy as np
from deprecated import deprecated

from qp.metrics import array_metrics
from qp.metrics.brier import Brier
from qp.metrics.goodness_of_fit import goodness_of_fit_metrics
from qp.utils import epsilon

Grid = namedtuple(
    "Grid", ["grid_values", "cardinality", "resolution", "hist_bin_edges", "limits"]
)


def _calculate_grid_parameters(limits, dx: float = 0.01) -> Grid:
    """
    Create a grid of points and return parameters describing it.

    Args:
        limits (Iterable) often a 2-tuple or numpy array with shape (2,)
            the max and min values of the 1d grid
        dx (float, optional):
            the desired delta between points. Used to define the cardinality. Defaults to 0.01.

    Returns:
        Grid: a namedtuple containing a 1d grid's values and attributes.
            grid_values: np.array with size = cardinality
            cardinality: int, number of elements in grid_value
            resolution: float, equal to grid_values[i] - grid_values[i-1]
            hist_bin_edges: np.array with size = cardinality+1.
                Equally spaced histogram bin edges starting at limit-resolution/2.
                Assumes that grid_value[i] should be centered in the bin defined by
                (hist_bin_edge[i], hist_bin_edge[i+1]).
            limits: 2-tuple, the limits passed in and used in this function
    """
    cardinality = int((limits[-1] - limits[0]) / dx)
    grid_values = np.linspace(limits[0], limits[1], cardinality)
    resolution = (limits[-1] - limits[0]) / (cardinality - 1)
    hist_bin_edges = np.histogram_bin_edges(
        (limits[0] - resolution / 2, limits[1] + resolution / 2), cardinality
    )

    return Grid(grid_values, cardinality, resolution, hist_bin_edges, limits)


[docs]def calculate_moment(p, N, limits, dx=0.01): """ Calculates a moment of a qp.Ensemble object Parameters ---------- p: qp.Ensemble object the collection of PDFs whose moment will be calculated N: int order of the moment to be calculated limits: tuple of floats endpoints of integration interval over which to calculate moments dx: float resolution of integration grid Returns ------- M: float value of the moment """ # Make a grid from the limits and resolution grid = _calculate_grid_parameters(limits, dx) # Evaluate the functions on the grid pe = p.gridded(grid.grid_values)[1] # calculate the moment grid_to_N = grid.grid_values**N M = array_metrics.quick_moment(pe, grid_to_N, grid.resolution) return M
[docs]def calculate_kld(p, q, limits, dx=0.01): """ Calculates the Kullback-Leibler Divergence between two qp.Ensemble objects. Parameters ---------- p: Ensemble object probability distribution closer to the truth q: Ensemble object probability distribution that approximates p limits: tuple of floats endpoints of integration interval in which to calculate KLD dx: float resolution of integration grid Returns ------- Dpq: float the value of the Kullback-Leibler Divergence from `q` to `p` Notes ----- TO DO: have this take number of points not dx! """ if p.shape != q.shape: raise ValueError( "Cannot calculate KLD between two ensembles with different shapes" ) # Make a grid from the limits and resolution grid = _calculate_grid_parameters(limits, dx) # Evaluate the functions on the grid and normalize pe = p.gridded(grid.grid_values) pn = pe[1] qe = q.gridded(grid.grid_values) qn = qe[1] # Calculate the KLD from q to p Dpq = array_metrics.quick_kld( pn, qn, grid.resolution ) # np.dot(pn * logquotient, np.ones(len(grid)) * dx) if np.any(Dpq < 0.0): # pragma: no cover print("broken KLD: " + str((Dpq, pn, qn, grid.resolution))) Dpq = epsilon * np.ones(Dpq.shape) return Dpq
[docs]def calculate_rmse(p, q, limits, dx=0.01): """ Calculates the Root Mean Square Error between two qp.Ensemble objects. Parameters ---------- p: qp.Ensemble object probability distribution function whose distance between its truth and the approximation of `q` will be calculated. q: qp.Ensemble object probability distribution function whose distance between its approximation and the truth of `p` will be calculated. limits: tuple of floats endpoints of integration interval in which to calculate RMS dx: float resolution of integration grid Returns ------- rms: float the value of the RMS error between `q` and `p` Notes ----- TO DO: change dx to N """ if p.shape != q.shape: raise ValueError( "Cannot calculate RMSE between two ensembles with different shapes" ) # Make a grid from the limits and resolution grid = _calculate_grid_parameters(limits, dx) # Evaluate the functions on the grid pe = p.gridded(grid.grid_values)[1] qe = q.gridded(grid.grid_values)[1] # Calculate the RMS between p and q rms = array_metrics.quick_rmse( pe, qe, grid.cardinality ) # np.sqrt(dx * np.sum((pe - qe) ** 2)) return rms
[docs]def calculate_rbpe(p, limits=(np.inf, np.inf)): """ Calculates the risk based point estimates of a qp.Ensemble object. Algorithm as defined in 4.2 of 'Photometric redshifts for Hyper Suprime-Cam Subaru Strategic Program Data Release 1' (Tanaka et al. 2018). Parameters ---------- p: qp.Ensemble object Ensemble of PDFs to be evalutated limits: tuple The limits at which to evaluate possible z_best estimates. If custom limits are not provided then all potential z value will be considered using the scipy.optimize.minimize_scalar function. Returns ------- rbpes: array of floats The risk based point estimates of the provided ensemble. """ rbpes = [] def evaluate_pdf_at_z(z, dist): return dist.pdf(z)[0][0] for n in range(0, p.npdf): if p[n].npdf != 1: raise ValueError( "quick_rbpe only handles Ensembles with a single PDF " "for ensembles with more than one PDF, use the qp.metrics.risk_based_point_estimate function." ) this_dist_pdf_at_z = partial(evaluate_pdf_at_z, dist=p[n]) integration_bounds = (p[n].ppf(0.01)[0][0], p[n].ppf(0.99)[0][0]) rbpes.append( array_metrics.quick_rbpe(this_dist_pdf_at_z, integration_bounds, limits) ) return np.array(rbpes)
def _prepare_for_brier(p, truth, limits, dx=0.01): # Ensure that the number of distributions objects in the Ensemble # is equal to the length of the truth array if p.npdf != len(truth): raise ValueError( "Number of distributions in the Ensemble (%d) is not equal to the number of truth values (%d)" % (p.npdf, len(truth)) ) # Values of truth that are outside the defined limits will not appear truth_array. # Consider expanding the limits or using numpy.clip to restrict truth values to the limits. if np.any(np.less(truth, limits[0])) or np.any(np.greater(truth, limits[1])): raise ValueError("Input truth values exceed the defined limits") # Make a grid object that defines grid values and histogram bin edges using limits and dx grid = _calculate_grid_parameters(limits, dx) # Evaluate the pdf of the distributions on the grid. # The value returned from p.gridded is a 2-tuple. The 0th index is the array of grid points, # the 1st index is the array of PDF values. Thus we call p.gridded(...)[1] pdf_values = p.gridded(grid.grid_values)[1] # Create the NxM truth_array. # Note np.histogram returns a 2-tuple. The 0th index is the histogram array, # thus we call np.squeeze to remove extra dimensions. truth_array = np.squeeze([np.histogram(t, grid.hist_bin_edges)[0] for t in truth]) # instantiate the Brier metric object return Brier(pdf_values, truth_array)
[docs]def calculate_brier(p, truth, limits, dx=0.01): """This function will do the following: 1) Generate a Mx1 sized grid based on `limits` and `dx`. 2) Produce an NxM array by evaluating the pdf for each of the N distribution objects in the Ensemble p on the grid. 3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1. 4) Create a Brier metric evaluation object 5) Return the result of the Brier metric calculation. Parameters ---------- p: qp.Ensemble object of N distributions probability distribution functions that will be gridded and compared against truth. truth: Nx1 sequence the list of true values, 1 per distribution in p. limits: 2-tuple of floats endpoints grid to evaluate the PDFs for the distributions in p dx: float resolution of the grid Defaults to 0.01. Returns ------- Brier_metric: float """ brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx) # return the results of evaluating the Brier metric return brier_metric_evaluation.evaluate()
[docs]def calculate_brier_for_accumulation(p, truth, limits, dx=0.01): brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx) brier_sum = brier_metric_evaluation.accumulate() return (brier_sum, p.npdf)
[docs]@deprecated( reason=""" This implementation is deprecated for performance reasons and does not accommodate N vs 1 comparisons. Please use calculate_goodness_of_fit instead. This method is for testing purposes only. """, category=DeprecationWarning, ) def calculate_anderson_darling( p, scipy_distribution="norm", num_samples=100, _random_state=None ): # pylint: disable=unused-argument """This function is deprecated and will be completely removed in a later version. Please use `calculate_goodness_of_fit` instead. Returns ------- logger.warning """ logging.warning( "This function is deprecated, please use `calculate_goodness_of_fit` with `fit_metric='ad'`" ) # pragma: no cover
[docs]@deprecated( reason=""" This implementation is deprecated for performance reasons and does not accommodate N vs 1 comparisons. Please use calculate_goodness_of_fit instead. This method is for testing purposes only. """, category=DeprecationWarning, ) def calculate_cramer_von_mises( p, q, num_samples=100, _random_state=None, **kwargs ): # pylint: disable=unused-argument """This function is deprecated and will be completely removed in a later version. Please use `calculate_goodness_of_fit` instead. Returns ------- logger.warning """ logging.warning( "This function is deprecated, please use `calculate_goodness_of_fit` with `fit_metric='cvm'`" ) # pragma: no cover
[docs]@deprecated( reason=""" This implementation is deprecated for performance reasons and does not accommodate N vs 1 comparisons. Please use calculate_goodness_of_fit instead. This method is for testing purposes only. """, category=DeprecationWarning, ) def calculate_kolmogorov_smirnov( p, q, num_samples=100, _random_state=None ): # pylint: disable=unused-argument """This function is deprecated and will be completely removed in a later version. Please use `calculate_goodness_of_fit` instead. Returns ------- logger.warning """ logging.warning( "This function is deprecated, please use `calculate_goodness_of_fit` with `fit_metric='ks'`" ) # pragma: no cover
[docs]def calculate_outlier_rate(p, lower_limit=0.0001, upper_limit=0.9999): """Fraction of outliers in each distribution Parameters ---------- p : qp.Ensemble A collection of N distributions. This implementation expects that Ensembles are not nested. lower_limit : float, optional Lower bound CDF for outliers, by default 0.0001 upper_limit : float, optional Upper bound CDF for outliers, by default 0.9999 Returns ------- [float] 1xN array where each element is the percent of outliers for a distribution in the Ensemble. """ # Validate that all the distributions in the Ensemble are single distributions - i.e. no nested Ensembles try: _check_ensemble_is_not_nested(p) except ValueError: #pragma: no cover - unittest coverage for _check_ensemble_is_not_nested is complete logging.warning("Each element in the ensemble `p` must be a single distribution.") outlier_rates = np.array([(dist.cdf(lower_limit) + (1. - dist.cdf(upper_limit)))[0][0] for dist in p]) return outlier_rates
[docs]def calculate_goodness_of_fit( estimate, reference, fit_metric="ks", num_samples=100, _random_state=None ): """This method calculates goodness of fit between the distributions in the `estimate` and `reference` Ensembles using the specified fit_metric. Parameters ---------- estimate : Ensemble containing N distributions Random variate samples will be drawn from this Ensemble reference : Ensemble containing N or 1 distributions The CDF of the distributions in this Ensemble are used in the goodness of fit calculation. fit_metric : string, optional The goodness of fit metric to use. One of ['ad', 'cvm', 'ks']. For clarity, 'ad' = Anderson-Darling, 'cvm' = Cramer-von Mises, and 'ks' = Kolmogorov-Smirnov, by default 'ks' num_samples : int, optional Number of random variates to draw from each distribution in `estimate`, by default 100 _random_state : _type_, optional Used for testing to create reproducible sets of random variates, by default None Returns ------- output: [float] A array of floats where each element is the result of the statistic calculation. Raises ------ KeyError If the requested `fit_metric` is not contained in `goodness_of_fit_metrics` dictionary, raise a KeyError. Notes ----- The calculation of the goodness of fit metrics is not symmetric. i.e. `calculate_goodness_of_fit(p, q, ...) != calculate_goodness_of_fit(q, p, ...)` In the future, we should be able to do this directly from the PDFs without needing to take random variates from the `estimate` Ensemble. The vectorized implementations of fit metrics are copied over (unmodified) from the developer branch of Scipy 1.10.0dev. When Scipy 1.10 is released, we can replace the copied implementation with the ones in Scipy. """ try: _check_ensembles_contain_correct_number_of_distributions(estimate, reference) except ( ValueError ): # pragma: no cover - unittest coverage for _check_ensembles_contain_correct_number_of_distributions is complete pylint: disable=line-too-long logging.warning( "The ensemble `reference` should have 1 or N distributions. " "With N = number of distributions in the ensemble `estimate`." ) try: _check_ensemble_is_not_nested(estimate) except ( ValueError ): # pragma: no cover - unittest coverage for _check_ensemble_is_not_nested is complete logging.warning( "Each element in the ensemble `estimate` must be a single distribution." ) try: _check_ensemble_is_not_nested(reference) except ( ValueError ): # pragma: no cover - unittest coverage for _check_ensemble_is_not_nested is complete logging.warning( "Each element in the ensemble `reference` must be a single distribution." ) if fit_metric not in goodness_of_fit_metrics: metrics = list(goodness_of_fit_metrics.keys()) raise KeyError(f"`fit_metric` should be one of {metrics}.") return goodness_of_fit_metrics[fit_metric]( reference, np.squeeze(estimate.rvs(size=num_samples, random_state=_random_state)), )
def _check_ensembles_are_same_size(p, q): """This utility function ensures checks that two Ensembles contain equal numbers of distributions" Args: p qp.Ensemble: An Ensemble containing 0 or more distributions q qp.Ensemble: A second Ensemble containing 0 or more distributions Raises: ValueError: If the result of evaluating qp.Ensemble.npdf on each Ensemble is not the same. """ if p.npdf != q.npdf: raise ValueError("Input ensembles should have the same number of distributions") def _check_ensemble_is_not_nested(p): """This utility function ensures that each element in the Ensemble is a single distribution. Args: p qp.Ensemble: An Ensemble that could contain nested Ensembles with multiple distributions in each Raises: ValueError: If there are any elements of the input Ensemble that contain more than 1 PDF. """ for dist in p: if dist.npdf != 1: raise ValueError( "Each element in the input Ensemble should be a single distribution." ) def _check_ensembles_contain_correct_number_of_distributions(estimate, reference): """This utility function ensures that the number of distributions in the ensembles matches expectations. estimate can contain 1 to N distributions. reference should contain either 1 or the same number of distributions as estimate. Example logic: estimate=N, reference=N (1 <= N) -> Pass estimate=N, reference=1 (1 <= N) -> Pass estimate=1, reference=N (N != 1) -> Raise ValueError estimate=N, reference=M (N != M) -> Raise ValueError Parameters ---------- estimate : Ensemble Used to calculate goodness of fit metrics, random variates will be produced from the distributions in this ensemble. reference : Ensemble The CDFs of the distributions in this ensemble will be used to calculate goodness of fit metrics. Raises ------ ValueError If the number of distributions in the reference ensemble does not meet the requirements, raise a ValueError. """ if estimate.npdf == reference.npdf: pass elif reference.npdf == 1: pass else: raise ValueError( "`reference` should contain either 1 distribution " "or the same number of distributions as `estimate`." )