Source code for iterative_ensemble_smoother.esmda

"""
Ensemble Smoother with Multiple Data Assimilation (ES-MDA)
----------------------------------------------------------

Implementation of the 2013 paper "Ensemble smoother with multiple data assimilation"

This implementation follows the paper, but with some additions:

- Allows batching the parameters. This is useful if you have many parameters
  that you need to update, e.g. 10 million. In that case you might want to
  read 1 million from disk, update those, then write them back to disk, assimilate
  the next 1 million, etc.
- Deals with missing combinations of parameters in ensembles.

We take a layered approach in the implementation:

1. Computations that need to be done once are performed in class initialization.
   Example: computing the Cholesky factor of the observation covariance
2. Computations that are done once per smoothing iteration are performed in the
   method `.prepare_assimilation()`.
   Example: computing the inversion of (Y @ Y.T + C_D)
3. Computations that are done done once per parameter group are performed in the
   method `.assimilate_batch()`. If you wish to assimilate all parameters in
   one batch, then just pass all parameters to this method.
   Example: computing the full product C_MD @ inv(C_DD + alpha * C_D) @ (D - Y)

References
----------

- Emerick, A.A., Reynolds, A.C. History matching time-lapse seismic data using
  the ensemble Kalman filter with multiple data assimilations.
  Comput Geosci 16, 639–659 (2012). https://doi.org/10.1007/s10596-012-9275-5
- Alexandre A. Emerick, Albert C. Reynolds.
  Ensemble smoother with multiple data assimilation.
  Computers & Geosciences, Volume 55, 2013, Pages 3-15, ISSN 0098-3004,
  https://doi.org/10.1016/j.cageo.2012.03.011
- https://gitlab.com/antoinecollet5/pyesmda
- https://helper.ipam.ucla.edu/publications/oilws3/oilws3_14147.pdf

"""

import numbers
from abc import ABC
from typing import Union

import numpy as np
import numpy.typing as npt
import scipy as sp  # type: ignore

from iterative_ensemble_smoother.esmda_inversion import (
    invert_subspace,
    normalize_alpha,
)
from iterative_ensemble_smoother.utils import adjust_for_missing, sample_mvnormal


