Skip to content

Measurement Models

The measurement module provides classes for defining ODE constraints and observation models used in probabilistic ODE solvers. The design separates concerns cleanly:

  • ODE classes define the dynamical system constraint
  • Constraint dataclasses define additional constraints (conservation laws, measurements)
  • Black-box models allow arbitrary user-defined measurement functions
  • Transformed models apply nonlinear state transformations with proper Jacobians

Class Hierarchy

ODE Information Classes

The module provides four ODE information classes, organized by ODE order and whether hidden states are present:

Class ODE Order Hidden States Vector Field Signature
ODEInformation 1st No vf(x, *, t) -> dx/dt
ODEInformationWithHidden 1st Yes vf(x, u, *, t) -> dx/dt
SecondOrderODEInformation 2nd No vf(x, v, *, t) -> d^2x/dt^2
SecondOrderODEInformationWithHidden 2nd Yes vf(x, v, u, *, t) -> d^2x/dt^2

Flexible Measurement Classes

Class Description
BlackBoxMeasurement User-defined g(state, t) with autodiff Jacobian
TransformedMeasurement Wraps any model with nonlinear state transformation

This separation ensures:

  • No runtime conditionals - Each class has a fixed code path, optimal for JAX JIT
  • Explicit signatures - The vector field type is clear from the class choice
  • Single responsibility - Each class handles exactly one case

Composable Constraints

Additional constraints are added via frozen dataclasses:

  • Conservation: Time-invariant linear constraints A @ x = p
  • Measurement: Time-varying observations A @ x = z[t] at specified times

These can be combined freely with any ODE class.

Usage Examples

First-Order ODE

from ode_filters.priors import IWP
from ode_filters.measurement import ODEInformation

prior = IWP(q=2, d=1)

def vf(x, *, t):
    return -x  # exponential decay

model = ODEInformation(vf, prior.E0, prior.E1)

Second-Order ODE (e.g., Harmonic Oscillator)

from ode_filters.priors import IWP
from ode_filters.measurement import SecondOrderODEInformation

prior = IWP(q=3, d=1)
omega = 2.0

def vf(x, v, *, t):
    return -(omega**2) * x  # harmonic oscillator

model = SecondOrderODEInformation(vf, prior.E0, prior.E1, prior.E2)

Joint State-Parameter Estimation

For estimating unknown parameters alongside the state, use JointPrior with the hidden state classes:

from ode_filters.priors import IWP, JointPrior
from ode_filters.measurement import SecondOrderODEInformationWithHidden, Measurement

# Prior for state x and unknown damping parameter u
prior_x = IWP(q=2, d=1)
prior_u = IWP(q=2, d=1)
prior_joint = JointPrior(prior_x, prior_u)

# Damped oscillator with unknown damping
def vf(x, v, u, *, t):
    omega = 1.0
    return -(omega**2) * x - u * v

# Observations of position
A_obs = prior_joint.E0[:1, :]  # observe x only
obs = Measurement(A=A_obs, z=observations, z_t=obs_times, noise=0.01)

model = SecondOrderODEInformationWithHidden(
    vf,
    E0=prior_joint.E0_x,        # extracts x
    E1=prior_joint.E1,          # extracts dx/dt
    E2=prior_joint.E2,          # extracts d^2x/dt^2
    E0_hidden=prior_joint.E0_hidden,  # extracts u
    constraints=[obs]
)

Adding Conservation Laws

from ode_filters.measurement import ODEInformation, Conservation
import jax.numpy as np

# Energy conservation: x1 + x2 = 1
cons = Conservation(A=np.array([[1.0, 1.0]]), p=np.array([1.0]))

model = ODEInformation(vf, E0, E1, constraints=[cons])

Black-Box Measurement Models

For cases where the standard ODE structure doesn't fit, use BlackBoxMeasurement to define an arbitrary differentiable measurement function. The Jacobian is computed automatically via JAX autodiff.

Example:

from ode_filters.measurement import BlackBoxMeasurement
import jax.numpy as np

# Custom nonlinear observation: observe squared position and velocity
def custom_g(state, *, t):
    x, v = state[0], state[1]
    return np.array([x**2, v])

model = BlackBoxMeasurement(
    g_func=custom_g,
    state_dim=6,      # full state dimension (e.g., q=2, d=2 -> D=6)
    obs_dim=2,        # observation dimension
    noise=0.01        # measurement noise variance
)

# Use like any other measurement model
H, c = model.linearize(state, t=0.0)
R = model.get_noise(t=0.0)

Transformed Measurement Models

TransformedMeasurement wraps any existing measurement model with a nonlinear state transformation sigma(state). The Jacobian is computed correctly via the chain rule: J_total = J_g(sigma(state)) @ J_sigma(state).

Use cases:

  • Nonlinear coordinate transformations (e.g., polar to Cartesian)
  • Applying constraints like softmax normalization
  • Feature extraction before measurement

Example with autodiff Jacobian:

from ode_filters.measurement import ODEInformation, TransformedMeasurement
from ode_filters.priors import IWP
import jax
import jax.numpy as np

# Base ODE model
def vf(x, *, t):
    return -x

prior = IWP(q=2, d=2)
base_model = ODEInformation(vf, prior.E0, prior.E1)

# Apply softmax to ensure state components sum to 1
def softmax_transform(state):
    x = state[:2]  # extract position components
    x_normalized = jax.nn.softmax(x)
    return state.at[:2].set(x_normalized)

model = TransformedMeasurement(base_model, softmax_transform)

# Jacobian includes chain rule automatically
H, c = model.linearize(state, t=0.0)

Example with explicit Jacobian:

For performance-critical applications, you can provide a custom Jacobian:

def sigma_jacobian(state):
    # Custom Jacobian implementation
    return jax.jacfwd(softmax_transform)(state)

model = TransformedMeasurement(
    base_model,
    softmax_transform,
    use_autodiff_jacobian=False,
    sigma_jacobian=sigma_jacobian
)

API Reference

Measurement model utilities for ODE filtering.

BlackBoxMeasurement

Black-box measurement model with autodiff Jacobian computation.

Allows users to define an arbitrary measurement function g(state, *, t) and automatically computes the Jacobian via JAX autodiff.

Parameters:

Name Type Description Default
g_func Callable[[Array], Array]

Measurement function g(state, *, t) -> observation. Must be a differentiable function compatible with JAX.

required
state_dim int

Dimension of the state vector.

required
obs_dim int

Dimension of the observation vector.

required
noise float | ArrayLike

Measurement noise (scalar, 1D diagonal, or 2D covariance matrix). Default is 0.0 (no noise).

0.0
Example

def custom_g(state, , t): ... # Nonlinear observation: squared position + velocity ... return jnp.array([state[0]*2, state[1]]) measure = BlackBoxMeasurement(custom_g, state_dim=4, obs_dim=2)

