#!/usr/bin/env python3
"""
Multitaper based phase and amplitude misfit and adjoint source.
:authors:
adjTomo Dev Team (adjtomo@gmail.com), 2022
Youyi Ruan (youyir@princeton.edu), 2016
Matthieu Lefebvre (ml15@princeton.edu), 2016
Yanhua O. Yuan (yanhuay@princeton.edu), 2015
:license:
BSD 3-Clause ("BSD New" or "BSD Simplified")
"""
import numpy as np
from scipy.integrate import simps
from pyadjoint import logger
from pyadjoint.utils.dpss import dpss_windows
from pyadjoint.utils.cctm import (calculate_cc_shift, calculate_cc_adjsrc,
calculate_dd_cc_shift, calculate_dd_cc_adjsrc,
)
from pyadjoint.utils.signal import (window_taper, get_window_info,
process_cycle_skipping)
[docs]class MultitaperMisfit:
"""
A class to house the machinery of the multitaper misfit calculation. This is
done with a class rather than a function to reduce the amount of unnecessary
parameter passing between functions.
"""
def __init__(self, observed, synthetic, config, windows,
observed_2=None, synthetic_2=None, windows_2=None
):
"""
Initialize Multitaper Misfit adjoint source creator
:type observed: obspy.core.trace.Trace
:param observed: observed waveform to calculate adjoint source
:type synthetic: obspy.core.trace.Trace
:param synthetic: synthetic waveform to calculate adjoint source
:type config: pyadjoint.config.Multitaper
:param config: Config class with parameters to control processing
:type windows: list of tuples
:param windows: [(left, right),...] representing left and right window
borders to be used to calculate misfit and adjoint sources
:type observed_2: obspy.core.trace.Trace
:param observed_2: second observed waveform to calculate adjoint sources
from station pairs
:type synthetic_2: obspy.core.trace.Trace
:param synthetic_2: second synthetic waveform to calculate adjoint sources
from station pairs
:type windows_2: list of tuples
:param windows_2: [(left, right),...] representing left and right window
borders to be tapered in units of seconds since first sample in data
array. Used to window `observed_2` and `synthetic_2`
"""
assert (config.__class__.__name__ == "ConfigMultitaper"), \
"Incorrect configuration class passed to CCTraveltime misfit"
self.observed = observed
self.synthetic = synthetic
self.config = config
self.windows = windows
# For optional double-difference measurements
self.observed_2 = observed_2
self.synthetic_2 = synthetic_2
self.windows_2 = windows_2
# Calculate some information to be used for MTM measurements
# Assumed that second set of waveforms (if provided) have same qualities
self.nlen_f = 2 ** self.config.lnpt
self.nlen_data = len(synthetic.data) # length in samples
self.dt = synthetic.stats.delta # sampling rate
self.tlen_data = self.nlen_data * self.dt # length in time [s]
[docs] def calculate_adjoint_source(self):
"""
Main processing function to calculate adjoint source for MTM.
The logic here is that the function will perform a series of checks/
calculations to check if MTM is valid for each given window. If
any check/calculation fails, it will fall back to CCTM misfit for the
given window.
"""
# Arrays for adjoint sources w.r.t time shift (p) and amplitude (q)
fp = np.zeros(self.nlen_data)
fq = np.zeros(self.nlen_data)
misfit_sum_p = 0.0
misfit_sum_q = 0.0
win_stats = []
# Loop over time windows and calculate misfit for each window range
for window in self.windows:
# `is_mtm` determines whether we use MTM (T) or CC (F) for misfit
is_mtm = True
left_sample, right_sample, nlen_w = get_window_info(window, self.dt)
fp_t = np.zeros(nlen_w)
fq_t = np.zeros(nlen_w)
misfit_p = 0.
misfit_q = 0.
# Pre-allocate arrays for memory efficiency
d = np.zeros(nlen_w)
s = np.zeros(nlen_w)
# d and s represent the windowed data and synthetic arrays
d[0: nlen_w] = self.observed.data[left_sample: right_sample]
s[0: nlen_w] = self.synthetic.data[left_sample: right_sample]
# Taper windowed signals in place
window_taper(d, taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
window_taper(s, taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
# Calculate cross correlation time shift, amplitude anomaly and
# errors. Config passed as **kwargs to control constants required
# by function
cc_tshift, cc_dlna, sigma_dt_cc, sigma_dlna_cc = calculate_cc_shift(
d=d, s=s, dt=self.dt, **vars(self.config)
)
# Perform a series of checks to see if MTM is valid for the data
# This will only loop once, but allows us to break if a check fail
while is_mtm is True:
is_mtm = self.check_time_series_acceptability(
cc_tshift=cc_tshift, nlen_w=nlen_w)
if is_mtm is False:
break
# Shift and scale observed data 'd' to match synthetics, make
# sure the time shift doesn't go passed time series' bounds
d, is_mtm = self.prepare_data_for_mtm(d=d, tshift=cc_tshift,
dlna=cc_dlna,
window=window)
if is_mtm is False:
logger.info(f"reject MTM: adjusted CC shift: {cc_tshift} is"
f"out of bounds of time series")
logger.debug(f"win = [{left_sample * self.dt}, "
f"{right_sample * self.dt}]")
break
# Determine FFT information related to frequency bands
# TODO: Sampling rate was set to observed delta, is dt the same?
freq = np.fft.fftfreq(n=self.nlen_f, d=self.dt)
df = freq[1] - freq[0] # delta_f: frequency step
wvec = freq * 2 * np.pi # omega vector: angular frequency
# dw = wvec[1] - wvec[0] # TODO: check to see if dw is not used
logger.debug("delta_f (frequency sampling) = {df}")
# Check for sufficient frequency range given taper bandwith
nfreq_min, nfreq_max, is_mtm = self.calculate_freq_limits(df)
if is_mtm is False:
logger.info("reject MTM: frequency range narrower than "
"half taper bandwith")
break
# Determine taper bandwith in frequency domain
tapert, eigens = dpss_windows(
n=nlen_w, half_nbw=self.config.mt_nw,
k_max=self.config.num_taper, low_bias=False
)
is_mtm = np.isfinite(eigens).all()
if is_mtm is False:
logger.warning("reject MTM: error constructing DPSS tapers")
logger.debug(f"eigenvalues: {eigens}")
break
# Check if tapers are properly generated. In rare cases
# (e.g., [nw=2.5, nlen=61] or [nw=4.0, nlen=15]) certain
# eigenvalues can not be found and associated eigentaper is NaN
tapers = tapert.T * np.sqrt(nlen_w)
phi_mtm, abs_mtm, dtau_mtm, dlna_mtm = \
self.calculate_multitaper(
d=d, s=s, tapers=tapers, wvec=wvec, nfreq_min=nfreq_min,
nfreq_max=nfreq_max, cc_tshift=cc_tshift,
cc_dlna=cc_dlna
)
# Calculate multi-taper error estimation if requested
if self.config.use_mt_error:
sigma_phi_mt, sigma_abs_mt, sigma_dtau_mt, \
sigma_dlna_mt = self.calculate_mt_error(
d=d, s=s, tapers=tapers, wvec=wvec,
nfreq_min=nfreq_min, nfreq_max=nfreq_max,
cc_tshift=cc_tshift, cc_dlna=cc_dlna,
phi_mtm=phi_mtm, abs_mtm=abs_mtm,
dtau_mtm=dtau_mtm, dlna_mtm=dlna_mtm)
else:
sigma_dtau_mt = np.zeros(self.nlen_f)
sigma_dlna_mt = np.zeros(self.nlen_f)
# Check if the multitaper measurements fail selection criteria
is_mtm = self.check_mtm_time_shift_acceptability(
nfreq_min=nfreq_min, nfreq_max=nfreq_max, df=df,
cc_tshift=cc_tshift, dtau_mtm=dtau_mtm,
sigma_dtau_mt=sigma_dtau_mt)
if is_mtm is False:
break
# We made it! If the loop is still running after this point,
# then we will use MTM for adjoint source calculation
# Frequency domain taper weighted by measurement error
wp_w, wq_w = self.calculate_freq_domain_taper(
nfreq_min=nfreq_min, nfreq_max=nfreq_max, df=df,
dtau_mtm=dtau_mtm, dlna_mtm=dlna_mtm, err_dt_cc=sigma_dt_cc,
err_dlna_cc=sigma_dlna_cc, err_dtau_mt=sigma_dtau_mt,
err_dlna_mt=sigma_dlna_mt,
)
# Misfit is defined as the error-weighted measurements
dtau_mtm_weigh_sqr = dtau_mtm ** 2 * wp_w
dlna_mtm_weigh_sqr = dlna_mtm ** 2 * wq_w
misfit_p = 0.5 * 2.0 * simps(y=dtau_mtm_weigh_sqr, dx=df)
misfit_q = 0.5 * 2.0 * simps(y=dlna_mtm_weigh_sqr, dx=df)
logger.info("calculating misfit and adjoint source with MTM")
fp_t, fq_t = self.calculate_mt_adjsrc(
s=s, tapers=tapers, nfreq_min=nfreq_min,
nfreq_max=nfreq_max, dtau_mtm=dtau_mtm, dlna_mtm=dlna_mtm,
wp_w=wp_w, wq_w=wq_w
)
win_stats.append(
{"left": left_sample * self.dt,
"right": right_sample * self.dt,
"type": "multitaper",
"measurement_type": self.config.measure_type,
"misfit_dt": misfit_p,
"misfit_dlna": misfit_q,
"sigma_dt": sigma_dt_cc,
"sigma_dlna": sigma_dlna_cc,
"tshift": np.mean(dtau_mtm[nfreq_min:nfreq_max]),
"dlna": np.mean(dlna_mtm[nfreq_min:nfreq_max]),
}
)
break
# If at some point MTM broke out of the loop, this code block will
# execute and calculate a CC adjoint source and misfit instead
if is_mtm is False:
logger.info("calculating misfit and adjoint source with CCTM")
misfit_p, misfit_q, fp_t, fq_t = \
calculate_cc_adjsrc(s=s, tshift=cc_tshift, dlna=cc_dlna,
dt=self.dt, sigma_dt=sigma_dt_cc,
sigma_dlna=sigma_dlna_cc)
win_stats.append(
{"left": left_sample * self.dt,
"right": right_sample * self.dt,
"type": "cross_correlation_traveltime",
"measurement_type": self.config.measure_type,
"misfit_dt": misfit_p,
"misfit_dlna": misfit_q,
"sigma_dt": sigma_dt_cc,
"sigma_dlna": sigma_dlna_cc,
"dt": cc_tshift,
"dlna": cc_dlna,
}
)
# Taper windowed adjoint source before including in final array
window_taper(fp_t[0:nlen_w],
taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
window_taper(fq_t[0:nlen_w],
taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
# Place windowed adjoint source within the correct time location
# w.r.t the entire synthetic seismogram
fp_wind = np.zeros(len(self.synthetic.data))
fq_wind = np.zeros(len(self.synthetic.data))
fp_wind[left_sample: right_sample] = fp_t[0:nlen_w]
fq_wind[left_sample: right_sample] = fq_t[0:nlen_w]
# Add the windowed adjoint source to the full adjoint source
fp += fp_wind
fq += fq_wind
# Increment total misfit value by misfit of windows
misfit_sum_p += misfit_p
misfit_sum_q += misfit_q
return misfit_sum_p, misfit_sum_q, fp, fq, win_stats
[docs] def calculate_dd_adjoint_source(self):
"""
Process double difference adjoint source. Requires second set of
waveforms and windows.
.. note::
amplitude measurement stuff has been mostly left in the function
(i.e., commented out) even it is not used, so that hopefully it is
easier for someone in the future to implement it if they want.
:rtype: (float, np.array, np.array, dict)
:return: (misfit_sum_p, fp, fp_2, win_stats) == (
total phase misfit for the measurement,
adjoint source for first data-synthetic pair,
adjoint source for second data-synthetic pair,
measurement information dictionary
)
"""
# Arrays for adjoint sources w.r.t time shift (p) and amplitude (q)
fp = np.zeros(self.nlen_data)
fp_2 = np.zeros(self.nlen_data)
misfit_sum_p = 0.0
# misfit_sum_q = 0.0
win_stats = []
# Loop over time windows and calculate misfit for each window range
for window, window_2 in zip(self.windows, self.windows_2):
# `is_mtm` determines whether we use MTM (T) or CC (F) for misfit
is_mtm = True
# Prepare first set of waveforms
left_sample, right_sample, nlen_w = get_window_info(window, self.dt)
fp_t = np.zeros(nlen_w)
misfit_p = 0.
# misfit_q = 0.
# Pre-allocate arrays for memory efficiency
d = np.zeros(nlen_w)
s = np.zeros(nlen_w)
# d and s represent the windowed data and synthetic arrays
d[0: nlen_w] = self.observed.data[left_sample: right_sample]
s[0: nlen_w] = self.synthetic.data[left_sample: right_sample]
# Prepare second set of waveforms
left_sample_2, right_sample_2, nlen_w_2 = \
get_window_info(window_2, self.dt)
fp_2_t = np.zeros(nlen_w_2)
# Pre-allocate arrays for memory efficiency
d_2 = np.zeros(nlen_w)
s_2 = np.zeros(nlen_w)
# d and s represent the windowed data and synthetic arrays
d_2[0: nlen_w_2] = \
self.observed_2.data[left_sample_2: right_sample_2]
s_2[0: nlen_w_2] = \
self.synthetic_2.data[left_sample_2: right_sample_2]
# Taper windowed signals (modify arrays in place)
for arr in [d, s, d_2, s_2]:
window_taper(arr, taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
# Calculate double difference cross correlation time shift for
# both sets of waveforms
cc_tshift, cc_tshift_obs, cc_tshift_syn, cc_dlna_obs, cc_dlna_syn, \
sigma_dt_cc, sigma_dlna_cc = calculate_dd_cc_shift(
d=d, s=s, d_2=d_2, s_2=s_2, dt=self.dt, **vars(self.config)
)
# Perform a series of checks to see if MTM is valid for the data
# This will only loop once, but allows us to break if a check fails
while is_mtm is True:
is_mtm = self.check_time_series_acceptability(
cc_tshift=cc_tshift, nlen_w=nlen_w)
if is_mtm is False:
break
# Shift and scale observed data 'd' to match second set: `d_2`
d, is_mtm_obs = self.prepare_data_for_mtm(
d=d, tshift=cc_tshift_obs, dlna=cc_dlna_obs, window=window
)
# Shift and scale synthetics 's' to match second set: `s_2`
s, is_mtm_syn = self.prepare_data_for_mtm(
d=s, tshift=cc_tshift_syn, dlna=cc_dlna_syn, window=window
)
if is_mtm_obs is False or is_mtm_syn is False:
logger.info(f"reject MTM: adjusted CC shift: {cc_tshift} is"
f"out of bounds of time series")
logger.debug(f"win = [{left_sample * self.dt}, "
f"{right_sample * self.dt}]")
break
# Determine FFT information related to frequency bands
# TODO: Sampling rate was set to observed delta, is dt the same?
freq = np.fft.fftfreq(n=self.nlen_f, d=self.dt)
df = freq[1] - freq[0] # delta_f: frequency step
wvec = freq * 2 * np.pi # omega vector: angular frequency
logger.debug("delta_f (frequency sampling) = {df}")
# Check for sufficient frequency range given taper bandwith
nfreq_min, nfreq_max, is_mtm = self.calculate_freq_limits(df)
if is_mtm is False:
logger.info("reject MTM: frequency range narrower than "
"half taper bandwith")
break
# Determine taper bandwith in frequency domain
tapert, eigens = dpss_windows(
n=nlen_w, half_nbw=self.config.mt_nw,
k_max=self.config.num_taper, low_bias=False
)
is_mtm = np.isfinite(eigens).all()
if is_mtm is False:
logger.warning("reject MTM: error constructing DPSS tapers")
logger.debug(f"eigenvalues: {eigens}")
break
# Check if tapers are properly generated. In rare cases
# (e.g., [nw=2.5, nlen=61] or [nw=4.0, nlen=15]) certain
# eigenvalues can not be found and associated eigentaper is NaN
tapers = tapert.T * np.sqrt(nlen_w)
phi_mtm_obs, abs_mtm_obs, dtau_mtm_obs, dlna_mtm_obs = \
self.calculate_multitaper(
d=d, s=d_2, tapers=tapers, wvec=wvec,
nfreq_min=nfreq_min, nfreq_max=nfreq_max,
cc_tshift=cc_tshift_obs, cc_dlna=cc_dlna_obs
)
phi_mtm_syn, abs_mtm_syn, dtau_mtm_syn, dlna_mtm_syn = \
self.calculate_multitaper(
d=s, s=s_2, tapers=tapers, wvec=wvec,
nfreq_min=nfreq_min, nfreq_max=nfreq_max,
cc_tshift=cc_tshift_syn, cc_dlna=cc_dlna_syn
)
# Measurements are difference between double differences
# phi_mtm = phi_mtm_syn - phi_mtm_obs
# abs_mtm = abs_mtm_syn - abs_mtm_obs
dtau_mtm = dtau_mtm_syn - dtau_mtm_obs
dlna_mtm = dlna_mtm_syn - dlna_mtm_obs
# Calculate multi-taper error estimation if requested
# FIXME: should dtau_mtm and dlna_mtm be the 'obs' or 'diff' v.?
if self.config.use_mt_error:
sigma_phi_mt, sigma_abs_mt, sigma_dtau_mt, \
sigma_dlna_mt = self.calculate_mt_error(
d=d, s=s, tapers=tapers, wvec=wvec,
nfreq_min=nfreq_min, nfreq_max=nfreq_max,
cc_tshift=cc_tshift, cc_dlna=cc_dlna_obs,
phi_mtm=phi_mtm_obs, abs_mtm=abs_mtm_obs,
# dtau_mtm=dtau_mtm_obs, dlna_mtm=dlna_mtm_obs # ?
dtau_mtm=dtau_mtm, dlna_mtm=dlna_mtm
)
else:
sigma_dtau_mt = np.zeros(self.nlen_f)
sigma_dlna_mt = np.zeros(self.nlen_f)
# Check if the multitaper measurements fail selection criteria
is_mtm = self.check_mtm_time_shift_acceptability(
nfreq_min=nfreq_min, nfreq_max=nfreq_max, df=df,
cc_tshift=cc_tshift, dtau_mtm=dtau_mtm,
sigma_dtau_mt=sigma_dtau_mt)
if is_mtm is False:
break
# We made it! If the loop is still running after this point,
# then we will use MTM for adjoint source calculation
# Frequency domain taper weighted by measurement error
wp_w, wq_w = self.calculate_freq_domain_taper(
nfreq_min=nfreq_min, nfreq_max=nfreq_max, df=df,
dtau_mtm=dtau_mtm, dlna_mtm=dlna_mtm, err_dt_cc=sigma_dt_cc,
err_dlna_cc=sigma_dlna_cc, err_dtau_mt=sigma_dtau_mt,
err_dlna_mt=sigma_dlna_mt,
)
# Misfit is defined as the error-weighted measurements
# TODO dlna misfit not calculated, only phase (dtau)
dtau_mtm_weigh_sqr = dtau_mtm ** 2 * wp_w
misfit_p = 0.5 * 2.0 * simps(y=dtau_mtm_weigh_sqr, dx=df)
logger.info("calculate double difference adjoint source w/ MTM")
fp_t, fp_2_t = self.calculate_dd_mt_adjsrc(
s=s, s_2=s_2, tapers=tapers, nfreq_min=nfreq_min,
nfreq_max=nfreq_max, df=df, dtau_mtm=dtau_mtm,
dlna_mtm=dlna_mtm, wp_w=wp_w, wq_w=wq_w
)
win_stats.append(
{"left": left_sample * self.dt,
"right": right_sample * self.dt,
"type": "dd_multitaper",
"measurement_type": self.config.measure_type,
"misfit_dt": misfit_p,
"sigma_dt": sigma_dt_cc,
"sigma_dlna": sigma_dlna_cc,
"tshift": np.mean(dtau_mtm[nfreq_min:nfreq_max]),
"dlna": np.mean(dlna_mtm[nfreq_min:nfreq_max]),
}
)
break
# If at some point MTM broke out of the loop, this code block will
# execute and calculate a CC adjoint source and misfit instead
if is_mtm is False:
logger.info("calculating adjoint source with double diff. CCTM")
misfit_p, misfit_q, fp_t, fp_2_t, fq_t, fq_2_t = \
calculate_dd_cc_adjsrc(s=s, s_2=s_2, tshift=cc_tshift,
dlna=cc_dlna_obs, dt=self.dt,
sigma_dt=sigma_dt_cc,
sigma_dlna=sigma_dlna_cc)
win_stats.append(
{"left": left_sample * self.dt,
"right": right_sample * self.dt,
"type": "dd_cross_correlation_traveltime",
"measurement_type": self.config.measure_type,
"misfit_dt": misfit_p,
"misfit_dlna": misfit_q,
"sigma_dt": sigma_dt_cc,
"sigma_dlna": sigma_dlna_cc,
"dt": cc_tshift,
"dlna": cc_dlna_obs,
}
)
# Taper windowed adjoint source before including in final array
window_taper(fp_t[0:nlen_w],
taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
window_taper(fp_2_t[0:nlen_w],
taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
# Place windowed adjoint source within the correct time location
# w.r.t the entire synthetic seismogram
fp_wind = np.zeros(len(self.synthetic.data))
fp_2_wind = np.zeros(len(self.synthetic.data))
fp_wind[left_sample: right_sample] = fp_t[0:nlen_w]
fp_2_wind[left_sample: right_sample] = fp_2_t[0:nlen_w]
# Add the windowed adjoint source to the full adjoint source
fp += fp_wind
fp_2 += fp_2_wind
# Increment total misfit value by misfit of windows
misfit_sum_p += misfit_p
return misfit_sum_p, fp, fp_2, win_stats
[docs] def calculate_mt_adjsrc(self, s, tapers, nfreq_min, nfreq_max,
dtau_mtm, dlna_mtm, wp_w, wq_w):
"""
Calculate the adjoint source for a multitaper measurement, which
tapers synthetics in various windowed frequency-dependent tapers and
scales them by phase dependent travel time measurements (which
incorporate the observed data).
:type s: np.array
:param s: synthetic data array
:type tapers: np.array
:param tapers: array of DPPS windows shaped (num_taper, nlen_w)
:type nfreq_min: int
:param nfreq_min: minimum frequency for suitable MTM measurement
:type nfreq_max: int
:param nfreq_max: maximum frequency for suitable MTM measurement
:type dtau_mtm: np.array
:param dtau_mtm: phase dependent travel time measurements from mtm
:type dlna_mtm: np.array
:param dlna_mtm: phase dependent amplitude anomaly
:type wp_w: np.array
:param wp_w: phase-misfit error weighted frequency domain taper
:type wq_w: np.array
:param wq_w: amplitude-misfit error weighted frequency domain taper
"""
nlen_t = len(s)
ntaper = len(tapers[0])
# Start pieceing together transfer functions that will be applie to
# the synthetics
bottom_p = np.zeros(self.nlen_f, dtype=complex)
bottom_q = np.zeros(self.nlen_f, dtype=complex)
s_tw = np.zeros((self.nlen_f, ntaper), dtype=complex)
s_tvw = np.zeros((self.nlen_f, ntaper), dtype=complex)
# Construct the bottom term of the adjoint formula which requires
# summed contributions from each of the taper bands
for itaper in range(0, ntaper):
taper = np.zeros(self.nlen_f)
taper[0:nlen_t] = tapers[0:nlen_t, itaper]
# Taper synthetics (s_t) and take the derivative (s_tv)
s_t = s * taper[0:nlen_t]
s_tv = np.gradient(s_t, self.dt)
# Apply FFT to tapered measurements to get to freq. domain.
s_tw[:, itaper] = np.fft.fft(s_t, self.nlen_f)[:] * self.dt
s_tvw[:, itaper] = np.fft.fft(s_tv, self.nlen_f)[:] * self.dt
# Calculate bottom term of the adjoint equation
bottom_p[:] = (
bottom_p[:] +
s_tvw[:, itaper] * s_tvw[:, itaper].conjugate()
)
bottom_q[:] = (
bottom_q[:] +
s_tw[:, itaper] * s_tw[:, itaper].conjugate()
)
# Now we generate the adjoint sources using each of the tapers
fp_t = np.zeros(nlen_t)
fq_t = np.zeros(nlen_t)
for itaper in range(0, ntaper):
taper = np.zeros(self.nlen_f)
taper[0: nlen_t] = tapers[0:nlen_t, itaper]
# Calculate the full adjoint terms pj(w), qj(w)
p_w = np.zeros(self.nlen_f, dtype=complex)
q_w = np.zeros(self.nlen_f, dtype=complex)
p_w[nfreq_min:nfreq_max] = (
s_tvw[nfreq_min:nfreq_max, itaper] /
bottom_p[nfreq_min:nfreq_max]
)
q_w[nfreq_min:nfreq_max] = (
-1 * s_tw[nfreq_min:nfreq_max, itaper] /
bottom_q[nfreq_min:nfreq_max]
)
# weight the adjoint terms by the phase + amplitude measurements
p_w *= dtau_mtm * wp_w # phase
q_w *= dlna_mtm * wq_w # amplitude
# inverse FFT of weighted adjoint to get back to the time domain
p_wt = np.fft.ifft(p_w, self.nlen_f).real * 2. / self.dt
q_wt = np.fft.ifft(q_w, self.nlen_f).real * 2. / self.dt
# Taper adjoint term before adding it back to full adj source
fp_t[0:nlen_t] += p_wt[0:nlen_t] * taper[0:nlen_t]
fq_t[0:nlen_t] += q_wt[0:nlen_t] * taper[0:nlen_t]
return fp_t, fq_t
[docs] def calculate_dd_mt_adjsrc(self, s, s_2, tapers, nfreq_min, nfreq_max, df,
dtau_mtm, dlna_mtm, wp_w, wq_w):
"""
Calculate the double difference adjoint source for multitaper
measurement. Almost the same as `calculate_mt_adjsrc` but only addresses
phase misfit and requres a second set of synthetics `s_2` which is
processed in the same way as the first set `s`
:type s: np.array
:param s: synthetic data array
:type s_2: np.array
:param s_2: optional 2nd set of synthetics for double difference
measurements only. This will change the output
:type tapers: np.array
:param tapers: array of DPPS windows shaped (num_taper, nlen_w)
:type nfreq_min: int
:param nfreq_min: minimum frequency for suitable MTM measurement
:type nfreq_max: int
:param nfreq_max: maximum frequency for suitable MTM measurement
:type df: floats
:param df: step length of frequency bins for FFT
:type dtau_mtm: np.array
:param dtau_mtm: phase dependent travel time measurements from mtm
:type dlna_mtm: np.array
:param dlna_mtm: phase dependent amplitude anomaly
:type wp_w: np.array
:param wp_w: phase-misfit error weighted frequency domain taper
:type wq_w: np.array
:param wq_w: amplitude-misfit error weighted frequency domain taper
"""
nlen_t = len(s)
ntaper = len(tapers[0])
# Set up to piece together transfer functions that will be applied to
# the synthetics. Sets up arrays for memory efficiency
s_tw = np.zeros((self.nlen_f, ntaper), dtype=complex)
s_tvw = np.zeros((self.nlen_f, ntaper), dtype=complex)
s_2_tw = np.zeros((self.nlen_f, ntaper), dtype=complex)
s_2_tvw = np.zeros((self.nlen_f, ntaper), dtype=complex)
bottom_p = np.zeros(self.nlen_f, dtype=complex)
bottom_p_2 = np.zeros(self.nlen_f, dtype=complex)
# Construct the bottom term of the adjoint formula which requires
# summed contributions from each of the taper bands
for itaper in range(0, ntaper):
taper = np.zeros(self.nlen_f)
taper[0:nlen_t] = tapers[0:nlen_t, itaper]
# Taper synthetics (s_t) and take the derivative (s_tv)
s_t = s * taper[0:nlen_t]
s_tv = np.gradient(s_t, self.dt)
# Apply FFT to tapered measurements to get to freq. domain.
s_tw[:, itaper] = np.fft.fft(s_t, self.nlen_f)[:] * self.dt
s_tvw[:, itaper] = np.fft.fft(s_tv, self.nlen_f)[:] * self.dt
# Perform same tasks but for second set synthetics
s_2_t = s_2 * taper[0:nlen_t]
s_2_tv = np.gradient(s_2_t, self.dt)
s_2_tw[:, itaper] = np.fft.fft(s_2_t, self.nlen_f)[:] * self.dt
s_2_tvw[:, itaper] = np.fft.fft(s_2_tv,
self.nlen_f)[:] * self.dt
# Calculate bottom term of the adjoint equation
bottom_p[:] = (
bottom_p[:] +
s_tvw[:, itaper] * s_2_tvw[:, itaper].conjugate()
)
bottom_p_2[:] = (
bottom_p_2[:] +
s_2_tvw[:, itaper] * s_tvw[:, itaper].conjugate()
)
# Now we generate the adjoint sources using each of the tapers
fp_t = np.zeros(nlen_t)
fp_2_t = np.zeros(nlen_t)
for itaper in range(0, ntaper):
taper = np.zeros(self.nlen_f)
taper[0: nlen_t] = tapers[0:nlen_t, itaper]
# Calculate the full adjoint terms pj(w), qj(w)
p_w = np.zeros(self.nlen_f, dtype=complex)
p_2_w = np.zeros(self.nlen_f, dtype=complex)
p_w[nfreq_min:nfreq_max] = (
-1. * s_2_tvw[nfreq_min:nfreq_max, itaper] /
bottom_p_2[nfreq_min:nfreq_max]
)
p_2_w[nfreq_min:nfreq_max] = (
1. * s_tvw[nfreq_min:nfreq_max, itaper] /
bottom_p[nfreq_min:nfreq_max]
)
# weight the adjoint terms by the phase measurements
p_w *= dtau_mtm * wp_w # phase
p_2_w *= dtau_mtm * wq_w # amplitude
# inverse FFT of weighted adjoint to get back to the time domain
p_wt = np.fft.ifft(p_w, self.nlen_f).real * 2. / self.dt
p_2_wt = np.fft.ifft(p_2_w, self.nlen_f).real * 2. / self.dt
# Taper adjoint term before adding it back to full adj source
fp_t[0:nlen_t] += p_wt[0:nlen_t] * taper[0:nlen_t]
fp_2_t[0:nlen_t] += p_2_wt[0:nlen_t] * taper[0:nlen_t]
return fp_t, fp_2_t
[docs] def calculate_freq_domain_taper(self, nfreq_min, nfreq_max, df,
dtau_mtm, dlna_mtm, err_dt_cc,
err_dlna_cc, err_dtau_mt, err_dlna_mt):
"""
Calculates frequency domain taper weighted by misfit (either CC or MTM)
.. note::
Frequency-domain tapers are based on adjusted frequency band and
error estimation. They are not one of the filtering processes that
needs to be applied to the adjoint source but rather a frequency
domain weighting function for adjoint source and misfit function.
:type nfreq_min: int
:param nfreq_min: minimum frequency for suitable MTM measurement
:type nfreq_max: int
:param nfreq_max: maximum frequency for suitable MTM measurement
:type df: floats
:param df: step length of frequency bins for FFT
:type dtau_mtm: np.array
:param dtau_mtm: phase dependent travel time measurements from mtm
:type dlna_mtm: np.array
:param dlna_mtm: phase dependent amplitude anomaly
:type err_dt_cc: float
:param err_dt_cc: cross correlation time shift error
:type err_dlna_cc: float
:param err_dlna_cc: cross correlation amplitude anomaly error
:type err_dtau_mt: np.array
:param err_dtau_mt: phase-dependent timeshift error
:type err_dlna_mt: np.array
:param err_dlna_mt: phase-dependent amplitude error
"""
w_taper = np.zeros(self.nlen_f)
win_taper_len = nfreq_max - nfreq_min
win_taper = np.ones(win_taper_len)
# Createsa cosine taper over a range of frequencies in freq. domain
window_taper(win_taper, taper_percentage=1.0, taper_type="cos_p10")
w_taper[nfreq_min: nfreq_max] = win_taper[0:win_taper_len]
# Normalization factor, factor 2 is needed for integration -inf to inf
ffac = 2.0 * df * np.sum(w_taper[nfreq_min: nfreq_max])
logger.debug(f"frequency bound (idx): [{nfreq_min}, {nfreq_max - 1}] "
f"(Hz) [{df * (nfreq_min - 1)}, {df * nfreq_max}]"
)
logger.debug(f"frequency domain taper normalization coeff: {ffac}")
logger.debug(f"frequency domain sampling length df={df}")
if ffac <= 0.0:
logger.warning("frequency band too narrow:")
logger.warning(f"fmin={nfreq_min}, fmax={nfreq_max}, ffac={ffac}")
# Normalized, tapered window in the frequency domain
wp_w = w_taper / ffac
wq_w = w_taper / ffac
# Choose whether to scale by CC error or to by calculated MT errors
if self.config.use_cc_error:
wp_w /= err_dt_cc ** 2
wq_w /= err_dlna_cc ** 2
elif self.config.use_mt_error:
dtau_wtr = (
self.config.water_threshold *
np.sum(np.abs(dtau_mtm[nfreq_min: nfreq_max])) /
(nfreq_max - nfreq_min)
)
dlna_wtr = (
self.config.water_threshold *
np.sum(np.abs(dlna_mtm[nfreq_min: nfreq_max])) /
(nfreq_max - nfreq_min)
)
err_dtau_mt[nfreq_min: nfreq_max] = \
err_dtau_mt[nfreq_min: nfreq_max] + dtau_wtr * \
(err_dtau_mt[nfreq_min: nfreq_max] < dtau_wtr)
err_dlna_mt[nfreq_min: nfreq_max] = \
err_dlna_mt[nfreq_min: nfreq_max] + dlna_wtr * \
(err_dlna_mt[nfreq_min: nfreq_max] < dlna_wtr)
wp_w[nfreq_min: nfreq_max] = (
wp_w[nfreq_min: nfreq_max] /
((err_dtau_mt[nfreq_min: nfreq_max]) ** 2)
)
wq_w[nfreq_min: nfreq_max] = (
wq_w[nfreq_min: nfreq_max] /
((err_dlna_mt[nfreq_min: nfreq_max]) ** 2)
)
return wp_w, wq_w
[docs] def calculate_multitaper(self, d, s, tapers, wvec, nfreq_min, nfreq_max,
cc_tshift, cc_dlna):
"""
Measure phase-dependent time shifts and amplitude anomalies using
the multitaper method
.. note::
Formerly `mt_measure`. Renamed for additional clarity and to match
the CCTM function names
:type d: np.array
:param d: observed data array
:type s: np.array
:param s: synthetic data array
:type tapers: np.array
:param tapers: array of DPPS windows shaped (num_taper, nlen_w)
:type wvec: np.array
:param wvec: angular frequency array generated from Discrete Fourier
Transform sample frequencies
:type nfreq_min: int
:param nfreq_min: minimum frequency for suitable MTM measurement
:type nfreq_max: int
:param nfreq_max: maximum frequency for suitable MTM measurement
:type cc_tshift: float
:param cc_tshift: cross correlation time shift
:type cc_dlna: float
:param cc_dlna: amplitude anomaly from cross correlation
:rtype: tuple of np.array
:return: (phi_w, abs_w, dtau_w, dlna_w);
(frequency dependent phase anomaly,
phase dependent amplitude anomaly,
phase dependent cross-correlation time shift,
phase dependent cross-correlation amplitude anomaly)
"""
# Initialize some constants for convenience
nlen_t = len(d)
ntaper = len(tapers[0])
fnum = int(self.nlen_f / 2 + 1)
# Initialize empty arrays to be filled by FFT calculations
top_tf = np.zeros(self.nlen_f, dtype=complex)
bot_tf = np.zeros(self.nlen_f, dtype=complex)
# Multitaper measurements
for itaper in range(0, ntaper):
taper = np.zeros(nlen_t)
taper[0:nlen_t] = tapers[0:nlen_t, itaper]
# Apply time-domain multi-tapered measurements
d_t = np.zeros(nlen_t, dtype=complex)
s_t = np.zeros(nlen_t, dtype=complex)
d_t[0:nlen_t] = d[0:nlen_t] * taper[0:nlen_t]
s_t[0:nlen_t] = s[0:nlen_t] * taper[0:nlen_t]
d_tw = np.fft.fft(d_t, self.nlen_f) * self.dt
s_tw = np.fft.fft(s_t, self.nlen_f) * self.dt
# Calculate top and bottom of MT transfer function
top_tf[:] = top_tf[:] + d_tw[:] * s_tw[:].conjugate()
bot_tf[:] = bot_tf[:] + s_tw[:] * s_tw[:].conjugate()
# Calculate water level for transfer function
wtr_use = (max(abs(bot_tf[0:fnum])) *
self.config.transfunc_waterlevel ** 2)
# Create transfer function
trans_func = np.zeros(self.nlen_f, dtype=complex)
for i in range(nfreq_min, nfreq_max):
if abs(bot_tf[i]) < wtr_use:
trans_func[i] = top_tf[i] / bot_tf[i]
else:
trans_func[i] = top_tf[i] / (bot_tf[i] + wtr_use)
# Estimate phase and amplitude anomaly from transfer function
phi_w = np.zeros(self.nlen_f)
abs_w = np.zeros(self.nlen_f)
dtau_w = np.zeros(self.nlen_f)
dlna_w = np.zeros(self.nlen_f)
# Calculate the phase anomaly
phi_w[nfreq_min:nfreq_max] = np.arctan2(
trans_func[nfreq_min:nfreq_max].imag,
trans_func[nfreq_min:nfreq_max].real
)
phi_w = process_cycle_skipping(phi_w=phi_w, nfreq_max=nfreq_max,
nfreq_min=nfreq_min, wvec=wvec,
phase_step=self.config.phase_step)
# Calculate amplitude anomaly
abs_w[nfreq_min:nfreq_max] = np.abs(trans_func[nfreq_min:nfreq_max])
# Add the CC measurements to the transfer function
dtau_w[0] = cc_tshift
dtau_w[max(nfreq_min, 1): nfreq_max] = \
- 1.0 / wvec[max(nfreq_min, 1): nfreq_max] * \
phi_w[max(nfreq_min, 1): nfreq_max] + cc_tshift
dlna_w[nfreq_min:nfreq_max] = np.log(
abs_w[nfreq_min:nfreq_max]) + cc_dlna
return phi_w, abs_w, dtau_w, dlna_w
[docs] def calculate_mt_error(self, d, s, tapers, wvec, nfreq_min, nfreq_max,
cc_tshift, cc_dlna, phi_mtm, abs_mtm, dtau_mtm,
dlna_mtm):
"""
Calculate multitaper error with Jackknife MT estimates.
The jackknife estimator of a parameter is found by systematically
leaving out each observation from a dataset and calculating the
parameter estimate over the remaining observations and then aggregating
these calculations.
:type d: np.array
:param d: observed data array
:type s: np.array
:param s: synthetic data array
:type tapers: np.array
:param tapers: array of DPPS windows shaped (num_taper, nlen_w)
:type wvec: np.array
:param wvec: angular frequency array generated from Discrete Fourier
Transform sample frequencies
:type nfreq_min: int
:param nfreq_min: minimum frequency for suitable MTM measurement
:type nfreq_max: int
:param nfreq_max: maximum frequency for suitable MTM measurement
:type cc_tshift: float
:param cc_tshift: cross correlation time shift
:type cc_dlna: float
:param cc_dlna: amplitude anomaly from cross correlation
:type phi_mtm: np.array
:param phi_mtm: frequency dependent phase anomaly
:type abs_mtm: np.array
:param abs_mtm: phase dependent amplitude anomaly
:type dtau_mtm: np.array
:param dtau_mtm: phase dependent cross-correlation time shift
:type dlna_mtm: np.array
:param dlna_mtm: phase dependent cross-correlation amplitude anomaly)
:rtype: tuple of np.array
:return: (err_phi, err_abs, err_dtau, err_dlna),
(error in frequency dependent phase anomaly,
error in phase dependent amplitude anomaly,
error in phase dependent cross-correlation time shift,
error in phase dependent cross-correlation amplitude anomaly)
"""
nlen_t = len(d)
ntaper = len(tapers[0])
logger.debug("Number of tapers used: %d" % ntaper)
# Jacknife MT estimates. Initialize arrays for memory efficiency
phi_mul = np.zeros((self.nlen_f, ntaper))
abs_mul = np.zeros((self.nlen_f, ntaper))
dtau_mul = np.zeros((self.nlen_f, ntaper))
dlna_mul = np.zeros((self.nlen_f, ntaper))
ephi_ave = np.zeros(self.nlen_f)
eabs_ave = np.zeros(self.nlen_f)
edtau_ave = np.zeros(self.nlen_f)
edlna_ave = np.zeros(self.nlen_f)
err_phi = np.zeros(self.nlen_f)
err_abs = np.zeros(self.nlen_f)
err_dtau = np.zeros(self.nlen_f)
err_dlna = np.zeros(self.nlen_f)
# Loop through all tapers
for itaper in range(0, ntaper):
# Delete one taper at a time
tapers_om = np.zeros((nlen_t, ntaper - 1))
tapers_om[0:self.nlen_f, 0:ntaper - 1] = \
np.delete(tapers, itaper, 1)
# FIXME Recalculate MT measurements with deleted taper list
phi_om, abs_om, dtau_om, dlna_om = self.calculate_multitaper(
d=d, s=s, tapers=tapers_om, wvec=wvec, nfreq_min=nfreq_min,
nfreq_max=nfreq_max, cc_tshift=cc_tshift, cc_dlna=cc_dlna
)
phi_mul[0:self.nlen_f, itaper] = phi_om[0:self.nlen_f]
abs_mul[0:self.nlen_f, itaper] = abs_om[0:self.nlen_f]
dtau_mul[0:self.nlen_f, itaper] = dtau_om[0:self.nlen_f]
dlna_mul[0:self.nlen_f, itaper] = dlna_om[0:self.nlen_f]
# Error estimation
ephi_ave[nfreq_min: nfreq_max] = (
ephi_ave[nfreq_min: nfreq_max] +
ntaper * phi_mtm[nfreq_min: nfreq_max] -
(ntaper - 1) * phi_mul[nfreq_min: nfreq_max, itaper]
)
eabs_ave[nfreq_min:nfreq_max] = (
eabs_ave[nfreq_min: nfreq_max] +
ntaper * abs_mtm[nfreq_min: nfreq_max] -
(ntaper - 1) * abs_mul[nfreq_min: nfreq_max, itaper]
)
edtau_ave[nfreq_min: nfreq_max] = (
edtau_ave[nfreq_min: nfreq_max] +
ntaper * dtau_mtm[nfreq_min: nfreq_max] -
(ntaper - 1) * dtau_mul[nfreq_min: nfreq_max, itaper]
)
edlna_ave[nfreq_min: nfreq_max] = (
edlna_ave[nfreq_min: nfreq_max] +
ntaper * dlna_mtm[nfreq_min: nfreq_max] -
(ntaper - 1) * dlna_mul[nfreq_min: nfreq_max, itaper]
)
# Take average over each taper band
ephi_ave /= ntaper
eabs_ave /= ntaper
edtau_ave /= ntaper
edlna_ave /= ntaper
# Calculate deviation
for itaper in range(0, ntaper):
err_phi[nfreq_min:nfreq_max] += \
(phi_mul[nfreq_min: nfreq_max, itaper] -
ephi_ave[nfreq_min: nfreq_max]) ** 2
err_abs[nfreq_min:nfreq_max] += \
(abs_mul[nfreq_min: nfreq_max, itaper] -
eabs_ave[nfreq_min: nfreq_max]) ** 2
err_dtau[nfreq_min:nfreq_max] += \
(dtau_mul[nfreq_min: nfreq_max, itaper] -
edtau_ave[nfreq_min: nfreq_max]) ** 2
err_dlna[nfreq_min:nfreq_max] += \
(dlna_mul[nfreq_min: nfreq_max, itaper] -
edlna_ave[nfreq_min: nfreq_max]) ** 2
# Calculate standard deviation
err_phi[nfreq_min: nfreq_max] = np.sqrt(
err_phi[nfreq_min: nfreq_max] / (ntaper * (ntaper - 1)))
err_abs[nfreq_min: nfreq_max] = np.sqrt(
err_abs[nfreq_min: nfreq_max] / (ntaper * (ntaper - 1)))
err_dtau[nfreq_min: nfreq_max] = np.sqrt(
err_dtau[nfreq_min: nfreq_max] / (ntaper * (ntaper - 1)))
err_dlna[nfreq_min: nfreq_max] = np.sqrt(
err_dlna[nfreq_min: nfreq_max] / (ntaper * (ntaper - 1)))
return err_phi, err_abs, err_dtau, err_dlna
[docs] def calculate_freq_limits(self, df):
"""
Determine if a given window is suitable for multitaper measurements.
If so, finds the maximum frequency range for the measurement using a
spectrum of tapered synthetic waveforms
First check if the window is suitable for mtm measurements, then
find the maximum frequency point for measurement using the spectrum of
tapered synthetics.
.. note::
formerly `frequency_limit`. renamed to be more descriptive. also
split off earlier cycle check from this function into
`check_sufficient_number_of_wavelengths`
:type df: float
:param df: step length of frequency bins for FFT
:rtype: tuple
:return (float, float, bool);
(minimumum frequency, maximum frequency, continue with MTM?)
"""
# Calculate the frequency limits based on FFT of synthetics
fnum = int(self.nlen_f / 2 + 1)
s_spectra = np.fft.fft(self.synthetic.data, self.nlen_f) * self.dt
# Calculate the maximum amplitude of the spectra for the given frequency
ampmax = max(abs(s_spectra[0: fnum]))
i_ampmax = np.argmax(abs(s_spectra[0: fnum]))
# Scale the maximum amplitude by some constant water level
scaled_wl = self.config.water_threshold * ampmax
# Default starting values for min/max freq. bands
ifreq_min = int(1.0 / (self.config.max_period * df)) # default fmin
ifreq_max = int(1.0 / (self.config.min_period * df)) # default fmax
# Get the maximum frequency limit by searching valid frequencies
nfreq_max = fnum - 1
is_search = True
for iw in range(0, fnum):
if iw > i_ampmax:
nfreq_max, is_search = self._search_frequency_limit(
is_search=is_search, index=iw, nfreq_limit=nfreq_max,
spectra=s_spectra, water_threshold=scaled_wl
)
# Make sure `nfreq_max` does not go beyond the Nyquist frequency
nfreq_max = min(nfreq_max, ifreq_max, int(1.0 / (2 * self.dt) / df) - 1)
# Get the minimum frequency limit by searchjing valid frequencies
nfreq_min = 0
is_search = True
for iw in range(fnum - 1, 0, -1):
if iw < i_ampmax:
nfreq_min, is_search = self._search_frequency_limit(
is_search=is_search, index=iw, nfreq_limit=nfreq_min,
spectra=s_spectra, water_threshold=scaled_wl
)
# Limit `nfreq_min` by assuming at least N cycles within the window
nfreq_min = max(
nfreq_min, ifreq_min,
int(self.config.min_cycle_in_window / self.tlen_data / df) - 1
)
# Reject mtm if the chosen frequency band is narrower than quarter of
# the multi-taper bandwidth
half_taper_bandwidth = self.config.mt_nw / (4.0 * self.tlen_data)
chosen_bandwidth = (nfreq_max - nfreq_min) * df
if chosen_bandwidth < half_taper_bandwidth:
logger.debug(f"chosen bandwidth ({chosen_bandwidth}) < "
f"half taper bandwidth ({half_taper_bandwidth})")
nfreq_min = None
nfreq_max = None
is_mtm = False
else:
is_mtm = True
return nfreq_min, nfreq_max, is_mtm
[docs] def prepare_data_for_mtm(self, d, tshift, dlna, window):
"""
Re-window observed data to center on the optimal time shift, and
scale by amplitude anomaly to get best matching waveforms for MTM
:return:
"""
left_sample, right_sample, nlen_w = get_window_info(window, self.dt)
ishift = int(tshift / self.dt) # time shift in samples
left_sample_d = max(left_sample + ishift, 0)
right_sample_d = min(right_sample + ishift, self.nlen_data)
nlen_d = right_sample_d - left_sample_d
if nlen_d == nlen_w:
# TODO: No need to correct `cc_dlna` in multitaper measurements?
d[0:nlen_w] = self.observed.data[left_sample_d:right_sample_d]
d *= np.exp(-dlna)
window_taper(d, taper_percentage=self.config.taper_percentage,
taper_type=self.config.taper_type)
is_mtm = True
# If the shifted time window is now out of bounds of the time series
# we will not be able to use MTM
else:
is_mtm = False
return d, is_mtm
[docs] def check_time_series_acceptability(self, cc_tshift, nlen_w):
"""
Checking acceptability of the time series characteristics for MTM
:type cc_tshift: float
:param cc_tshift: time shift in unit [s]
:type nlen_w: int
:param nlen_w: window length in samples
:rtype: bool
:return: True if time series OK for MTM, False if fall back to CC
"""
# Check length of the time shift w.r.t time step
if abs(cc_tshift) <= self.dt:
logger.info(f"reject MTM: time shift {cc_tshift} <= "
f"dt ({self.dt})")
return False
# Check for sufficient number of wavelengths in window
elif bool(self.config.min_cycle_in_window * self.config.min_period >
nlen_w):
logger.info("reject MTM: too few cycles within time window")
logger.debug(f"min_period: {self.config.min_period:.2f}s; "
f"window length: {nlen_w:.2f}s")
return False
else:
return True
[docs] def check_mtm_time_shift_acceptability(self, nfreq_min, nfreq_max, df,
cc_tshift, dtau_mtm, sigma_dtau_mt):
"""
Check MTM time shift measurements to see if they are within allowable
bounds set by the config. If any of the phases used in MTM do not
meet criteria, we will fall back to CC measurement.
.. note::
formerly `mt_measure_select`, renamed for clarity
:type nfreq_max: int
:param nfreq_max: maximum in frequency domain
:type nfreq_min: int
:param nfreq_min: minimum in frequency domain
:type df: floats
:param df: step length of frequency bins for FFT
:type cc_tshift: float
:param cc_tshift: c.c. time shift
:type dtau_mtm: np.array
:param dtau_mtm: phase dependent travel time measurements from mtm
:type sigma_dtau_mt: np.array
:param sigma_dtau_mt: phase-dependent error of multitaper measurement
:rtype: bool
:return: flag for whether any of the MTM phases failed check
"""
# True unless set False
is_mtm = True
# If any MTM measurements is out of the resonable range, switch to CC
for j in range(nfreq_min, nfreq_max):
# dt larger than 1/dt_fac of the wave period
if np.abs(dtau_mtm[j]) > 1. / (self.config.dt_fac * j * df):
logger.info("reject MTM: `dt` measurements is too large")
is_mtm = False
# Error larger than 1/err_fac of wave period
if sigma_dtau_mt[j] > 1. / (self.config.err_fac * j * df):
logger.debug("reject MTM: `dt` error is too large")
is_mtm = False
# dt larger than the maximum allowable time shift
if np.abs(dtau_mtm[j]) > self.config.dt_max_scale * abs(cc_tshift):
logger.debug("reject MTM: dt is larger than the maximum "
"allowable time shift")
is_mtm = False
return is_mtm
[docs] def rewindow(self, data, left_sample, right_sample, shift):
"""
Align data in a window according to a given time shift. Will not fully
shift if shifted window hits bounds of the data array
:type data: np.array
:param data: full data array to cut with shifted window
:type left_sample: int
:param left_sample: left window border
:type right_sample: int
:param right_sample: right window border
:type shift: int
:param shift: overall time shift in units of samples
"""
nlen_data = len(data)
nlen = right_sample - left_sample
lindex = 0
left_shifted = left_sample + shift
if left_shifted < 0:
logger.warn("Re-windowing due to left shift is out of bounds.")
lindex = -1 * left_shifted
left_shifted = 0
rindex = nlen
right_shifted = right_sample + shift
if right_shifted > nlen_data:
logger.warn("Re-windowing due to right shift is out of bounds.")
rindex = rindex - (right_shifted - nlen_data)
right_shifted = nlen_data
data_shifted = np.zeros(nlen)
data_shifted[lindex:rindex] = data[left_shifted:right_shifted]
return data_shifted, left_shifted, right_shifted
@staticmethod
[docs] def _search_frequency_limit(is_search, index, nfreq_limit, spectra,
water_threshold, c=10):
"""
Search valid frequency range of spectra. If the spectra larger than
10 * `water_threshold` it will trigger the search again, works like the
heating thermostat.
:type is_search: bool
:param is_search: Logic switch
:type index: int
:param index: index of spectra
:type nfreq_limit: int
:param nfreq_limit: index of freqency limit searched
:type spectra: int
:param spectra: spectra of signal
:type water_threshold: float
:param water_threshold: optional triggering value to stop the search,
if not given, defaults to Config value
:type c: int
:param c: constant scaling factor for water threshold.
"""
if abs(spectra[index]) < water_threshold and is_search:
is_search = False
nfreq_limit = index
if abs(spectra[index]) > c * water_threshold and not is_search:
is_search = True
nfreq_limit = index
return nfreq_limit, is_search
[docs]def calculate_adjoint_source(observed, synthetic, config, windows,
observed_2=None, synthetic_2=None, windows_2=None):
"""
Convenience wrapper function for MTM class to match the expected format
of Pyadjoint. Contains the logic for what to return to User.
:type observed: obspy.core.trace.Trace
:param observed: observed waveform to calculate adjoint source
:type synthetic: obspy.core.trace.Trace
:param synthetic: synthetic waveform to calculate adjoint source
:type config: pyadjoint.config.ConfigCCTraveltime
:param config: Config class with parameters to control processing
:type windows: list of tuples
:param windows: [(left, right),...] representing left and right window
borders to be used to calculate misfit and adjoint sources
:type observed_2: obspy.core.trace.Trace
:param observed_2: second observed waveform to calculate adjoint sources
from station pairs
:type synthetic_2: obspy.core.trace.Trace
:param synthetic_2: second synthetic waveform to calculate adjoint sources
from station pairs
:type windows_2: list of tuples
:param windows_2: [(left, right),...] representing left and right window
borders to be tapered in units of seconds since first sample in data
array. Used to window `observed_2` and `synthetic_2`
"""
if config.double_difference:
for val in [observed_2, synthetic_2, windows_2]:
assert val is not None, (
"Double difference measurements require a second set of "
"waveforms and windows (`observed_2`, `synthetic_2`, "
"`windows_2`)"
)
# Standard Multitaper Misfit approach, single waveform set
if config.double_difference is False:
ret_val_p = {}
ret_val_q = {}
# Use the MTM class to generate misfit and adjoint sources
mtm = MultitaperMisfit(observed=observed, synthetic=synthetic,
config=config, windows=windows)
misfit_sum_p, misfit_sum_q, fp, fq, stats = \
mtm.calculate_adjoint_source()
# Append information on the misfit for phase and amplitude
ret_val_p["misfit"] = misfit_sum_p
ret_val_q["misfit"] = misfit_sum_q
# Reverse adjoint source in time w.r.t synthetics
ret_val_p["adjoint_source"] = fp[::-1]
ret_val_q["adjoint_source"] = fq[::-1]
if config.measure_type == "dt":
ret_val = ret_val_p
elif config.measure_type == "am":
ret_val = ret_val_q
ret_val["window_stats"] = stats
# Double difference multitaper misfit, two sets of waveforms
elif config.double_difference is True:
ret_val = {}
# Use the MTM class to generate misfit and adjoint sources
mtm = MultitaperMisfit(observed=observed, synthetic=synthetic,
config=config, windows=windows,
observed_2=observed_2, synthetic_2=synthetic_2,
windows_2=windows_2)
misfit_sum_p, fp, fp_2, stats = mtm.calculate_dd_adjoint_source()
ret_val["misfit"] = misfit_sum_p
ret_val["adjoint_source"] = fp[::-1]
ret_val["adjoint_source_2"] = fp_2[::-1]
ret_val["window_stats"] = stats
else:
raise NotImplementedError
return ret_val