class BaseESMDA(ABC):
    """Base class for all ESMDA classes.

    This class defines every method, apart from `assimilate_batch()`,
    which each subclass implements in their own way.
    """

    def __init__(
        self,
        covariance: npt.NDArray[np.floating],
        observations: npt.NDArray[np.floating],
        alpha: Union[int, npt.NDArray[np.floating]] = 5,
        seed: Union[np.random.Generator, int, None] = None,
    ) -> None:
        """
        Parameters
        ----------
        covariance : np.ndarray
            Either a 1D array of diagonal covariances, or a 2D covariance matrix.
            The shape is (num_observations,) or (num_observations, num_observations).
            This is C_D in Emerick (2013), and represents observation or measurement
            errors. We observe d from the real world, y from the model g(x), and
            assume that d = y + e, where the error e is multivariate normal with
            covariance given by `covariance`.
        observations : np.ndarray
            1D array of shape (num_observations,) representing real-world observations.
            This is d_obs in Emerick (2013).
        alpha : int or 1D np.ndarray, optional
            Multiplicative factor for the covariance.
            If an integer `alpha` is given, an array with length `alpha` and
            elements `alpha` is constructed. If an 1D array is given, it is
            normalized so sum_i 1/alpha_i = 1 and used. The default is 5, which
            corresponds to np.array([5, 5, 5, 5, 5]).
        seed : integer or numpy.random.Generator, optional
            A seed or numpy.random.Generator used for random number
            generation. The argument is passed to numpy.random.default_rng().
            The default is None.
        """

        # Validate inputs
        if not (isinstance(covariance, np.ndarray) and covariance.ndim in (1, 2)):
            raise TypeError(
                "Argument `covariance` must be a NumPy array of dimension 1 or 2."
            )

        if covariance.ndim == 2 and covariance.shape[0] != covariance.shape[1]:
            raise ValueError("Argument `covariance` must be square if it's 2D.")

        if not (isinstance(observations, np.ndarray) and observations.ndim == 1):
            raise TypeError("Argument `observations` must be a 1D NumPy array.")

        if not observations.shape[0] == covariance.shape[0]:
            raise ValueError("Shapes of `observations` and `covariance` must match.")

        if not (isinstance(seed, (int, np.random.Generator)) or seed is None):
            raise TypeError(
                "Argument `seed` must be an integer or numpy.random.Generator."
            )

        if not np.issubdtype(observations.dtype, np.floating):
            raise TypeError(
                f"'observations' must have a floating dtype, got {observations.dtype}"
            )
        if not np.issubdtype(covariance.dtype, np.floating):
            raise TypeError(
                f"'covariance' must have a floating dtype, got {covariance.dtype}"
            )
        if observations.dtype != covariance.dtype:
            raise ValueError(
                f"dtype mismatch: 'observations' is {observations.dtype}, "
                f"'covariance' is {covariance.dtype}"
            )

        if covariance.ndim == 1 and not np.all(covariance > 0):
            raise ValueError(
                "All elements of `covariance` must be strictly positive "
                "when it is a 1D array."
            )

        # Store data
        self.observations = observations
        self.iteration = -1
        self.rng = np.random.default_rng(seed)

        # Only compute the covariance factorization once
        # If it's a full matrix, we gain speedup by only computing cholesky once
        # If it's a diagonal, we gain speedup by never having to compute cholesky
        self.C_D = covariance.copy()
        if covariance.ndim == 2:
            self.C_D_L = sp.linalg.cholesky(self.C_D, lower=False)
        elif covariance.ndim == 1:
            self.C_D_L = np.sqrt(self.C_D)
        else:
            raise TypeError("Argument `covariance` must be 1D or 2D array")

        if not (
            (isinstance(alpha, np.ndarray) and alpha.ndim == 1)
            or isinstance(alpha, numbers.Integral)
        ):
            raise TypeError("Argument `alpha` must be an integer or a 1D NumPy array.")

        # Alpha can either be an integer (num iterations) or a list of weights
        if isinstance(alpha, np.ndarray) and alpha.ndim == 1:
            self.alpha = normalize_alpha(alpha)
        elif isinstance(alpha, numbers.Integral):
            self.alpha = normalize_alpha(np.ones(alpha))
            assert np.allclose(self.alpha, normalize_alpha(self.alpha))
        else:
            raise TypeError("Alpha must be integer or 1D array.")

        self.alpha = self.alpha

    def num_assimilations(self) -> int:
        return len(self.alpha)

    def perturb_observations(
        self, *, ensemble_size: int, alpha: float
    ) -> npt.NDArray[np.floating]:
        """Create a matrix D with perturbed observations.

        In the Emerick (2013) paper, the matrix D is defined in section 6.
        See section 2(b) of the ES-MDA algorithm in the paper.

        Parameters
        ----------
        ensemble_size : int
            The ensemble size, i.e., the number of columns in the returned array,
            which is of shape (num_observations, ensemble_size).
        alpha : float
            The covariance inflation factor. The sequence of alphas should
            obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced
            in this method call. The user/caller is responsible for this.

        Returns
        -------
        D : np.ndarray
            Each column consists of perturbed observations,
            observation std is scaled by sqrt(alpha).
        """
        # Draw samples from zero-centered multivariate normal with cov=alpha * C_D,
        # and add them to the observations. Notice that
        # if C_D = L @ L.T by the cholesky factorization, then drawing y from
        # a zero cented normal means that y := L @ z, where z ~ norm(0, 1).
        # Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha).
        samples = sample_mvnormal(
            C_dd_cholesky=self.C_D_L, rng=self.rng, size=ensemble_size
        ).astype(self.C_D_L.dtype)

        D: npt.NDArray[np.floating] = (
            self.observations[:, np.newaxis] + (alpha**0.5) * samples
        )
        assert D.shape == (len(self.observations), ensemble_size)
        return D

    def prepare_assimilation(
        self,
        *,
        Y: npt.NDArray[np.floating],
        truncation: float = 0.99,
        overwrite: bool = False,
    ) -> None:
        r"""Prepare assimilation of one or several batches of parameters.

        This method call pre-computes everything that is needed to assimilate
        a set of batches once. This saves time since we do not have to repeat
        the same computations for every single batch.

        Parameters
        ----------
        Y : np.ndarray
            2D array of shape (num_observations, ensemble_size), containing
            responses when evaluating the model at X. In other words, Y = g(X),
            where g is the forward model.
        truncation : float
            How large a fraction of the singular values to keep in the inversion
            routine (if the inversion routine supports it). Must be a float in
            the range (0, 1]. A lower number means a more approximate answer and a
            slightly faster computation. The default is 0.99, which is recommended
            by Emerick in the reference paper.
        overwrite: bool
            If False (the default), the input array will not be overwritten (mutated).
            If True, the method may overwrite the input array.

        Returns
        -------
        self
            The instance with mutated state.

        Notes
        -----
        In the equation:

            M + (\delta M)
                (\delta D)^T [(\delta D) (\delta D)^T + \alpha (N_e - 1) C_D]^(-1)
                (D_obs - D)

        This method call corresponds to computing:

            - The next-to-last factors: (\delta D)^T [(\delta D) (\delta D)^T +
                                                      \alpha (N_e - 1) C_D]^(-1)
            - The last factor: (D_obs - D)

        The total internal storage is: 2 * ensemble_size * num_observations,
        since shapes are:

            - The next-to-last factors: (ensemble_size, num_observations)
            - The last factor: (num_observations, ensemble_size)
        """
        assert Y.ndim == 2
        assert 0 < truncation <= 1.0
        if not np.issubdtype(Y.dtype, np.floating):
            raise TypeError("Argument `Y` must contain floats")

        if Y.dtype != self.observations.dtype:
            raise ValueError(
                f"'Y' must have dtype {self.observations.dtype}, got {Y.dtype}"
            )

        if self.iteration >= self.num_assimilations():
            raise Exception("No more assimilation steps to run.")

        if not overwrite:
            Y = Y.copy()

        self.truncation = truncation
        self.iteration += 1

        D = Y  # Switch from API notation to paper notation
        N_d, N_e = D.shape  # (num_observations, ensemble_size)
        assert N_e >= 2, "Must have at least two ensemble members"
        assert N_d == self.observations.shape[0], "Shape mismatch"

        # Compute the last factor
        alpha = self.alpha[self.iteration]
        D_obs = self.perturb_observations(ensemble_size=N_e, alpha=alpha)
        self.D_obs_minus_D = D_obs - D

        # Compute parts of the Kalman gain
        D -= np.mean(D, axis=1, keepdims=True)  # Center observations
        self.delta_DT, self.term_diag, self.termT = invert_subspace(
            delta_D=D, C_D_L=self.C_D_L, alpha=alpha, truncation=truncation
        )

    def _compute_delta_M(
        self,
        *,
        X: npt.NDArray[np.floating],
        missing: Union[npt.NDArray[np.bool_], None] = None,
    ) -> npt.NDArray[np.floating]:
        """Prepare delta_M := X - center(X), dealing with missing values
        as needed."""
        if not np.issubdtype(X.dtype, np.floating):
            raise TypeError("Argument `X` must contain floats")
        if not (missing is None or np.issubdtype(missing.dtype, np.bool_)):
            raise TypeError("Argument `missing_mask` must contain booleans")
        if missing is not None and (not X.shape == missing.shape):
            raise ValueError(f"Shapes must match: {X.shape=} != {missing.shape}")
        if not X.ndim == 2:
            raise ValueError("X must have shape (num_parameters_batch, ensemble_size)")

        assert X.ndim == 2
        N_m, N_e = X.shape  # (num_parameters, ensemble_size)

        # Center the parameters, possibly accounting for missing data
        if missing is not None:
            return adjust_for_missing(X, missing=missing)
        result: npt.NDArray[np.floating] = X - np.mean(X, axis=1, keepdims=True)
        return result