Source code in ode_filters/measurement/measurement_models.py
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
class BlackBoxMeasurement:
    """Black-box measurement model with autodiff Jacobian computation.

    Allows users to define an arbitrary measurement function g(state, *, t)
    and automatically computes the Jacobian via JAX autodiff.

    Args:
        g_func: Measurement function g(state, *, t) -> observation.
            Must be a differentiable function compatible with JAX.
        state_dim: Dimension of the state vector.
        obs_dim: Dimension of the observation vector.
        noise: Measurement noise (scalar, 1D diagonal, or 2D covariance matrix).
            Default is 0.0 (no noise).

    Example:
        >>> def custom_g(state, *, t):
        ...     # Nonlinear observation: squared position + velocity
        ...     return jnp.array([state[0]**2, state[1]])
        >>> measure = BlackBoxMeasurement(custom_g, state_dim=4, obs_dim=2)
    """

    def __init__(
        self,
        g_func: Callable[[Array], Array],
        state_dim: int,
        obs_dim: int,
        noise: float | ArrayLike = 0.0,
    ):
        if not callable(g_func):
            raise TypeError("'g_func' must be callable.")
        if not isinstance(state_dim, int) or state_dim <= 0:
            raise ValueError("'state_dim' must be a positive integer.")
        if not isinstance(obs_dim, int) or obs_dim <= 0:
            raise ValueError("'obs_dim' must be a positive integer.")

        self._g_func = g_func
        self._state_dim = state_dim
        self._obs_dim = obs_dim
        self._jacobian_g_func = jax.jacrev(lambda s, t: g_func(s, t=t))

        # Initialize noise matrix
        noise_arr = np.asarray(noise)
        if noise_arr.ndim == 0:
            self._R = noise_arr * np.eye(obs_dim)
        elif noise_arr.ndim == 1:
            if noise_arr.shape[0] != obs_dim:
                raise ValueError(
                    f"Diagonal noise must have length {obs_dim}, got {noise_arr.shape[0]}."
                )
            self._R = np.diag(noise_arr)
        elif noise_arr.ndim == 2:
            if noise_arr.shape != (obs_dim, obs_dim):
                raise ValueError(
                    f"Noise matrix must have shape ({obs_dim}, {obs_dim}), "
                    f"got {noise_arr.shape}."
                )
            self._R = noise_arr
        else:
            raise ValueError("'noise' must be scalar, 1D, or 2D array.")

    @property
    def R(self) -> Array:
        """Measurement noise covariance matrix."""
        return self._R

    @R.setter
    def R(self, value: ArrayLike) -> None:
        """Set the measurement noise covariance matrix."""
        R_arr = np.asarray(value)
        if R_arr.ndim == 0:
            self._R = R_arr * np.eye(self._obs_dim)
        elif R_arr.ndim == 1:
            if R_arr.shape[0] != self._obs_dim:
                raise ValueError(
                    f"Diagonal noise must have length {self._obs_dim}, "
                    f"got {R_arr.shape[0]}."
                )
            self._R = np.diag(R_arr)
        elif R_arr.ndim == 2:
            if R_arr.shape != (self._obs_dim, self._obs_dim):
                raise ValueError(
                    f"Noise matrix must have shape ({self._obs_dim}, {self._obs_dim}), "
                    f"got {R_arr.shape}."
                )
            self._R = R_arr
        else:
            raise ValueError("'noise' must be scalar, 1D, or 2D array.")

    def g(self, state: Array, *, t: float) -> Array:
        """Evaluate the measurement function.

        Args:
            state: State vector of length state_dim.
            t: Current time.

        Returns:
            Observation vector of length obs_dim.
        """
        state_arr = self._validate_state(state)
        return self._g_func(state_arr, t=t)

    def jacobian_g(self, state: Array, *, t: float) -> Array:
        """Compute Jacobian of the measurement function via autodiff.

        Args:
            state: State vector of length state_dim.
            t: Current time.

        Returns:
            Jacobian matrix of shape (obs_dim, state_dim).
        """
        state_arr = self._validate_state(state)
        return self._jacobian_g_func(state_arr, t)

    def get_noise(self, *, t: float) -> Array:
        """Return the square root of the measurement noise covariance.

        Returns an upper-triangular matrix L such that L.T @ L ≈ R.

        Args:
            t: Current time (unused, included for API consistency).

        Returns:
            Upper-triangular square root of the noise covariance matrix.
        """
        return _safe_cholesky_sqr(self._R, self._obs_dim)

    def measurement_times(self) -> Array:
        """No fixed measurement times (the model is always active)."""
        return onp.empty((0,), dtype=float)

    def linearize(self, state: Array, *, t: float) -> tuple[Array, Array]:
        """Linearize the measurement model around the given state.

        Args:
            state: State vector to linearize around.
            t: Current time.

        Returns:
            Tuple of (H_t, c_t) where:
            - H_t is the Jacobian matrix (shape [obs_dim, state_dim])
            - c_t is the constant term (observation offset)
        """
        state_arr = self._validate_state(state)
        # Use VJP to get value and Jacobian in a single forward pass
        g_val, vjp_fn = jax.vjp(lambda s: self._g_func(s, t=t), state_arr)
        H_t = jax.vmap(lambda v: vjp_fn(v)[0])(np.eye(self._obs_dim))
        c_t = g_val - H_t @ state_arr
        return H_t, c_t

    def _validate_state(self, state: Array) -> Array:
        """Validate and convert state to required format."""
        state_arr = np.asarray(state)
        if state_arr.ndim != 1:
            raise ValueError("'state' must be a one-dimensional array.")
        if state_arr.shape[0] != self._state_dim:
            raise ValueError(
                f"'state' must have length {self._state_dim}, got {state_arr.shape[0]}."
            )
        return state_arr

R property writable

Measurement noise covariance matrix.

g(state, *, t)

Evaluate the measurement function.

Parameters:

Name Type Description Default
state Array

State vector of length state_dim.

required
t float

Current time.

required

Returns:

Type Description
Array

Observation vector of length obs_dim.

Source code in ode_filters/measurement/measurement_models.py
951
952
953
954
955
956
957
958
959
960
961
962
def g(self, state: Array, *, t: float) -> Array:
    """Evaluate the measurement function.

    Args:
        state: State vector of length state_dim.
        t: Current time.

    Returns:
        Observation vector of length obs_dim.
    """
    state_arr = self._validate_state(state)
    return self._g_func(state_arr, t=t)

get_noise(*, t)

Return the square root of the measurement noise covariance.

Returns an upper-triangular matrix L such that L.T @ L ≈ R.

Parameters:

Name Type Description Default
t float

Current time (unused, included for API consistency).

required

Returns:

Type Description
Array

Upper-triangular square root of the noise covariance matrix.

Source code in ode_filters/measurement/measurement_models.py
977
978
979
980
981
982
983
984
985
986
987
988
def get_noise(self, *, t: float) -> Array:
    """Return the square root of the measurement noise covariance.

    Returns an upper-triangular matrix L such that L.T @ L ≈ R.

    Args:
        t: Current time (unused, included for API consistency).

    Returns:
        Upper-triangular square root of the noise covariance matrix.
    """
    return _safe_cholesky_sqr(self._R, self._obs_dim)

jacobian_g(state, *, t)

Compute Jacobian of the measurement function via autodiff.

Parameters:

Name Type Description Default
state Array

State vector of length state_dim.

required
t float

Current time.

required

Returns:

Type Description
Array

Jacobian matrix of shape (obs_dim, state_dim).

Source code in ode_filters/measurement/measurement_models.py
964
965
966
967
968
969
970
971
972
973
974
975
def jacobian_g(self, state: Array, *, t: float) -> Array:
    """Compute Jacobian of the measurement function via autodiff.

    Args:
        state: State vector of length state_dim.
        t: Current time.

    Returns:
        Jacobian matrix of shape (obs_dim, state_dim).
    """
    state_arr = self._validate_state(state)
    return self._jacobian_g_func(state_arr, t)

linearize(state, *, t)

Linearize the measurement model around the given state.

Parameters:

Name Type Description Default
state Array

State vector to linearize around.

required
t float

Current time.

required

Returns:

Type Description
Array

Tuple of (H_t, c_t) where:

Array
  • H_t is the Jacobian matrix (shape [obs_dim, state_dim])
tuple[Array, Array]
  • c_t is the constant term (observation offset)
