Source code for ofex.sampling_simulation.sampling_base

"""
This module provides tools for representing and manipulating probability distributions.

Classes:

- ProbDist: Represents a single probability distribution. Contains methods for sampling,
    computing statistical metrics such as mean, variance, and standard deviation.

- JointProbDist: Extends ProbDist to handle multidimensional events (joint distributions),
    supporting covariance computation and other joint distribution functionalities.

These classes are designed for flexibility and support various operations related to
probabilistic modeling and numerical simulations.
"""

import math
from itertools import product
from typing import List, Any, Optional, Union, Dict, Tuple

import numpy as np
from openfermion.config import EQ_TOLERANCE

from ofex.exceptions import OfexTypeError

__all__ = ["ProbDist", "JointProbDist"]

EventType = Union[int, float, complex]

[docs] class ProbDist(dict): """ Represents a probability distribution as a dictionary where keys are events and values are their probabilities. Provides functionalities for sampling, computing statistical moments, and related operations. """ # _cumulative: np.ndarray # _cumulative_events: List[Any] _true_average: Optional[Any] = None _true_variance: Optional[Any] = None def __init__(self, *args, **kwargs): """ Initialize the probability distribution by checking normalization and extracting events and their probabilities. Args: *args: Arguments to initialize the dictionary. **kwargs: Keyword arguments to initialize the dictionary. Raises: ValueError: If the sum of probabilities is not 1 within tolerance. """ super().__init__(*args, **kwargs) if not np.isclose(sum(self.values()), 1.0, atol=EQ_TOLERANCE): raise ValueError("Not completed prob.", sum(self.values())) self.event = list(self.keys()) self.prob = [self[k] for k in self.event]
[docs] def sample_num(self, shots: int = 1, seed: Optional[int]=None)\ -> Dict[EventType, float]: """ Sample event occurrences based on the probability distribution. Args: shots (int): Number of samples to draw. seed (Optional[int]): Seed for the random number generator. Returns: Dict[EventType, float]: A dictionary mapping events to occurrences. """ if seed is not None: np.random.seed(seed) try: dist = np.random.multinomial(shots, self.prob, size=1)[0] except ValueError as e: print(shots) raise e return dict(zip(self.event, dist))
@property def true_average(self) -> EventType: """ Calculate the true average (mean) of the probability distribution. Returns: EventType: The computed true average value. """ if self._true_average is None: ret = 0.0 for ev, prob in self.items(): ret += ev * prob self._true_average = ret return self._true_average @property def true_variance(self) -> float: """ Calculate the true variance of the probability distribution. Returns: float: The computed true variance value. """ if self._true_variance is None: avg = 0.0 sq_avg = 0.0 for ev, prob in self.items(): avg += ev * prob sq_avg += (abs(ev) ** 2) * prob self._true_variance = sq_avg - (abs(avg) ** 2) return self._true_variance @property def true_std(self) -> float: """ Calculate the true standard deviation of the probability distribution. Returns: float: The computed true standard deviation value. """ return np.sqrt(self.true_variance)
[docs] def empirical_average(self, shots: Union[int, float], seed: Optional[int]=None)\ -> EventType: """ Calculate the empirical average of the probability distribution. Args: shots (Union[int, float]): The number of samples to draw for computing the average. If infinite, returns the true average. seed (Optional[int]): Seed for the random number generator. Returns: EventType: The computed empirical average value. """ if math.isinf(shots): return self.true_average elif isinstance(shots, float): shots = int(shots) if shots == 0: raise NotImplementedError samples = self.sample_num(shots, seed) avg = 0.0 for ev, occ in samples.items(): avg += ev * occ return avg / shots
[docs] def batched_empirical_average(self, shots: Union[int, float], n_batch: int, seed: Optional[int]=None)\ -> List[EventType]: """ Compute a batched empirical average over multiple iterations. Args: shots (Union[int, float]): The number of samples to draw for each batch. n_batch (int): The number of batches to compute. seed (Optional[int]): Seed for the random number generator. Returns: List[EventType]: A list of empirical averages for each batch. """ if seed is not None: np.random.seed(seed) output = list() for _ in range(n_batch): output.append(self.empirical_average(shots, seed=None)) return output
[docs] def empirical_variance(self, shots: int, seed: Optional[int]=None)\ -> float: """ Calculate the empirical variance of the probability distribution. Args: shots (int): The number of samples to draw for computing the variance. If infinite, returns the true variance. seed (Optional[int]): Seed for the random number generator. Returns: float: The computed empirical variance value. """ if math.isinf(shots): return self.true_variance elif isinstance(shots, float): shots = int(shots) if shots == 0: raise NotImplementedError samples = self.sample_num(shots, seed) avg = 0.0 sq_avg = 0.0 for ev, occ in samples.items(): avg += ev * occ sq_avg += (abs(ev) ** 2) * occ avg /= shots sq_avg /= shots return sq_avg - (abs(avg) ** 2)
[docs] def empirical_std(self, shots: int, seed:Optional[int]=None)\ -> EventType: """ Compute the empirical standard deviation of the probability distribution. Args: shots (int): The number of samples to draw for computing the standard deviation. seed (Optional[int]): Seed for the random number generator. Returns: EventType: The computed empirical standard deviation value. """ return np.sqrt(self.empirical_variance(shots, seed))
[docs] @classmethod def unpickle(cls, pickled_obj: Dict[EventType, float]): """ Unpickle a serialized probability distribution. Args: pickled_obj (Dict[EventType, float]): The serialized probability distribution object. Returns: ProbDist: A reconstructed ProbDist object. """ return cls(pickled_obj)
[docs] def pickle(self) -> Dict[EventType, float]: """ Serialize the probability distribution into a dictionary. Returns: Dict[EventType, float]: A dictionary containing the serialized probability distribution. """ return dict(self)
[docs] class JointProbDist(ProbDist): """ Represents a joint probability distribution over multiple variables with additional capabilities to compute covariance and handle multidimensional events. """ _true_covariance = None def __init__(self, keywords: List[str], distr: Dict[Tuple[EventType, ...], float], dtype=float): """ Initialize a joint probability distribution with events and their probabilities. Args: keywords (List[str]): Descriptive labels for each variable in the joint distribution. distr (Dict[Tuple[EventType, ...], float]): Dictionary mapping multidimensional events (tuples) to their associated probabilities. dtype (type): Data type for computations (default is float). Raises: OfexTypeError: If an event in the distribution is not a tuple. ValueError: If the number of event dimensions does not match the number of keywords. """ super().__init__(distr) for ev in self.event: if not isinstance(ev, tuple): raise OfexTypeError(ev) if len(ev) != len(keywords): raise ValueError(f"The length of the keywords to describe the events are not the same.") self.keywords = keywords self.dtype = dtype
[docs] @classmethod def unpickle(cls, pickled_obj: Tuple[List[str], Dict[Tuple[EventType, ...], float], str]): """ Deserialize a joint probability distribution. Args: pickled_obj (Tuple[List[str], Dict[Tuple[EventType, ...], float], str]): The serialized object containing keywords, event probabilities, and data type. Returns: JointProbDist: A reconstructed JointProbDist instance. Raises: ValueError: If the data type string is invalid. """ keywords, distr, dtype = pickled_obj if 'float' in dtype: dtype = float elif 'complex' in dtype: dtype = complex elif 'int' in dtype: dtype = int else: raise ValueError(f"Invalid dtype {dtype}") return cls(keywords, distr, dtype)
[docs] def pickle(self) -> Tuple[List[str], Dict[Tuple[EventType, ...], float], str]: """ Serialize the joint probability distribution into a tuple. Returns: Tuple[List[str], Dict[Tuple[EventType, ...], float], str]: A tuple containing the keywords, event probabilities, and data type of the distribution. """ return self.keywords, dict(self), str(self.dtype)
[docs] def sample_num(self, shots: int = 1, seed: Optional[int] = None) \ -> Dict[Tuple[EventType], float]: """ Sample event occurrences based on the probability distribution. Args: shots (int): Number of samples to draw. seed (Optional[int]): Seed for the random number generator. Returns: Dict[Tuple[EventType], float]: A dictionary mapping events to occurrences. """ if seed is not None: np.random.seed(seed) try: dist = np.random.multinomial(shots, self.prob, size=1)[0] except ValueError as e: print(shots) raise e return dict(zip(self.event, dist))
@property def true_average(self) -> Dict[str, EventType]: """ Compute the true average for each variable in the joint distribution. Returns: Dict[str, EventType]: A dictionary mapping each variable label to its mean value. """ if self._true_average is None: ret = np.zeros(len(self.keywords), dtype=self.dtype) for ev, prob in self.items(): ret += np.array(ev) * prob self._true_average = {k: val for k, val in zip(self.keywords, ret)} return self._true_average @property def true_variance(self) -> Dict[str, float]: """ Compute the true variance for each variable in the joint distribution. Returns: Dict[str, float]: A dictionary mapping each variable label to its variance. """ if self._true_variance is None: self._true_variance = {k: self.true_covariance[(k, k)] for k in self.keywords} return self._true_variance @property def true_covariance(self) -> Dict[Tuple[str, str], EventType]: """ Compute the true covariance matrix over all variables in the joint distribution. Returns: Dict[Tuple[str, str], EventType]: A dictionary mapping pairs of variables to their covariance values. """ if self._true_covariance is None: cov_mat = np.zeros((len(self.keywords), len(self.keywords)), dtype=self.dtype) for ev, prob in self.items(): cov_mat += np.array([[ev1.conjugate() * ev2 * prob for ev1 in ev] for ev2 in ev]) cov_mat -= np.array([[self.true_average[k1].conjugate() * self.true_average[k2] for k1 in self.keywords] for k2 in self.keywords]) self._true_covariance = {(k1, k2): cov_mat[i, j] for (i, k1), (j, k2) in product(enumerate(self.keywords), repeat=2)} return self._true_covariance @property def true_std(self) -> Dict[str, float]: """ Compute the true standard deviation for each variable in the joint distribution. Returns: Dict[str, float]: A dictionary mapping each variable label to its standard deviation. """ return {k: np.sqrt(x) for k, x in self.true_variance}
[docs] def empirical_average(self, shots: Union[int, float], seed: Optional[int] = None) \ -> Dict[str, EventType]: """ Compute the empirical average for each variable based on sampled data. Args: shots (Union[int, float]): Number of samples to draw. If infinite, returns the true average. seed (Optional[int]): Seed for the random number generator. Returns: Dict[str, EventType]: A dictionary mapping variable labels to their empirical averages. """ if math.isinf(shots): return self.true_average elif isinstance(shots, float): shots = int(shots) if shots == 0: return {k: 0.0 for k in self.keywords} samples = self.sample_num(shots, seed) avg = np.zeros(len(self.keywords), dtype=self.dtype) for ev, occ in samples.items(): avg += np.array(ev) * occ return {k: val / shots for k, val in zip(self.keywords, avg)}
[docs] def empirical_covariance(self, shots: int, seed: Optional[int] = None) \ -> Dict[Tuple[str, str], complex]: """ Compute the empirical covariance matrix using sampled data. Args: shots (int): Number of samples to use for the computation. seed (Optional[int]): Seed for the random number generator. Returns: Dict[Tuple[str, str], EventType]: Empirical covariance between all variable pairs. Raises: NotImplementedError: If the number of shots is zero. """ if math.isinf(shots): return self.true_covariance elif isinstance(shots, float): shots = int(shots) if shots == 0: raise NotImplementedError samples = self.sample_num(shots, seed) marg_avg = np.zeros(len(self.keywords), dtype=self.dtype) for ev, occ in samples.items(): marg_avg += np.array(ev) * occ / shots cov_mat = np.zeros((len(self.keywords), len(self.keywords)), dtype=self.dtype) for ev, occ in samples.items(): cov_mat += np.array([[ev1.conjugate() * ev2 * occ / shots for ev1 in ev] for ev2 in ev]) cov_mat -= np.array([[marg_avg[k1].conjugate() * marg_avg[k2] for k1 in range(len(self.keywords))] for k2 in range(len(self.keywords))]) return {(k1, k2): complex(cov_mat[i, j]) for (i, k1), (j, k2) in product(enumerate(self.keywords), repeat=2)}
[docs] def empirical_variance(self, shots: int, seed: Optional[int] = None) \ -> Dict[str, float]: """ Compute the empirical variance of each variable. Args: shots (int): Number of samples to use for the computation. seed (Optional[int]): Seed for the random number generator. Returns: Dict[str, float]: A dictionary mapping each variable label to its variance. Raises: NotImplementedError: If the number of shots is zero. """ if math.isinf(shots): return self.true_variance elif isinstance(shots, float): shots = int(shots) if shots == 0: raise NotImplementedError samples = self.sample_num(shots, seed) avgs = np.zeros(len(self.keywords), dtype=self.dtype) for ev, occ in samples.items(): avgs += np.array(ev) * occ avgs = avgs / shots variance = np.zeros(len(self.keywords), dtype=self.dtype) for ev, occ in samples.items(): variance += np.abs(np.array(ev)) ** 2 * occ variance = variance / shots variance -= abs(avgs) ** 2 return {k: float(variance[i]) for i, k in enumerate(self.keywords)}
[docs] def empirical_std(self, shots: int, seed: Optional[int] = None) \ -> Dict[str, float]: """ Compute the empirical standard deviation for each variable. Args: shots (int): Number of samples to use for the computation. seed (Optional[int]): Seed for the random number generator. Returns: Dict[str, float]: A dictionary mapping each variable label to its standard deviation. """ return {k: np.sqrt(v) for k, v in self.empirical_variance(shots, seed).items()}