"""
Functions for network matched-filter detection of seismic data.
Designed to cross-correlate templates generated by template_gen function
with data and output the detections.
:copyright:
EQcorrscan developers.
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import copy
import getpass
import glob
import os
import pickle
import shutil
import tarfile
import tempfile
import traceback
import uuid
import logging
from multiprocessing import Process, Queue, cpu_count
from queue import Empty
from obspy import Catalog, Stream, read, read_events
from obspy.core.event import Comment, CreationInfo
from eqcorrscan.core import template_gen
from eqcorrscan.core.match_filter.template import (
Template, quick_group_templates)
from eqcorrscan.core.match_filter.party import Party
from eqcorrscan.core.match_filter.family import Family
from eqcorrscan.core.match_filter.helpers import (
_safemembers, _par_read, get_waveform_client,
_remove_duplicates, _moveout)
from eqcorrscan.core.match_filter.helpers.tribe import _wildcard_fill
from eqcorrscan.utils.pre_processing import (
_quick_copy_stream, _prep_data_for_correlation)
Logger = logging.getLogger(__name__)
[docs]
class Tribe(object):
"""
Holder for multiple templates.
:type templates: List of Template
:param templates: The templates within the Tribe.
"""
_timeout = 1
[docs]
def __init__(self, templates=None):
self.templates = []
if isinstance(templates, Template):
templates = [templates]
if templates:
self.templates.extend(templates)
# Managers for Processes and Queues to be killed on errors
self._processes = dict()
self._queues = dict()
# Assign unique ids
self.__unique_ids()
def __repr__(self):
"""
Print information about the tribe.
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='a')])
>>> print(tribe)
Tribe of 1 templates
"""
return 'Tribe of %i templates' % self.__len__()
def __add__(self, other):
"""
Add two Tribes or a Tribe and a Template together. '+'
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='a')])
>>> tribe_ab = tribe + Tribe(templates=[Template(name='b')])
>>> print(tribe_ab)
Tribe of 2 templates
>>> tribe_abc = tribe_ab + Template(name='c')
>>> print(tribe_abc)
Tribe of 3 templates
"""
return self.copy().__iadd__(other)
def __iadd__(self, other):
"""
Add in place: '+='
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='a')])
>>> tribe += Tribe(templates=[Template(name='b')])
>>> print(tribe)
Tribe of 2 templates
>>> tribe += Template(name='c')
>>> print(tribe)
Tribe of 3 templates
"""
assert isinstance(other, (Tribe, Template)), \
"Must be either Template or Tribe"
self.__unique_ids(other)
if isinstance(other, Tribe):
self.templates += other.templates
elif isinstance(other, Template):
self.templates.append(other)
return self
def __eq__(self, other):
"""
Test for equality. Rich comparison operator '=='
.. rubric:: Example
>>> tribe_a = Tribe(templates=[Template(name='a')])
>>> tribe_b = Tribe(templates=[Template(name='b')])
>>> tribe_a == tribe_b
False
>>> tribe_a == tribe_a
True
"""
if self.sort().templates != other.sort().templates:
return False
return True
def __ne__(self, other):
"""
Test for inequality. Rich comparison operator '!='
.. rubric:: Example
>>> tribe_a = Tribe(templates=[Template(name='a')])
>>> tribe_b = Tribe(templates=[Template(name='b')])
>>> tribe_a != tribe_b
True
>>> tribe_a != tribe_a
False
"""
return not self.__eq__(other)
def __len__(self):
"""
Number of Templates in Tribe. len(tribe)
.. rubric:: Example
>>> tribe_a = Tribe(templates=[Template(name='a')])
>>> len(tribe_a)
1
"""
return len(self.templates)
def __iter__(self):
"""
Iterator for the Tribe.
"""
return list(self.templates).__iter__()
def __getitem__(self, index):
"""
Support slicing to get Templates from Tribe.
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='a'), Template(name='b'),
... Template(name='c')])
>>> tribe[1] # doctest: +NORMALIZE_WHITESPACE
Template b:
0 channels;
lowcut: None Hz;
highcut: None Hz;
sampling rate None Hz;
filter order: None;
process length: None s
>>> tribe[0:2]
Tribe of 2 templates
>>> tribe["d"]
[]
"""
if isinstance(index, slice):
return self.__class__(templates=self.templates.__getitem__(index))
elif isinstance(index, int):
return self.templates.__getitem__(index)
else:
_index = [i for i, t in enumerate(self.templates)
if t.name == index]
try:
return self.templates.__getitem__(_index[0])
except IndexError:
Logger.warning('Template: %s not in tribe' % index)
return []
def __unique_ids(self, other=None):
""" Check that template names are unique. """
template_names = [t.name for t in self.templates]
if other:
assert isinstance(other, (Template, Tribe)), (
"Can only test against tribes or templates")
if isinstance(other, Template):
template_names.append(other.name)
else:
template_names.extend([t.name for t in other])
unique_names = set(template_names)
if len(unique_names) < len(template_names):
non_unique_names = [name for name in unique_names
if template_names.count(name) > 1]
raise NotImplementedError(
"Multiple templates found with the same name. Template names "
"must be unique. Non-unique templates: "
f"{', '.join(non_unique_names)}")
return
@property
def _stream_dir(self):
""" Location for temporary streams """
return f".streams_{os.getpid()}"
[docs]
def sort(self):
"""
Sort the tribe, sorts by template name.
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='c'), Template(name='b'),
... Template(name='a')])
>>> tribe.sort()
Tribe of 3 templates
>>> tribe[0] # doctest: +NORMALIZE_WHITESPACE
Template a:
0 channels;
lowcut: None Hz;
highcut: None Hz;
sampling rate None Hz;
filter order: None;
process length: None s
"""
self.templates = sorted(self.templates, key=lambda x: x.name)
return self
[docs]
def select(self, template_name):
"""
Select a particular template from the tribe.
:type template_name: str
:param template_name: Template name to look-up
:return: Template
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='c'), Template(name='b'),
... Template(name='a')])
>>> tribe.select('b') # doctest: +NORMALIZE_WHITESPACE
Template b:
0 channels;
lowcut: None Hz;
highcut: None Hz;
sampling rate None Hz;
filter order: None;
process length: None s
"""
return [t for t in self.templates if t.name == template_name][0]
[docs]
def remove(self, template):
"""
Remove a template from the tribe.
:type template: :class:`eqcorrscan.core.match_filter.Template`
:param template: Template to remove from tribe
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='c'), Template(name='b'),
... Template(name='a')])
>>> tribe.remove(tribe.templates[0])
Tribe of 2 templates
"""
self.templates = [t for t in self.templates if t != template]
return self
[docs]
def copy(self):
"""
Copy the Tribe.
.. rubric:: Example
>>> tribe_a = Tribe(templates=[Template(name='a')])
>>> tribe_b = tribe_a.copy()
>>> tribe_a == tribe_b
True
"""
# We can't copy processes, so we need to just copy the templates
other = Tribe(copy.deepcopy(self.templates))
return other
[docs]
def write(self, filename, compress=True, catalog_format="QUAKEML"):
"""
Write the tribe to a file using tar archive formatting.
:type filename: str
:param filename:
Filename to write to, if it exists it will be appended to.
:type compress: bool
:param compress:
Whether to compress the tar archive or not, if False then will
just be files in a folder.
:type catalog_format: str
:param catalog_format:
What format to write the detection-catalog with. Only Nordic,
SC3ML, QUAKEML are supported. Note that not all information is
written for all formats (QUAKEML is the most complete, but is
slow for IO).
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='c', st=read())])
>>> tribe.write('test_tribe')
Tribe of 1 templates
>>> tribe.write(
... "this_wont_work.bob",
... catalog_format="BOB") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
TypeError: BOB is not supported
"""
from eqcorrscan.core.match_filter import CAT_EXT_MAP
if catalog_format not in CAT_EXT_MAP.keys():
raise TypeError("{0} is not supported".format(catalog_format))
dirname, ext = os.path.splitext(filename)
if not os.path.isdir(dirname):
os.makedirs(dirname)
self._par_write(dirname)
tribe_cat = Catalog()
for t in self.templates:
if t.event is not None:
# Check that the name in the comment matches the template name
for comment in t.event.comments:
if not comment.text:
comment.text = "eqcorrscan_template_{0}".format(t.name)
elif comment.text.startswith("eqcorrscan_template_"):
comment.text = "eqcorrscan_template_{0}".format(t.name)
tribe_cat.append(t.event)
if len(tribe_cat) > 0:
tribe_cat.write(
os.path.join(dirname, 'tribe_cat.{0}'.format(
CAT_EXT_MAP[catalog_format])), format=catalog_format)
for template in self.templates:
template.st.write(
os.path.join(dirname, '{0}.ms'.format(template.name)),
format='MSEED')
if compress:
if not filename.endswith(".tgz"):
Logger.info("Appending '.tgz' to filename.")
filename += ".tgz"
with tarfile.open(filename, "w:gz") as tar:
tar.add(dirname, arcname=os.path.basename(dirname))
shutil.rmtree(dirname)
return self
def _par_write(self, dirname):
"""
Internal write function to write a formatted parameter file.
:type dirname: str
:param dirname: Directory to write the parameter file to.
"""
filename = dirname + '/' + 'template_parameters.csv'
with open(filename, 'w') as parfile:
for template in self.templates:
for key in template.__dict__.keys():
if key not in ['st', 'event']:
parfile.write(key + ': ' +
str(template.__dict__[key]) + ', ')
parfile.write('\n')
return self
def _temporary_template_db(self, template_dir: str = None) -> dict:
"""
Write a temporary template database of pickled templates to disk.
:param template_dir:
Directory to write to - if None will make a temporary directory.
"""
# We use template names for filenames - check that these are unique
self.__unique_ids()
# Make sure that the template directory exists, or make a tempdir
if template_dir:
if not os.path.isdir(template_dir):
os.makedirs(template_dir)
else:
template_dir = tempfile.mkdtemp()
template_files = dict()
for template in self.templates:
t_file = os.path.join(template_dir, f"{template.name}.pkl")
with open(t_file, "wb") as f:
pickle.dump(template, f)
template_files.update({template.name: t_file})
return template_files
[docs]
def read(self, filename):
"""
Read a tribe of templates from a tar formatted file.
:type filename: str
:param filename: File to read templates from.
.. rubric:: Example
>>> tribe = Tribe(templates=[Template(name='c', st=read())])
>>> tribe.write('test_tribe')
Tribe of 1 templates
>>> tribe_back = Tribe().read('test_tribe.tgz')
>>> tribe_back == tribe
True
>>> # This can also read pickled templates
>>> import pickle
>>> with open("test_tribe.pkl", "wb") as f:
... pickle.dump(tribe, f)
>>> tribe_back = Tribe().read("test_tribe.pkl")
>>> tribe_back == tribe
True
"""
if filename.endswith(".pkl"):
with open(filename, "rb") as f:
self.__iadd__(pickle.load(f))
return self
with tarfile.open(filename, "r:*") as arc:
temp_dir = tempfile.mkdtemp()
arc.extractall(path=temp_dir, members=_safemembers(arc))
tribe_dir = glob.glob(temp_dir + os.sep + '*')[0]
self._read_from_folder(dirname=tribe_dir)
shutil.rmtree(temp_dir)
# Assign unique ids
self.__unique_ids()
return self
def _read_from_folder(self, dirname):
"""
Internal folder reader.
:type dirname: str
:param dirname: Folder to read from.
"""
templates = _par_read(dirname=dirname, compressed=False)
t_files = glob.glob(dirname + os.sep + '*.ms')
tribe_cat_file = glob.glob(os.path.join(dirname, "tribe_cat.*"))
if len(tribe_cat_file) != 0:
tribe_cat = read_events(tribe_cat_file[0])
else:
tribe_cat = Catalog()
previous_template_names = [t.name for t in self.templates]
for template in templates:
if template.name in previous_template_names:
# Don't read in for templates that we already have.
continue
for event in tribe_cat:
for comment in event.comments:
if comment.text == 'eqcorrscan_template_' + template.name:
template.event = event
t_file = [t for t in t_files
if t.split(os.sep)[-1] == template.name + '.ms']
if len(t_file) == 0:
Logger.error('No waveform for template: ' + template.name)
continue
elif len(t_file) > 1:
Logger.warning('Multiple waveforms found, using: ' + t_file[0])
template.st = read(t_file[0])
# Remove templates that do not have streams
templates = [t for t in templates if t.st is not None]
self.templates.extend(templates)
return
[docs]
def construct(self, method, lowcut, highcut, samp_rate, filt_order,
length, prepick, swin="all", process_len=86400,
all_horiz=False, delayed=True, plot=False, plotdir=None,
min_snr=None, parallel=False, num_cores=False,
skip_short_chans=False, save_progress=False, **kwargs):
"""
Generate a Tribe of Templates.
:type method: str
:param method:
Method of Tribe generation. Possible options are: `from_client`,
`from_meta_file`. See below on the additional required arguments
for each method.
:type lowcut: float
:param lowcut:
Low cut (Hz), if set to None will not apply a lowcut
:type highcut: float
:param highcut:
High cut (Hz), if set to None will not apply a highcut.
:type samp_rate: float
:param samp_rate:
New sampling rate in Hz.
:type filt_order: int
:param filt_order:
Filter level (number of corners).
:type length: float
:param length: Length of template waveform in seconds.
:type prepick: float
:param prepick: Pre-pick time in seconds
:type swin: str
:param swin:
P, S, P_all, S_all or all, defaults to all: see note in
:func:`eqcorrscan.core.template_gen.template_gen`
:type process_len: int
:param process_len: Length of data in seconds to download and process.
:type all_horiz: bool
:param all_horiz:
To use both horizontal channels even if there is only a pick on
one of them. Defaults to False.
:type delayed: bool
:param delayed: If True, each channel will begin relative to it's own
pick-time, if set to False, each channel will begin at the same
time.
:type plot: bool
:param plot: Plot templates or not.
:type plotdir: str
:param plotdir:
The path to save plots to. If `plotdir=None` (default) then the
figure will be shown on screen.
:type min_snr: float
:param min_snr:
Minimum signal-to-noise ratio for a channel to be included in the
template, where signal-to-noise ratio is calculated as the ratio
of the maximum amplitude in the template window to the rms
amplitude in the whole window given.
:type parallel: bool
:param parallel: Whether to process data in parallel or not.
:type num_cores: int
:param num_cores:
Number of cores to try and use, if False and parallel=True,
will use either all your cores, or as many traces as in the data
(whichever is smaller).
:type save_progress: bool
:param save_progress:
Whether to save the resulting template set at every data step or
not. Useful for long-running processes.
:type skip_short_chans: bool
:param skip_short_chans:
Whether to ignore channels that have insufficient length data or
not. Useful when the quality of data is not known, e.g. when
downloading old, possibly triggered data from a datacentre
.. note::
*Method specific arguments:*
- `from_client` requires:
:param str client_id:
string passable by obspy to generate Client, or any object
with a `get_waveforms` method, including a Client instance.
:param `obspy.core.event.Catalog` catalog:
Catalog of events to generate template for
:param float data_pad: Pad length for data-downloads in seconds
- `from_meta_file` requires:
:param str meta_file:
Path to obspy-readable event file, or an obspy Catalog
:param `obspy.core.stream.Stream` st:
Stream containing waveform data for template. Note that
this should be the same length of stream as you will use
for the continuous detection, e.g. if you detect in
day-long files, give this a day-long file!
:param bool process:
Whether to process the data or not, defaults to True.
.. Note::
Method: `from_sac` is not supported by Tribe.construct and must
use Template.construct.
.. Note:: Templates will be named according to their start-time.
"""
templates, catalog, process_lengths = template_gen.template_gen(
method=method, lowcut=lowcut, highcut=highcut, length=length,
filt_order=filt_order, samp_rate=samp_rate, prepick=prepick,
return_event=True, save_progress=save_progress, swin=swin,
process_len=process_len, all_horiz=all_horiz, plotdir=plotdir,
delayed=delayed, plot=plot, min_snr=min_snr, parallel=parallel,
num_cores=num_cores, skip_short_chans=skip_short_chans,
**kwargs)
for template, event, process_len in zip(templates, catalog,
process_lengths):
t = Template()
if len(template) == 0:
Logger.error('Empty Template')
continue
t.st = template
t.name = template.sort(['starttime'])[0]. \
stats.starttime.strftime('%Y_%m_%dt%H_%M_%S')
t.lowcut = lowcut
t.highcut = highcut
t.filt_order = filt_order
t.samp_rate = samp_rate
t.process_length = process_len
t.prepick = prepick
event.comments.append(Comment(
text="eqcorrscan_template_" + t.name,
creation_info=CreationInfo(agency='eqcorrscan',
author=getpass.getuser())))
t.event = event
self.templates.append(t)
return self
def _template_channel_ids(self, wildcards: bool = False):
template_channel_ids = set()
for template in self.templates:
for tr in template.st:
# Cope with missing info and convert to wildcards
net, sta, loc, chan = tr.id.split('.')
if wildcards:
net, sta, loc, chan = _wildcard_fill(
net, sta, loc, chan)
template_channel_ids.add((net, sta, loc, chan))
return template_channel_ids
[docs]
def cluster(self, method, **kwargs):
"""
Cluster the tribe.
Cluster templates within a tribe: returns multiple tribes each of
which could be stacked.
:type method: str
:param method:
Method of stacking, see :mod:`eqcorrscan.utils.clustering`
:return: List of tribes.
"""
from eqcorrscan.utils import clustering
tribes = []
func = getattr(clustering, method)
if method in ['space_cluster', 'space_time_cluster']:
cat = Catalog([t.event for t in self.templates])
groups = func(cat, **kwargs)
for group in groups:
new_tribe = Tribe()
for event in group:
new_tribe.templates.extend([t for t in self.templates
if t.event == event])
tribes.append(new_tribe)
return tribes
[docs]
def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
plotdir=None, daylong=False, parallel_process=True,
xcorr_func=None, concurrency=None, cores=None,
concurrent_processing=False, ignore_length=False,
ignore_bad_data=False, group_size=None, overlap="calculate",
full_peaks=False, save_progress=False, process_cores=None,
pre_processed=False, check_processing=True,
**kwargs):
"""
Detect using a Tribe of templates within a continuous stream.
:type stream: `Queue` or `obspy.core.stream.Stream`
:param stream:
Queue of streams of continuous data to detect within using the
Templates, or just the continuous data itself.
:type threshold: float
:param threshold:
Threshold level, if using `threshold_type='MAD'` then this will be
the multiple of the median absolute deviation.
:type threshold_type: str
:param threshold_type:
The type of threshold to be used, can be MAD, absolute or
av_chan_corr. See Note on thresholding below.
:type trig_int: float
:param trig_int:
Minimum gap between detections from one template in seconds.
If multiple detections occur within trig_int of one-another, the
one with the highest cross-correlation sum will be selected.
:type plot: bool
:param plot: Turn plotting on or off.
:type plotdir: str
:param plotdir:
The path to save plots to. If `plotdir=None` (default) then the
figure will be shown on screen.
:type daylong: bool
:param daylong:
Set to True to use the
:func:`eqcorrscan.utils.pre_processing.dayproc` routine, which
preforms additional checks and is more efficient for day-long data
over other methods.
:type parallel_process: bool
:param parallel_process:
:type xcorr_func: str or callable
:param xcorr_func:
A str of a registered xcorr function or a callable for implementing
a custom xcorr function. For more information see:
:func:`eqcorrscan.utils.correlate.register_array_xcorr`
:type concurrency: str
:param concurrency:
The type of concurrency to apply to the xcorr function. Options are
'multithread', 'multiprocess', 'concurrent'. For more details see
:func:`eqcorrscan.utils.correlate.get_stream_xcorr`
:type cores: int
:param cores: Number of workers for processing and detection.
:type concurrent_processing: bool
:param concurrent_processing:
Whether to process steps in detection workflow concurrently or not.
See https://github.com/eqcorrscan/EQcorrscan/pull/544 for
benchmarking.
:type ignore_length: bool
:param ignore_length:
If using daylong=True, then dayproc will try check that the data
are there for at least 80% of the day, if you don't want this check
(which will raise an error if too much data are missing) then set
ignore_length=True. This is not recommended!
:type ignore_bad_data: bool
:param ignore_bad_data:
If False (default), errors will be raised if data are excessively
gappy or are mostly zeros. If True then no error will be raised,
but an empty trace will be returned (and not used in detection).
:type group_size: int
:param group_size:
Maximum number of templates to run at once, use to reduce memory
consumption, if unset will use all templates.
:type overlap: float
:param overlap:
Either None, "calculate" or a float of number of seconds to
overlap detection streams by. This is to counter the effects of
the delay-and-stack in calculating cross-correlation sums. Setting
overlap = "calculate" will work out the appropriate overlap based
on the maximum lags within templates.
:type full_peaks: bool
:param full_peaks: See `eqcorrscan.utils.findpeaks.find_peaks2_short`
:type save_progress: bool
:param save_progress:
Whether to save the resulting party at every data step or not.
Useful for long-running processes.
:type process_cores: int
:param process_cores:
Number of processes to use for pre-processing (if different to
`cores`).
:type pre_processed: bool
:param pre_processed:
Whether the stream has been pre-processed or not to match the
templates.
:type check_processing: bool
:param check_processing:
Whether to check that all templates were processed the same.
:return:
:class:`eqcorrscan.core.match_filter.Party` of Families of
detections.
.. Note::
When using the "fftw" correlation backend the length of the fft
can be set. See :mod:`eqcorrscan.utils.correlate` for more info.
.. Note::
`stream` must not be pre-processed. If your data contain gaps
you should *NOT* fill those gaps before using this method.
The pre-process functions (called within) will fill the gaps
internally prior to processing, process the data, then re-fill
the gaps with zeros to ensure correlations are not incorrectly
calculated within gaps. If your data have gaps you should pass a
merged stream without the `fill_value` argument
(e.g.: `stream = stream.merge()`).
.. note::
**Data overlap:**
Internally this routine shifts and trims the data according to the
offsets in the template (e.g. if trace 2 starts 2 seconds after
trace 1 in the template then the continuous data will be shifted
by 2 seconds to align peak correlations prior to summing).
Because of this, detections at the start and end of continuous
data streams **may be missed**. The maximum time-period that
might be missing detections is the maximum offset in the template.
To work around this, if you are conducting matched-filter
detections through long-duration continuous data, we suggest
using some overlap (a few seconds, on the order of the maximum
offset in the templates) in the continuous data. You will then
need to post-process the detections (which should be done anyway
to remove duplicates). See below note for how `overlap` argument
affects data internally if `stream` is longer than the processing
length.
.. Note::
If `stream` is longer than processing length, this routine will
ensure that data overlap between loops, which will lead to no
missed detections at data start-stop points (see above note).
This will result in end-time not being strictly
honoured, so detections may occur after the end-time set. This is
because data must be run in the correct process-length.
.. note::
**Thresholding:**
**MAD** threshold is calculated as the:
.. math::
threshold {\\times} (median(abs(cccsum)))
where :math:`cccsum` is the cross-correlation sum for a given
template.
**absolute** threshold is a true absolute threshold based on the
cccsum value.
**av_chan_corr** is based on the mean values of single-channel
cross-correlations assuming all data are present as required for
the template, e.g:
.. math::
av\_chan\_corr\_thresh=threshold \\times (cccsum /
len(template))
where :math:`template` is a single template from the input and the
length is the number of channels within this template.
"""
# Check that template names are unique
self.__unique_ids()
# We should not need to copy the stream, it is copied in chunks by
# _group_process
# Argument handling
if overlap is None:
overlap = 0.0
elif not isinstance(overlap, float) and str(overlap) == "calculate":
overlap = max(
_moveout(template.st) for template in self.templates)
elif not isinstance(overlap, float):
raise NotImplementedError(
"%s is not a recognised overlap type" % str(overlap))
assert overlap < self.templates[0].process_length, (
f"Overlap {overlap} must be less than process length "
f"{self.templates[0].process_length}")
# Copy because we need to muck around with them.
inner_kwargs = copy.copy(kwargs)
plot_format = inner_kwargs.pop("plot_format", "png")
export_cccsums = inner_kwargs.pop('export_cccsums', False)
peak_cores = inner_kwargs.pop('peak_cores',
process_cores) or cpu_count()
groups = inner_kwargs.pop("groups", None)
if peak_cores == 1:
parallel = False
else:
parallel = True
if check_processing:
assert len(quick_group_templates(self.templates)) == 1, (
"Inconsistent template processing parameters found, this is no"
" longer supported. \nSplit your tribe using "
"eqcorrscan.core.match_filter.template.quick_group_templates "
"and re-run for each grouped tribe")
sampling_rate = self.templates[0].samp_rate
# Used for sanity checking seed id overlap
template_ids = set(
tr.id for template in self.templates for tr in template.st)
args = (stream, template_ids, pre_processed, parallel_process,
process_cores, daylong, ignore_length, overlap,
ignore_bad_data, group_size, groups, sampling_rate, threshold,
threshold_type, save_progress, xcorr_func, concurrency, cores,
export_cccsums, parallel, peak_cores, trig_int, full_peaks,
plot, plotdir, plot_format,)
if concurrent_processing:
party = self._detect_concurrent(*args, **inner_kwargs)
else:
party = self._detect_serial(*args, **inner_kwargs)
Logger.info("Ensuring all templates are in party")
additional_families = []
for template in self.templates:
if template.name in party._template_dict.keys():
continue
additional_families.append(
Family(template=template, detections=[]))
party.families.extend(additional_families)
# Post-process
if len(party) > 0:
Logger.info("Removing duplicates")
party = _remove_duplicates(party)
return party
def _detect_serial(
self, stream, template_ids, pre_processed, parallel_process,
process_cores, daylong, ignore_length, overlap, ignore_bad_data,
group_size, groups, sampling_rate, threshold, threshold_type,
save_progress, xcorr_func, concurrency, cores, export_cccsums,
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
**kwargs
):
""" Internal serial detect workflow. """
from eqcorrscan.core.match_filter.helpers.tribe import (
_pre_process, _group, _corr_and_peaks, _detect, _make_party)
party = Party()
assert isinstance(stream, Stream), (
f"Serial detection requires stream to be a stream, not"
f" a {type(stream)}")
# We need to copy data here to keep the users input safe.
st_chunks = _pre_process(
st=stream.copy(), template_ids=template_ids,
pre_processed=pre_processed,
filt_order=self.templates[0].filt_order,
highcut=self.templates[0].highcut,
lowcut=self.templates[0].lowcut,
samp_rate=self.templates[0].samp_rate,
process_length=self.templates[0].process_length,
parallel=parallel_process, cores=process_cores, daylong=daylong,
ignore_length=ignore_length, ignore_bad_data=ignore_bad_data,
overlap=overlap, **kwargs)
chunk_files = []
for st_chunk in st_chunks:
starttime = st_chunk[0].stats.starttime
delta = st_chunk[0].stats.delta
template_groups = _group(
sids={tr.id for tr in st_chunk},
templates=self.templates, group_size=group_size, groups=groups)
for i, template_group in enumerate(template_groups):
templates = [_quick_copy_stream(t.st) for t in template_group]
template_names = [t.name for t in template_group]
Logger.info(
f"Prepping {len(templates)} templates for correlation")
# We need to copy the stream here.
_st, templates, template_names = _prep_data_for_correlation(
stream=_quick_copy_stream(st_chunk), templates=templates,
template_names=template_names)
all_peaks, thresholds, no_chans, chans = _corr_and_peaks(
templates=templates, template_names=template_names,
stream=_st, xcorr_func=xcorr_func, concurrency=concurrency,
cores=cores, i=i, export_cccsums=export_cccsums,
parallel=parallel, peak_cores=peak_cores,
threshold=threshold, threshold_type=threshold_type,
trig_int=trig_int, sampling_rate=sampling_rate,
full_peaks=full_peaks, plot=plot, plotdir=plotdir,
plot_format=plot_format, prepped=False, **kwargs)
detections = _detect(
template_names=template_names,
all_peaks=all_peaks, starttime=starttime,
delta=delta, no_chans=no_chans,
chans=chans, thresholds=thresholds)
chunk_file = _make_party(
detections=detections, threshold=threshold,
threshold_type=threshold_type,
templates=self.templates, chunk_start=starttime,
chunk_id=i, save_progress=save_progress)
chunk_files.append(chunk_file)
# Rebuild
for _chunk_file in chunk_files:
Logger.info(f"Adding party from {_chunk_file} to party")
with open(_chunk_file, "rb") as _f:
party += pickle.load(_f)
if not save_progress:
try:
os.remove(_chunk_file)
except FileNotFoundError:
pass
Logger.info(f"Added party from {_chunk_file}, party now "
f"contains {len(party)} detections")
if os.path.isdir(self._stream_dir):
shutil.rmtree(self._stream_dir)
return party
def _detect_concurrent(
self, stream, template_ids, pre_processed, parallel_process,
process_cores, daylong, ignore_length, overlap, ignore_bad_data,
group_size, groups, sampling_rate, threshold, threshold_type,
save_progress, xcorr_func, concurrency, cores, export_cccsums,
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
**kwargs
):
""" Internal concurrent detect workflow. """
from eqcorrscan.core.match_filter.helpers.processes import (
_pre_processor, _prepper, _make_detections, _check_for_poison,
_get_and_check, Poison)
from eqcorrscan.core.match_filter.helpers.tribe import _corr_and_peaks
if isinstance(stream, Stream):
Logger.info("Copying stream to keep your original data safe")
st_queue = Queue(maxsize=2)
st_queue.put(stream.copy())
# Close off queue
st_queue.put(None)
stream = st_queue
else:
# Note that if a queue has been passed we do not try to keep
# data safe
Logger.warning("Streams in queue will be edited in-place, you "
"should not re-use them")
# To reduce load copying templates between processes we dump them to
# disk and pass the dictionary of files
template_dir = f".template_db_{uuid.uuid4()}"
template_db = self._temporary_template_db(template_dir)
# Set up processes and queues
poison_queue = kwargs.get('poison_queue', Queue())
if not pre_processed:
processed_stream_queue = Queue(maxsize=1)
else:
processed_stream_queue = stream
# Prepped queue contains templates and stream (and extras)
prepped_queue = Queue(maxsize=1)
# Output queues
peaks_queue = Queue()
party_file_queue = Queue()
# Set up processes
if not pre_processed:
pre_processor_process = Process(
target=_pre_processor,
kwargs=dict(
input_stream_queue=stream,
temp_stream_dir=self._stream_dir,
template_ids=template_ids,
pre_processed=pre_processed,
filt_order=self.templates[0].filt_order,
highcut=self.templates[0].highcut,
lowcut=self.templates[0].lowcut,
samp_rate=self.templates[0].samp_rate,
process_length=self.templates[0].process_length,
parallel=parallel_process,
cores=process_cores,
daylong=daylong,
ignore_length=ignore_length,
overlap=overlap,
ignore_bad_data=ignore_bad_data,
output_filename_queue=processed_stream_queue,
poison_queue=poison_queue,
),
name="ProcessProcess"
)
prepper_process = Process(
target=_prepper,
kwargs=dict(
input_stream_filename_queue=processed_stream_queue,
group_size=group_size,
groups=groups,
templates=template_db,
output_queue=prepped_queue,
poison_queue=poison_queue,
xcorr_func=xcorr_func,
),
name="PrepProcess"
)
detector_process = Process(
target=_make_detections,
kwargs=dict(
input_queue=peaks_queue,
delta=1 / sampling_rate,
templates=template_db,
threshold=threshold,
threshold_type=threshold_type,
save_progress=save_progress,
output_queue=party_file_queue,
poison_queue=poison_queue,
),
name="DetectProcess"
)
# Cope with old tribes
if not hasattr(self, '_processes'):
self._processes = dict()
if not hasattr(self, '_queues'):
self._queues = dict()
# Put these processes into the namespace
self._processes.update({
"prepper": prepper_process,
"detector": detector_process,
})
self._queues.update({
"poison": poison_queue,
"stream": stream,
"prepped": prepped_queue,
"peaks": peaks_queue,
"party_file": party_file_queue,
})
if not pre_processed:
Logger.info("Starting preprocessor")
self._queues.update({"processed_stream": processed_stream_queue})
self._processes.update({"pre-processor": pre_processor_process})
pre_processor_process.start()
# Start your engines!
prepper_process.start()
detector_process.start()
# Loop over input streams and template groups
while True:
killed = _check_for_poison(poison_queue)
if killed:
Logger.info("Killed in main loop")
break
try:
to_corr = _get_and_check(prepped_queue, poison_queue)
if to_corr is None:
Logger.info("Ran out of streams, exiting correlation")
break
elif isinstance(to_corr, Poison):
Logger.info("Killed in main loop")
break
starttime, i, stream, template_names, templates, \
*extras = to_corr
inner_kwargs = copy.copy(kwargs) # We will mangle them
# Correlation specific handling to reduce single-threaded time
if xcorr_func == "fmf":
weights, pads, chans, no_chans = extras
inner_kwargs.update({
'weights': weights, 'pads': pads, "no_chans": no_chans,
"chans": chans, "prepped": True})
elif xcorr_func in (None, 'fftw'):
pads, seed_ids = extras
inner_kwargs.update({
"pads": pads, "seed_ids": seed_ids, "prepped": True})
Logger.info(f"Got stream of {len(stream)} channels")
Logger.info(f"Starting correlation from {starttime}")
all_peaks, thresholds, no_chans, chans = _corr_and_peaks(
templates=templates, template_names=template_names,
stream=stream, xcorr_func=xcorr_func,
concurrency=concurrency,
cores=cores, i=i, export_cccsums=export_cccsums,
parallel=parallel, peak_cores=peak_cores,
threshold=threshold, threshold_type=threshold_type,
trig_int=trig_int, sampling_rate=sampling_rate,
full_peaks=full_peaks, plot=plot, plotdir=plotdir,
plot_format=plot_format, **inner_kwargs
)
peaks_queue.put(
(starttime, all_peaks, thresholds, no_chans, chans,
template_names))
except Exception as e:
Logger.error(
f"Caught exception in correlator:\n {e}")
traceback.print_tb(e.__traceback__)
poison_queue.put(e)
break # We need to break in Main
i += 1
Logger.debug("Putting None into peaks queue.")
peaks_queue.put(None)
# Get the party back
Logger.info("Collecting party")
party = Party()
while True:
killed = _check_for_poison(poison_queue)
if killed:
Logger.error("Killed")
break
pf = _get_and_check(party_file_queue, poison_queue)
if pf is None:
# Fin - Queue has been finished with.
break
if isinstance(pf, Poison):
Logger.error("Killed while checking for party")
killed = True
break
with open(pf, "rb") as f:
party += pickle.load(f)
if not save_progress:
try:
os.remove(pf)
except FileNotFoundError:
pass
# Check for exceptions
if killed:
internal_error = poison_queue.get()
if isinstance(internal_error, Poison):
internal_error = internal_error.value
Logger.error(f"Raising error {internal_error} in main process")
# Now we can raise the error
if internal_error:
# Clean the template db
if os.path.isdir(template_dir):
shutil.rmtree(template_dir)
if os.path.isdir(self._stream_dir):
shutil.rmtree(self._stream_dir)
self._on_error(internal_error)
# Shut down the processes and close the queues
shutdown = kwargs.get("shutdown", True)
# Allow client_detect to take control
if shutdown:
Logger.info("Shutting down")
self._close_queues()
self._close_processes()
if os.path.isdir(template_dir):
shutil.rmtree(template_dir)
if os.path.isdir(self._stream_dir):
shutil.rmtree(self._stream_dir)
return party
[docs]
def client_detect(self, client, starttime, endtime, threshold,
threshold_type, trig_int, plot=False, plotdir=None,
min_gap=None, daylong=False, parallel_process=True,
xcorr_func=None, concurrency=None, cores=None,
concurrent_processing=False, ignore_length=False,
ignore_bad_data=False, group_size=None,
return_stream=False, full_peaks=False,
save_progress=False, process_cores=None, retries=3,
check_processing=True, **kwargs):
"""
Detect using a Tribe of templates within a continuous stream.
:type client: `obspy.clients.*.Client`
:param client:
Any obspy client (or client-like object) with a dataselect service.
:type starttime: :class:`obspy.core.UTCDateTime`
:param starttime: Start-time for detections.
:type endtime: :class:`obspy.core.UTCDateTime`
:param endtime: End-time for detections
:type threshold: float
:param threshold:
Threshold level, if using `threshold_type='MAD'` then this will be
the multiple of the median absolute deviation.
:type threshold_type: str
:param threshold_type:
The type of threshold to be used, can be MAD, absolute or
av_chan_corr. See Note on thresholding below.
:type trig_int: float
:param trig_int:
Minimum gap between detections from one template in seconds.
If multiple detections occur within trig_int of one-another, the
one with the highest cross-correlation sum will be selected.
:type plot: bool
:param plot: Turn plotting on or off.
:type plotdir: str
:param plotdir:
The path to save plots to. If `plotdir=None` (default) then the
figure will be shown on screen.
:type min_gap: float
:param min_gap:
Minimum gap allowed in data - use to remove traces with known
issues
:type daylong: bool
:param daylong:
Set to True to use the
:func:`eqcorrscan.utils.pre_processing.dayproc` routine, which
preforms additional checks and is more efficient for day-long data
over other methods.
:type parallel_process: bool
:param parallel_process:
:type xcorr_func: str or callable
:param xcorr_func:
A str of a registered xcorr function or a callable for implementing
a custom xcorr function. For more information see:
:func:`eqcorrscan.utils.correlate.register_array_xcorr`
:type concurrency: str
:param concurrency:
The type of concurrency to apply to the xcorr function. Options are
'multithread', 'multiprocess', 'concurrent'. For more details see
:func:`eqcorrscan.utils.correlate.get_stream_xcorr`
:type cores: int
:param cores: Number of workers for processing and detection.
:type concurrent_processing: bool
:param concurrent_processing:
Whether to process steps in detection workflow concurrently or not.
See https://github.com/eqcorrscan/EQcorrscan/pull/544 for
benchmarking.
:type ignore_length: bool
:param ignore_length:
If using daylong=True, then dayproc will try check that the data
are there for at least 80% of the day, if you don't want this check
(which will raise an error if too much data are missing) then set
ignore_length=True. This is not recommended!
:type ignore_bad_data: bool
:param ignore_bad_data:
If False (default), errors will be raised if data are excessively
gappy or are mostly zeros. If True then no error will be raised,
but an empty trace will be returned (and not used in detection).
:type group_size: int
:param group_size:
Maximum number of templates to run at once, use to reduce memory
consumption, if unset will use all templates.
:type full_peaks: bool
:param full_peaks: See `eqcorrscan.utils.findpeaks.find_peaks2_short`
:type save_progress: bool
:param save_progress:
Whether to save the resulting party at every data step or not.
Useful for long-running processes.
:type process_cores: int
:param process_cores:
Number of processes to use for pre-processing (if different to
`cores`).
:type return_stream: bool
:param return_stream:
Whether to also output the stream downloaded, useful if you plan
to use the stream for something else, e.g. lag_calc.
:type retries: int
:param retries:
Number of attempts allowed for downloading - allows for transient
server issues.
:return:
:class:`eqcorrscan.core.match_filter.Party` of Families of
detections.
.. Note::
When using the "fftw" correlation backend the length of the fft
can be set. See :mod:`eqcorrscan.utils.correlate` for more info.
.. Note::
Ensures that data overlap between loops, which will lead to no
missed detections at data start-stop points (see note for
:meth:`eqcorrscan.core.match_filter.Tribe.detect` method).
This will result in end-time not being strictly
honoured, so detections may occur after the end-time set. This is
because data must be run in the correct process-length.
.. warning::
Plotting within the match-filter routine uses the Agg backend
with interactive plotting turned off. This is because the function
is designed to work in bulk. If you wish to turn interactive
plotting on you must import matplotlib in your script first,
when you then import match_filter you will get the warning that
this call to matplotlib has no effect, which will mean that
match_filter has not changed the plotting behaviour.
.. note::
**Thresholding:**
**MAD** threshold is calculated as the:
.. math::
threshold {\\times} (median(abs(cccsum)))
where :math:`cccsum` is the cross-correlation sum for a given
template.
**absolute** threshold is a true absolute threshold based on the
cccsum value.
**av_chan_corr** is based on the mean values of single-channel
cross-correlations assuming all data are present as required for
the template, e.g:
.. math::
av\_chan\_corr\_thresh=threshold \\times (cccsum /
len(template))
where :math:`template` is a single template from the input and the
length is the number of channels within this template.
"""
from eqcorrscan.core.match_filter.helpers.tribe import _download_st
from eqcorrscan.core.match_filter.helpers.processes import (
_get_detection_stream)
# This uses get_waveforms_bulk to get data - not all client types have
# this, so we check and monkey patch here.
if not hasattr(client, "get_waveforms_bulk"):
assert hasattr(client, "get_waveforms"), (
f"client {client} must have at least a get_waveforms method")
Logger.info(f"Client {client} does not have a get_waveforms_bulk "
"method, monkey-patching this")
client = get_waveform_client(client)
if check_processing:
assert len(quick_group_templates(self.templates)) == 1, (
"Inconsistent template processing parameters found, this is no"
" longer supported. Split your tribe using "
"eqcorrscan.core.match_filter.template.quick_group_templates "
"and re-run for each group")
groups = kwargs.get("groups", None)
# Hard-coded buffer for downloading data, often data downloaded is
# not the correct length
buff = 300
data_length = max([t.process_length for t in self.templates])
# Calculate overlap
overlap = max(_moveout(template.st) for template in self.templates)
assert overlap < data_length, (
f"Overlap of {overlap} s is larger than the length of data to "
f"be downloaded: {data_length} s - this won't work.")
# Work out start and end times of chunks to download
chunk_start, time_chunks = starttime, []
while chunk_start < endtime:
time_chunks.append((chunk_start, chunk_start + data_length + 20))
chunk_start += data_length - overlap
full_stream_dir = None
if return_stream:
full_stream_dir = tempfile.mkdtemp()
poison_queue = Queue()
detector_kwargs = dict(
threshold=threshold, threshold_type=threshold_type,
trig_int=trig_int, plot=plot, plotdir=plotdir,
daylong=daylong, parallel_process=parallel_process,
xcorr_func=xcorr_func, concurrency=concurrency, cores=cores,
ignore_length=ignore_length, ignore_bad_data=ignore_bad_data,
group_size=group_size, overlap=None, full_peaks=full_peaks,
process_cores=process_cores, save_progress=save_progress,
return_stream=return_stream, check_processing=False,
poison_queue=poison_queue, shutdown=False,
concurrent_processing=concurrent_processing, groups=groups)
if not concurrent_processing:
Logger.warning("Using concurrent_processing=True can be faster if"
"downloading your data takes a long time. See "
"https://github.com/eqcorrscan/EQcorrscan/pull/544"
"for benchmarks.")
party = Party()
if return_stream:
full_st = Stream()
for _starttime, _endtime in time_chunks:
st = _download_st(
starttime=_starttime, endtime=_endtime,
buff=buff, min_gap=min_gap,
template_channel_ids=self._template_channel_ids(),
client=client, retries=retries)
if len(st) == 0:
Logger.warning(f"No suitable data between {_starttime} "
f"and {_endtime}, skipping")
continue
party += self.detect(stream=st, pre_processed=False,
**detector_kwargs)
if return_stream:
full_st += st
if return_stream:
return party, full_st
return party
# Get data in advance
time_queue = Queue()
stream_queue = Queue(maxsize=1)
downloader = Process(
target=_get_detection_stream,
kwargs=dict(
input_time_queue=time_queue,
client=client,
retries=retries,
min_gap=min_gap,
buff=buff,
output_filename_queue=stream_queue,
poison_queue=poison_queue,
temp_stream_dir=self._stream_dir,
full_stream_dir=full_stream_dir,
pre_process=True, parallel_process=parallel_process,
process_cores=process_cores, daylong=daylong,
overlap=0.0, ignore_length=ignore_length,
ignore_bad_data=ignore_bad_data,
filt_order=self.templates[0].filt_order,
highcut=self.templates[0].highcut,
lowcut=self.templates[0].lowcut,
samp_rate=self.templates[0].samp_rate,
process_length=self.templates[0].process_length,
template_channel_ids=self._template_channel_ids(),
),
name="DownloadProcess"
)
# Cope with old tribes
if not hasattr(self, '_processes'):
self._processes = dict()
if not hasattr(self, '_queues'):
self._queues = dict()
# Put processes and queues into shared state
self._processes.update({
"downloader": downloader,
})
self._queues.update({
"times": time_queue,
"poison": poison_queue,
"stream": stream_queue,
})
# Fill time queue
for time_chunk in time_chunks:
time_queue.put(time_chunk)
# Close off queue
time_queue.put(None)
# Start up processes
downloader.start()
party = self.detect(
stream=stream_queue, pre_processed=True, **detector_kwargs)
# Close and join processes
self._close_processes()
self._close_queues()
if return_stream:
full_st = read(os.path.join(full_stream_dir, "*"))
shutil.rmtree(full_stream_dir)
return party, full_st
return party
def _close_processes(
self,
terminate: bool = False,
processes: dict = None
):
processes = processes or self._processes
for p_name, p in processes.items():
if terminate:
Logger.warning(f"Terminating {p_name}")
p.terminate()
continue
try:
Logger.info(f"Joining {p_name}")
p.join(timeout=self._timeout)
except Exception as e:
Logger.error(f"Failed to join due to {e}: terminating")
p.terminate()
Logger.info(f"Closing {p_name}")
try:
p.close()
except Exception as e:
Logger.error(
f"Failed to close {p_name} due to {e}, terminating")
p.terminate()
Logger.info("Finished closing processes")
return
def _close_queues(self, queues: dict = None):
queues = queues or self._queues
for q_name, q in queues.items():
if q._closed:
continue
Logger.info(f"Emptying {q_name}")
while True:
try:
q.get_nowait()
except Empty:
break
Logger.info(f"Closing {q_name}")
q.close()
Logger.info("Finished closing queues")
return
def _on_error(self, error):
""" Gracefully close all child processes and queues and raise error """
self._close_processes(terminate=True)
self._close_queues()
Logger.info("Admin complete, raising error")
raise error
[docs]
def read_tribe(fname):
"""
Read a Tribe of templates from a tar archive.
:param fname: Filename to read from
:return: :class:`eqcorrscan.core.match_filter.Tribe`
"""
tribe = Tribe()
tribe.read(filename=fname)
return tribe
if __name__ == "__main__":
import doctest
doctest.testmod()
# List files to be removed after doctest
cleanup = ['test_tribe.tgz', "test_tribe.pkl"]
for f in cleanup:
if os.path.isfile(f):
os.remove(f)
elif os.path.isdir(f):
shutil.rmtree(f)