Source code in ode_filters/measurement/measurement_models.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
def linearize(self, state: Array, *, t: float) -> tuple[Array, Array]:
    """Linearize the measurement model around the given state.

    Args:
        state: State vector to linearize around.
        t: Current time.

    Returns:
        Tuple of (H_t, c_t) where:
        - H_t is the Jacobian matrix (shape [obs_dim, state_dim])
        - c_t is the constant term (observation offset)
    """
    state_arr = self._validate_state(state)
    # Use VJP to get value and Jacobian in a single forward pass
    g_val, vjp_fn = jax.vjp(lambda s: self._g_func(s, t=t), state_arr)
    H_t = jax.vmap(lambda v: vjp_fn(v)[0])(np.eye(self._obs_dim))
    c_t = g_val - H_t @ state_arr
    return H_t, c_t

measurement_times()

No fixed measurement times (the model is always active).

Source code in ode_filters/measurement/measurement_models.py
990
991
992
def measurement_times(self) -> Array:
    """No fixed measurement times (the model is always active)."""
    return onp.empty((0,), dtype=float)

Conservation dataclass

Conservation constraint: A @ x = p (always active).

Parameters:

Name Type Description Default
A Array

Constraint matrix (shape [k, d] or [k, state_dim] if full_state=True).

required
p Array

Target values (shape [k]).

required
full_state bool

If True, A operates on the full state X instead of x = E0 @ X.

False
Source code in ode_filters/measurement/measurement_models.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@dataclass(frozen=True)
class Conservation:
    """Conservation constraint: A @ x = p (always active).

    Args:
        A: Constraint matrix (shape [k, d] or [k, state_dim] if full_state=True).
        p: Target values (shape [k]).
        full_state: If True, A operates on the full state X instead of x = E0 @ X.
    """

    A: Array
    p: Array
    full_state: bool = False

    def __post_init__(self) -> None:
        if self.A.shape[0] != self.p.shape[0]:
            raise ValueError(
                f"A.shape[0] ({self.A.shape[0]}) must match p.shape[0] ({self.p.shape[0]})."
            )

    def residual(self, x: Array) -> Array:
        """Compute residual: A @ x - p."""
        return self.A @ x - self.p

    def jacobian(self) -> Array:
        """Return Jacobian (constant): A."""
        return self.A

    @property
    def dim(self) -> int:
        """Dimension of constraint output."""
        return int(self.p.shape[0])

dim property

Dimension of constraint output.

jacobian()

Return Jacobian (constant): A.

Source code in ode_filters/measurement/measurement_models.py
202
203
204
def jacobian(self) -> Array:
    """Return Jacobian (constant): A."""
    return self.A

residual(x)

Compute residual: A @ x - p.

Source code in ode_filters/measurement/measurement_models.py
198
199
200
def residual(self, x: Array) -> Array:
    """Compute residual: A @ x - p."""
    return self.A @ x - self.p

Measurement dataclass

Time-varying linear measurement: A @ x = z[t] (active only at specified times).

This constraint is only active when the filter's current time matches one of the measurement times in z_t. Time matching uses tolerance-based comparison (not exact equality) to handle floating-point discrepancies.

Important

When creating time grids, use the same linspace implementation (preferably jax.numpy.linspace) for both measurement times (z_t) and filter time steps. NumPy and JAX linspace can produce slightly different values (~1e-8 differences) which may cause measurements to be missed with exact comparison.

Parameters:

Name Type Description Default
A Array

Measurement matrix (shape [k, d] or [k, state_dim] if full_state=True).

required
z Array

Measurement values (shape [n, k]).

required
z_t Array

Measurement times (shape [n]). Should use jax.numpy.linspace for consistency with the filter's internal time grid.

required
noise float | Array

Measurement noise variance (scalar or [k] or [k, k]).

DEFAULT_MEASUREMENT_NOISE
full_state bool

If True, A operates on the full state X instead of x = E0 @ X.

False
Source code in ode_filters/measurement/measurement_models.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
@dataclass(frozen=True)
class Measurement:
    """Time-varying linear measurement: A @ x = z[t] (active only at specified times).

    This constraint is only active when the filter's current time matches one of
    the measurement times in z_t. Time matching uses tolerance-based comparison
    (not exact equality) to handle floating-point discrepancies.

    Important:
        When creating time grids, use the same linspace implementation (preferably
        jax.numpy.linspace) for both measurement times (z_t) and filter time steps.
        NumPy and JAX linspace can produce slightly different values (~1e-8 differences)
        which may cause measurements to be missed with exact comparison.

    Args:
        A: Measurement matrix (shape [k, d] or [k, state_dim] if full_state=True).
        z: Measurement values (shape [n, k]).
        z_t: Measurement times (shape [n]). Should use jax.numpy.linspace for
            consistency with the filter's internal time grid.
        noise: Measurement noise variance (scalar or [k] or [k, k]).
        full_state: If True, A operates on the full state X instead of x = E0 @ X.
    """

    A: Array
    z: Array
    z_t: Array
    noise: float | Array = DEFAULT_MEASUREMENT_NOISE
    full_state: bool = False

    def __post_init__(self) -> None:
        if self.A.ndim != 2:
            raise ValueError(f"'A' must be 2D, got {self.A.ndim}D.")
        if self.z.ndim != 2 or self.z.shape[1] != self.A.shape[0]:
            raise ValueError(
                f"'z' must be 2D with shape (n, {self.A.shape[0]}), got {self.z.shape}."
            )
        # Normalize z_t to 1D
        z_t = self.z_t
        if z_t.ndim == 2 and z_t.shape[1] == 1:
            object.__setattr__(self, "z_t", z_t.reshape(-1))
        elif z_t.ndim != 1:
            raise ValueError(
                f"'z_t' must be 1D shape (n,) or 2D shape (n, 1), got {z_t.shape}."
            )
        if self.z_t.shape[0] != self.z.shape[0]:
            raise ValueError(
                f"'z_t' length must match number of measurements {self.z.shape[0]}, "
                f"got {self.z_t.shape[0]}."
            )

    def find_index(
        self,
        t: float,
        rtol: float = MEASUREMENT_TIME_RTOL,
        atol: float = MEASUREMENT_TIME_ATOL,
    ) -> int | None:
        """Find measurement index for time t, or None if not found.

        Uses binary search with tolerance for robust floating-point comparison.
        Times are assumed to be sorted.

        Note: Default tolerances are set to handle discrepancies between
        NumPy and JAX linspace implementations, which can differ by ~1e-8
        for typical time grids.
        """
        z_t = onp.asarray(self.z_t)
        t_val = float(t)
        idx = int(onp.searchsorted(z_t, t_val))
        # Check the found position and neighbors for a close match
        for i in [idx, idx - 1]:
            if 0 <= i < len(z_t) and onp.isclose(z_t[i], t_val, rtol=rtol, atol=atol):
                return i
        return None

    def residual(self, x: Array, t: float) -> Array | None:
        """Compute residual: A @ x - z[t], or None if no measurement at t."""
        idx = self.find_index(t)
        if idx is None:
            return None
        return self.A @ x - self.z[idx]

    def jacobian(self, t: float) -> Array | None:
        """Return Jacobian (constant): A, or None if no measurement at t."""
        if self.find_index(t) is None:
            return None
        return self.A

    @property
    def dim(self) -> int:
        """Dimension of measurement output."""
        return int(self.A.shape[0])

    def get_noise_matrix(self) -> Array:
        """Get noise covariance matrix."""
        noise = np.asarray(self.noise)
        if noise.ndim == 0:
            return noise * np.eye(self.dim)
        elif noise.ndim == 1:
            return np.diag(noise)
        return noise

dim property

Dimension of measurement output.

find_index(t, rtol=MEASUREMENT_TIME_RTOL, atol=MEASUREMENT_TIME_ATOL)

Find measurement index for time t, or None if not found.

Uses binary search with tolerance for robust floating-point comparison. Times are assumed to be sorted.

