#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Class managing the actual window selection process.
:copyright:
Lion Krischer (krischer@geophysik.uni-muenchen.de), 2014
:license:
GNU General Public License, Version 3
(http://www.gnu.org/copyleft/gpl.html)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
from future import standard_library
import copy
import json
import numpy as np
import obspy
import obspy.geodetics
from obspy.signal.filter import envelope
from obspy.taup import TauPyModel
import os
import warnings
from . import PyflexError, PyflexWarning, utils, logger, Event, Station
from .stalta import sta_lta
from .window import Window
from .interval_scheduling import schedule_weighted_intervals
with standard_library.hooks():
import itertools
[docs]class WindowSelector(object):
"""
Low level window selector internally used by Pyflex.
"""
def __init__(self, observed, synthetic, config, event=None, station=None):
"""
:param observed: The preprocessed, observed waveform.
:type observed: :class:`~obspy.core.trace.Trace` or single component
:class:`~obspy.core.stream.Stream`
:param synthetic: The preprocessed, synthetic waveform.
:type synthetic: :class:`~obspy.core.trace.Trace` or single component
:class:`~obspy.core.stream.Stream`
:param config: Configuration object.
:type config: :class:`~.config.Config`
:param event: The event information. Either a Pyflex Event object,
or an ObsPy Catalog or Event object. If not given this information
will be extracted from the data traces if either originates from a
SAC file.
:type event: A Pyflex :class:`~pyflex.Event` object, an ObsPy
:class:`~obspy.core.event.Catalog` object, or an ObsPy
:class:`~obspy.core.event.Event` object
:param station: The station information. Either a Pyflex Station
object, or an ObsPy Inventory. If not given this information
will be extracted from the data traces if either originates from
a SAC file.
:type station: A Pyflex :class:`~pyflex.Station` object or an ObsPy
:class:`~obspy.core.inventory.Inventory` object
"""
self.observed = observed
self.synthetic = synthetic
self._sanity_checks()
self.event = event
self.station = station
self._parse_event_and_station()
# Copy to not modify the original data.
self.observed = self.observed.copy()
self.synthetic = self.synthetic.copy()
self.observed.data = np.ascontiguousarray(self.observed.data)
self.synthetic.data = np.ascontiguousarray(self.synthetic.data)
self.config = copy.deepcopy(config)
self.config._convert_to_array(npts=self.observed.stats.npts)
self.ttimes = []
self.windows = []
self.taupy_model = TauPyModel(model=self.config.earth_model)
[docs] def load(self, filename):
"""
Load windows from a JSON file and attach them to the current window
selector object.
:param filename: The filename or file-like object to load.
:type filename: str or file-like object
"""
if hasattr(filename, "read"):
obj = json.load(filename)
else:
if os.path.exists(filename):
with open(filename, "r") as fh:
obj = json.load(fh)
else:
obj = json.loads(filename)
if "windows" not in obj:
raise ValueError("Not a valid Windows JSON file.")
windows = obj["windows"]
window_objects = []
for win in windows:
win_obj = Window._load_from_json_content(win)
# Perform a number of checks.
if win_obj.channel_id != self.observed.id:
raise PyflexError(
"The window has channel id '%s' whereas the observed "
"data has channel id '%s'." % (
win_obj.channel_id, self.observed.id))
if abs(win_obj.dt - self.observed.stats.delta) / \
self.observed.stats.delta >= 0.001:
raise PyflexError(
"The sample interval specified in the window is %g whereas"
" the sample interval in the observed data is %g." % (
win_obj.delta, self.observed_stats.delta))
if abs(win_obj.time_of_first_sample -
self.observed.stats.starttime) > \
0.5 * self.observed.stats.delta:
raise PyflexError(
"The window expects the data to start with at %s whereas "
"the observed data starts at %s." % (
win.time_of_first_sample,
self.observed.stats.starttime))
# Collect in temporary list and not directly attach to not
# modify the window object in case a later window raises an
# exception. Either all or nothing.
window_objects.append(win_obj)
self.windows.extend(window_objects)
# Recalculate window criteria.
for win in self.windows:
win._calc_criteria(self.observed.data, self.synthetic.data)
[docs] def write(self, filename):
"""
Write windows to the specified filename or file like object.
Will be written as a custom JSON file.
:param filename: Name or object to write to.
:type filename: str or file-like object
"""
windows = [_i._get_json_content() for _i in self.windows]
info = {"windows": windows}
class WindowEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, obspy.UTCDateTime):
return str(obj)
# Numpy objects also require explicit handling.
elif isinstance(obj, np.int64):
return int(obj)
elif isinstance(obj, np.int32):
return int(obj)
elif isinstance(obj, np.float64):
return float(obj)
elif isinstance(obj, np.float32):
return float(obj)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
if not hasattr(filename, "write"):
with open(filename, "wb") as fh:
j = json.dumps(
info, cls=WindowEncoder, sort_keys=True, indent=4,
separators=(',', ': '))
try:
fh.write(j)
except TypeError:
fh.write(j.encode())
else:
j = json.dumps(
info, cls=WindowEncoder, sort_keys=True, indent=4,
separators=(',', ': '))
try:
filename.write(j)
except TypeError:
filename.write(j.encode())
def _parse_event_and_station(self):
"""
Parse the event and station information.
"""
# Parse the event.
if self.event and not isinstance(self.event, Event):
# It might be an ObsPy event catalog.
if isinstance(self.event, obspy.core.event.Catalog):
if len(self.event) != 1:
raise PyflexError("The event catalog must contain "
"exactly one event.")
self.event = self.event[0]
# It might be an ObsPy event object.
if isinstance(self.event, obspy.core.event.Event):
if not self.event.origins:
raise PyflexError("Event does not contain an origin.")
origin = self.event.preferred_origin() or self.event.origins[0]
self.event = Event(latitude=float(origin.latitude),
longitude=float(origin.longitude),
depth_in_m=float(origin.depth),
origin_time=origin.time)
else:
raise PyflexError("Could not parse the event. Unknown type.")
# Parse the station information if it is an obspy inventory object.
if isinstance(self.station, obspy.core.inventory.Inventory):
net = self.observed.stats.network
sta = self.observed.stats.station
# Workaround for ObsPy 0.9.2 Newer version have a get
# coordinates method...
for network in self.station:
if network.code == net:
break
else:
raise PyflexError("Could not find the network of the "
"observed data in the inventory object.")
for station in network:
if station.code == sta:
break
else:
raise PyflexError("Could not find the station of the "
"observed data in the inventory object.")
self.station = Station(latitude=float(station.latitude),
longitude=float(station.longitude))
# Last resort, if either is not set, and the observed or synthetics
# are sac files, get the information from there.
if not self.station or not self.event:
if hasattr(self.observed.stats, "sac"):
tr = self.observed
ftype = "observed"
elif hasattr(self.synthetic.stats, "sac"):
tr = self.synthetic
ftype = "synthetic"
else:
return
sac = tr.stats.sac
values = (sac.evla, sac.evlo, sac.evdp, sac.stla, sac.stlo, sac.b)
# Invalid value in sac.
if -12345.0 in values:
return
if not self.station:
self.station = Station(latitude=values[3], longitude=values[4])
logger.info("Extracted station information from %s SAC file."
% ftype)
if not self.event:
self.event = Event(
latitude=values[0], longitude=values[1],
depth_in_m=values[2] * 1000.0,
origin_time=self.observed.stats.starttime - values[5])
logger.info("Extracted event information from %s SAC file." %
ftype)
[docs] def calculate_preliminiaries(self):
"""
Calculates the envelope, STA/LTA and the finds the local extrema.
"""
logger.info("Calculating envelope of synthetics.")
self.synthetic_envelope = envelope(self.synthetic.data)
logger.info("Calculating STA/LTA.")
self.stalta = sta_lta(self.synthetic_envelope,
self.observed.stats.delta,
self.config.min_period)
self.peaks, self.troughs = utils.find_local_extrema(self.stalta)
if not len(self.peaks) and len(self.troughs):
return
if self.ttimes:
offset = self.event.origin_time - self.observed.stats.starttime
min_time = self.ttimes[0]["time"] - \
self.config.max_time_before_first_arrival + offset
min_idx = int(min_time / self.observed.stats.delta)
dist_in_km = obspy.geodetics.calc_vincenty_inverse(
self.station.latitude, self.station.longitude,
self.event.latitude, self.event.longitude)[0] / 1000.0
max_time = dist_in_km / self.config.min_surface_wave_velocity + \
offset + self.config.max_period
max_idx = int(max_time / self.observed.stats.delta)
# Reject all peaks and troughs before the minimal allowed start
# time and after the maximum allowed end time.
first_trough, last_trough = self.troughs[0], self.troughs[-1]
self.troughs = self.troughs[(self.troughs >= min_idx) &
(self.troughs <= max_idx)]
# If troughs have been removed, readd them add the boundaries.
if len(self.troughs):
if first_trough != self.troughs[0]:
self.troughs = np.concatenate([
np.array([min_idx], dtype=self.troughs.dtype),
self.troughs])
if last_trough != self.troughs[-1]:
self.troughs = np.concatenate([
self.troughs,
np.array([max_idx], dtype=self.troughs.dtype)])
# Make sure peaks are inside the troughs!
min_trough, max_trough = self.troughs[0], self.troughs[-1]
self.peaks = self.peaks[(self.peaks > min_trough) &
(self.peaks < max_trough)]
[docs] def select_windows(self):
"""
Launch the window selection.
"""
# Fill self.ttimes.
if self.event and self.station:
self.calculate_ttimes()
self.calculate_preliminiaries()
# Perform all window selection steps.
self.initial_window_selection()
# Reject windows based on traveltime if event and station
# information is given. This will also fill self.ttimes.
if self.event and self.station:
self.reject_on_traveltimes()
else:
msg = "No rejection based on traveltime possible. Event and/or " \
"station information is not available."
logger.warning(msg)
warnings.warn(msg, PyflexWarning)
self.determine_signal_and_noise_indices()
if self.config.check_global_data_quality:
self.check_data_quality()
self.reject_windows_based_on_minimum_length()
self.reject_on_minima_water_level()
self.reject_on_prominence_of_central_peak()
self.reject_on_phase_separation()
self.curtail_length_of_windows()
self.remove_duplicates()
# Call once again as curtailing might change the length of some
# windows. Very cheap so can easily be called more than once.
self.reject_windows_based_on_minimum_length()
self.reject_based_on_signal_to_noise_ratio()
self.reject_based_on_data_fit_criteria()
if self.config.resolution_strategy == "interval_scheduling":
self.schedule_weighted_intervals()
elif self.config.resolution_strategy == "merge":
self.merge_windows()
else:
raise NotImplementedError
if self.ttimes:
self.attach_phase_arrivals_to_windows()
return self.windows
[docs] def attach_phase_arrivals_to_windows(self):
"""
Attaches the theoretical phase arrivals to the windows.
"""
offset = self.event.origin_time - self.observed.stats.starttime
for win in self.windows:
left = win.relative_starttime - offset
right = win.relative_endtime - offset
win.phase_arrivals = [
_i for _i in self.ttimes if left <= _i["time"] <= right]
[docs] def merge_windows(self):
"""
Merge overlapping windows. Will also recalculate the data fit criteria.
"""
# Sort by starttime.
self.windows = sorted(self.windows, key=lambda x: x.left)
windows = [self.windows.pop(0)]
for right_win in self.windows:
left_win = windows[-1]
if (left_win.right + 1) < right_win.left:
windows.append(right_win)
continue
left_win.right = right_win.right
self.windows = windows
for win in self.windows:
# Recenter windows
win.center = int(win.left + (win.right - win.left) / 2.0)
# Recalculate criteria.
win._calc_criteria(self.observed.data, self.synthetic.data)
logger.info("Merging windows resulted in %i windows." %
len(self.windows))
[docs] def determine_signal_and_noise_indices(self):
"""
Calculate the time range of the noise and the signal respectively if
not yet specified by the user.
"""
if self.config.noise_end_index is None:
if not self.ttimes:
logger.warning("Cannot calculate the end of the noise as "
"event and/or station information is not given "
"and thus the theoretical arrival times cannot "
"be calculated")
else:
self.config.noise_end_index = \
int(self.ttimes[0]["time"] - self.config.min_period)
if self.config.signal_start_index is None:
self.config.signal_start_index = self.config.noise_end_index
[docs] def reject_based_on_signal_to_noise_ratio(self):
"""
Rejects windows based on their signal to noise amplitude ratio.
"""
if self.config.noise_end_index is None:
logger.warning("Cannot reject windows based on their signal to "
"noise ratio. Please give station and event "
"information or information about the temporal "
"range of the noise.")
return
noise = self.observed.data[self.config.noise_start_index:
self.config.noise_end_index]
if self.config.window_signal_to_noise_type == "amplitude":
noise_amp = np.abs(noise).max()
def filter_window_noise(win):
win_signal = self.observed.data[win.left: win.right]
win_noise_amp = np.abs(win_signal).max() / noise_amp
if win_noise_amp < self.config.s2n_limit[win.center]:
return False
return True
elif self.config.window_signal_to_noise_type == "energy":
noise_energy = np.sum(noise ** 2) / len(noise)
def filter_window_noise(win):
data = self.observed.data[win.left: win.right]
win_energy = np.sum(data ** 2) / len(data)
win_noise_amp = win_energy / noise_energy
if win_noise_amp < self.config.s2n_limit[win.center]:
return False
return True
else:
raise NotImplementedError
self.windows = list(filter(filter_window_noise, self.windows))
logger.info("SN amplitude ratio window rejection retained %i windows" %
len(self.windows))
[docs] def check_data_quality(self):
"""
Checks the data quality by estimating signal to noise ratios.
"""
if self.config.noise_end_index is None:
raise PyflexError(
"Cannot check data quality as the noise end index is not "
"given and station and/or event information is not "
"available so the theoretical arrival times cannot be "
"calculated.")
noise = self.observed.data[self.config.noise_start_index:
self.config.noise_end_index]
signal = self.observed.data[self.config.signal_start_index:
self.config.signal_end_index]
noise_int = np.sum(noise ** 2) / len(noise)
noise_amp = np.abs(noise).max()
signal_int = np.sum(signal ** 2) / len(signal)
signal_amp = np.abs(signal).max()
# Calculate ratios.
snr_int = signal_int / noise_int
snr_amp = signal_amp / noise_amp
if snr_int < self.config.snr_integrate_base:
msg = ("Whole waveform rejected as the integrated signal to "
"noise ratio (%f) is above the threshold (%f)." %
(snr_int, self.config.snr_integrate_base))
logger.warn(msg)
warnings.warn(msg, PyflexWarning)
return False
if snr_amp < self.config.snr_max_base:
msg = ("Whole waveform rejected as the signal to noise amplitude "
"ratio (%f) is above the threshold (%f)." % (
snr_amp, self.config.snr_max_base))
logger.warn(msg)
warnings.warn(msg, PyflexWarning)
return False
logger.info("Global SNR checks passed. Integrated SNR: %f, Amplitude "
"SNR: %f" % (snr_int, snr_amp))
return True
[docs] def calculate_ttimes(self):
"""
Calculate theoretical travel times. Only call if station and event
information is available!
"""
dist_in_deg = obspy.geodetics.locations2degrees(
self.station.latitude, self.station.longitude,
self.event.latitude, self.event.longitude)
tts = self.taupy_model.get_travel_times(
source_depth_in_km=self.event.depth_in_m / 1000.0,
distance_in_degree=dist_in_deg)
self.ttimes = [{"time": _i.time, "name": _i.name} for _i in tts]
logger.info("Calculated travel times.")
[docs] def reject_on_traveltimes(self):
"""
Reject based on traveltimes. Will reject windows containing only
data before a minimum period before the first arrival and windows
only containing data after the minimum allowed surface wave speed.
Only call if station and event information is available!
"""
dist_in_km = obspy.geodetics.calc_vincenty_inverse(
self.station.latitude, self.station.longitude, self.event.latitude,
self.event.longitude)[0] / 1000.0
offset = self.event.origin_time - self.observed.stats.starttime
min_time = self.ttimes[0]["time"] - self.config.min_period + offset
max_time = dist_in_km / self.config.min_surface_wave_velocity + offset
self.windows = [win for win in self.windows
if (win.relative_endtime >= min_time) and
(win.relative_starttime <= max_time)]
logger.info("Rejection based on travel times retained %i windows." %
len(self.windows))
[docs] def initial_window_selection(self):
"""
Find all possible windows. This is equivalent to the setup_M_L_R()
function in flexwin.
"""
for peak in self.peaks:
# only continue if there are available minima on either side
if peak <= self.troughs[0] or peak >= self.troughs[-1]:
continue
# only continue if this maximum is above the water level
if self.stalta[peak] <= self.config.stalta_waterlevel[peak]:
continue
smaller_troughs = self.troughs[self.troughs < peak]
larger_troughs = self.troughs[self.troughs > peak]
for left, right in itertools.product(smaller_troughs,
larger_troughs):
self.windows.append(Window(
left=left, right=right, center=peak,
channel_id=self.observed.id,
time_of_first_sample=self.synthetic.stats.starttime,
dt=self.observed.stats.delta,
min_period=self.config.min_period,
weight_function=self.config.window_weight_fct))
logger.info("Initial window selection yielded %i possible windows." %
len(self.windows))
[docs] def remove_duplicates(self):
"""
Filter to remove duplicate windows based on left and right bounds.
This function will also change the middle to actually be in the
center of the window. This should result in better results for the
following stages as lots of thresholds are evaluated at the center
of a window.
"""
new_windows = {}
for window in self.windows:
tag = (window.left, window.right)
if tag not in new_windows:
window.center = \
int(window.left + (window.right - window.left) / 2.0)
new_windows[tag] = window
self.windows = sorted(new_windows.values(), key=lambda x: x.left)
logger.info("Removing duplicates retains %i windows." % len(
self.windows))
[docs] def schedule_weighted_intervals(self):
"""
Run the weighted interval scheduling.
"""
self.windows = schedule_weighted_intervals(self.windows)
logger.info("Weighted interval schedule optimzation retained %i "
"windows." % len(self.windows))
[docs] def reject_on_minima_water_level(self):
"""
Filter function rejecting windows whose internal minima are below the
water level of the windows peak. This is equivalent to the
reject_on_water_level() function in flexwin.
"""
def filter_window_minima(win):
waterlevel_midpoint = \
self.config.c_0 * self.config.stalta_waterlevel[win.center]
internal_minima = win._get_internal_indices(self.troughs)
return not np.any(self.stalta[internal_minima] <=
waterlevel_midpoint)
self.windows = list(filter(filter_window_minima, self.windows))
logger.info("Water level rejection retained %i windows" %
len(self.windows))
[docs] def reject_on_prominence_of_central_peak(self):
"""
Equivalent to reject_on_prominence() in the original flexwin code.
"""
# The fine tuning constant is often set to 0. Nothing to do in this
# case as all windows will then pass the criteria by definition.
if not self.config.c_2:
return self.windows
def filter_windows_maximum_prominence(win):
smaller_troughs = self.troughs[self.troughs < win.center]
larger_troughs = self.troughs[self.troughs > win.center]
if not len(smaller_troughs) or not len(larger_troughs):
return False
left = self.stalta[smaller_troughs[-1]]
right = self.stalta[larger_troughs[0]]
center = self.stalta[win.center]
delta_left = center - left
delta_right = center - right
if (delta_left < self.config.c_2 * center) or \
(delta_right < self.config.c_2 * center):
return False
return True
self.windows = list(filter(filter_windows_maximum_prominence,
self.windows))
logger.info("Prominence of central peak rejection retained "
"%i windows." % len(self.windows))
[docs] def reject_on_phase_separation(self):
"""
Reject windows based on phase seperation. Equivalent to
reject_on_phase_separation() in the original flexwin code.
"""
def filter_phase_rejection(win):
# Find the lowest minimum within the window.
internal_minima = self.troughs[
(self.troughs >= win.left) & (self.troughs <= win.right)]
stalta_min = self.stalta[internal_minima].min()
# find the height of the central maximum above this minimum value
d_stalta_center = self.stalta[win.center] - stalta_min
# Find all internal maxima.
internal_maxima = self.peaks[
(self.peaks >= win.left) & (self.peaks <= win.right) &
(self.peaks != win.center)]
for max_index in internal_maxima:
# find height of current maximum above lowest minimum
d_stalta = self.stalta[max_index] - stalta_min
# find scaled time between current maximum and central maximum
d_time = abs(win.center - max_index) * \
self.observed.stats.delta / self.config.min_period
# find value of time decay function.
# The paper has a square root in the numinator of the
# exponent as well. Not the case here as it is not the case
# in the original flexwin code.
if (d_time >= self.config.c_3b):
f_time = np.exp(-((d_time - self.config.c_3b) /
self.config.c_3b) ** 2)
else:
f_time = 1.0
# check condition
if d_stalta > (self.config.c_3a * d_stalta_center * f_time):
break
else:
return True
return False
self.windows = list(filter(
filter_phase_rejection, self.windows))
logger.info("Single phase group rejection retained %i windows" %
len(self.windows))
[docs] def curtail_length_of_windows(self):
"""
Curtail the window length. Equivalent to a call to
curtail_window_length() in the original flexwin code.
"""
dt = self.observed.stats.delta
def curtail_window_length(win):
time_decay_left = self.config.min_period * self.config.c_4a / dt
time_decay_right = self.config.min_period * self.config.c_4b / dt
# Find all internal maxima.
internal_maxima = self.peaks[
(self.peaks >= win.left) & (self.peaks <= win.right) &
(self.peaks != win.center)]
if len(internal_maxima) < 2:
return win
i_left = internal_maxima[0]
i_right = internal_maxima[-1]
delta_left = i_left - win.left
delta_right = win.right - i_right
# check condition
if delta_left > time_decay_left:
logger.info("Curtailing left")
win.left = int(i_left - time_decay_left)
if delta_right > time_decay_right:
logger.info("Curtailing right")
win.right = int(i_right + time_decay_right)
return win
self.windows = [curtail_window_length(i) for i in self.windows]
@property
def minimum_window_length(self):
"""
Minimum acceptable window length.
"""
return self.config.c_1 * self.config.min_period / \
self.observed.stats.delta
[docs] def reject_windows_based_on_minimum_length(self):
"""
Reject windows smaller than the minimal window length.
"""
self.windows = list(filter(
lambda x: (x.right - x.left) >= self.minimum_window_length,
self.windows))
logger.info("Rejection based on minimum window length retained %i "
"windows." % len(self.windows))
[docs] def reject_based_on_data_fit_criteria(self):
"""
Rejects windows based on similarity between data and synthetics.
"""
# First calculate the criteria for all remaining windows.
for win in self.windows:
win._calc_criteria(self.observed.data, self.synthetic.data)
def reject_based_on_criteria(win):
tshift_min = self.config.tshift_reference - \
self.config.tshift_acceptance_level[win.center]
tshift_max = self.config.tshift_reference + \
self.config.tshift_acceptance_level[win.center]
dlnA_min = self.config.dlna_reference - \
self.config.dlna_acceptance_level[win.center]
dlnA_max = self.config.dlna_reference + \
self.config.dlna_acceptance_level[win.center]
if not (tshift_min < win.cc_shift *
self.observed.stats.delta < tshift_max):
logger.debug("Window rejected due to time shift: %f" %
win.cc_shift)
return False
if not (dlnA_min < win.dlnA < dlnA_max):
logger.debug("Window rejected due to amplitude fit: %f" %
win.dlnA)
return False
if win.max_cc_value < self.config.cc_acceptance_level[win.center]:
logger.debug("Window rejected due to CC value: %f" %
win.max_cc_value)
return False
return True
self.windows = list(filter(reject_based_on_criteria, self.windows))
logger.info("Rejection based on data fit criteria retained %i windows."
% len(self.windows))
def _sanity_checks(self):
"""
Perform a number of basic sanity checks to assure the data is valid
in a certain sense.
It checks the types of both, the starttime, sampling rate, number of
samples, ...
"""
if not isinstance(self.observed, obspy.Trace):
# Also accept Stream objects.
if isinstance(self.observed, obspy.Stream) and \
len(self.observed) == 1:
self.observed = self.observed[0]
else:
raise PyflexError(
"Observed data must be an ObsPy Trace object.")
if not isinstance(self.synthetic, obspy.Trace):
if isinstance(self.synthetic, obspy.Stream) and \
len(self.synthetic) == 1:
self.synthetic = self.synthetic[0]
else:
raise PyflexError(
"Synthetic data must be an ObsPy Trace object.")
if self.observed.stats.npts != self.synthetic.stats.npts:
raise PyflexError("Observed and synthetic data must have the same "
"number of samples.")
sr1 = self.observed.stats.sampling_rate
sr2 = self.synthetic.stats.sampling_rate
if abs(sr1 - sr2) / sr1 >= 1E-5:
raise PyflexError("Observed and synthetic data must have the same "
"sampling rate.")
# Make sure data and synthetics start within half a sample interval.
if abs(self.observed.stats.starttime -
self.synthetic.stats.starttime) > self.observed.stats.delta * \
0.5:
raise PyflexError("Observed and synthetic data must have the same "
"starttime.")
ptp = sorted([self.observed.data.ptp(), self.synthetic.data.ptp()])
if ptp[1] / ptp[0] >= 5:
warnings.warn("The amplitude difference between data and "
"synthetic is fairly large.")
# Also check the components of the data to avoid silly mistakes of
# users.
if len(set([self.observed.stats.channel[-1].upper(),
self.synthetic.stats.channel[-1].upper()])) != 1:
warnings.warn("The orientation code of synthetic and observed "
"data is not equal.")
[docs] def plot(self, filename=None):
"""
Plot the current state of the windows.
:param filename: If given, the plot will be written to this file,
otherwise the plot will be shown.
:type filename: str
"""
# Lazy imports to not import matplotlib all the time.
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
# Use an offset to have the seconds since the event as the time axis.
if self.event:
offset = self.event.origin_time - self.observed.stats.starttime
else:
offset = 0
plt.figure(figsize=(15, 5))
plt.axes([0.025, 0.92, 0.95, 0.07])
times = self.observed.times() - offset
# Plot theoretical arrivals.
ax = plt.gca()
ax.spines['right'].set_color('none')
ax.spines['left'].set_color('none')
ax.spines['top'].set_color('none')
ax.spines['bottom'].set_color('none')
ax.set_xticks([])
ax.set_yticks([])
ax.set_xlim(times[0], times[-1])
ylim = plt.ylim()
for tt in self.ttimes:
if tt["name"].lower().startswith("p"):
color = "#008c28"
else:
color = "#950000"
# Don't need an offset as the time axis corresponds to time
# since event.
plt.vlines(tt["time"], ylim[0], ylim[1], color=color)
plt.ylim(*ylim)
plt.text(0.01, 0.92, 'Phase Arrivals', horizontalalignment='left',
verticalalignment='top', transform=ax.transAxes)
plt.axes([0.025, 0.51, 0.95, 0.4])
plt.plot(times, self.observed.data, color="black")
plt.plot(times, self.synthetic.data, color="red")
plt.xlim(times[0], times[-1])
ax = plt.gca()
ax.spines['right'].set_color('none')
ax.spines['left'].set_color('none')
ax.spines['top'].set_color('none')
ax.spines['bottom'].set_color('none')
ax.set_xticks([])
ax.set_yticks([])
plt.text(0.01, 0.99, 'Seismograms', horizontalalignment='left',
verticalalignment='top', transform=ax.transAxes)
buf = 0.003 * (plt.xlim()[1] - plt.xlim()[0])
for win in self.windows:
l = win.relative_starttime - offset
r = win.relative_endtime - offset
re = Rectangle((l, plt.ylim()[0]), r - l,
plt.ylim()[1] - plt.ylim()[0], color="blue",
alpha=(win.max_cc_value ** 2) * 0.25)
plt.gca().add_patch(re)
plt.text(l + buf, plt.ylim()[1],
"CC=%.2f\ndT=%.2f\ndA=%.2f" %
(win.max_cc_value,
win.cc_shift * self.observed.stats.delta,
win.dlnA),
horizontalalignment="left",
verticalalignment="top", rotation="vertical",
size="small", multialignment="right")
plt.axes([0.025, 0.1, 0.95, 0.4])
plt.plot(times, self.stalta, color="blue")
plt.plot(times, self.config.stalta_waterlevel, linestyle="dashed",
color="blue")
plt.xlim(times[0], times[-1])
if self.event:
plt.xlabel("Time [s] since event")
else:
plt.xlabel("Time [s]")
ax = plt.gca()
ax.spines['right'].set_color('none')
ax.spines['left'].set_color('none')
ax.spines['top'].set_color('none')
ax.set_yticks([])
ax.xaxis.set_ticks_position('bottom')
plt.text(0.01, 0.99, 'STA/LTA', horizontalalignment='left',
verticalalignment='top', transform=ax.transAxes)
for win in self.windows:
l = win.relative_starttime - offset
r = win.relative_endtime - offset
re = Rectangle((l, plt.ylim()[0]), r - l,
plt.ylim()[1] - plt.ylim()[0], color="blue",
alpha=(win.max_cc_value ** 2) * 0.25)
plt.gca().add_patch(re)
plt.ylim(0, plt.ylim()[1])
if filename is None:
plt.show()
else:
plt.savefig(filename)