Source code for eqcorrscan.utils.stacking
"""
Utility module of the EQcorrscan package to allow for different methods of \
stacking of seismic signal in one place.
:copyright:
EQcorrscan developers.
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import logging
import numpy as np
from scipy.signal import hilbert
from copy import deepcopy
Logger = logging.getLogger(__name__)
[docs]
def linstack(streams, normalize=True):
"""
Compute the linear stack of a series of seismic streams of \
multiplexed data.
:type streams: list
:param streams: List of streams to stack
:type normalize: bool
:param normalize: Normalize traces before stacking, normalizes by the RMS \
amplitude.
:returns: stacked data
:rtype: :class:`obspy.core.stream.Stream`
"""
stack = streams[np.argmax([len(stream) for stream in streams])].copy()
if normalize:
for tr in stack:
tr.data = tr.data / np.sqrt(np.mean(np.square(tr.data)))
tr.data = np.nan_to_num(tr.data)
for i in range(1, len(streams)):
for tr in stack:
matchtr = streams[i].select(station=tr.stats.station,
channel=tr.stats.channel)
if matchtr:
# Normalize the data before stacking
if normalize:
norm = matchtr[0].data /\
np.sqrt(np.mean(np.square(matchtr[0].data)))
norm = np.nan_to_num(norm)
else:
norm = matchtr[0].data
tr.data = np.sum((norm, tr.data), axis=0)
return stack
[docs]
def PWS_stack(streams, weight=2, normalize=True):
"""
Compute the phase weighted stack of a series of streams.
.. note:: It is recommended to align the traces before stacking.
:type streams: list
:param streams: List of :class:`obspy.core.stream.Stream` to stack.
:type weight: float
:param weight: Exponent to the phase stack used for weighting.
:type normalize: bool
:param normalize: Normalize traces before stacking.
:return: Stacked stream.
:rtype: :class:`obspy.core.stream.Stream`
"""
# First get the linear stack which we will weight by the phase stack
Linstack = linstack(streams)
# Compute the instantaneous phase
instaphases = []
Logger.debug("Computing instantaneous phase")
for stream in streams:
instaphase = stream.copy()
for tr in instaphase:
analytic = hilbert(tr.data)
envelope = np.sqrt(np.sum((np.square(analytic),
np.square(tr.data)), axis=0))
tr.data = analytic / envelope
instaphases.append(instaphase)
# Compute the phase stack
Logger.debug("Computing the phase stack")
Phasestack = linstack(instaphases, normalize=normalize)
# Compute the phase-weighted stack
for tr in Phasestack:
tr.data = Linstack.select(station=tr.stats.station)[0].data *\
np.abs(tr.data ** weight)
return Phasestack
[docs]
def align_traces(trace_list, shift_len, master=False, positive=False,
plot=False):
"""
Align traces relative to each other based on their cross-correlation value.
Uses the :func:`eqcorrscan.core.match_filter.normxcorr2` function to find
the optimum shift to align traces relative to a master event. Either uses
a given master to align traces, or uses the trace with the highest MAD
amplitude.
:type trace_list: list
:param trace_list: List of traces to align
:type shift_len: int
:param shift_len: Length to allow shifting within in samples
:type master: obspy.core.trace.Trace
:param master: Master trace to align to, if set to False will align to \
the largest amplitude trace (default)
:type positive: bool
:param positive: Return the maximum positive cross-correlation, or the \
absolute maximum, defaults to False (absolute maximum).
:type plot: bool
:param plot: If true, will plot each trace aligned with the master.
:returns: list of shifts and correlations for best alignment in seconds.
:rtype: list
"""
from eqcorrscan.core.match_filter import normxcorr2
from eqcorrscan.utils.plotting import xcorr_plot
traces = deepcopy(trace_list)
if not master:
# Use trace with largest MAD amplitude as master
master = traces[0]
MAD_master = np.median(np.abs(master.data))
for i in range(1, len(traces)):
if np.median(np.abs(traces[i].data)) > MAD_master:
master = traces[i]
MAD_master = np.median(np.abs(master.data))
else:
Logger.info('Using master given by user')
shifts = []
ccs = []
for i in range(len(traces)):
if not master.stats.sampling_rate == traces[i].stats.sampling_rate:
raise ValueError('Sampling rates not the same')
cc_vec = normxcorr2(template=traces[i].data.
astype(np.float32)[shift_len:-shift_len],
image=master.data.astype(np.float32))
cc_vec = cc_vec[0]
shift = np.abs(cc_vec).argmax()
cc = cc_vec[shift]
if plot:
xcorr_plot(template=traces[i].data.
astype(np.float32)[shift_len:-shift_len],
image=master.data.astype(np.float32), shift=shift,
cc=cc)
shift -= shift_len
if cc < 0 and positive:
cc = cc_vec.max()
shift = cc_vec.argmax() - shift_len
shifts.append(shift / master.stats.sampling_rate)
ccs.append(cc)
return shifts, ccs
if __name__ == "__main__":
import doctest
doctest.testmod()