Note: Default tolerances are set to handle discrepancies between NumPy and JAX linspace implementations, which can differ by ~1e-8 for typical time grids.

Source code in ode_filters/measurement/measurement_models.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def find_index(
    self,
    t: float,
    rtol: float = MEASUREMENT_TIME_RTOL,
    atol: float = MEASUREMENT_TIME_ATOL,
) -> int | None:
    """Find measurement index for time t, or None if not found.

    Uses binary search with tolerance for robust floating-point comparison.
    Times are assumed to be sorted.

    Note: Default tolerances are set to handle discrepancies between
    NumPy and JAX linspace implementations, which can differ by ~1e-8
    for typical time grids.
    """
    z_t = onp.asarray(self.z_t)
    t_val = float(t)
    idx = int(onp.searchsorted(z_t, t_val))
    # Check the found position and neighbors for a close match
    for i in [idx, idx - 1]:
        if 0 <= i < len(z_t) and onp.isclose(z_t[i], t_val, rtol=rtol, atol=atol):
            return i
    return None

get_noise_matrix()

Get noise covariance matrix.

Source code in ode_filters/measurement/measurement_models.py
304
305
306
307
308
309
310
311
def get_noise_matrix(self) -> Array:
    """Get noise covariance matrix."""
    noise = np.asarray(self.noise)
    if noise.ndim == 0:
        return noise * np.eye(self.dim)
    elif noise.ndim == 1:
        return np.diag(noise)
    return noise

jacobian(t)

Return Jacobian (constant): A, or None if no measurement at t.

Source code in ode_filters/measurement/measurement_models.py
293
294
295
296
297
def jacobian(self, t: float) -> Array | None:
    """Return Jacobian (constant): A, or None if no measurement at t."""
    if self.find_index(t) is None:
        return None
    return self.A

residual(x, t)

Compute residual: A @ x - z[t], or None if no measurement at t.

Source code in ode_filters/measurement/measurement_models.py
286
287
288
289
290
291
def residual(self, x: Array, t: float) -> Array | None:
    """Compute residual: A @ x - z[t], or None if no measurement at t."""
    idx = self.find_index(t)
    if idx is None:
        return None
    return self.A @ x - self.z[idx]

ODEInformation

Bases: BaseODEInformation

First-order ODE measurement model: dx/dt = f(x, t).

Parameters:

Name Type Description Default
vf Callable[[Array], Array]

Vector field function vf(x, *, t) -> dx/dt.

required
E0 ArrayLike

State extraction matrix (shape [d, D]).

required
E1 ArrayLike

First derivative extraction matrix (shape [d, D]).

required
constraints list[Conservation | Measurement] | None

Optional list of Conservation and/or Measurement constraints.

