123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645 |
- """Module for random variables.
- This module contains classes for random variables and exceptions.
- Classes:
- ParameterException: Exception for invalid parameters.
- RandomVariable: Abstract class for random variables.
- Discrete: Class for discrete random variables.
- Gaussian: Class for Gaussian random variables.
- """
- from abc import ABC, abstractmethod, abstractproperty, abstractclassmethod
- import numpy as np
- class ParameterException(Exception):
- """Exception for invalid parameters."""
- pass
- class RandomVariable(ABC):
- """Abstract base class for all random variables."""
- @abstractclassmethod
- def unity(cls, *args):
- pass
- @abstractproperty
- def dim(self):
- pass
- @abstractmethod
- def __str__(self):
- pass
- @abstractmethod
- def __add__(self):
- pass
- @abstractmethod
- def __sub__(self):
- pass
- @abstractmethod
- def __mul__(self):
- pass
- @abstractmethod
- def __iadd__(self):
- pass
- @abstractmethod
- def __isub__(self):
- pass
- @abstractmethod
- def __imul__(self):
- pass
- @abstractmethod
- def __eq__(self):
- pass
- @abstractmethod
- def normalize(self):
- pass
- @abstractmethod
- def marginalize(self):
- pass
- @abstractmethod
- def maximize(self):
- pass
- @abstractmethod
- def argmax(self):
- pass
- @abstractmethod
- def log(self):
- pass
- class Discrete(RandomVariable):
- """Class for discrete random variables.
- A discrete random variable is defined by a single- or multi-dimensional
- probability mass function. In addition, each dimension of the probability
- mass function has to be associated with a variable. The variable is
- represented by a variable node of the comprehensive factor graph.
- """
- def __init__(self, raw_pmf, *args):
- """Initialize a discrete random variable.
- Create a new discrete random variable with the given probability
- mass function over the given variable nodes.
- Args:
- raw_pmf: A Numpy array representing the probability mass function.
- The probability mass function does not need to be normalized.
- *args: Instances of the class VNode representing the variables of
- the probability mass function. The number of the positional
- arguments must match the number of dimensions of the Numpy
- array.
- Raises:
- ParameterException: An error occurred initializing with invalid
- parameters.
- """
- pmf = np.asarray(raw_pmf, dtype=np.float64)
- # Set probability mass function
- self._pmf = pmf
- # Set variable nodes for dimensions
- if np.ndim(pmf) != len(args):
- raise ParameterException('Dimension mismatch.')
- else:
- self._dim = args
- @classmethod
- def unity(cls, *args):
- """Initialize unit element of a discrete random variable.
- Args:
- *args: Instances of the class VNode representing the variables of
- the probability mass function. The number of the positional
- arguments must match the number of dimensions of the Numpy
- array.
- Raises:
- ParameterException: An error occurred initializing with invalid
- parameters.
- """
- n = len(args)
- return cls(np.ones((1,) * n), *args)
- @property
- def pmf(self):
- return self._pmf
- @property
- def dim(self):
- return self._dim
- def __str__(self):
- """Return string representation of the discrete random variable."""
- return str(self.pmf)
- def __add__(self, other):
- """Add other to self and return the result.
- Args:
- other: Summand for the discrete random variable.
- Returns:
- A new discrete random variable representing the summation.
- """
- # Verify dimensions of summand and summand.
- if len(self.dim) < len(other.dim):
- self._expand(other.dim, other.pmf.shape)
- elif len(self.dim) > len(other.dim):
- other._expand(self.dim, self.pmf.shape)
- pmf = self.pmf + other.pmf
- return Discrete(pmf, *self.dim)
- def __sub__(self, other):
- """Subtract other from self and return the result.
- Args:
- other: Subtrahend for the discrete random variable.
- Returns:
- A new discrete random variable representing the subtraction.
- """
- # Verify dimensions of minuend and subtrahend.
- if len(self.dim) < len(other.dim):
- self._expand(other.dim, other.pmf.shape)
- elif len(self.dim) > len(other.dim):
- other._expand(self.dim, self.pmf.shape)
- pmf = self.pmf - other.pmf
- return Discrete(pmf, *self.dim)
- def __mul__(self, other):
- """Multiply other with self and return the result.
- Args:
- other: Multiplier for the discrete random variable.
- Returns:
- A new discrete random variable representing the multiplication.
- """
- # Verify dimensions of multiplicand and multiplier.
- if len(self.dim) < len(other.dim):
- self._expand(other.dim, other.pmf.shape)
- elif len(self.dim) > len(other.dim):
- other._expand(self.dim, self.pmf.shape)
- pmf = self.pmf * other.pmf
- return Discrete(pmf, *self.dim)
- def __iadd__(self, other):
- """Method for augmented addition.
- Args:
- other: Summand for the discrete random variable.
- Returns:
- A new discrete random variable representing the summation.
- """
- return self.__add__(other)
- def __isub__(self, other):
- """Method for augmented subtraction.
- Args:
- other: Subtrahend for the discrete random variable.
- Returns:
- A new discrete random variable representing the subtraction.
- """
- return self.__sub__(other)
- def __imul__(self, other):
- """Method for augmented multiplication.
- Args:
- other: Multiplier for the discrete random variable.
- Returns:
- A new discrete random variable representing the multiplication.
- """
- return self.__mul__(other)
- def __eq__(self, other):
- """Compare self with other and return the boolean result.
- Two discrete random variables are equal only if the probability mass
- functions are equal and the order of dimensions are equal.
- """
- return np.allclose(self.pmf, other.pmf) \
- and self.dim == other.dim
- def _expand(self, dims, states):
- """Expand dimensions.
- Expand the discrete random variable along the given new dimensions.
- Args:
- dims: List of discrete random variables.
- """
- reps = [1, ] * len(dims)
- # Extract missing dimensions
- diff = [i for i, d in enumerate(dims) if d not in self.dim]
- # Expand missing dimensions
- for d in diff:
- self._pmf = np.expand_dims(self.pmf, axis=d)
- reps[d] = states[d]
- # Repeat missing dimensions
- self._pmf = np.tile(self.pmf, reps)
- self._dim = dims
- def normalize(self):
- """Normalize probability mass function."""
- pmf = self.pmf / np.sum(self.pmf)
- return Discrete(pmf, *self.dim)
- def marginalize(self, *dims, normalize=True):
- """Return the marginal for given dimensions.
- The probability mass function of the discrete random variable
- is marginalized along the given dimensions.
- Args:
- *dims: Instances of discrete random variables, which should be
- marginalized out.
- normalize: Boolean flag if probability mass function should be
- normalized after marginalization.
- Returns:
- A new discrete random variable representing the marginal.
- """
- axis = tuple(idx for idx, d in enumerate(self.dim) if d in dims)
- pmf = np.sum(self.pmf, axis)
- if normalize:
- pmf /= np.sum(pmf)
- new_dims = tuple(d for d in self.dim if d not in dims)
- return Discrete(pmf, *new_dims)
- def maximize(self, *dims, normalize=True):
- """Return the maximum for given dimensions.
- The probability mass function of the discrete random variable
- is maximized along the given dimensions.
- Args:
- *dims: Instances of discrete random variables, which should be
- maximized out.
- normalize: Boolean flag if probability mass function should be
- normalized after marginalization.
- Returns:
- A new discrete random variable representing the maximum.
- """
- axis = tuple(idx for idx, d in enumerate(self.dim) if d in dims)
- pmf = np.amax(self.pmf, axis)
- if normalize:
- pmf /= np.sum(pmf)
- new_dims = tuple(d for d in self.dim if d not in dims)
- return Discrete(pmf, *new_dims)
- def argmax(self, dim=None):
- """Return the dimension index of the maximum.
- Args:
- dim: An optional discrete random variable along a marginalization
- should be performed and the maximum is searched over the
- remaining dimensions. In the case of None, the maximum is
- search along all dimensions.
- Returns:
- An integer representing the dimension of the maximum.
- """
- if dim is None:
- return np.unravel_index(self.pmf.argmax(), self.pmf.shape)
- m = self.marginalize(dim)
- return np.argmax(m.pmf)
- def log(self):
- """Natural logarithm of the discrete random variable.
- Returns:
- A new discrete random variable with the natural logarithm of the
- probablitiy mass function.
- """
- return Discrete(np.log(self.pmf), *self.dim)
- class Gaussian(RandomVariable):
- """Class for Gaussian random variables.
- A Gaussian random variable is defined by a mean vector and a covariance
- matrix. In addition, each dimension of the mean vector and the covariance
- matrix has to be associated with a variable. The variable is
- represented by a variable node of the comprehensive factor graph.
- """
- def __init__(self, raw_mean, raw_cov, *args):
- """Initialize a Gaussian random variable.
- Create a new Gaussian random variable with the given mean vector and
- the given covariance matrix over the given variable nodes.
- Args:
- raw_mean: A Numpy array representing the mean vector.
- raw_cov: A Numpy array representing the covariance matrix.
- *args: Instances of the class VNode representing the variables of
- the mean vector and covariance matrix, respectively. The number
- of the positional arguments must match the number of dimensions
- of the Numpy arrays.
- Raises:
- ParameterException: An error occurred initializing with invalid
- parameters.
- """
- if raw_mean is not None and raw_cov is not None:
- mean = np.asarray(raw_mean, dtype=np.float64)
- cov = np.asarray(raw_cov, dtype=np.float64)
- # Set mean vector and covariance matrix
- if mean.shape[0] != cov.shape[0]:
- raise ParameterException('Dimension mismatch.')
- else:
- # Precision matrix
- self._W = np.linalg.inv(np.asarray(cov))
- # Precision-mean vector
- self._Wm = np.dot(self._W, np.asarray(mean))
- # Set variable nodes for dimensions
- if cov.shape[0] != len(args):
- raise ParameterException('Dimension mismatch.')
- else:
- self._dim = args
- else:
- self._dim = args
- @classmethod
- def unity(cls, *args):
- """Initialize unit element of a Gaussian random variable.
- Args:
- *args: Instances of the class VNode representing the variables of
- the mean vector and covariance matrix, respectively. The number
- of the positional arguments must match the number of dimensions
- of the Numpy arrays.
- Raises:
- ParameterException: An error occurred initializing with invalid
- parameters.
- """
- n = len(args)
- return cls(np.diag(np.zeros(n)), np.diag(np.ones(n) * np.Inf), *args)
- @classmethod
- def inf_form(cls, raw_W, raw_Wm, *args):
- """Initialize a Gaussian random variable using the information form.
- Create a new Gaussian random variable with the given mean vector and
- the given covariance matrix over the given variable nodes.
- Args:
- raw_W: A Numpy array representing the precision matrix.
- raw_Wm: A Numpy array representing the precision-mean vector.
- *args: Instances of the class VNode representing the variables of
- the mean vector and covariance matrix, respectively. The number
- of the positional arguments must match the number of dimensions
- of the Numpy arrays.
- Raises:
- ParameterException: An error occurred initializing with invalid
- parameters.
- """
- g = cls(None, None, *args)
- g._W = np.asarray(raw_W, dtype=np.float64)
- g._Wm = np.asarray(raw_Wm, dtype=np.float64)
- return g
- @property
- def mean(self):
- return np.dot(np.linalg.inv(self._W), self._Wm)
- @property
- def cov(self):
- return np.linalg.inv(self._W)
- @property
- def dim(self):
- return self._dim
- def __str__(self):
- """Return string representation of the Gaussian random variable."""
- return "%s %s" % (self.mean, self.cov)
- def __add__(self, other):
- """Add other to self and return the result.
- Args:
- other: Summand for the Gaussian random variable.
- Returns:
- A new Gaussian random variable representing the summation.
- """
- return Gaussian(self.mean + other.mean,
- self.cov + other.cov,
- *self.dim)
- def __sub__(self, other):
- """Subtract other from self and return the result.
- Args:
- other: Subrahend for the Gaussian random variable.
- Returns:
- A new Gaussian random variable representing the subtraction.
- """
- return Gaussian(self.mean - other.mean,
- self.cov - other.cov,
- *self.dim)
- def __mul__(self, other):
- """Multiply other with self and return the result.
- Args:
- other: Multiplier for the Gaussian random variable.
- Returns:
- A new Gaussian random variable representing the multiplication.
- """
- W = self._W + other._W
- Wm = self._Wm + other._Wm
- return Gaussian.inf_form(W, Wm, *self.dim)
- def __iadd__(self, other):
- """Method for augmented addition.
- Args:
- other: Summand for the Gaussian random variable.
- Returns:
- A new Gaussian random variable representing the summation.
- """
- return self.__add__(other)
- def __isub__(self, other):
- """Method for augmented subtraction.
- Args:
- other: Subtrahend for the Gaussian random variable.
- Returns:
- A new Gaussian random variable representing the subtraction.
- """
- return self.__sub__(other)
- def __imul__(self, other):
- """Method for augmented multiplication.
- Args:
- other: Multiplier for the Gaussian random variable.
- Returns:
- A new Gaussian random variable representing the multiplication.
- """
- return self.__mul__(other)
- def __eq__(self, other):
- """Compare self with other and return the boolean result.
- Two Gaussian random variables are equal only if the mean vectors and
- the covariance matrices are equal and the order of dimensions are
- equal.
- """
- return np.allclose(self._W, other._W) \
- and np.allclose(self._Wm, other._Wm) \
- and self.dim == other.dim
- def normalize(self):
- """Normalize probability density function."""
- return self
- def marginalize(self, *dims):
- """Return the marginal for given dimensions.
- The probability density function of the Gaussian random variable
- is marginalized along the given dimensions.
- Args:
- *dims: Instances of Gaussian random variables, which should be
- marginalized out.
- Returns:
- A new Gaussian random variable representing the marginal.
- """
- axis = tuple(idx for idx, d in enumerate(self.dim) if d not in dims)
- mean = self.mean[np.ix_(axis, [0])]
- cov = self.cov[np.ix_(axis, axis)]
- new_dims = tuple(d for d in self.dim if d not in dims)
- return Gaussian(mean, cov, *new_dims)
- def maximize(self, *dims):
- """Return the maximum for given dimensions.
- The probability density function of the Gaussian random variable
- is maximized along the given dimensions.
- Args:
- *dims: Instances of Gaussian random variables, which should be
- maximized out.
- Returns:
- A new Gaussian random variable representing the maximum.
- """
- axis = tuple(idx for idx, d in enumerate(self.dim) if d not in dims)
- mean = self.mean[np.ix_(axis, [0])]
- cov = self.cov[np.ix_(axis, axis)]
- new_dims = tuple(d for d in self.dim if d not in dims)
- return Gaussian(mean, cov, *new_dims)
- def argmax(self, dim=None):
- """Return the dimension index of the maximum.
- Args:
- dim: An optional Gaussian random variable along a marginalization
- should be performed and the maximum is searched over the
- remaining dimensions. In the case of None, the maximum is
- search along all dimensions.
- Returns:
- An integer representing the dimension of the maximum.
- """
- if dim is None:
- return self.mean
- m = self.marginalize(dim)
- return m.mean
- def log(self):
- """Natural logarithm of the Gaussian random variable.
- Returns:
- A new Gaussian random variable with the natural logarithm of the
- probability density function.
- """
- raise NotImplementedError
|