"""
OPS.py
---------
Defines the OpticalParticleSpectrometer class for simulating Mie
scattering intensity collected by the OPS instrument. The class includes
methods to compute the truncated scattering cross-section based on the
instrument's geometry and optical properties. The implementation uses
numerical integration over the instrument's angular acceptance,
leveraging Mie scattering calculations from mie_modules.py and geometric
transformations from geometry.py.
"""
from warnings import warn
import numpy as np
from numpy.typing import ArrayLike, NDArray
import scipy
from miepython.core import S1_S2
from . import detector
from .geometry import ptz2r_sc
from .mirror import mirror_depth
[docs]
class OpticalParticleSpectrometer:
"""Class representing the OPS instrument for scattering simulations."""
def __init__(
self,
laser_wavelength: float = 0.405,
laser_power: float = 70,
laser_polarization: str = "horizontal",
mirror_radius: float = 12.5,
mirror_radius_of_curvature: float = 20.0,
aerosol_mirror_separation: float = 14.2290,
pmt_control_voltage: float = 0.619,
dark_current: float = detector.H10720_110_DARK_CURRENT,
bandwidth: float = detector.BANDWIDTH,
input_current_noise: float = detector.TIA60_INPUT_CURRENT_NOISE,
):
"""Initialize the OPS instrument parameters.
Parameters
----------
laser_wavelength : float
Wavelength of the incident light in micrometers.
laser_power : float, default 70
Laser power in milliwatts.
laser_polarization : str, default 'horizontal'
Polarization state of the incident laser light. Options are
'unpolarized', 'horizontal', or 'vertical'. Default is
'horizontal'.
mirror_radius : float, default 12.5
Radius of the spherical mirror in millimeters.
mirror_radius_of_curvature : float, default 20.0
Radius of curvature of the spherical mirror in millimeters.
aerosol_mirror_separation : float, default 14.2290
Separation between the aerosol and the center of the mirror
in millimeters.
pmt_control_voltage : float, default 0.619
Control voltage of the PMT in volts. Default is 0.619 V,
which was measured on a Handix POPS. Adjust for different
instruments.
dark_current : float, default 1e-9 (see detector.py)
Dark current of the detector in Amperes.
bandwidth : float, default 4e6 (see detector.py)
Bandwidth of the detector in Hertz.
input_current_noise : float, default 4.8e-12 (see detector.py)
Input current noise of the detector in Amperes per square
root Hertz.
"""
# Ensure inputs are valid
if np.any(laser_wavelength <= 0):
raise ValueError("All laser wavelengths must be positive.")
if np.any(laser_power <= 0):
raise ValueError("All laser powers must be positive.")
if laser_polarization not in ["unpolarized", "horizontal", "vertical"]:
raise ValueError(
"laser_polarization must be 'unpolarized', 'horizontal', or 'vertical'."
)
if mirror_radius <= 0 or mirror_radius_of_curvature <= 0:
raise ValueError("Mirror radius and radius of curvature must be positive.")
if aerosol_mirror_separation <= 0:
raise ValueError("Aerosol-mirror separation must be positive.")
# Ensure variables are close to expected magnitude
if np.any(laser_wavelength > 2) or np.any(laser_wavelength < 0.2):
warn(
"Laser wavelength outside of expected range. "
"Verify laser wavelength is in micrometers."
)
if np.any(laser_power > 1000) or np.any(laser_power < 1):
warn(
"Laser power outside of expected range. "
"Verify laser power is in milliwatts."
)
if mirror_radius > 100 or mirror_radius < 1:
warn(
"Mirror radius outside of expected range. "
"Verify mirror radius is in millimeters."
)
if mirror_radius_of_curvature > 200 or mirror_radius_of_curvature < 5:
warn(
"Mirror radius of curvature outside of expected range. "
"Verify radius of curvature is in millimeters."
)
if aerosol_mirror_separation > 100 or aerosol_mirror_separation < 5:
warn(
"Aerosol-mirror separation outside of expected range. "
"Verify separation is in millimeters."
)
[docs]
self.laser_wavelength = laser_wavelength
[docs]
self.laser_power = laser_power
[docs]
self.laser_polarization = laser_polarization
[docs]
self.mirror_radius = mirror_radius
[docs]
self.mirror_radius_of_curvature = mirror_radius_of_curvature
[docs]
self.aerosol_mirror_separation = aerosol_mirror_separation
[docs]
self.y0 = aerosol_mirror_separation
# Axial distance from aerosol stream to the top edge of mirror.
[docs]
self.h = aerosol_mirror_separation - mirror_depth(
mirror_radius=mirror_radius, radius_of_curvature=mirror_radius_of_curvature
)
[docs]
self.pmt_control_voltage = pmt_control_voltage
[docs]
self.dark_current = dark_current
[docs]
self.bandwidth = bandwidth
[docs]
self.input_current_noise = input_current_noise
[docs]
self.mirror_reflectivity = 0.80 # Edmund Optics #43-464 at 405 nm
[docs]
self.anode_radiant_sensitivity = detector.anode_radiant_sensitivity(
self.pmt_control_voltage
)
[docs]
def truncated_scattering_cross_section(
self,
ri: complex,
diameters: float | ArrayLike,
n_theta: int = 51,
n_phi: int = 41,
) -> NDArray[np.float64]:
"""
Simulates OPS scattering and computed truncated cross-sections.
This function performs a numerical integration of Mie-scattered
intensity over the instrument's angular field of view, computing
the total light collected by the OPS mirror.
Parameters
----------
ri : complex
Complex refractive index of the particle.
diameters : float | ArrayLike
Diameter of the particle in micrometers.
n_theta : int, default 51
Number of polar angle samples for integration. Should be an
odd integer for Simpson's rule.
n_phi : int, default 41
Number of azimuthal angle samples for integration.Should be
an odd integer for Simpson's rule.
Returns
-------
np.ndarray
Truncated scattering cross-section in square micrometers.
"""
# Validate inputs
if np.any(np.imag(ri) < 0) or np.any(np.real(ri) < 1):
raise ValueError(
"Imaginary part of refractive indices must be non-negative and real"
"part must be >= 1."
)
try:
if isinstance(diameters, (float, int)):
diameters = np.array([diameters], dtype=float)
else:
diameters = np.asarray(diameters, dtype=float)
except Exception as e:
raise TypeError(
"Diameters must be convertible to a numpy array of floats."
) from e
if np.any(diameters <= 0):
raise ValueError("Diameters must be positive values.")
if np.any(diameters < 0.001) or np.any(diameters > 100):
warn(
"Some diameters are outside of expected range. Verify diameters are in micrometers."
)
if not isinstance(n_theta, int) or n_theta <= 0:
raise ValueError("n_theta must be a positive integer.")
if n_theta % 2 == 0:
warn("n_theta should be an odd integer for Simpson's rule.")
if not isinstance(n_phi, int) or n_phi <= 0:
raise ValueError("n_phi must be a positive integer.")
if n_phi % 2 == 0:
warn("n_phi should be an odd integer for Simpson's rule.")
### Mirror Integration ###
# Derived quantities
theta_max = np.arctan(self.mirror_radius / self.h)
theta_values = np.linspace(
np.pi / 2 - theta_max, np.pi / 2 + theta_max, n_theta
)
r_min = np.sqrt(self.mirror_radius**2 + self.h**2)
# Build complete grid of (theta, phi) pairs
# Preallocate numpy arrays
n_points = n_theta * n_phi
all_thetas = np.zeros(n_points)
all_phis = np.zeros(n_points)
theta_indices = np.zeros(n_points, dtype=int)
phi_values_per_theta = [] # Store phi arrays for later
idx = 0
for j, theta in enumerate(theta_values):
phi_max = np.arccos(
np.clip(self.h / (r_min * np.sqrt(1 - np.cos(theta) ** 2)), -1, 1)
)
phi_values = np.linspace(-phi_max, phi_max, n_phi)
phi_values_per_theta.append(phi_values)
# Fill arrays using slicing
all_thetas[idx : idx + n_phi] = theta
all_phis[idx : idx + n_phi] = phi_values
theta_indices[idx : idx + n_phi] = j
idx += n_phi
# Single vectorized call for ALL geometry calculations
_, _, _, _, ws, wp, _ = ptz2r_sc(
ops=self,
phi=all_phis,
theta=all_thetas,
)
### Scattering Integration ###
# Compute size parameters and geometric cross sections
size_parameters = np.pi / self.laser_wavelength * diameters
geometric_cross_sections = np.pi * (diameters / 2) ** 2
# Loop through each diameter and compute S1, S2, and integrand for all points
truncated_cscas = []
for size_parameter, geometric_cross_section in zip(
size_parameters, geometric_cross_sections
):
# Compute S1, S2
mp_s1s2 = S1_S2(
m=ri, x=size_parameter, mu=np.cos(theta_values), norm="qsca"
)
s1_sq = np.abs(mp_s1s2[0]) ** 2
s2_sq = np.abs(mp_s1s2[1]) ** 2
# Compute integrand for all points
integrand = ws * s1_sq[theta_indices] + wp * s2_sq[theta_indices]
# Reshape back to (n_theta, n_phi) grid
integrand_grid = integrand.reshape(n_theta, n_phi)
# Use Simpson's rule for both dimensions
theta_integrand = np.zeros(n_theta)
for j in range(n_theta):
# Integrate over phi using Simpson's rule
theta_integrand[j] = scipy.integrate.simpson(
integrand_grid[j, :], x=phi_values_per_theta[j]
) * np.sin(theta_values[j])
# Integrate over theta using Simpson's rule
total_signal = scipy.integrate.simpson(theta_integrand, x=theta_values)
# Compute truncated cross section
truncated_cscas.append(
np.array(total_signal * geometric_cross_section, dtype=np.float64)
)
return np.array(truncated_cscas, dtype=np.float64)
[docs]
def estimate_signal_noise(
self,
ri: complex,
diameters: float | ArrayLike,
n_theta: int | None = None,
n_phi: int | None = None,
) -> tuple[float | np.ndarray, float | np.ndarray]:
"""
Estimate the signal amplitude from the scattered light incident
on the OPS photomultiplier tube (PMT).
Parameters
----------
ri : complex
Complex refractive index of the particle.
diameters : float | np.ndarray
Diameter of the particle in micrometers.
n_theta : int | None
Number of polar angle samples for integration. If None, uses
default value in truncated_scattering_cross_section.
n_phi : int | None
Number of azimuthal angle samples for integration. If None,
uses default value in truncated_scattering_cross_section.
Returns
-------
signal : float or np.ndarray
Estimated signal amplitude in units of Amperes (A).
noise : float or np.ndarray
Estimated noise amplitude in units of Amperes (A).
"""
try:
if isinstance(diameters, (float, int)):
diameters = np.array([diameters], dtype=float)
else:
diameters = np.asarray(diameters, dtype=float)
except Exception as e:
raise TypeError(
"Diameters must be convertible to a numpy array of floats"
) from e
kwargs = {}
if n_theta is not None:
kwargs["n_theta"] = n_theta
if n_phi is not None:
kwargs["n_phi"] = n_phi
trunc_csca = self.truncated_scattering_cross_section(ri, diameters, **kwargs)
signal, noise = detector.estimate_signal_noise(self, trunc_csca)
return signal, noise
[docs]
def digitize_signal(
signal_current: float | NDArray[np.float64],
max_voltage: float = 5,
digitizer_bins: int = 65536,
feedback_resistor: float = 2050,
) -> float | NDArray[np.float64]:
"""Convert signal current to digitizer bins.
Parameters
----------
signal_current : float or np.ndarray
Signal current in Amperes (A).
max_voltage : float, default 5
Maximum voltage of the digitizer in Volts (V).
digitizer_bins : int, default 65536
Number of bins in the digitizer (16-bit digitizer).
feedback_resistor : float, default 2050
Feedback resistor value in Ohms (Ω) used in the transimpedance
amplifier. Default is 2050 Ω, based on correspondence with
Handix Scientific.
Returns
-------
float or np.ndarray
Signal amplitude in digitizer bins.
"""
if np.any(signal_current < 0):
raise ValueError("Signal current must be non-negative.")
return signal_current * feedback_resistor * (digitizer_bins / max_voltage)