None
Source code in ode_filters/measurement/measurement_models.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
class ODEInformation(BaseODEInformation):
    """First-order ODE measurement model: dx/dt = f(x, t).

    Args:
        vf: Vector field function vf(x, *, t) -> dx/dt.
        E0: State extraction matrix (shape [d, D]).
        E1: First derivative extraction matrix (shape [d, D]).
        constraints: Optional list of Conservation and/or Measurement constraints.
    """

    def __init__(
        self,
        vf: Callable[[Array], Array],
        E0: ArrayLike,
        E1: ArrayLike,
        constraints: list[Conservation | Measurement] | None = None,
    ):
        self._vf = vf
        self._E0 = np.asarray(E0)
        self._E1 = np.asarray(E1)
        self._d = self._E0.shape[0]
        self._state_dim = self._E0.shape[1]
        self._E_constraint = self._E1
        self._base_R = np.zeros((self._d, self._d))
        self._jacobian_vf = jax.jacfwd(self._vf)
        self._init_constraints(constraints)

    def _ode_residual(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        vf_eval = self._vf(x, t=t)
        return self._E_constraint @ state - vf_eval

    def _ode_jacobian(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        jac_vf = self._jacobian_vf(x, t=t)
        return self._E_constraint - jac_vf @ self._E0

ODEInformationWithHidden

Bases: BaseODEInformation

First-order ODE with hidden states: dx/dt = f(x, u, t).

For joint state-parameter estimation where u is a hidden parameter that appears in the dynamics but evolves according to its own prior.

Parameters:

Name Type Description Default
vf Callable[[Array, Array], Array]

Vector field function vf(x, u, *, t) -> dx/dt.

required
E0 ArrayLike

State extraction matrix for x (shape [d_x, D]).

required
E1 ArrayLike

First derivative extraction matrix (shape [d_x, D]).

required
E0_hidden ArrayLike

Hidden state extraction matrix for u (shape [d_u, D]).

required
constraints list[Conservation | Measurement] | None

Optional list of Conservation and/or Measurement constraints.

None
Source code in ode_filters/measurement/measurement_models.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
class ODEInformationWithHidden(BaseODEInformation):
    """First-order ODE with hidden states: dx/dt = f(x, u, t).

    For joint state-parameter estimation where u is a hidden parameter
    that appears in the dynamics but evolves according to its own prior.

    Args:
        vf: Vector field function vf(x, u, *, t) -> dx/dt.
        E0: State extraction matrix for x (shape [d_x, D]).
        E1: First derivative extraction matrix (shape [d_x, D]).
        E0_hidden: Hidden state extraction matrix for u (shape [d_u, D]).
        constraints: Optional list of Conservation and/or Measurement constraints.
    """

    def __init__(
        self,
        vf: Callable[[Array, Array], Array],
        E0: ArrayLike,
        E1: ArrayLike,
        E0_hidden: ArrayLike,
        constraints: list[Conservation | Measurement] | None = None,
    ):
        self._vf = vf
        self._E0 = np.asarray(E0)
        self._E1 = np.asarray(E1)
        self._E0_hidden = np.asarray(E0_hidden)
        self._d = self._E0.shape[0]
        self._state_dim = self._E0.shape[1]
        self._E_constraint = self._E1
        self._base_R = np.zeros((self._d, self._d))
        self._jacobian_vf_x = jax.jacfwd(self._vf, argnums=0)
        self._jacobian_vf_u = jax.jacfwd(self._vf, argnums=1)
        self._init_constraints(constraints)

    def _ode_residual(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        u = self._E0_hidden @ state
        vf_eval = self._vf(x, u, t=t)
        return self._E_constraint @ state - vf_eval

    def _ode_jacobian(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        u = self._E0_hidden @ state
        jac_vf_x = self._jacobian_vf_x(x, u, t=t)
        jac_vf_u = self._jacobian_vf_u(x, u, t=t)
        return self._E_constraint - jac_vf_x @ self._E0 - jac_vf_u @ self._E0_hidden

ObsModel

Bases: NamedTuple

Pre-computed observation data for sequential filtering.

Describes linear, time-invariant observations where the observation matrix H and noise covariance are constant across time and only the measurement values vary per step. Built by :func:prepare_observations from a list of :class:Measurement objects and a time grid.

Attributes:

Name Type Description
H Array

Observation Jacobian (constant), shape [obs_dim, state_dim].

R_sqr Array

Square-root noise covariance for active observations (constant), shape [obs_dim, obs_dim].

c_seq Array

Observation offsets per step, shape [N, obs_dim]. Equal to -z[idx] when the observation is active, zero otherwise.

mask Array

Boolean mask for active dimensions, shape [N, obs_dim].

Source code in ode_filters/measurement/measurement_models.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ObsModel(NamedTuple):
    """Pre-computed observation data for sequential filtering.

    Describes linear, time-invariant observations where the observation
    matrix ``H`` and noise covariance are constant across time and only
    the measurement values vary per step.  Built by
    :func:`prepare_observations` from a list of :class:`Measurement`
    objects and a time grid.

    Attributes:
        H: Observation Jacobian (constant), shape ``[obs_dim, state_dim]``.
        R_sqr: Square-root noise covariance for active observations
            (constant), shape ``[obs_dim, obs_dim]``.
        c_seq: Observation offsets per step, shape ``[N, obs_dim]``.
            Equal to ``-z[idx]`` when the observation is active, zero
            otherwise.
        mask: Boolean mask for active dimensions, shape ``[N, obs_dim]``.
    """

    H: Array
    R_sqr: Array
    c_seq: Array
    mask: Array

SecondOrderODEInformation

Bases: BaseODEInformation

Second-order ODE measurement model: d^2x/dt^2 = f(x, v, t).

Parameters:

Name Type Description Default
vf Callable[[Array, Array], Array]

Vector field function vf(x, v, *, t) -> d^2x/dt^2.

required
E0 ArrayLike

State extraction matrix (shape [d, D]).

required
E1 ArrayLike

First derivative extraction matrix (shape [d, D]).

required
E2 ArrayLike

Second derivative extraction matrix (shape [d, D]).

required
constraints list[Conservation | Measurement] | None

Optional list of Conservation and/or Measurement constraints.

None
Source code in ode_filters/measurement/measurement_models.py
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
class SecondOrderODEInformation(BaseODEInformation):
    """Second-order ODE measurement model: d^2x/dt^2 = f(x, v, t).

    Args:
        vf: Vector field function vf(x, v, *, t) -> d^2x/dt^2.
        E0: State extraction matrix (shape [d, D]).
        E1: First derivative extraction matrix (shape [d, D]).
        E2: Second derivative extraction matrix (shape [d, D]).
        constraints: Optional list of Conservation and/or Measurement constraints.
    """

    def __init__(
        self,
        vf: Callable[[Array, Array], Array],
        E0: ArrayLike,
        E1: ArrayLike,
        E2: ArrayLike,
        constraints: list[Conservation | Measurement] | None = None,
    ):
        self._vf = vf
        self._E0 = np.asarray(E0)
        self._E1 = np.asarray(E1)
        self._E2 = np.asarray(E2)
        self._d = self._E0.shape[0]
        self._state_dim = self._E0.shape[1]
        self._E_constraint = self._E2
        self._base_R = np.zeros((self._d, self._d))
        self._jacobian_vf_x = jax.jacfwd(self._vf, argnums=0)
        self._jacobian_vf_v = jax.jacfwd(self._vf, argnums=1)
        self._init_constraints(constraints)

    def _ode_residual(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        v = self._E1 @ state
        vf_eval = self._vf(x, v, t=t)
        return self._E_constraint @ state - vf_eval

    def _ode_jacobian(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        v = self._E1 @ state
        jac_x = self._jacobian_vf_x(x, v, t=t)
        jac_v = self._jacobian_vf_v(x, v, t=t)
        return self._E_constraint - jac_x @ self._E0 - jac_v @ self._E1

SecondOrderODEInformationWithHidden

Bases: BaseODEInformation

Second-order ODE with hidden states: d^2x/dt^2 = f(x, v, u, t).

For joint state-parameter estimation where u is a hidden parameter that appears in the dynamics but evolves according to its own prior.

Parameters:

Name Type Description Default
vf Callable[[Array, Array, Array], Array]

Vector field function vf(x, v, u, *, t) -> d^2x/dt^2.

required
E0 ArrayLike

State extraction matrix for x (shape [d_x, D]).

required
E1 ArrayLike

First derivative extraction matrix (shape [d_x, D]).

required
E2 ArrayLike

Second derivative extraction matrix (shape [d_x, D]).

required
E0_hidden ArrayLike

Hidden state extraction matrix for u (shape [d_u, D]).

required
constraints list[Conservation | Measurement] | None

Optional list of Conservation and/or Measurement constraints.

None
Source code in ode_filters/measurement/measurement_models.py
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
class SecondOrderODEInformationWithHidden(BaseODEInformation):
    """Second-order ODE with hidden states: d^2x/dt^2 = f(x, v, u, t).

    For joint state-parameter estimation where u is a hidden parameter
    that appears in the dynamics but evolves according to its own prior.

    Args:
        vf: Vector field function vf(x, v, u, *, t) -> d^2x/dt^2.
        E0: State extraction matrix for x (shape [d_x, D]).
        E1: First derivative extraction matrix (shape [d_x, D]).
        E2: Second derivative extraction matrix (shape [d_x, D]).
        E0_hidden: Hidden state extraction matrix for u (shape [d_u, D]).
        constraints: Optional list of Conservation and/or Measurement constraints.
    """

    def __init__(
        self,
        vf: Callable[[Array, Array, Array], Array],
        E0: ArrayLike,
        E1: ArrayLike,
        E2: ArrayLike,
        E0_hidden: ArrayLike,
        constraints: list[Conservation | Measurement] | None = None,
    ):
        self._vf = vf
        self._E0 = np.asarray(E0)
        self._E1 = np.asarray(E1)
        self._E2 = np.asarray(E2)
        self._E0_hidden = np.asarray(E0_hidden)
        self._d = self._E0.shape[0]
        self._state_dim = self._E0.shape[1]
        self._E_constraint = self._E2
        self._base_R = np.zeros((self._d, self._d))
        self._jacobian_vf_x = jax.jacfwd(self._vf, argnums=0)
        self._jacobian_vf_v = jax.jacfwd(self._vf, argnums=1)
        self._jacobian_vf_u = jax.jacfwd(self._vf, argnums=2)
        self._init_constraints(constraints)

    def _ode_residual(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        v = self._E1 @ state
        u = self._E0_hidden @ state
        vf_eval = self._vf(x, v, u, t=t)
        return self._E_constraint @ state - vf_eval

    def _ode_jacobian(self, state: Array, *, t: float) -> Array:
        x = self._E0 @ state
        v = self._E1 @ state
        u = self._E0_hidden @ state
        jac_x = self._jacobian_vf_x(x, v, u, t=t)
        jac_v = self._jacobian_vf_v(x, v, u, t=t)
        jac_u = self._jacobian_vf_u(x, v, u, t=t)
        return (
            self._E_constraint
            - jac_x @ self._E0
            - jac_v @ self._E1
            - jac_u @ self._E0_hidden
        )

TransformedMeasurement

Wrapper that applies a nonlinear state transformation before measurement.

Given a base measurement model and a transformation sigma(state), this class computes g(sigma(state)) with proper chain-rule Jacobian: J_total = J_g(sigma(state)) @ J_sigma(state)

This is useful for: - Nonlinear coordinate transformations - Feature extraction before measurement - Applying learned transformations to the state

Parameters:

Name Type Description Default
base_model BaseODEInformation | BlackBoxMeasurement

Base measurement model with g, jacobian_g, get_noise, linearize.

required
sigma Callable[[Array], Array]

State transformation function sigma(state) -> transformed_state. Must be differentiable and compatible with JAX.

required
use_autodiff_jacobian bool

If True (default), compute J_sigma via autodiff. If False, expect sigma_jacobian to be provided.

True
sigma_jacobian Callable[[Array], Array] | None

Optional explicit Jacobian function for sigma. If provided and use_autodiff_jacobian=False, this will be used instead of autodiff.

None
Example

Apply softmax transformation to state before ODE measurement

def softmax_transform(state): ... # Transform first 3 components via softmax ... x = state[:3] ... x_soft = jax.nn.softmax(x) ... return state.at[:3].set(x_soft) base = ODEInformation(vf, E0, E1) transformed = TransformedMeasurement(base, softmax_transform)

Source code in ode_filters/measurement/measurement_models.py
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
class TransformedMeasurement:
    """Wrapper that applies a nonlinear state transformation before measurement.

    Given a base measurement model and a transformation sigma(state),
    this class computes g(sigma(state)) with proper chain-rule Jacobian:
        J_total = J_g(sigma(state)) @ J_sigma(state)

    This is useful for:
    - Nonlinear coordinate transformations
    - Feature extraction before measurement
    - Applying learned transformations to the state

    Args:
        base_model: Base measurement model with g, jacobian_g, get_noise, linearize.
        sigma: State transformation function sigma(state) -> transformed_state.
            Must be differentiable and compatible with JAX.
        use_autodiff_jacobian: If True (default), compute J_sigma via autodiff.
            If False, expect sigma_jacobian to be provided.
        sigma_jacobian: Optional explicit Jacobian function for sigma.
            If provided and use_autodiff_jacobian=False, this will be used
            instead of autodiff.

    Example:
        >>> # Apply softmax transformation to state before ODE measurement
        >>> def softmax_transform(state):
        ...     # Transform first 3 components via softmax
        ...     x = state[:3]
        ...     x_soft = jax.nn.softmax(x)
        ...     return state.at[:3].set(x_soft)
        >>> base = ODEInformation(vf, E0, E1)
        >>> transformed = TransformedMeasurement(base, softmax_transform)
    """

    def __init__(
        self,
        base_model: BaseODEInformation | BlackBoxMeasurement,
        sigma: Callable[[Array], Array],
        use_autodiff_jacobian: bool = True,
        sigma_jacobian: Callable[[Array], Array] | None = None,
    ):
        if not callable(sigma):
            raise TypeError("'sigma' must be callable.")

        self._base = base_model
        self._sigma = sigma
        self._use_autodiff = use_autodiff_jacobian

        if use_autodiff_jacobian:
            self._jacobian_sigma = jax.jacfwd(sigma)
        elif sigma_jacobian is not None:
            self._jacobian_sigma = sigma_jacobian
        else:
            raise ValueError(
                "Must provide 'sigma_jacobian' when 'use_autodiff_jacobian=False'."
            )

    @property
    def R(self) -> Array:
        """Measurement noise covariance matrix (delegated to base model)."""
        return self._base.R

    @R.setter
    def R(self, value: ArrayLike) -> None:
        """Set the measurement noise covariance matrix on the base model."""
        self._base.R = value

    def g(self, state: Array, *, t: float) -> Array:
        """Evaluate the measurement function on transformed state.

        Computes g(sigma(state), t).

        Args:
            state: Original state vector.
            t: Current time.

        Returns:
            Observation vector.
        """
        transformed = self._sigma(state)
        return self._base.g(transformed, t=t)

    def jacobian_g(self, state: Array, *, t: float) -> Array:
        """Compute Jacobian with chain rule: J_g(sigma(state)) @ J_sigma(state).

        Args:
            state: Original state vector.
            t: Current time.

        Returns:
            Jacobian matrix of the composed measurement model.
        """
        transformed = self._sigma(state)
        J_base = self._base.jacobian_g(transformed, t=t)
        J_sigma = self._jacobian_sigma(state)
        return J_base @ J_sigma

    def get_noise(self, *, t: float) -> Array:
        """Return the square root of the noise covariance (delegated to base model).

        Args:
            t: Current time.

        Returns:
            Upper-triangular square root of the noise covariance matrix.
        """
        return self._base.get_noise(t=t)

    def measurement_times(self) -> Array:
        """Forward the base model's fixed measurement times, if any."""
        base_fn = getattr(self._base, "measurement_times", None)
        if base_fn is None:
            return onp.empty((0,), dtype=float)
        return onp.asarray(base_fn())

    def linearize(self, state: Array, *, t: float) -> tuple[Array, Array]:
        """Linearize the transformed measurement model.

        Args:
            state: Original state vector to linearize around.
            t: Current time.

        Returns:
            Tuple of (H_t, c_t) where:
            - H_t is the Jacobian of the composed model
            - c_t is the constant term (observation offset)
        """
        transformed = self._sigma(state)
        J_sigma = self._jacobian_sigma(state)
        J_base = self._base.jacobian_g(transformed, t=t)
        g_val = self._base.g(transformed, t=t)
        H_t = J_base @ J_sigma
        c_t = g_val - H_t @ state
        return H_t, c_t

R property writable

Measurement noise covariance matrix (delegated to base model).

g(state, *, t)

Evaluate the measurement function on transformed state.

Computes g(sigma(state), t).

Parameters:

Name Type Description Default
state Array

Original state vector.

required
t float

Current time.

required

Returns:

Type Description
Array

Observation vector.

Source code in ode_filters/measurement/measurement_models.py
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
def g(self, state: Array, *, t: float) -> Array:
    """Evaluate the measurement function on transformed state.

    Computes g(sigma(state), t).

    Args:
        state: Original state vector.
        t: Current time.

    Returns:
        Observation vector.
    """
    transformed = self._sigma(state)
    return self._base.g(transformed, t=t)

get_noise(*, t)

Return the square root of the noise covariance (delegated to base model).

Parameters:

Name Type Description Default
t float

Current time.

required

Returns:

Type Description
Array

Upper-triangular square root of the noise covariance matrix.

Source code in ode_filters/measurement/measurement_models.py
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
def get_noise(self, *, t: float) -> Array:
    """Return the square root of the noise covariance (delegated to base model).

    Args:
        t: Current time.

    Returns:
        Upper-triangular square root of the noise covariance matrix.
    """
    return self._base.get_noise(t=t)

jacobian_g(state, *, t)

Compute Jacobian with chain rule: J_g(sigma(state)) @ J_sigma(state).

Parameters:

Name Type Description Default
state Array

Original state vector.

required
t float

Current time.

required

Returns:

Type Description
Array

Jacobian matrix of the composed measurement model.

Source code in ode_filters/measurement/measurement_models.py
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
def jacobian_g(self, state: Array, *, t: float) -> Array:
    """Compute Jacobian with chain rule: J_g(sigma(state)) @ J_sigma(state).

    Args:
        state: Original state vector.
        t: Current time.

    Returns:
        Jacobian matrix of the composed measurement model.
    """
    transformed = self._sigma(state)
    J_base = self._base.jacobian_g(transformed, t=t)
    J_sigma = self._jacobian_sigma(state)
    return J_base @ J_sigma

linearize(state, *, t)

Linearize the transformed measurement model.

Parameters:

Name Type Description Default
state Array

Original state vector to linearize around.

required
t float

Current time.

required

Returns:

Type Description
Array

Tuple of (H_t, c_t) where:

Array
  • H_t is the Jacobian of the composed model
tuple[Array, Array]
  • c_t is the constant term (observation offset)
Source code in ode_filters/measurement/measurement_models.py
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
def linearize(self, state: Array, *, t: float) -> tuple[Array, Array]:
    """Linearize the transformed measurement model.

    Args:
        state: Original state vector to linearize around.
        t: Current time.

    Returns:
        Tuple of (H_t, c_t) where:
        - H_t is the Jacobian of the composed model
        - c_t is the constant term (observation offset)
    """
    transformed = self._sigma(state)
    J_sigma = self._jacobian_sigma(state)
    J_base = self._base.jacobian_g(transformed, t=t)
    g_val = self._base.g(transformed, t=t)
    H_t = J_base @ J_sigma
    c_t = g_val - H_t @ state
    return H_t, c_t

measurement_times()

Forward the base model's fixed measurement times, if any.

Source code in ode_filters/measurement/measurement_models.py
1132
1133
1134
1135
1136
1137
def measurement_times(self) -> Array:
    """Forward the base model's fixed measurement times, if any."""
    base_fn = getattr(self._base, "measurement_times", None)
    if base_fn is None:
        return onp.empty((0,), dtype=float)
    return onp.asarray(base_fn())

ODEconservation(vf, E0, E1, A, p)

Create first-order ODE model with conservation constraint.

Parameters:

Name Type Description Default
vf Callable

Vector field function vf(x, *, t) -> dx/dt.

required
E0 ArrayLike

State extraction matrix.

required
E1 ArrayLike

Derivative extraction matrix.

required
A Array

Conservation constraint matrix (shape [k, d]).

required
p Array

Conservation target values (shape [k]).

required

Returns:

Type Description
ODEInformation

ODEInformation with conservation constraint.

Source code in ode_filters/measurement/measurement_models.py
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
def ODEconservation(
    vf: Callable,
    E0: ArrayLike,
    E1: ArrayLike,
    A: Array,
    p: Array,
) -> ODEInformation:
    """Create first-order ODE model with conservation constraint.

    Args:
        vf: Vector field function vf(x, *, t) -> dx/dt.
        E0: State extraction matrix.
        E1: Derivative extraction matrix.
        A: Conservation constraint matrix (shape [k, d]).
        p: Conservation target values (shape [k]).

    Returns:
        ODEInformation with conservation constraint.
    """
    return ODEInformation(vf, E0, E1, constraints=[Conservation(A, p)])

ODEconservationmeasurement(vf, E0, E1, C, p, A, z, z_t, measurement_noise=DEFAULT_MEASUREMENT_NOISE)

Create first-order ODE model with conservation and measurements.

Parameters:

Name Type Description Default
vf Callable

Vector field function vf(x, *, t) -> dx/dt.

required
E0 ArrayLike

State extraction matrix.

required
E1 ArrayLike

Derivative extraction matrix.

required
C Array

Conservation constraint matrix (shape [m, d]).

required
p Array

Conservation target values (shape [m]).

required
A Array

Measurement matrix (shape [k, d]).

required
z Array

Measurement values (shape [n, k]).

required
z_t Array

Measurement times (shape [n]).

required
measurement_noise float

Measurement noise variance.

DEFAULT_MEASUREMENT_NOISE

Returns:

Type Description
ODEInformation

ODEInformation with conservation and measurement constraints.

Source code in ode_filters/measurement/measurement_models.py
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
def ODEconservationmeasurement(
    vf: Callable,
    E0: ArrayLike,
    E1: ArrayLike,
    C: Array,
    p: Array,
    A: Array,
    z: Array,
    z_t: Array,
    measurement_noise: float = DEFAULT_MEASUREMENT_NOISE,
) -> ODEInformation:
    """Create first-order ODE model with conservation and measurements.

    Args:
        vf: Vector field function vf(x, *, t) -> dx/dt.
        E0: State extraction matrix.
        E1: Derivative extraction matrix.
        C: Conservation constraint matrix (shape [m, d]).
        p: Conservation target values (shape [m]).
        A: Measurement matrix (shape [k, d]).
        z: Measurement values (shape [n, k]).
        z_t: Measurement times (shape [n]).
        measurement_noise: Measurement noise variance.

    Returns:
        ODEInformation with conservation and measurement constraints.
    """
    return ODEInformation(
        vf,
        E0,
        E1,
        constraints=[Conservation(C, p), Measurement(A, z, z_t, measurement_noise)],
    )

ODEmeasurement(vf, E0, E1, A, z, z_t, measurement_noise=DEFAULT_MEASUREMENT_NOISE)

Create first-order ODE model with linear measurements.

Parameters:

Name Type Description Default
vf Callable

Vector field function vf(x, *, t) -> dx/dt.

required
E0 ArrayLike

State extraction matrix.

required
E1 ArrayLike

Derivative extraction matrix.

required
A Array

Measurement matrix (shape [k, d]).

required
z Array

Measurement values (shape [n, k]).

required
z_t Array

Measurement times (shape [n]).

required
measurement_noise float

Measurement noise variance.

DEFAULT_MEASUREMENT_NOISE

Returns:

Type Description
ODEInformation

ODEInformation with measurement constraint.

Source code in ode_filters/measurement/measurement_models.py
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
def ODEmeasurement(
    vf: Callable,
    E0: ArrayLike,
    E1: ArrayLike,
    A: Array,
    z: Array,
    z_t: Array,
    measurement_noise: float = DEFAULT_MEASUREMENT_NOISE,
) -> ODEInformation:
    """Create first-order ODE model with linear measurements.

    Args:
        vf: Vector field function vf(x, *, t) -> dx/dt.
        E0: State extraction matrix.
        E1: Derivative extraction matrix.
        A: Measurement matrix (shape [k, d]).
        z: Measurement values (shape [n, k]).
        z_t: Measurement times (shape [n]).
        measurement_noise: Measurement noise variance.

    Returns:
        ODEInformation with measurement constraint.
    """
    return ODEInformation(
        vf, E0, E1, constraints=[Measurement(A, z, z_t, measurement_noise)]
    )

SecondOrderODEconservation(vf, E0, E1, E2, A, p)

Create second-order ODE model with conservation constraint.

Parameters:

Name Type Description Default
vf Callable

Vector field function vf(x, v, *, t) -> d^2x/dt^2.

required
E0 ArrayLike

State extraction matrix.

required
E1 ArrayLike

First derivative extraction matrix.

required
E2 ArrayLike

Second derivative extraction matrix.

required
A Array

Conservation constraint matrix (shape [k, d]).

required
p Array

Conservation target values (shape [k]).

required

Returns:

Type Description
SecondOrderODEInformation

SecondOrderODEInformation with conservation constraint.

Source code in ode_filters/measurement/measurement_models.py
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
def SecondOrderODEconservation(
    vf: Callable,
    E0: ArrayLike,
    E1: ArrayLike,
    E2: ArrayLike,
    A: Array,
    p: Array,
) -> SecondOrderODEInformation:
    """Create second-order ODE model with conservation constraint.

    Args:
        vf: Vector field function vf(x, v, *, t) -> d^2x/dt^2.
        E0: State extraction matrix.
        E1: First derivative extraction matrix.
        E2: Second derivative extraction matrix.
        A: Conservation constraint matrix (shape [k, d]).
        p: Conservation target values (shape [k]).

    Returns:
        SecondOrderODEInformation with conservation constraint.
    """
    return SecondOrderODEInformation(vf, E0, E1, E2, constraints=[Conservation(A, p)])

SecondOrderODEconservationmeasurement(vf, E0, E1, E2, C, p, A, z, z_t, measurement_noise=DEFAULT_MEASUREMENT_NOISE)

Create second-order ODE model with conservation and measurements.

Parameters:

Name Type Description Default
vf Callable

Vector field function vf(x, v, *, t) -> d^2x/dt^2.

required
E0 ArrayLike

State extraction matrix.

required
E1 ArrayLike

First derivative extraction matrix.

required
E2 ArrayLike

Second derivative extraction matrix.

required
C Array

Conservation constraint matrix (shape [m, d]).

required
p Array

Conservation target values (shape [m]).

required
A Array

Measurement matrix (shape [k, d]).

required
z Array

Measurement values (shape [n, k]).

required
z_t Array

Measurement times (shape [n]).

required
measurement_noise float

Measurement noise variance.

DEFAULT_MEASUREMENT_NOISE

Returns:

Type Description
SecondOrderODEInformation

SecondOrderODEInformation with conservation and measurement constraints.

Source code in ode_filters/measurement/measurement_models.py
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
def SecondOrderODEconservationmeasurement(
    vf: Callable,
    E0: ArrayLike,
    E1: ArrayLike,
    E2: ArrayLike,
    C: Array,
    p: Array,
    A: Array,
    z: Array,
    z_t: Array,
    measurement_noise: float = DEFAULT_MEASUREMENT_NOISE,
) -> SecondOrderODEInformation:
    """Create second-order ODE model with conservation and measurements.

    Args:
        vf: Vector field function vf(x, v, *, t) -> d^2x/dt^2.
        E0: State extraction matrix.
        E1: First derivative extraction matrix.
        E2: Second derivative extraction matrix.
        C: Conservation constraint matrix (shape [m, d]).
        p: Conservation target values (shape [m]).
        A: Measurement matrix (shape [k, d]).
        z: Measurement values (shape [n, k]).
        z_t: Measurement times (shape [n]).
        measurement_noise: Measurement noise variance.

    Returns:
        SecondOrderODEInformation with conservation and measurement constraints.
    """
    return SecondOrderODEInformation(
        vf,
        E0,
        E1,
        E2,
        constraints=[Conservation(C, p), Measurement(A, z, z_t, measurement_noise)],
    )

SecondOrderODEmeasurement(vf, E0, E1, E2, A, z, z_t, measurement_noise=DEFAULT_MEASUREMENT_NOISE)

Create second-order ODE model with linear measurements.

Parameters:

Name Type Description Default
vf Callable

Vector field function vf(x, v, *, t) -> d^2x/dt^2.

required
E0 ArrayLike

State extraction matrix.

required
E1 ArrayLike

First derivative extraction matrix.

required
E2 ArrayLike

Second derivative extraction matrix.

required
A Array

Measurement matrix (shape [k, d]).

required
z Array

Measurement values (shape [n, k]).

required
z_t Array

Measurement times (shape [n]).

required
measurement_noise float

Measurement noise variance.

DEFAULT_MEASUREMENT_NOISE

Returns:

Type Description
SecondOrderODEInformation

SecondOrderODEInformation with measurement constraint.

Source code in ode_filters/measurement/measurement_models.py
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
def SecondOrderODEmeasurement(
    vf: Callable,
    E0: ArrayLike,
    E1: ArrayLike,
    E2: ArrayLike,
    A: Array,
    z: Array,
    z_t: Array,
    measurement_noise: float = DEFAULT_MEASUREMENT_NOISE,
) -> SecondOrderODEInformation:
    """Create second-order ODE model with linear measurements.

    Args:
        vf: Vector field function vf(x, v, *, t) -> d^2x/dt^2.
        E0: State extraction matrix.
        E1: First derivative extraction matrix.
        E2: Second derivative extraction matrix.
        A: Measurement matrix (shape [k, d]).
        z: Measurement values (shape [n, k]).
        z_t: Measurement times (shape [n]).
        measurement_noise: Measurement noise variance.

    Returns:
        SecondOrderODEInformation with measurement constraint.
    """
    return SecondOrderODEInformation(
        vf, E0, E1, E2, constraints=[Measurement(A, z, z_t, measurement_noise)]
    )

build_obs_at_time(observations, E0, t)

Build an observation update tuple for a single time step.

Returns (H, c, R_sqr) for the active observations at time t, or None when no observation is active.

Parameters:

Name Type Description Default
observations list[Measurement]

List of Measurement constraints.

required
E0 ArrayLike

State extraction matrix, shape [d, state_dim].

required
t float

Current time.

required

Returns:

Type Description
tuple[Array, Array, Array] | None

Tuple (H, c, R_sqr) or None.

Source code in ode_filters/measurement/measurement_models.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def build_obs_at_time(
    observations: list[Measurement],
    E0: ArrayLike,
    t: float,
) -> tuple[Array, Array, Array] | None:
    """Build an observation update tuple for a single time step.

    Returns ``(H, c, R_sqr)`` for the active observations at time *t*,
    or ``None`` when no observation is active.

    Args:
        observations: List of Measurement constraints.
        E0: State extraction matrix, shape ``[d, state_dim]``.
        t: Current time.

    Returns:
        Tuple ``(H, c, R_sqr)`` or ``None``.
    """
    E0 = np.asarray(E0)
    active: list[tuple[Measurement, int]] = []
    for m in observations:
        idx = m.find_index(t)
        if idx is not None:
            active.append((m, idx))
    if not active:
        return None

    jacobians = []
    offsets = []
    for m, idx in active:
        jacobians.append(m.A if m.full_state else m.A @ E0)
        offsets.append(-m.z[idx])

    H = np.concatenate(jacobians) if len(jacobians) > 1 else jacobians[0]
    c = np.concatenate(offsets) if len(offsets) > 1 else offsets[0]

    obs_dim = H.shape[0]
    R = np.zeros((obs_dim, obs_dim))
    off = 0
    for m, _ in active:
        R = R.at[off : off + m.dim, off : off + m.dim].set(m.get_noise_matrix())
        off += m.dim
    R_sqr = _safe_cholesky_sqr(R, obs_dim)

    return H, c, R_sqr

prepare_observations(observations, E0, ts)

Build an :class:ObsModel from :class:Measurement objects.

Assumes linear, time-invariant observation matrices (A and noise are constant across time; only the measurement values z vary).

Parameters:

Name Type Description Default
observations list[Measurement]

List of Measurement constraints.

required
E0 ArrayLike

State extraction matrix, shape [d, state_dim].

required
ts ArrayLike

Time grid of shape [N+1] (includes initial time t0). The filter uses ts[1:] for the N filter steps.

required

Returns:

Name Type Description
An ObsModel | None

class:ObsModel, or None when observations is empty.

Source code in ode_filters/measurement/measurement_models.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def prepare_observations(
    observations: list[Measurement],
    E0: ArrayLike,
    ts: ArrayLike,
) -> ObsModel | None:
    """Build an :class:`ObsModel` from :class:`Measurement` objects.

    Assumes linear, time-invariant observation matrices (``A`` and noise
    are constant across time; only the measurement values ``z`` vary).

    Args:
        observations: List of Measurement constraints.
        E0: State extraction matrix, shape ``[d, state_dim]``.
        ts: Time grid of shape ``[N+1]`` (includes initial time ``t0``).
            The filter uses ``ts[1:]`` for the ``N`` filter steps.

    Returns:
        An :class:`ObsModel`, or ``None`` when *observations* is empty.
    """
    if not observations:
        return None

    E0 = np.asarray(E0)
    ts = np.asarray(ts)
    N = len(ts) - 1
    state_dim = E0.shape[1]
    max_obs_dim = sum(m.dim for m in observations)

    # --- Constant Jacobian and noise (computed once) ---
    H = np.zeros((max_obs_dim, state_dim))
    R_block = np.zeros((max_obs_dim, max_obs_dim))
    offset = 0
    for m in observations:
        H_m = m.A if m.full_state else m.A @ E0
        H = H.at[offset : offset + m.dim, :].set(H_m)
        R_block = R_block.at[offset : offset + m.dim, offset : offset + m.dim].set(
            m.get_noise_matrix()
        )
        offset += m.dim

    R_sqr = _safe_cholesky_sqr(R_block, max_obs_dim)

    # --- Per-step offsets and masks ---
    c_list: list[Array] = []
    mask_list: list[Array] = []

    for i in range(N):
        t = float(ts[i + 1])
        c_i = np.zeros(max_obs_dim)
        mask_i = np.zeros(max_obs_dim, dtype=bool)

        offset = 0
        for m in observations:
            idx = m.find_index(t)
            if idx is not None:
                c_i = c_i.at[offset : offset + m.dim].set(-m.z[idx])
                mask_i = mask_i.at[offset : offset + m.dim].set(True)
            offset += m.dim

        c_list.append(c_i)
        mask_list.append(mask_i)

    return ObsModel(
        H=H,
        R_sqr=R_sqr,
        c_seq=np.stack(c_list),
        mask=np.stack(mask_list),
    )

handler: python options: show_object_full_path: true show_source: false members_order: source show_signature_annotations: true