[docs] class ESMDA(BaseESMDA): """ Implement an Ensemble Smoother with Multiple Data Assimilation (ES-MDA). The implementation follows :cite:t:`EMERICK2013`. Examples -------- A full example where the forward model maps 10 parameters to 3 outputs. We will use 100 realizations. First we define the forward model: >>> rng = np.random.default_rng(42) >>> A = rng.normal(size=(3, 10)) >>> def forward_model(x): ... return A @ x Then we set up the ESMDA instance and the prior realizations X: >>> covariance = np.ones(3, dtype=float) # Covariance of the observations / outputs >>> observations = np.array([1, 2, 3], dtype=float) # The observed data >>> esmda = ESMDA(covariance, observations, alpha=3, seed=42) >>> X = rng.normal(size=(10, 100)) To assimilate data, we iterate over the assimilation steps: >>> for iteration in range(esmda.num_assimilations()): ... # Apply the forward model in each realization in the ensemble ... Y = np.array([forward_model(x) for x in X.T]).T ... esmda.prepare_assimilation(Y=Y) ... X = esmda.assimilate_batch(X=X) # Update X """
[docs] def assimilate_batch( self, *, X: npt.NDArray[np.floating], missing: Union[npt.NDArray[np.bool_], None] = None, overwrite: bool = False, ) -> npt.NDArray[np.floating]: """Assimilate a batch of parameters against all observations. The internal storage used by the class is 2 * ensemble_size * num_observations, so a good batch size that is of the same order of magnitude as the internal storage is 2 * num_observations. This is only a rough guideline. Parameters ---------- X : np.ndarray A 2D array of shape (num_parameters_batch, ensemble_size). Each row corresponds to a parameter in the model, and each column corresponds to an ensemble member (realization). missing : np.ndarray or None A boolean 2D array of shape (num_parameters_batch, ensemble_size). If an entry is True, then that value is assumed missing. This can happen if the ensemble members use different grids, where each ensemble member has a slightly different grid layout. If None, then all entries are assumed to be valid. overwrite: bool If False (the default), the input arrays will not be overwritten (mutated). If True, the method may overwrite the input arrays. Returns ------- X_posterior : np.ndarray 2D array of shape (num_parameters_batch, ensemble_size). """ if not overwrite: X = X.copy() if not hasattr(self, "delta_DT"): raise Exception("The method `prepare_assmilation` must be called.") N_m, N_e = X.shape # (num_parameters, ensemble_size) assert N_e == self.delta_DT.shape[0], "Dimension mismatch" # In standard ESMDA, we compute the product in a good order. # First compute the ensemble-space update in observation precision # (small: N_e × N_e), then cast to X's dtype before multiplying with # delta_M so that no parameter-sized float64 intermediate is created. input_dtype = X.dtype delta_M = self._compute_delta_M(X=X, missing=missing) ensemble_update = np.linalg.multi_dot( [self.delta_DT, self.term_diag, self.termT, self.D_obs_minus_D] ) X += delta_M @ ensemble_update.astype(input_dtype) return X
if __name__ == "__main__": import pytest pytest.main( args=[ __file__, "--doctest-modules", "-v", ] )