"""
This module contains functions relevant to executing subspace detection
for earthquake catalogs.
We recommend that you read Harris' detailed report on subspace detection
theory which can be found here: https://e-reports-ext.llnl.gov/pdf/335299.pdf
:copyright:
EQcorrscan developers.
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import numpy as np
import logging
import h5py
import getpass
import eqcorrscan
import copy
import scipy
from timeit import default_timer
from obspy import Trace, UTCDateTime, Stream
from obspy.core.event import (
Event, CreationInfo, ResourceIdentifier, Comment, WaveformStreamID, Pick)
from eqcorrscan.utils.clustering import svd
from eqcorrscan.utils import findpeaks, pre_processing, stacking, plotting
from eqcorrscan.core.match_filter import Detection
from eqcorrscan.core.match_filter.helpers import extract_from_stream
from eqcorrscan.utils.plotting import subspace_detector_plot, subspace_fc_plot
Logger = logging.getLogger(__name__)
[docs]
class Detector(object):
"""
Class to serve as the base for subspace detections.
:type name: str
:param name: Name of subspace detector, used for book-keeping
:type sampling_rate: float
:param sampling_rate: Sampling rate in Hz of original waveforms
:type multiplex: bool
:param multiplex: Is this detector multiplexed.
:type stachans: list
:param stachans:
List of tuples of (station, channel) used in detector.
If multiplexed, these must be in the order that multiplexing was done.
:type lowcut: float
:param lowcut: Lowcut filter in Hz
:type highcut: float
:param highcut: Highcut filter in Hz
:type filt_order: int
:param filt_order: Number of corners for filtering
:type data: numpy.ndarray
:param data: The actual detector
:type u: numpy.ndarray
:param u: Full rank U matrix of left (input) singular vectors.
:type sigma: numpy.ndarray
:param sigma: Full rank vector of singular values.
:type v: numpy.ndarray
:param v: Full rank right (output) singular vectors.
:type dimension: int
:param dimension: Dimension of data.
.. warning::
Changing between scipy.linalg.svd solvers (obvious changes between
scipy version 0.18.x and 0.19.0) result in sign changes in svd results.
You should only run a detector created using the same scipy version
as you currently run.
"""
[docs]
def __init__(self, name=None, sampling_rate=None, multiplex=None,
stachans=None, lowcut=None, highcut=None,
filt_order=None, data=None, u=None, sigma=None, v=None,
dimension=None):
self.name = name
self.sampling_rate = sampling_rate
self.multiplex = multiplex
self.stachans = stachans
# self.delays = delays
self.lowcut = lowcut
self.highcut = highcut
self.filt_order = filt_order
self.data = data
self.u = u
self.sigma = sigma
self.v = v
self.dimension = dimension
def __repr__(self):
if self.name:
out = 'Detector: ' + self.name
else:
out = 'Empty Detector object'
return out
def __str__(self):
out = 'Detector object: \n'
for key in ['name', 'sampling_rate', 'multiplex', 'lowcut', 'highcut',
'filt_order', 'dimension']:
if self.__getattribute__(key):
out += ('\t' + key + ': ' + str(self.__getattribute__(key)) +
'\n')
return out
def __eq__(self, other):
if not isinstance(other, Detector):
return False
for key in ['name', 'sampling_rate', 'multiplex', 'lowcut', 'highcut',
'filt_order', 'dimension', 'stachans']:
if not self.__getattribute__(key) == other.__getattribute__(key):
return False
for key in ['data', 'u', 'v', 'sigma']:
list_item = self.__getattribute__(key)
other_list = other.__getattribute__(key)
if not len(list_item) == len(other_list):
return False
for item, other_item in zip(list_item, other_list):
if not np.allclose(np.absolute(item),
np.absolute(other_item), atol=0.001):
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __len__(self):
return len(self.data)
[docs]
def construct(self, streams, lowcut, highcut, filt_order,
sampling_rate, multiplex, name, align, shift_len=0,
reject=0.3, no_missed=True, plot=False):
"""
Construct a subspace detector from a list of streams, full rank.
Subspace detector will be full-rank, further functions can be used \
to select the desired dimensions.
:type streams: list
:param streams:
List of :class:`obspy.core.stream.Stream` to be used to generate
the subspace detector. These should be pre-clustered and aligned.
:type lowcut: float
:param lowcut: Lowcut in Hz, can be None to not apply filter
:type highcut: float
:param highcut: Highcut in Hz, can be None to not apply filter
:type filt_order: int
:param filt_order: Number of corners for filter.
:type sampling_rate: float
:param sampling_rate: Desired sampling rate in Hz
:type multiplex: bool
:param multiplex:
Whether to multiplex the data or not. Data are multiplexed
according to the method of Harris, see the multi function for
details.
:type name: str
:param name: Name of the detector, used for book-keeping.
:type align: bool
:param align:
Whether to align the data or not - needs to be done at some point
:type shift_len: float
:param shift_len: Maximum shift allowed for alignment in seconds.
:type reject: float
:param reject:
Minimum correlation to include traces - only used if align=True.
:type no_missed: bool
:param no_missed:
Reject streams with missed traces, defaults to True. A missing
trace from lots of events will reduce the quality of the subspace
detector if multiplexed. Only used when multi is set to True.
:type plot: bool
:param plot: Whether to plot the alignment stage or not.
.. note::
The detector will be normalized such that the data, before
computing the singular-value decomposition, will have unit energy.
e.g. We divide the amplitudes of the data by the L1 norm of the
data.
.. warning::
EQcorrscan's alignment will attempt to align over the whole data
window given. For long (more than 2s) chunks of data this can give
poor results and you might be better off using the
:func:`eqcorrscan.utils.stacking.align_traces` function externally,
focusing on a smaller window of data. To do this you would align
the data prior to running construct.
"""
self.lowcut = lowcut
self.highcut = highcut
self.filt_order = filt_order
self.sampling_rate = sampling_rate
self.name = name
self.multiplex = multiplex
# Pre-process data
p_streams, stachans = _subspace_process(
streams=copy.deepcopy(streams), lowcut=lowcut, highcut=highcut,
filt_order=filt_order, sampling_rate=sampling_rate,
multiplex=multiplex, align=align, shift_len=shift_len,
reject=reject, plot=plot, no_missed=no_missed)
# Compute the SVD, use the cluster.SVD function
u, sigma, v, svd_stachans = svd(stream_list=p_streams, full=True)
self.stachans = stachans
# self.delays = delays
self.u = u
self.v = v
self.sigma = sigma
self.data = copy.deepcopy(u) # Set the data matrix to be full rank U.
self.dimension = np.inf
return self
[docs]
def partition(self, dimension):
"""
Partition subspace into desired dimension.
:type dimension: int
:param dimension: Maximum dimension to use.
"""
# Take leftmost 'dimension' input basis vectors
for i, channel in enumerate(self.u):
if self.v[i].shape[1] < dimension:
raise IndexError('Channel is max dimension %s'
% self.v[i].shape[1])
self.data[i] = channel[:, 0:dimension]
self.dimension = dimension
return self
[docs]
def energy_capture(self, stachans='all', size=(10, 7), show=False):
"""
Calculate the average percentage energy capture for this subspace.
:return: Percentage energy capture
:rtype: float
"""
if show:
return subspace_fc_plot(detector=self, stachans=stachans,
size=size, show=show)
percent_capture = 0
if np.isinf(self.dimension):
return 100
for channel in self.sigma:
fc = np.sum(channel[0:self.dimension]) / np.sum(channel)
percent_capture += fc
else:
return 100 * (percent_capture / len(self.sigma))
[docs]
def detect(self, st, threshold, trig_int, moveout=0, min_trig=0,
process=True, extract_detections=False, cores=1):
"""
Detect within continuous data using the subspace method.
:type st: obspy.core.stream.Stream
:param st:
Un-processed stream to detect within using the subspace detector.
:type threshold: float
:param threshold: Threshold value for detections between 0-1
:type trig_int: float
:param trig_int: Minimum trigger interval in seconds.
:type moveout: float
:param moveout:
Maximum allowable moveout window for non-multiplexed, network
detection. See note.
:type min_trig: int
:param min_trig:
Minimum number of stations exceeding threshold for non-multiplexed,
network detection. See note.
:type process: bool
:param process:
Whether or not to process the stream according to the parameters
defined by the detector. Default is True, which will process the
data.
:type extract_detections: bool
:param extract_detections:
Whether to extract waveforms for each detection or not, if True
will return detections and streams.
:type cores: int
:param cores: Number of threads to process data with.
:return: list of :class:`eqcorrscan.core.match_filter.Detection`
:rtype: list
.. warning::
Subspace is currently in beta, see note in the subspace tutorial
for information.
.. note::
If running in bulk with detectors that all have the same
parameters then you can pre-process the data and set process to
False. This will speed up this detect function dramatically.
.. warning::
If the detector and stream are multiplexed then they must
contain the same channels and multiplexed in the same order. This
is handled internally when process=True, but if running in bulk
you must take care.
.. note::
Non-multiplexed, network detection. When the detector is
not multiplexed, but there are multiple channels within the
detector, we do not stack the single-channel detection statistics
because we do not have a one-size-fits-all solution for computing
delays for a subspace detector (if you want to implement one, then
please contribute it!). Therefore, these parameters provide a
means for declaring a network coincidence trigger using
single-channel detection statistics, in a similar fashion to the
commonly used network-coincidence trigger with energy detection
statistics.
"""
return _detect(detector=self, st=st, threshold=threshold,
trig_int=trig_int, moveout=moveout, min_trig=min_trig,
process=process, extract_detections=extract_detections,
cores=cores)
[docs]
def write(self, filename):
"""
Write detector to a file - uses HDF5 file format.
Meta-data are stored alongside numpy data arrays. See h5py.org for \
details of the methods.
:type filename: str
:param filename: Filename to save the detector to.
"""
f = h5py.File(filename, "w")
# Must store eqcorrscan version number, username would be useful too.
data_group = f.create_group(name="data")
for i, data in enumerate(self.data):
dset = data_group.create_dataset(name="data_" + str(i),
shape=data.shape,
dtype=data.dtype)
dset[...] = data
data_group.attrs['length'] = len(self.data)
data_group.attrs['name'] = self.name.encode("ascii", "ignore")
data_group.attrs['sampling_rate'] = self.sampling_rate
data_group.attrs['multiplex'] = self.multiplex
data_group.attrs['lowcut'] = self.lowcut
data_group.attrs['highcut'] = self.highcut
data_group.attrs['filt_order'] = self.filt_order
data_group.attrs['dimension'] = self.dimension
data_group.attrs['user'] = getpass.getuser()
data_group.attrs['eqcorrscan_version'] = str(eqcorrscan.__version__)
# Convert station-channel list to something writable
ascii_stachans = ['.'.join(stachan).encode("ascii", "ignore")
for stachan in self.stachans]
stachans = f.create_dataset(name="stachans",
shape=(len(ascii_stachans),),
dtype='S10')
stachans[...] = ascii_stachans
u_group = f.create_group("u")
for i, u in enumerate(self.u):
uset = u_group.create_dataset(name="u_" + str(i),
shape=u.shape, dtype=u.dtype)
uset[...] = u
u_group.attrs['length'] = len(self.u)
sigma_group = f.create_group("sigma")
for i, sigma in enumerate(self.sigma):
sigmaset = sigma_group.create_dataset(name="sigma_" + str(i),
shape=sigma.shape,
dtype=sigma.dtype)
sigmaset[...] = sigma
sigma_group.attrs['length'] = len(self.sigma)
v_group = f.create_group("v")
for i, v in enumerate(self.v):
vset = v_group.create_dataset(name="v_" + str(i),
shape=v.shape, dtype=v.dtype)
vset[...] = v
v_group.attrs['length'] = len(self.v)
f.flush()
f.close()
return self
[docs]
def read(self, filename):
"""
Read detector from a file, must be HDF5 format.
Reads a Detector object from an HDF5 file, usually created by \
eqcorrscan.
:type filename: str
:param filename: Filename to save the detector to.
"""
f = h5py.File(filename, "r")
self.data = []
for i in range(f['data'].attrs['length']):
self.data.append(f['data']['data_' + str(i)][...])
self.u = []
for i in range(f['u'].attrs['length']):
self.u.append(f['u']['u_' + str(i)][...])
self.sigma = []
for i in range(f['sigma'].attrs['length']):
self.sigma.append(f['sigma']['sigma_' + str(i)][...])
self.v = []
for i in range(f['v'].attrs['length']):
self.v.append(f['v']['v_' + str(i)][...])
self.stachans = [tuple(stachan.decode('ascii').split('.'))
for stachan in f['stachans'][...]]
self.dimension = f['data'].attrs['dimension']
self.filt_order = f['data'].attrs['filt_order']
self.highcut = f['data'].attrs['highcut']
self.lowcut = f['data'].attrs['lowcut']
self.multiplex = bool(f['data'].attrs['multiplex'])
self.sampling_rate = f['data'].attrs['sampling_rate']
if isinstance(f['data'].attrs['name'], str):
self.name = f['data'].attrs['name']
else:
self.name = f['data'].attrs['name'].decode('ascii')
return self
[docs]
def plot(self, stachans='all', size=(10, 7), show=True, *args, **kwargs):
"""
Plot the output basis vectors for the detector at the given dimension.
Corresponds to the first n horizontal vectors of the V matrix.
:type stachans: list
:param stachans: list of tuples of station, channel pairs to plot.
:type stachans: list
:param stachans: List of tuples of (station, channel) to use. Can set\
to 'all' to use all the station-channel pairs available. If \
detector is multiplexed, will just plot that.
:type size: tuple
:param size: Figure size.
:type show: bool
:param show: Whether or not to show the figure.
:returns: Figure
:rtype: matplotlib.pyplot.Figure
.. Note::
See :func:`eqcorrscan.utils.plotting.subspace_detector_plot`
for example.
"""
return subspace_detector_plot(detector=self, stachans=stachans,
size=size, show=show, *args, **kwargs)
def _detect(detector, st, threshold, trig_int, moveout=0, min_trig=0,
process=True, extract_detections=False, cores=1):
"""
Detect within continuous data using the subspace method.
Not to be called directly, use the detector.detect method.
:type detector: eqcorrscan.core.subspace.Detector
:param detector: Detector to use.
:type st: obspy.core.stream.Stream
:param st: Un-processed stream to detect within using the subspace \
detector
:type threshold: float
:param threshold: Threshold value for detections between 0-1
:type trig_int: float
:param trig_int: Minimum trigger interval in seconds.
:type moveout: float
:param moveout: Maximum allowable moveout window for non-multiplexed,
network detection. See note.
:type min_trig: int
:param min_trig: Minimum number of stations exceeding threshold for \
non-multiplexed, network detection. See note.
:type process: bool
:param process: Whether or not to process the stream according to the \
parameters defined by the detector. Default is to process the \
data (True).
:type extract_detections: bool
:param extract_detections: Whether to extract waveforms for each \
detection or not, if true will return detections and streams.
:return: list of detections
:rtype: list of eqcorrscan.core.match_filter.Detection
"""
detections = []
# First process the stream
if process:
Logger.info('Processing Stream')
stream, stachans = _subspace_process(
streams=[st.copy()], lowcut=detector.lowcut,
highcut=detector.highcut, filt_order=detector.filt_order,
sampling_rate=detector.sampling_rate, multiplex=detector.multiplex,
stachans=detector.stachans, parallel=True, align=False,
shift_len=None, reject=False, cores=cores)
else:
# Check the sampling rate at the very least
for tr in st:
if not tr.stats.sampling_rate == detector.sampling_rate:
raise ValueError('Sampling rates do not match.')
stream = [st]
stachans = detector.stachans
outtic = default_timer()
# If multiplexed, how many samples do we increment by?
if detector.multiplex:
Nc = len(detector.stachans)
else:
Nc = 1
# Here do all ffts
fft_vars = _do_ffts(detector, stream, Nc)
Logger.info('Computing detection statistics')
Logger.info('Preallocating stats matrix')
stats = np.zeros((len(stream[0]),
(len(stream[0][0]) // Nc) - (fft_vars[4] // Nc) + 1))
for det_freq, data_freq_sq, data_freq, i in zip(fft_vars[0], fft_vars[1],
fft_vars[2],
np.arange(len(stream[0]))):
# Calculate det_statistic in frequency domain
stats[i] = _det_stat_freq(det_freq, data_freq_sq, data_freq,
fft_vars[3], Nc, fft_vars[4], fft_vars[5])
Logger.info('Stats matrix is shape %s' % str(stats[i].shape))
trig_int_samples = detector.sampling_rate * trig_int
Logger.info('Finding peaks')
peaks = []
for i in range(len(stream[0])):
peaks.append(findpeaks.find_peaks2_short(
arr=stats[i], thresh=threshold, trig_int=trig_int_samples))
if not detector.multiplex:
# Conduct network coincidence triggering
peaks = findpeaks.coin_trig(
peaks=peaks, samp_rate=detector.sampling_rate, moveout=moveout,
min_trig=min_trig, stachans=stachans, trig_int=trig_int)
else:
peaks = peaks[0]
if len(peaks) > 0:
for peak in peaks:
detecttime = st[0].stats.starttime + \
(peak[1] / detector.sampling_rate)
rid = ResourceIdentifier(
id=detector.name + '_' + str(detecttime), prefix='smi:local')
ev = Event(resource_id=rid)
cr_i = CreationInfo(
author='EQcorrscan', creation_time=UTCDateTime())
ev.creation_info = cr_i
# All detection info in Comments for lack of a better idea
thresh_str = 'threshold=' + str(threshold)
ccc_str = 'detect_val=' + str(peak[0])
used_chans = 'channels used: ' +\
' '.join([str(pair) for pair in detector.stachans])
ev.comments.append(Comment(text=thresh_str))
ev.comments.append(Comment(text=ccc_str))
ev.comments.append(Comment(text=used_chans))
for stachan in detector.stachans:
tr = st.select(station=stachan[0], channel=stachan[1])
if tr:
net_code = tr[0].stats.network
else:
net_code = ''
pick_tm = detecttime
wv_id = WaveformStreamID(
network_code=net_code, station_code=stachan[0],
channel_code=stachan[1])
ev.picks.append(Pick(time=pick_tm, waveform_id=wv_id))
detections.append(
Detection(template_name=detector.name, detect_time=detecttime,
no_chans=len(detector.stachans), detect_val=peak[0],
threshold=threshold, typeofdet='subspace',
threshold_type='abs', threshold_input=threshold,
chans=detector.stachans, event=ev))
outtoc = default_timer()
Logger.info('Detection took %s seconds' % str(outtoc - outtic))
if extract_detections:
detection_streams = extract_from_stream(st, detections)
return detections, detection_streams
return detections
def _do_ffts(detector, stream, Nc):
"""
Perform ffts on data, detector and denominator boxcar
:type detector: eqcorrscan.core.subspace.Detector
:param detector: Detector object for doing detecting
:type stream: list of obspy.core.stream.Stream
:param stream: List of streams processed according to detector
:type Nc: int
:param Nc: Number of channels in data. 1 for non-multiplexed
:return: list of time-reversed detector(s) in freq domain
:rtype: list
:return: list of squared data stream(s) in freq domain
:rtype: list
:return: list of data stream(s) in freq domain
:return: detector-length boxcar in freq domain
:rtype: numpy.ndarray
:return: length of detector
:rtype: int
:return: length of data
:rtype: int
"""
min_fftlen = int(stream[0][0].data.shape[0] +
detector.data[0].shape[0] - Nc)
fftlen = scipy.fftpack.next_fast_len(min_fftlen)
mplen = stream[0][0].data.shape[0]
ulen = detector.data[0].shape[0]
num_st_fd = [np.fft.rfft(tr.data, n=fftlen)
for tr in stream[0]]
denom_st_fd = [np.fft.rfft(np.square(tr.data), n=fftlen)
for tr in stream[0]]
# Frequency domain of boxcar
w = np.fft.rfft(np.ones(detector.data[0].shape[0]),
n=fftlen)
# This should go into the detector object as in Detex
detector_fd = []
for dat_mat in detector.data:
detector_fd.append(np.array([np.fft.rfft(col[::-1], n=fftlen)
for col in dat_mat.T]))
return detector_fd, denom_st_fd, num_st_fd, w, ulen, mplen
def _det_stat_freq(det_freq, data_freq_sq, data_freq, w, Nc, ulen, mplen):
"""
Compute detection statistic in the frequency domain
:type det_freq: numpy.ndarray
:param det_freq: detector in freq domain
:type data_freq_sq: numpy.ndarray
:param data_freq_sq: squared data in freq domain
:type data_freq: numpy.ndarray
:param data_freq: data in freq domain
:type w: numpy.ndarray
:param w: boxcar in freq domain
:type Nc: int
:param Nc: number of channels in data stream
:type ulen: int
:param ulen: length of detector
:type mplen: int
:param mplen: length of data
:return: Array of detection statistics
:rtype: numpy.ndarray
"""
num_cor = np.multiply(det_freq, data_freq) # Numerator convolution
den_cor = np.multiply(w, data_freq_sq) # Denominator convolution
# Do inverse fft
# First and last Nt - 1 samples are invalid; clip them off
num_ifft = np.real(np.fft.irfft(num_cor))[:, ulen-1:mplen:Nc]
denominator = np.real(np.fft.irfft(den_cor))[ulen-1:mplen:Nc]
# Ratio of projected to envelope energy = det_stat across all channels
result = np.sum(np.square(num_ifft), axis=0) / denominator
return result
def _subspace_process(streams, lowcut, highcut, filt_order, sampling_rate,
multiplex, align, shift_len, reject, no_missed=True,
stachans=None, parallel=False, plot=False, cores=1):
"""
Process stream data, internal function.
:type streams: list
:param streams: List of obspy.core.stream.Stream to be used to \
generate the subspace detector. These should be pre-clustered \
and aligned.
:type lowcut: float
:param lowcut: Lowcut in Hz, can be None to not apply filter
:type highcut: float
:param highcut: Highcut in Hz, can be None to not apply filter
:type filt_order: int
:param filt_order: Number of corners for filter.
:type sampling_rate: float
:param sampling_rate: Desired sampling rate in Hz
:type multiplex: bool
:param multiplex: Whether to multiplex the data or not. Data are \
multiplexed according to the method of Harris, see the multi \
function for details.
:type stachans: list of tuple
:param stachans: list of tuples of (station, channel) to use.
:type align: bool
:param align: Whether to align the data or not - needs to be done \
at some point
:type shift_len: float
:param shift_len: Maximum shift allowed for alignment in seconds.
:type reject: float
:param reject: Minimum correlation for traces, only used if align=True.
:type no_missed: bool
:param: no_missed: Reject streams with missed traces, defaults to True. \
A missing trace from lots of events will reduce the quality of the \
subspace detector if multiplexed. Only used when multi is set to True.
:type plot: bool
:param plot: Passed down to align traces - used to check alignment process.
:return: Processed streams
:rtype: list
:return: Station, channel pairs in order
:rtype: list of tuple
:return: List of delays
:rtype: list
"""
processed_streams = []
if not stachans:
input_stachans = list(set([(tr.stats.station, tr.stats.channel)
for st in streams for tr in st.sort()]))
else:
input_stachans = stachans
input_stachans.sort() # Make sure stations and channels are in order
# Check that all channels are the same length in seconds
first_length = len(streams[0][0].data) /\
streams[0][0].stats.sampling_rate
for st in streams:
for tr in st:
if not len(tr) / tr.stats.sampling_rate == first_length:
raise IOError(
'All channels of all streams must be the same length')
for st in streams:
processed = pre_processing.multi_process(
st=st, lowcut=lowcut, highcut=highcut, filt_order=filt_order,
samp_rate=sampling_rate, seisan_chan_names=False)
# Add in empty channels if needed and sort
processed_stream = Stream()
for stachan in input_stachans:
tr = processed.select(station=stachan[0], channel=stachan[1])
if len(tr) == 0:
tr = Trace(np.zeros(int(first_length * sampling_rate)))
tr.stats.station = stachan[0]
tr.stats.channel = stachan[1]
tr.stats.sampling_rate = sampling_rate
tr.stats.starttime = st[0].stats.starttime
# Do this to make more sensible plots
Logger.warning('Padding stream with zero trace for ' +
'station ' + stachan[0] + '.' + stachan[1])
processed_stream += tr
processed_stream += tr
assert [(tr.stats.station, tr.stats.channel)
for tr in processed_stream] == input_stachans
processed_streams.append(processed_stream)
if no_missed and multiplex:
for tr in processed_stream:
if np.count_nonzero(tr.data) == 0:
processed_streams.remove(processed_stream)
Logger.info('Removed stream with empty trace')
break
if align:
processed_streams = align_design(
design_set=processed_streams, shift_len=shift_len, reject=reject,
multiplex=multiplex, plot=plot, no_missed=no_missed)
output_streams = []
for processed_stream in processed_streams:
if len(processed_stream) == 0:
# If we have removed all of the traces then onwards!
continue
# Need to order the stream according to input_stachans
_st = Stream()
for stachan in input_stachans:
tr = processed_stream.select(
station=stachan[0], channel=stachan[1])
if len(tr) >= 1:
_st += tr[0]
elif multiplex and len(tr) == 0:
raise IndexError(
'Missing data for %s.%s' % (stachan[0], stachan[1]))
if multiplex:
st = multi(stream=_st)
st = Stream(Trace(st))
st[0].stats.station = 'Multi'
st[0].stats.sampling_rate = sampling_rate
else:
st = _st
for tr in st:
# Normalize the data
norm = np.linalg.norm(tr.data)
if not norm == 0:
tr.data /= norm
output_streams.append(st)
return output_streams, input_stachans
[docs]
def read_detector(filename):
"""
Read detector from a filename.
:type filename: str
:param filename: Filename to save the detector to.
:return: Detector object
:rtype: eqcorrscan.core.subspace.Detector
"""
detector = Detector()
detector.read(filename=filename)
return detector
[docs]
def multi(stream):
"""
Internal multiplexer for multiplex_detect.
:type stream: obspy.core.stream.Stream
:param stream: Stream to multiplex
:return: trace of multiplexed data
:rtype: obspy.core.trace.Trace
.. Note: Requires all channels to be the same length.
Maps a standard multiplexed stream of seismic data to a single traces of \
multiplexed data as follows:
Input:
x = [x1, x2, x3, ...]
y = [y1, y2, y3, ...]
z = [z1, z2, z3, ...]
Output:
xyz = [x1, y1, z1, x2, y2, z2, x3, y3, z3, ...]
"""
stack = stream[0].data
for tr in stream[1:]:
stack = np.vstack((stack, tr.data))
multiplex = stack.T.reshape(stack.size, )
return multiplex
def align_design(design_set, shift_len, reject, multiplex, no_missed=True,
plot=False):
"""
Align individual traces within streams of the design set.
Perform before Detector.construct to align traces before computing the \
singular value decomposition.
:type design_set: list
:param design_set: List of obspy.core.stream.Stream to be aligned
:type shift_len: float
:param shift_len: Maximum shift (plus/minus) in seconds.
:type reject: float
:param reject: Minimum correlation for traces, only used if align=True.
:type multiplex: bool
:param multiplex: If you are going to multiplex the data, then there has \
to be data for all channels, so we will pad with zeros, otherwise \
there is no need.
:type no_missed: bool
:param: no_missed: Reject streams with missed traces, defaults to True. \
A missing trace from lots of events will reduce the quality of the \
subspace detector if multiplexed. Only used when multi is set to True.
:type plot: bool
:param plot: Whether to plot the aligned traces as we go or not.
:rtype: list
:return: List of obspy.core.stream.Stream of aligned streams
.. Note:: Assumes only one trace for each channel for each stream in the \
design_set. If more are present will only use the first one.
.. Note:: Will cut all traces to be the same length as required for the \
svd, this length will be the shortest trace length - 2 * shift_len
"""
trace_lengths = [tr.stats.endtime - tr.stats.starttime for st in design_set
for tr in st]
clip_len = min(trace_lengths) - (2 * shift_len)
stachans = list(set([(tr.stats.station, tr.stats.channel)
for st in design_set for tr in st]))
remove_set = []
for stachan in stachans:
trace_list = []
trace_ids = []
for i, st in enumerate(design_set):
tr = st.select(station=stachan[0], channel=stachan[1])
if len(tr) > 0:
trace_list.append(tr[0])
trace_ids.append(i)
if len(tr) > 1:
Logger.warning(
'Too many matches for %s %s' % (stachan[0], stachan[1]))
shift_len_samples = int(shift_len * trace_list[0].stats.sampling_rate)
shifts, cccs = stacking.align_traces(
trace_list=trace_list, shift_len=shift_len_samples, positive=True)
for i, shift in enumerate(shifts):
st = design_set[trace_ids[i]]
start_t = st.select(
station=stachan[0], channel=stachan[1])[0].stats.starttime
start_t += shift_len
start_t -= shift
st.select(
station=stachan[0], channel=stachan[1])[0].trim(
start_t, start_t + clip_len)
if cccs[i] < reject:
if multiplex and not no_missed:
st.select(station=stachan[0],
channel=stachan[1])[0].data = np.zeros(
int(clip_len * (st.select(
station=stachan[0],
channel=stachan[1])[0].stats.sampling_rate) + 1))
Logger.warning('Padding stream with zero trace for ' +
'station ' + stachan[0] + '.' + stachan[1])
Logger.debug('zero padding')
elif multiplex and no_missed:
remove_set.append(st)
Logger.warning('Will remove stream due to low-correlation')
continue
else:
st.remove(st.select(station=stachan[0],
channel=stachan[1])[0])
Logger.warning(
'Removed channel with correlation at %s' % cccs[i])
continue
if no_missed:
for st in remove_set:
if st in design_set:
design_set.remove(st)
if plot:
for stachan in stachans:
trace_list = []
for st in design_set:
tr = st.select(station=stachan[0], channel=stachan[1])
if len(tr) > 0:
trace_list.append(tr[0])
if len(trace_list) > 1:
plotting.multi_trace_plot(traces=trace_list, corr=True,
stack=None, title='.'.join(stachan))
else:
Logger.error(
'No plot for you, only one trace left after rejection')
return design_set
[docs]
def subspace_detect(detectors, stream, threshold, trig_int, moveout=0,
min_trig=1, parallel=True, num_cores=None):
"""
Conduct subspace detection with chosen detectors.
:type detectors: list
:param detectors:
list of :class:`eqcorrscan.core.subspace.Detector` to be used
for detection.
:type stream: obspy.core.stream.Stream
:param stream: Stream to detect within.
:type threshold: float
:param threshold:
Threshold between 0 and 1 for detection, see :func:`Detector.detect`
:type trig_int: float
:param trig_int: Minimum trigger interval in seconds.
:type moveout: float
:param moveout:
Maximum allowable moveout window for non-multiplexed, network
detection. See note.
:type min_trig: int
:param min_trig:
Minimum number of stations exceeding threshold for non-multiplexed,
network detection. See note in :func:`Detector.detect`.
:type parallel: bool
:param parallel: Whether to run detectors in parallel in groups.
:type num_cores: int
:param num_cores:
How many cpu cores to use if parallel==True. If set to None (default),
will use all available cores.
:rtype: list
:return:
List of :class:`eqcorrscan.core.match_filter.Detection` detections.
.. Note::
This will loop through your detectors using their detect method.
If the detectors are multiplexed it will run groups of detectors with
the same channels at the same time.
"""
from multiprocessing import Pool, cpu_count
# First check that detector parameters are the same
parameters = []
detections = []
for detector in detectors:
parameter = (detector.lowcut, detector.highcut,
detector.filt_order, detector.sampling_rate,
detector.multiplex, detector.stachans)
if parameter not in parameters:
parameters.append(parameter)
for parameter_set in parameters:
parameter_detectors = []
for detector in detectors:
det_par = (detector.lowcut, detector.highcut, detector.filt_order,
detector.sampling_rate, detector.multiplex,
detector.stachans)
if det_par == parameter_set:
parameter_detectors.append(detector)
stream, stachans = \
_subspace_process(
streams=[stream.copy()], lowcut=parameter_set[0],
highcut=parameter_set[1], filt_order=parameter_set[2],
sampling_rate=parameter_set[3], multiplex=parameter_set[4],
stachans=parameter_set[5], parallel=True, align=False,
shift_len=None, reject=False)
if not parallel:
for detector in parameter_detectors:
detections += _detect(
detector=detector, st=stream[0], threshold=threshold,
trig_int=trig_int, moveout=moveout, min_trig=min_trig,
process=False, extract_detections=False)
else:
if num_cores:
ncores = num_cores
else:
ncores = cpu_count()
pool = Pool(processes=ncores)
results = [pool.apply_async(
_detect, args=(detector, stream[0], threshold, trig_int,
moveout, min_trig, False, False))
for detector in parameter_detectors]
pool.close()
try:
_detections = [p.get() for p in results]
except KeyboardInterrupt as e: # pragma: no cover
pool.terminate()
raise e
pool.join()
for d in _detections:
if isinstance(d, list):
detections += d
else:
detections.append(d)
return detections