Source code for pyadjoint.adjoint_source_types.exponentiated_phase_misfit

#!/usr/bin/env python3
"""
Exponentiated phase misfit function and adjoint source.

:authors:
    adjtomo Dev Team (adjtomo@gmail.com), 2022
    Yanhua O. Yuan (yanhuay@princeton.edu), 2016
:license:
    BSD 3-Clause ("BSD New" or "BSD Simplified")
"""
import numpy as np

from scipy import signal
from scipy.integrate import simps

from pyadjoint.utils.signal import get_window_info, window_taper


[docs]def calculate_adjoint_source(observed, synthetic, config, windows, adjoint_src=True, window_stats=True, **kwargs): """ Calculate adjoint source for the exponentiated phase misfit. :type observed: obspy.core.trace.Trace :param observed: observed waveform to calculate adjoint source :type synthetic: obspy.core.trace.Trace :param synthetic: synthetic waveform to calculate adjoint source :type config: pyadjoint.config.ConfigExponentiatedPhase :param config: Config class with parameters to control processing :type windows: list of tuples :param windows: [(left, right),...] representing left and right window borders to be used to calculate misfit and adjoint sources :type adjoint_src: bool :param adjoint_src: flag to calculate adjoint source, if False, will only calculate misfit :type window_stats: bool :param window_stats: flag to return stats for individual misfit windows used to generate the adjoint source """ assert(config.__class__.__name__ == "ConfigExponentiatedPhase"), \ "Incorrect configuration class passed to Exponentiated Phase misfit" if config.double_difference: raise NotImplementedError( "Exponentiated phase misfit does not have double difference " "capabilities" ) # Dictionary of return values related to exponentiated phase ret_val = {} # List of windows and some measurement values for each win_stats = [] # Initiate constants and empty return values to fill nlen_w_data = len(synthetic.data) dt = synthetic.stats.delta f = np.zeros(nlen_w_data) # adjoint source misfit_sum = 0.0 # loop over time windows for window in windows: left_sample, right_sample, nlen_w = get_window_info(window, dt) # Initiate empty window arrays for memory efficiency d = np.zeros(nlen_w) s = np.zeros(nlen_w) d[0: nlen_w] = observed.data[left_sample: right_sample] s[0: nlen_w] = synthetic.data[left_sample: right_sample] # Taper window to get rid of kinks at two ends window_taper(d, taper_percentage=config.taper_percentage, taper_type=config.taper_type) window_taper(s, taper_percentage=config.taper_percentage, taper_type=config.taper_type) # Calculate envelope and hilbert transform of data, synthetics env_s = abs(signal.hilbert(s)) env_d = abs(signal.hilbert(d)) hilbt_s = np.imag(signal.hilbert(s)) hilbt_d = np.imag(signal.hilbert(d)) # Determine water level threshold thresh_s = config.wtr_env * env_s.max() thresh_d = config.wtr_env * env_d.max() env_s_wtr = env_s + thresh_s env_d_wtr = env_d + thresh_d # Calculate differences between data and synthetic acct for waterlevel diff_real = d/env_d_wtr - s/env_s_wtr diff_imag = hilbt_d/env_d_wtr - hilbt_s/env_s_wtr # Integrate with the composite Simpson's rule. misfit_real = 0.5 * simps(y=diff_real**2, dx=dt) misfit_imag = 0.5 * simps(y=diff_imag**2, dx=dt) misfit_sum += misfit_real + misfit_imag env_s_wtr_cubic = env_s_wtr**3 adj_real = ( -1 * (diff_real * hilbt_s ** 2 / env_s_wtr_cubic) - np.imag(signal.hilbert(diff_real * s * hilbt_s / env_s_wtr_cubic)) ) adj_imag = ( diff_imag * s * hilbt_s / env_s_wtr_cubic + np.imag(signal.hilbert(diff_imag * s**2 / env_s_wtr_cubic)) ) # Re-taper newly generated adjoint source window_taper(adj_real, taper_percentage=config.taper_percentage, taper_type=config.taper_type) window_taper(adj_imag, taper_percentage=config.taper_percentage, taper_type=config.taper_type) f[left_sample:right_sample] = adj_real[0:nlen_w] + adj_imag[0:nlen_w] win_stats.append( {"type": config.adjsrc_type, "measurement_type": "dt", "left": left_sample * dt, "right": right_sample * dt, "diff_real": np.mean(diff_real[0:nlen_w]), "diff_imag": np.mean(diff_imag[0:nlen_w]), "misfit_real": misfit_real, "misfit_imag": misfit_imag } ) # Place return values in output dictionary ret_val["misfit"] = misfit_sum if window_stats: ret_val["window_stats"] = win_stats # Time reverse adjoint sources w.r.t synthetic waveforms if adjoint_src is True: ret_val["adjoint_source"] = f[::-1] return ret_val