"""
Functions to enable simple energy-base triggering in a network setting where \
different stations have different noise parameters.
:copyright:
EQcorrscan developers.
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import getpass
import ast
import logging
from multiprocessing import Pool, cpu_count
from obspy.core.util import AttribDict
from obspy import UTCDateTime
from obspy.signal.trigger import trigger_onset, recursive_sta_lta
import eqcorrscan
from eqcorrscan.utils.despike import median_filter
Logger = logging.getLogger(__name__)
[docs]
class TriggerParameters(AttribDict):
"""
Base class for trigger parameters.
>>> from eqcorrscan.utils.trigger import TriggerParameters
>>> defaults = TriggerParameters()
>>> defaults.station = 'TEST'
>>> # Or use dictionaries
>>> defaults['station'] = 'ALF'
>>> defaults = TriggerParameters({'station': 'TEST',
... 'channel': 'SHZ',
... 'sta_len': 0.3,
... 'lta_len': 10.0,
... 'thr_on': 10,
... 'thr_off': 3,
... 'lowcut': 2,
... 'highcut': 20})
>>> print(defaults.station)
TEST
"""
defaults = {'station': '', 'channel': '', 'sta_len': 0.0, 'lta_len': 0.0,
'thr_on': 0.0, 'thr_off': 0.0, 'lowcut': 0.0, 'highcut': 0.0}
[docs]
def __init__(self, header={}):
"""
"""
super(TriggerParameters, self).__init__(header)
def __setitem__(self, key, value):
"""
"""
if key in ['station', 'channel']:
# Ensure these are string type
value = str(value)
elif key in ['lowcut', 'highcut']:
if value:
value = float(value)
else:
value = float(value)
if isinstance(value, dict):
print('Setting dict')
super(TriggerParameters, self).__setitem__(key, AttribDict(value))
else:
super(TriggerParameters, self).__setitem__(key, value)
__setattr__ = __setitem__
def __repr__(self):
return "TriggerParameters()"
def __str__(self):
print_str = ' '.join(["TriggerParameters:",
"\n station:", str(self.station),
"\n channel:", str(self.channel),
"\n sta_len:", str(self.sta_len),
"\n lta_len:", str(self.lta_len),
"\n thr_on:", str(self.thr_on),
"\n thr_off:", str(self.thr_off),
"\n lowcut:", str(self.lowcut),
"\n highcut:", str(self.highcut),
])
return print_str
def write(self, filename, append=True):
"""Write the parameters to a file as a human-readable series of dicts.
:type filename: str
:param filename: File to write to
:type append: bool
:param append: Append to already existing file or over-write.
"""
header = ' '.join(['# User:', getpass.getuser(),
'\n# Creation date:', str(UTCDateTime()),
'\n# EQcorrscan version:',
str(eqcorrscan.__version__),
'\n\n\n'])
if append:
f = open(filename, 'a')
else:
f = open(filename, 'w')
f.write(header)
parameters = self.__dict__
f.write(str(parameters))
f.write('\n')
f.close()
return
[docs]
def read_trigger_parameters(filename):
"""Read the trigger parameters into trigger_parameter classes.
:type filename: str
:param filename: Parameter file
:returns: List of :class:`eqcorrscan.utils.trigger.TriggerParameters`
:rtype: list
.. rubric:: Example
>>> from eqcorrscan.utils.trigger import read_trigger_parameters
>>> parameters = read_trigger_parameters('parameters') # doctest: +SKIP
"""
parameters = []
f = open(filename, 'r')
print('Reading parameters with the following header:')
for line in f:
if line[0] == '#':
print(line.rstrip('\n').lstrip('\n'))
else:
parameter_dict = ast.literal_eval(line)
# convert the dictionary to the class
trig_par = TriggerParameters(parameter_dict)
parameters.append(trig_par)
f.close()
return parameters
def _channel_loop(tr, parameters, max_trigger_length=60,
despike=False):
"""
Internal loop for parellel processing.
:type tr: obspy.core.trace
:param tr: Trace to look for triggers in.
:type parameters: list
:param parameters: List of TriggerParameter class for trace.
:type max_trigger_length: float
:type despike: bool
:return: trigger
:rtype: list
"""
for par in parameters:
if par['station'] == tr.stats.station and \
par['channel'] == tr.stats.channel:
parameter = par
break
else:
Logger.warning(
'No parameters set for station ' + str(tr.stats.station))
return []
triggers = []
Logger.debug(tr)
tr.detrend('simple')
if despike:
median_filter(tr)
if parameter['lowcut'] and parameter['highcut']:
tr.filter('bandpass', freqmin=parameter['lowcut'],
freqmax=parameter['highcut'])
elif parameter['lowcut']:
tr.filter('highpass', freq=parameter['lowcut'])
elif parameter['highcut']:
tr.filter('lowpass', freq=parameter['highcut'])
# find triggers for each channel using recursive_sta_lta
df = tr.stats.sampling_rate
cft = recursive_sta_lta(tr.data, int(parameter['sta_len'] * df),
int(parameter['lta_len'] * df))
if max_trigger_length:
trig_args = {'max_len_delete': True}
trig_args['max_len'] = int(max_trigger_length *
df + 0.5)
tmp_trigs = trigger_onset(cft, float(parameter['thr_on']),
float(parameter['thr_off']),
**trig_args)
for on, off in tmp_trigs:
cft_peak = tr.data[on:off].max()
cft_std = tr.data[on:off].std()
on = tr.stats.starttime + \
float(on) / tr.stats.sampling_rate
off = tr.stats.starttime + \
float(off) / tr.stats.sampling_rate
triggers.append((on.timestamp, off.timestamp,
tr.id, cft_peak,
cft_std))
return triggers
[docs]
def network_trigger(st, parameters, thr_coincidence_sum, moveout,
max_trigger_length=60, despike=True, parallel=True):
"""
Main function to compute triggers for a network of stations.
Computes single-channel characteristic functions using given parameters,
then combines these to find network triggers on a number of stations
within a set moveout window.
:type st: obspy.core.stream.Stream
:param st: Stream to compute detections within
:type parameters: list
:param parameters: List of parameter class
:type thr_coincidence_sum: int
:param thr_coincidence_sum:
Minimum number of stations required to raise a network trigger.
:type moveout: float
:param moveout: Window to find triggers within in the network detection \
stage.
:type max_trigger_length: float
:param max_trigger_length: Maximum trigger length in seconds, used to \
remove long triggers - can set to False to not use.
:type despike: bool
:param despike: Whether to apply simple despiking routine or not
:type parallel: bool
:param parallel: Whether to run in parallel or not
:returns: List of triggers
:rtype: list
.. rubric:: Example
>>> from obspy import read
>>> from eqcorrscan.utils.trigger import TriggerParameters, network_trigger
>>> st = read("https://examples.obspy.org/" +
... "example_local_earthquake_3stations.mseed")
>>> parameters = []
>>> for tr in st:
... parameters.append(TriggerParameters({'station': tr.stats.station,
... 'channel': tr.stats.channel,
... 'sta_len': 0.5,
... 'lta_len': 10.0,
... 'thr_on': 10.0,
... 'thr_off': 3.0,
... 'lowcut': 2.0,
... 'highcut': 15.0}))
>>> triggers = network_trigger(st=st, parameters=parameters,
... thr_coincidence_sum=5, moveout=30,
... max_trigger_length=60, despike=False)
>>> print(len(triggers))
1
"""
triggers = []
trace_ids = [tr.id for tr in st]
trace_ids = dict.fromkeys(trace_ids, 1)
if not parallel:
# Don't run in parallel
for tr in st:
triggers += _channel_loop(
tr=tr, parameters=parameters,
max_trigger_length=max_trigger_length, despike=despike)
else:
# Needs to be pickleable
parameters = [par.__dict__ for par in parameters]
pool = Pool(processes=cpu_count())
results = [pool.apply_async(_channel_loop,
args=(tr, parameters, max_trigger_length,
despike))
for tr in st]
pool.close()
triggers = [p.get() for p in results]
pool.join()
triggers = [item for sublist in triggers for item in sublist]
triggers.sort()
Logger.info('Looking for coincidence triggers ...')
# the coincidence triggering and coincidence sum computation
coincidence_triggers = []
last_off_time = 0.0
while triggers:
# remove first trigger from list and look for overlaps
on, off, tr_id, cft_peak, cft_std = triggers.pop(0)
event = {}
event['time'] = UTCDateTime(on)
event['stations'] = [tr_id.split(".")[1]]
event['trace_ids'] = [tr_id]
event['coincidence_sum'] = trace_ids[tr_id]
# compile the list of stations that overlap with the current trigger
for trigger in triggers:
tmp_on, tmp_off, tmp_tr_id, tmp_cft_peak, tmp_cft_std = trigger
# skip retriggering of already present station in current
# coincidence trigger
if tmp_tr_id in event['trace_ids']:
continue
# check for overlapping trigger
if tmp_on <= off + moveout:
event['stations'].append(tmp_tr_id.split(".")[1])
event['trace_ids'].append(tmp_tr_id)
event['coincidence_sum'] += trace_ids[tmp_tr_id]
# allow sets of triggers that overlap only on subsets of all
# stations (e.g. A overlaps with B and B overlaps w/ C => ABC)
off = max(off, tmp_off)
# break if there is a gap in between the two triggers
else:
break
# skip if coincidence sum threshold is not met
if event['coincidence_sum'] < thr_coincidence_sum:
continue
# skip coincidence trigger if it is just a subset of the previous
# (determined by a shared off-time, this is a bit sloppy)
if off == last_off_time:
continue
event['duration'] = off - on
coincidence_triggers.append(event)
last_off_time = off
Logger.debug('Coincidence triggers : {0}'.format(coincidence_triggers))
Logger.info('Found %s Coincidence triggers' % len(coincidence_triggers))
return coincidence_triggers
if __name__ == '__main__':
import doctest
doctest.testmod()